All Products
Search
Document Center

PolarDB:Mengoptimalkan kueri pada tabel terdistribusi

Last Updated:Nov 11, 2025

Dalam kluster PolarDB for PostgreSQL (Distributed Edition), performa kueri sangat bergantung pada cara Anda menulis pernyataan SQL. Pernyataan SQL yang ditulis dengan baik dapat memaksimalkan manfaat dari arsitektur terdistribusi dengan menurunkan komputasi ke node data (DNs) untuk eksekusi paralel, sehingga menghindari transfer data yang tidak perlu dan overhead koordinasi. Topik ini menjelaskan beberapa metode praktis untuk mengoptimalkan kueri.

Metode optimasi

Skenario kueri

Metode optimasi

Prinsip inti

Kueri titik atau pemindaian rentang kecil pada satu tabel

Gunakan kolom distribusi dalam klausa WHERE untuk penyaringan kesetaraan atau rentang.

Pemangkasan shard: Menentukan satu shard atau beberapa shard yang menyimpan data tersebut, sehingga menghindari pemindaian semua shard.

Penggabungan pada beberapa tabel besar

  • Gunakan kolom distribusi yang sama dan tempatkan tabel-tabel tersebut dalam grup kolokasi.

  • Ubah tabel kecil menjadi tabel replikasi.

Penurunan komputasi: Menurunkan operasi JOIN ke setiap node data untuk eksekusi lokal, sehingga menghindari overhead jaringan lintas-node.

Pengelompokan dan agregasi (GROUP BY)

Sertakan kolom distribusi dalam klausa GROUP BY.

Penurunan agregasi: Menyelesaikan sebagian besar operasi agregasi secara lokal di node data; node klien hanya melakukan ringkasan akhir.

Kueri kompleks atau JOIN non-kolokasi

Tulis ulang kueri menggunakan Common Table Expressions (CTEs).

Mengurangi redistribusi data: Gunakan CTE untuk terlebih dahulu menyaring tabel besar dan menghasilkan set hasil kecil, lalu lakukan operasi terdistribusi berikutnya pada set yang lebih kecil ini. Hal ini mengurangi jumlah data yang ditransfer melalui jaringan.

Gunakan kolom distribusi untuk pemangkasan shard (optimasi kueri satu tabel)

Pemangkasan shard adalah metode optimasi paling dasar dan efisien untuk kueri terdistribusi. Ketika kondisi kueri mencakup pemeriksaan kesetaraan pada kolom distribusi, pengoptimal dapat langsung menentukan lokasi shard unik yang berisi data tersebut, sehingga performanya setara dengan kueri single-node.

Prinsip:

Saat menjalankan kueri, pengoptimal terdistribusi memeriksa klausa WHERE. Jika ditemukan kondisi pada kolom distribusi, seperti t2.a = 1000, pengoptimal menghitung nilai hash dari 1000 untuk menentukan shard fisik tepat yang menyimpan data tersebut. Selanjutnya, pengoptimal memangkas semua shard lain yang tidak relevan dan hanya mengirim tugas kueri ke node data yang berisi shard target. Proses ini disebut pemangkasan shard.

Contoh:

  1. Buat tabel t2 dan gunakan kolom a sebagai kolom distribusi.

    CREATE TABLE t2 (a INT, b INT);
    SELECT create_distributed_table('t2', 'a');
  2. Kueri data di mana a = 1000. Kueri titik ini langsung diarahkan ke satu shard, yaitu t2_102134 dalam contoh ini.

    EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;

    Hasil berikut dikembalikan:

                            QUERY PLAN                        
    ----------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 1 -- Jumlah tugas adalah 1, yang berarti kueri diarahkan ke satu shard.
       Tasks Shown: All
       ->  Task
             Node: host=10.188.92.147 port=3003 dbname=testdb
             ->  Seq Scan on t2_102134 t2
                   Filter: (a = 1000)
    (7 rows)

Turunkan operasi JOIN menggunakan grup kolokasi dan tabel replikasi (optimasi penggabungan multi-tabel)

Dalam lingkungan terdistribusi, menghindari JOIN lintas-node adalah kunci untuk meningkatkan performa. Anda dapat menurunkan komputasi JOIN dengan dua cara berikut:

  • Gunakan grup kolokasi: Jika dua atau lebih tabel besar digabungkan berdasarkan kolom distribusi yang sama, Anda dapat menempatkannya dalam grup kolokasi yang sama. Hal ini memastikan bahwa data terkait berada di node yang sama, sehingga memungkinkan JOIN lokal.

  • Gunakan tabel replikasi: Jika sebuah tabel yang terlibat dalam JOIN memenuhi kondisi berikut, Anda dapat membuatnya sebagai tabel replikasi. Tabel replikasi memiliki replika di setiap node data, yang juga memungkinkan JOIN lokal.

    1. Tidak cocok menggunakan kunci gabungan sebagai kolom distribusi. Misalnya, kolom lain mungkin telah dipilih sebagai kolom distribusi.

    2. Tabel tersebut jarang dimodifikasi dan perlu digabungkan dengan beberapa tabel lain.

Prinsip:

Ketika pengoptimal menemukan bahwa dua tabel dalam JOIN berada dalam kolokasi atau salah satunya adalah tabel replikasi, dan kondisi JOIN berada pada kolom distribusi, pengoptimal menentukan bahwa baris data terkait semuanya berada di node data yang sama. Oleh karena itu, pengoptimal menurunkan seluruh operasi JOIN sebagai tugas ke setiap node data. Setiap node menyelesaikan operasi JOIN secara lokal dan paralel. Akhirnya, node klien (CN) hanya perlu mengumpulkan dan merangkum hasil dari setiap DN. Proses ini menghindari transfer sejumlah besar data mentah antar node untuk operasi JOIN, sehingga menghasilkan performa kueri yang tinggi.

Contoh:

Gunakan grup kolokasi

  1. Buat tabel colocation_t1 dan colocation_t2. Gunakan kolom a sebagai kolom distribusi untuk keduanya, dan pastikan keduanya berada dalam kolokasi.

    CREATE TABLE colocation_t1 (a INT, b INT);
    SELECT create_distributed_table('colocation_t1', 'a');
    
    CREATE TABLE colocation_t2 (a INT, b INT);
    SELECT create_distributed_table('colocation_t2', 'a');
  2. Jalankan kueri JOIN. Rencana eksekusi berikut menunjukkan bahwa operasi JOIN diturunkan ke DN untuk dieksekusi.

     EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;

    Hasil berikut dikembalikan:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4 -- Tugas didistribusikan ke semua shard.
       Tasks Shown: One of 4 
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join -- Operasi JOIN dieksekusi di dalam node data (DN).
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)
  3. Subkueri: Penggunaan grup kolokasi juga efektif untuk subkueri. Jika keluaran subkueri digabungkan dengan tabel terdistribusi, dan kunci gabungan juga merupakan kolom distribusi tabel dalam subkueri tersebut, subkueri tersebut juga diturunkan.

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;

    Hasil berikut dikembalikan:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)

Gunakan tabel replikasi

  1. Buat tabel colocation_t3 dan colocation_t4. Gunakan kolom a sebagai kolom distribusi untuk tabel colocation_t3, dan buat tabel colocation_t4 sebagai tabel replikasi.

    CREATE TABLE colocation_t3 (a INT, b INT);
    SELECT create_distributed_table('colocation_t3', 'a');
    
    CREATE TABLE colocation_t4 (a INT, b INT);
    SELECT create_reference_table('colocation_t4');
  2. Jalankan kueri JOIN. Rencana eksekusi berikut menunjukkan bahwa operasi JOIN diturunkan ke DN untuk dieksekusi.

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;

    Hasil berikut dikembalikan:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t3.a = colocation_t4.b)
                   ->  Sort
                         Sort Key: colocation_t3.a
                         ->  Seq Scan on colocation_t3_102145 colocation_t3
                   ->  Sort
                         Sort Key: colocation_t4.b
                         ->  Seq Scan on colocation_t4_102149 colocation_t4
    (13 rows)

Turunkan operasi agregasi dan pengurutan

Untuk operasi agregasi dan pengurutan, menurunkan komputasi ke node data dapat secara signifikan mengurangi beban pada CN dan mengurangi lalu lintas jaringan.

Penurunan agregasi

  • Ketika KEY dalam klausa GROUP BY mencakup kolom distribusi, fungsi agregat seperti SUM dan COUNT sepenuhnya diturunkan ke node data.

  • Ketika KEY dalam klausa GROUP BY tidak mencakup kolom distribusi, pengoptimal menggunakan strategi dua tahap. Pertama, melakukan pra-agregasi di node data, lalu melakukan agregasi akhir di node klien.

    1. Tahap 1 (Agregasi lokal di DN): Setiap node data pertama-tama melakukan operasi agregasi pada shard datanya sendiri untuk menghasilkan hasil antara lokal.

    2. Tahap 2 (Agregasi akhir di CN): Node klien mengumpulkan hasil lokal dari semua DN. Set hasil ini biasanya kecil. CN kemudian melakukan agregasi kedua untuk menghitung hasil global akhir. Metode ini mengurangi jumlah data yang harus ditransfer ke node klien melalui jaringan.

Contoh:

  1. Buat tabel group_t dan gunakan kolom a sebagai kolom distribusi.

    CREATE TABLE group_t (a INT, b INT);
    SELECT create_distributed_table('group_t', 'a');
  2. Ketika kolom distribusi disertakan, agregasi sepenuhnya diturunkan. Fungsi agregat dieksekusi sepenuhnya di DN. Rencana eksekusi adalah sebagai berikut:

    EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;

    Hasil berikut dikembalikan:

                           QUERY PLAN                        
    ---------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  HashAggregate
                   Group Key: a
                   ->  Seq Scan on group_t_102150 group_t
    (8 rows)
  3. Ketika kolom distribusi tidak disertakan, strategi dua tahap digunakan. Fungsi agregat dieksekusi dua kali: sekali di DN dan sekali di CN.

    EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;

    Hasil berikut dikembalikan:

                              QUERY PLAN                           
    ---------------------------------------------------------------
     Aggregate
       ->  Custom Scan (PolarCluster Adaptive)
             Task Count: 4
             Tasks Shown: One of 4
             ->  Task
                   Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                   ->  Aggregate
                         ->  Seq Scan on group_t_102150 group_t
    (8 rows)

Penurunan pengurutan

Klausa ORDER BY ... LIMIT N juga dapat diturunkan ke node data dalam banyak skenario. Setiap node pertama-tama mengembalikan hasil top-N-nya sendiri. Node klien kemudian melakukan pengurutan akhir pada hasil antara ini. Metode ini sangat mengurangi jumlah data yang perlu diurutkan.

Sebagai contoh, jika tidak ada klausa GROUP BY, atau jika KEY dari klausa GROUP BY mencakup kolom distribusi, operasi ORDER BY ... LIMIT N diturunkan ke shard. CN kemudian menjalankan operasi ORDER BY ... LIMIT N lagi pada hasil yang dikembalikan dari setiap shard.

Contoh:

  1. Buat tabel order_t dan gunakan kolom a sebagai kolom distribusi.

    CREATE TABLE order_t (a INT, b INT);
    SELECT create_distributed_table('order_t', 'a');
  2. Operasi ORDER BY ... LIMIT N diturunkan. Rencana eksekusi adalah sebagai berikut:

    EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;

    Hasil berikut dikembalikan:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Limit
       ->  Sort
             Sort Key: remote_scan.a
             ->  Custom Scan (PolarCluster Adaptive)
                   Task Count: 4
                   Tasks Shown: One of 4
                   ->  Task
                         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                         ->  Limit
                               ->  Sort
                                     Sort Key: a
                                     ->  Seq Scan on order_t_102154 order_t
    (12 rows)

Gunakan CTE untuk menulis ulang kueri kompleks

Untuk JOIN kompleks yang tidak dapat dioptimalkan dengan grup kolokasi—misalnya ketika kondisi JOIN tidak berada pada kolom distribusi—pengoptimal mungkin memilih rencana eksekusi yang tidak efisien secara default. Sebagai contoh, pengoptimal mungkin menarik semua data dari satu tabel ke node klien lalu mengirimkannya kembali ke node data. Dalam kasus ini, Anda dapat mengoptimalkan kueri secara manual menggunakan CTE (WITH clause).

Prinsip:

Pertama, Anda dapat menggunakan CTE untuk menerapkan filter selektif tinggi pada tabel besar. Langkah ini menghasilkan set hasil antara yang kecil. Kemudian, Anda dapat menggabungkan set hasil kecil ini dengan tabel lain. Metode ini dapat secara signifikan mengurangi jumlah data yang perlu ditransfer antar node.

Contoh:

  1. Persiapkan data: Buat tabel cte_t1 dan cte_t2, dan gunakan kolom a sebagai kolom distribusi untuk keduanya. Buat indeks lokal pada kolom b dari tabel cte_t1.

    -- Izinkan pengoptimal kueri menghasilkan rencana eksekusi redistribusi data untuk JOIN antara tabel atau kolom non-kolokasi.
    SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON;
    -- Buat tabel standar
    CREATE TABLE cte_t1(a INT, b INT);
    CREATE TABLE cte_t2(a INT, b INT);
    -- Ubah menjadi tabel terdistribusi
    SELECT create_distributed_table('cte_t1', 'a');
    SELECT create_distributed_table('cte_t2', 'a');
    -- Buat indeks lokal
    CREATE INDEX local_idx_t1_b ON cte_t1(b);
    -- Masukkan data uji
    INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    -- Kumpulkan dan perbarui statistik untuk kedua tabel ini.
    ANALYZE cte_t1, cte_t2;
  2. Sebelum optimasi: Jika Anda menjalankan kueri JOIN non-kolokasi secara langsung, kueri menghasilkan rencana eksekusi berikut. Kueri pertama-tama melakukan pemindaian tabel penuh pada cte_t2, mengembalikan hasil ke CN, lalu mengirimkannya kembali ke DN untuk operasi JOIN.

    EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;

    Hasil berikut dikembalikan:

                                                                                       QUERY PLAN                                               
                                        
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1)
       ->  Distributed Subplan 46_1
             Subplan Duration: 1488.57 ms
             Intermediate Data Size: 17 MB
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 7813 kB
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 1950 kB
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Seq Scan on cte_t2_102165 cte_t2  (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l
    oops=1)
                             Planning Time: 0.013 ms
                             Execution Time: 48.372 ms
             Planning Time: 0.000 ms
             Execution Time: 531.747 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 480 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Hash Join  (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1)
                   Hash Cond: (intermediate_result.b = cte_t1.a)
                   ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179
    .565..278.108 rows=1000000 loops=1)
                   ->  Hash  (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1)
                         Buckets: 1024  Batches: 1  Memory Usage: 9kB
                         ->  Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.021
    ..0.021 rows=3 loops=1)
                               Index Cond: (b = 1)
                 Planning Time: 0.365 ms
                 Execution Time: 332.955 ms
     Planning Time: 0.625 ms
     Execution Time: 1823.139 ms
    (34 rows)
  3. Setelah optimasi: Anda dapat menggunakan CTE untuk terlebih dahulu menyaring tabel cte_t1. Hal ini memaksa pemindaian indeks pada cte_t1 dan mengembalikan set hasil kecil ke CN. Metode ini mengurangi waktu pemindaian dan jumlah data yang ditransfer melalui jaringan.

    EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;

    Kueri yang dioptimalkan puluhan kali lebih cepat karena menghindari redistribusi data skala besar.

                                                                                    QUERY PLAN                                                  
                                   
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1)
       ->  Distributed Subplan 49_1
             Subplan Duration: 0.94 ms
             Intermediate Data Size: 180 bytes
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 80 bytes
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 32 bytes
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.005
    ..0.005 rows=4 loops=1)
                               Index Cond: (b = 1)
                             Planning Time: 0.020 ms
                             Execution Time: 0.014 ms
             Planning Time: 0.000 ms
             Execution Time: 0.779 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 496 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Gather  (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1)
                   Workers Planned: 1
                   Workers Launched: 1
                   ->  Hash Join  (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2)
                         Hash Cond: (cte_t2.b = intermediate_result.a)
                         ->  Parallel Seq Scan on cte_t2_102163 cte_t2  (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows=
    124936 loops=2)
                         ->  Hash  (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2)
                               Buckets: 1024  Batches: 1  Memory Usage: 9kB
                               ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..0.07 rows=10 width=8) (actual time
    =1.444..1.448 rows=10 loops=2)
                 Planning Time: 0.068 ms
                 Execution Time: 19.729 ms
     Planning Time: 0.514 ms
     Execution Time: 39.672 ms
    (37 rows)

SELECT ... FOR UPDATE batasan

Karena kompleksitas koordinasi kunci global, pernyataan SELECT ... FOR UPDATE tidak mendukung eksekusi lintas-shard. Jika kondisi kueri tidak dapat membatasi cakupan ke satu shard saja, pernyataan tersebut akan mengembalikan kesalahan.

SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR:  FOR UPDATE is not supported for query of more than one shard