Dalam kluster PolarDB for PostgreSQL (Distributed Edition), performa kueri sangat bergantung pada cara Anda menulis pernyataan SQL. Pernyataan SQL yang ditulis dengan baik dapat memaksimalkan manfaat dari arsitektur terdistribusi dengan menurunkan komputasi ke node data (DNs) untuk eksekusi paralel, sehingga menghindari transfer data yang tidak perlu dan overhead koordinasi. Topik ini menjelaskan beberapa metode praktis untuk mengoptimalkan kueri.
Metode optimasi
Skenario kueri | Metode optimasi | Prinsip inti |
Kueri titik atau pemindaian rentang kecil pada satu tabel | Gunakan kolom distribusi dalam klausa | Pemangkasan shard: Menentukan satu shard atau beberapa shard yang menyimpan data tersebut, sehingga menghindari pemindaian semua shard. |
Penggabungan pada beberapa tabel besar |
| Penurunan komputasi: Menurunkan operasi JOIN ke setiap node data untuk eksekusi lokal, sehingga menghindari overhead jaringan lintas-node. |
Pengelompokan dan agregasi (GROUP BY) | Sertakan kolom distribusi dalam klausa | Penurunan agregasi: Menyelesaikan sebagian besar operasi agregasi secara lokal di node data; node klien hanya melakukan ringkasan akhir. |
Kueri kompleks atau JOIN non-kolokasi | Tulis ulang kueri menggunakan Common Table Expressions (CTEs). | Mengurangi redistribusi data: Gunakan CTE untuk terlebih dahulu menyaring tabel besar dan menghasilkan set hasil kecil, lalu lakukan operasi terdistribusi berikutnya pada set yang lebih kecil ini. Hal ini mengurangi jumlah data yang ditransfer melalui jaringan. |
Gunakan kolom distribusi untuk pemangkasan shard (optimasi kueri satu tabel)
Pemangkasan shard adalah metode optimasi paling dasar dan efisien untuk kueri terdistribusi. Ketika kondisi kueri mencakup pemeriksaan kesetaraan pada kolom distribusi, pengoptimal dapat langsung menentukan lokasi shard unik yang berisi data tersebut, sehingga performanya setara dengan kueri single-node.
Prinsip:
Saat menjalankan kueri, pengoptimal terdistribusi memeriksa klausa WHERE. Jika ditemukan kondisi pada kolom distribusi, seperti t2.a = 1000, pengoptimal menghitung nilai hash dari 1000 untuk menentukan shard fisik tepat yang menyimpan data tersebut. Selanjutnya, pengoptimal memangkas semua shard lain yang tidak relevan dan hanya mengirim tugas kueri ke node data yang berisi shard target. Proses ini disebut pemangkasan shard.
Contoh:
Buat tabel
t2dan gunakan kolomasebagai kolom distribusi.CREATE TABLE t2 (a INT, b INT); SELECT create_distributed_table('t2', 'a');Kueri data di mana
a = 1000. Kueri titik ini langsung diarahkan ke satu shard, yaitut2_102134dalam contoh ini.EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;Hasil berikut dikembalikan:
QUERY PLAN ---------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 1 -- Jumlah tugas adalah 1, yang berarti kueri diarahkan ke satu shard. Tasks Shown: All -> Task Node: host=10.188.92.147 port=3003 dbname=testdb -> Seq Scan on t2_102134 t2 Filter: (a = 1000) (7 rows)
Turunkan operasi JOIN menggunakan grup kolokasi dan tabel replikasi (optimasi penggabungan multi-tabel)
Dalam lingkungan terdistribusi, menghindari JOIN lintas-node adalah kunci untuk meningkatkan performa. Anda dapat menurunkan komputasi JOIN dengan dua cara berikut:
Gunakan grup kolokasi: Jika dua atau lebih tabel besar digabungkan berdasarkan kolom distribusi yang sama, Anda dapat menempatkannya dalam grup kolokasi yang sama. Hal ini memastikan bahwa data terkait berada di node yang sama, sehingga memungkinkan JOIN lokal.
Gunakan tabel replikasi: Jika sebuah tabel yang terlibat dalam JOIN memenuhi kondisi berikut, Anda dapat membuatnya sebagai tabel replikasi. Tabel replikasi memiliki replika di setiap node data, yang juga memungkinkan JOIN lokal.
Tidak cocok menggunakan kunci gabungan sebagai kolom distribusi. Misalnya, kolom lain mungkin telah dipilih sebagai kolom distribusi.
Tabel tersebut jarang dimodifikasi dan perlu digabungkan dengan beberapa tabel lain.
Prinsip:
Ketika pengoptimal menemukan bahwa dua tabel dalam JOIN berada dalam kolokasi atau salah satunya adalah tabel replikasi, dan kondisi JOIN berada pada kolom distribusi, pengoptimal menentukan bahwa baris data terkait semuanya berada di node data yang sama. Oleh karena itu, pengoptimal menurunkan seluruh operasi JOIN sebagai tugas ke setiap node data. Setiap node menyelesaikan operasi JOIN secara lokal dan paralel. Akhirnya, node klien (CN) hanya perlu mengumpulkan dan merangkum hasil dari setiap DN. Proses ini menghindari transfer sejumlah besar data mentah antar node untuk operasi JOIN, sehingga menghasilkan performa kueri yang tinggi.
Contoh:
Gunakan grup kolokasi
Buat tabel
colocation_t1dancolocation_t2. Gunakan kolomasebagai kolom distribusi untuk keduanya, dan pastikan keduanya berada dalam kolokasi.CREATE TABLE colocation_t1 (a INT, b INT); SELECT create_distributed_table('colocation_t1', 'a'); CREATE TABLE colocation_t2 (a INT, b INT); SELECT create_distributed_table('colocation_t2', 'a');Jalankan kueri JOIN. Rencana eksekusi berikut menunjukkan bahwa operasi JOIN diturunkan ke DN untuk dieksekusi.
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;Hasil berikut dikembalikan:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 -- Tugas didistribusikan ke semua shard. Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join -- Operasi JOIN dieksekusi di dalam node data (DN). Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)Subkueri: Penggunaan grup kolokasi juga efektif untuk subkueri. Jika keluaran subkueri digabungkan dengan tabel terdistribusi, dan kunci gabungan juga merupakan kolom distribusi tabel dalam subkueri tersebut, subkueri tersebut juga diturunkan.
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;Hasil berikut dikembalikan:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)
Gunakan tabel replikasi
Buat tabel
colocation_t3dancolocation_t4. Gunakan kolomasebagai kolom distribusi untuk tabelcolocation_t3, dan buat tabelcolocation_t4sebagai tabel replikasi.CREATE TABLE colocation_t3 (a INT, b INT); SELECT create_distributed_table('colocation_t3', 'a'); CREATE TABLE colocation_t4 (a INT, b INT); SELECT create_reference_table('colocation_t4');Jalankan kueri JOIN. Rencana eksekusi berikut menunjukkan bahwa operasi JOIN diturunkan ke DN untuk dieksekusi.
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;Hasil berikut dikembalikan:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t3.a = colocation_t4.b) -> Sort Sort Key: colocation_t3.a -> Seq Scan on colocation_t3_102145 colocation_t3 -> Sort Sort Key: colocation_t4.b -> Seq Scan on colocation_t4_102149 colocation_t4 (13 rows)
Turunkan operasi agregasi dan pengurutan
Untuk operasi agregasi dan pengurutan, menurunkan komputasi ke node data dapat secara signifikan mengurangi beban pada CN dan mengurangi lalu lintas jaringan.
Penurunan agregasi
Ketika
KEYdalam klausaGROUP BYmencakup kolom distribusi, fungsi agregat sepertiSUMdanCOUNTsepenuhnya diturunkan ke node data.Ketika
KEYdalam klausaGROUP BYtidak mencakup kolom distribusi, pengoptimal menggunakan strategi dua tahap. Pertama, melakukan pra-agregasi di node data, lalu melakukan agregasi akhir di node klien.Tahap 1 (Agregasi lokal di DN): Setiap node data pertama-tama melakukan operasi agregasi pada shard datanya sendiri untuk menghasilkan hasil antara lokal.
Tahap 2 (Agregasi akhir di CN): Node klien mengumpulkan hasil lokal dari semua DN. Set hasil ini biasanya kecil. CN kemudian melakukan agregasi kedua untuk menghitung hasil global akhir. Metode ini mengurangi jumlah data yang harus ditransfer ke node klien melalui jaringan.
Contoh:
Buat tabel
group_tdan gunakan kolomasebagai kolom distribusi.CREATE TABLE group_t (a INT, b INT); SELECT create_distributed_table('group_t', 'a');Ketika kolom distribusi disertakan, agregasi sepenuhnya diturunkan. Fungsi agregat dieksekusi sepenuhnya di DN. Rencana eksekusi adalah sebagai berikut:
EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;Hasil berikut dikembalikan:
QUERY PLAN --------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> HashAggregate Group Key: a -> Seq Scan on group_t_102150 group_t (8 rows)Ketika kolom distribusi tidak disertakan, strategi dua tahap digunakan. Fungsi agregat dieksekusi dua kali: sekali di DN dan sekali di CN.
EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;Hasil berikut dikembalikan:
QUERY PLAN --------------------------------------------------------------- Aggregate -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Aggregate -> Seq Scan on group_t_102150 group_t (8 rows)
Penurunan pengurutan
Klausa ORDER BY ... LIMIT N juga dapat diturunkan ke node data dalam banyak skenario. Setiap node pertama-tama mengembalikan hasil top-N-nya sendiri. Node klien kemudian melakukan pengurutan akhir pada hasil antara ini. Metode ini sangat mengurangi jumlah data yang perlu diurutkan.
Sebagai contoh, jika tidak ada klausa GROUP BY, atau jika KEY dari klausa GROUP BY mencakup kolom distribusi, operasi ORDER BY ... LIMIT N diturunkan ke shard. CN kemudian menjalankan operasi ORDER BY ... LIMIT N lagi pada hasil yang dikembalikan dari setiap shard.
Contoh:
Buat tabel
order_tdan gunakan kolomasebagai kolom distribusi.CREATE TABLE order_t (a INT, b INT); SELECT create_distributed_table('order_t', 'a');Operasi
ORDER BY ... LIMIT Nditurunkan. Rencana eksekusi adalah sebagai berikut:EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;Hasil berikut dikembalikan:
QUERY PLAN ------------------------------------------------------------------------ Limit -> Sort Sort Key: remote_scan.a -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Limit -> Sort Sort Key: a -> Seq Scan on order_t_102154 order_t (12 rows)
Gunakan CTE untuk menulis ulang kueri kompleks
Untuk JOIN kompleks yang tidak dapat dioptimalkan dengan grup kolokasi—misalnya ketika kondisi JOIN tidak berada pada kolom distribusi—pengoptimal mungkin memilih rencana eksekusi yang tidak efisien secara default. Sebagai contoh, pengoptimal mungkin menarik semua data dari satu tabel ke node klien lalu mengirimkannya kembali ke node data. Dalam kasus ini, Anda dapat mengoptimalkan kueri secara manual menggunakan CTE (WITH clause).
Prinsip:
Pertama, Anda dapat menggunakan CTE untuk menerapkan filter selektif tinggi pada tabel besar. Langkah ini menghasilkan set hasil antara yang kecil. Kemudian, Anda dapat menggabungkan set hasil kecil ini dengan tabel lain. Metode ini dapat secara signifikan mengurangi jumlah data yang perlu ditransfer antar node.
Contoh:
Persiapkan data: Buat tabel
cte_t1dancte_t2, dan gunakan kolomasebagai kolom distribusi untuk keduanya. Buat indeks lokal pada kolombdari tabelcte_t1.-- Izinkan pengoptimal kueri menghasilkan rencana eksekusi redistribusi data untuk JOIN antara tabel atau kolom non-kolokasi. SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON; -- Buat tabel standar CREATE TABLE cte_t1(a INT, b INT); CREATE TABLE cte_t2(a INT, b INT); -- Ubah menjadi tabel terdistribusi SELECT create_distributed_table('cte_t1', 'a'); SELECT create_distributed_table('cte_t2', 'a'); -- Buat indeks lokal CREATE INDEX local_idx_t1_b ON cte_t1(b); -- Masukkan data uji INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; -- Kumpulkan dan perbarui statistik untuk kedua tabel ini. ANALYZE cte_t1, cte_t2;Sebelum optimasi: Jika Anda menjalankan kueri JOIN non-kolokasi secara langsung, kueri menghasilkan rencana eksekusi berikut. Kueri pertama-tama melakukan pemindaian tabel penuh pada
cte_t2, mengembalikan hasil ke CN, lalu mengirimkannya kembali ke DN untuk operasi JOIN.EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;Hasil berikut dikembalikan:
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1) -> Distributed Subplan 46_1 Subplan Duration: 1488.57 ms Intermediate Data Size: 17 MB Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1) Task Count: 4 Tuple data received from nodes: 7813 kB Tasks Shown: One of 4 -> Task Tuple data received from node: 1950 kB Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Seq Scan on cte_t2_102165 cte_t2 (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l oops=1) Planning Time: 0.013 ms Execution Time: 48.372 ms Planning Time: 0.000 ms Execution Time: 531.747 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 480 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Hash Join (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1) Hash Cond: (intermediate_result.b = cte_t1.a) -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179 .565..278.108 rows=1000000 loops=1) -> Hash (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.021 ..0.021 rows=3 loops=1) Index Cond: (b = 1) Planning Time: 0.365 ms Execution Time: 332.955 ms Planning Time: 0.625 ms Execution Time: 1823.139 ms (34 rows)Setelah optimasi: Anda dapat menggunakan CTE untuk terlebih dahulu menyaring tabel
cte_t1. Hal ini memaksa pemindaian indeks padacte_t1dan mengembalikan set hasil kecil ke CN. Metode ini mengurangi waktu pemindaian dan jumlah data yang ditransfer melalui jaringan.EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;Kueri yang dioptimalkan puluhan kali lebih cepat karena menghindari redistribusi data skala besar.
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1) -> Distributed Subplan 49_1 Subplan Duration: 0.94 ms Intermediate Data Size: 180 bytes Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1) Task Count: 4 Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 32 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.005 ..0.005 rows=4 loops=1) Index Cond: (b = 1) Planning Time: 0.020 ms Execution Time: 0.014 ms Planning Time: 0.000 ms Execution Time: 0.779 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 496 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Gather (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1) Workers Planned: 1 Workers Launched: 1 -> Hash Join (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2) Hash Cond: (cte_t2.b = intermediate_result.a) -> Parallel Seq Scan on cte_t2_102163 cte_t2 (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows= 124936 loops=2) -> Hash (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..0.07 rows=10 width=8) (actual time =1.444..1.448 rows=10 loops=2) Planning Time: 0.068 ms Execution Time: 19.729 ms Planning Time: 0.514 ms Execution Time: 39.672 ms (37 rows)
SELECT ... FOR UPDATE batasan
Karena kompleksitas koordinasi kunci global, pernyataan SELECT ... FOR UPDATE tidak mendukung eksekusi lintas-shard. Jika kondisi kueri tidak dapat membatasi cakupan ke satu shard saja, pernyataan tersebut akan mengembalikan kesalahan.
SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR: FOR UPDATE is not supported for query of more than one shard