Topik ini menjelaskan skenario umum kesenjangan data di MaxCompute dan menyediakan solusi yang sesuai.
MapReduce
Untuk memahami kesenjangan data, Anda perlu terlebih dahulu memahami MapReduce. MapReduce adalah framework komputasi terdistribusi yang menerapkan metode divide-and-conquer dengan membagi masalah besar atau kompleks menjadi submasalah yang lebih kecil dan mudah dikelola, menyelesaikannya secara paralel, lalu menggabungkan hasilnya menjadi solusi akhir. Dibandingkan dengan framework pemrograman paralel tradisional, MapReduce menawarkan toleransi kesalahan yang lebih tinggi, kemudahan penggunaan, serta ekstensibilitas yang lebih baik. Saat menggunakan MapReduce untuk pemrograman paralel, Anda tidak perlu mengelola isu non-pemrograman pada kluster terdistribusi, seperti penyimpanan data, komunikasi antar-node, dan mekanisme transmisi, sehingga sangat menyederhanakan pemrograman terdistribusi.
Gambar berikut menunjukkan alur kerja MapReduce.
Ketidakseimbangan data
Kesenjangan data sering terjadi selama tahap reduce. Mapper biasanya membagi data secara merata berdasarkan file input. Kesenjangan data muncul ketika data dalam suatu tabel didistribusikan secara tidak merata di antara worker yang berbeda. Distribusi yang tidak merata ini menyebabkan beberapa worker menyelesaikan komputasinya dengan cepat, sementara yang lain membutuhkan waktu jauh lebih lama. Di lingkungan produksi, sebagian besar data bersifat miring dan sering kali mengikuti aturan 80/20—misalnya, 20% Pengguna aktif di forum mungkin menghasilkan 80% postingan, atau 20% pengguna menghasilkan 80% traffic website. Di era data besar, dengan pertumbuhan volume data yang eksponensial, kesenjangan data dapat berdampak signifikan terhadap kinerja program terdistribusi. Gejala umumnya adalah progres eksekusi pekerjaan tersangkut di 99%.
Cara mengidentifikasi kesenjangan data
Di MaxCompute, Anda dapat menggunakan Logview untuk mengidentifikasi kesenjangan data. Langkah-langkah berikut menjelaskan prosesnya:
Di Fuxi Jobs, urutkan pekerjaan berdasarkan latency secara descending dan pilih tahap pekerjaan dengan waktu proses terlama.
Di daftar Fuxi Instance untuk Fuxi Stage tersebut, urutkan task berdasarkan latency secara descending. Pilih task dengan waktu proses yang jauh lebih lama dari rata-rata—biasanya task pertama dalam daftar—lalu lihat log StdOut-nya.
Gunakan informasi dari StdOut untuk melihat graf eksekusi pekerjaan yang sesuai.
Gunakan informasi kunci dari graf eksekusi pekerjaan untuk menemukan potongan kode SQL yang menyebabkan kesenjangan data.
Contoh berikut menunjukkan cara menggunakan metode ini.
Temukan log Logview dari log operasional task. Untuk informasi selengkapnya, lihat Titik masuk Logview.

Untuk mengidentifikasi masalah dengan cepat, buka antarmuka Logview, urutkan task Fuxi berdasarkan Latency secara descending, lalu pilih task dengan waktu proses terlama.

Task
R31_26_27memiliki waktu proses terlama. Klik taskR31_26_27untuk membuka halaman detail instans, seperti yang ditunjukkan pada gambar berikut.
Nilai Latency: {min:00:00:06, avg:00:00:13, max:00:26:40}menunjukkan bahwa untuk semua instans task ini, waktu proses minimum adalah6 s, rata-rata waktu proses adalah13 s, dan waktu proses maksimum adalah26 menit 40 detik. Anda dapat mengurutkan berdasarkanLatency(waktu proses instans) secara descending. Anda akan melihat empat instans dengan waktu proses panjang. MaxCompute menganggap instans Fuxi sebagai long-tailed jika waktu prosesnya lebih dari dua kali rata-rata. Artinya, setiap instans task dengan waktu proses lebih dari26 sdianggap long-tailed. Dalam kasus ini, 21 instans memiliki waktu proses lebih dari26 s. Kehadiran instans long-tailed tidak selalu menunjukkan skew task. Anda juga perlu membandingkan nilaiavgdanmaxdari waktu proses instans. Jika nilaimaxjauh lebih besar daripada nilaiavg, hal ini menunjukkan kesenjangan data yang parah. Task ini memerlukan penanganan.Klik ikon
di kolom StdOut untuk melihat log output, seperti yang ditunjukkan pada gambar berikut. 
Setelah mengidentifikasi masalah, buka tab Job Details. Klik kanan
R31_26_27dan pilih Expand All untuk memperluas task. Untuk informasi selengkapnya, lihat Gunakan Logview 2.0 untuk melihat informasi pekerjaan.
Periksa langkah sebelum StreamLineRead22, yaituStreamLineWriter21. Langkah ini mengungkap kunci yang menyebabkan kesenjangan data:new_uri_path_structure,cookie_x5check_userid, dancookie_userid. Gunakan informasi ini untuk menemukan potongan SQL yang menyebabkan kesenjangan data.
Pemecahan masalah skew data dan solusi
Penyebab paling umum kesenjangan data adalah sebagai berikut:
Join
GroupBy
Count (Distinct)
ROW_NUMBER (TopN)
Dynamic partition
Frekuensi kemunculan, dari yang paling hingga paling jarang, adalah: JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > Dynamic partition.
Join
Kesenjangan data dari operasi join dapat terjadi dalam berbagai situasi, seperti menggabungkan tabel besar dengan tabel kecil, menggabungkan tabel besar dengan tabel menengah, atau ketika nilai hot key menyebabkan long tail.
Menggabungkan tabel besar dengan tabel kecil.
Contoh skew data
Pada contoh berikut,
t1adalah tabel besar, sedangkant2dant3adalah tabel kecil.SELECT t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN <other_viewtable> t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_idSolusi
Gunakan sintaks MAPJOIN HINT, seperti pada contoh berikut.
SELECT /*+ mapjoin(t2,t3)*/ t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN (<other_viewtable>) t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_idCatatan
Saat mereferensikan tabel kecil atau subkueri, Anda harus menggunakan alias-nya.
MapJoin mendukung penggunaan subkueri sebagai tabel kecil.
Pada MapJoin, Anda dapat menggunakan non-equi joins atau menghubungkan beberapa kondisi dengan
OR. Anda juga dapat menghitung Produk Kartesius dengan menghilangkan klausaONdan menggunakanmapjoin on 1 = 1, misalnya,select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;. Namun, operasi ini dapat menyebabkan pembengkakan data.Pada MapJoin, pisahkan beberapa tabel kecil dengan koma (
,), seperti/*+ mapjoin(a,b,c)*/.Selama tahap map, MapJoin memuat semua data dari tabel yang ditentukan ke dalam memori. Oleh karena itu, tabel yang ditentukan harus kecil. Total memori yang digunakan oleh tabel setelah dimuat tidak boleh melebihi 512 MB. Karena MaxCompute menggunakan penyimpanan terkompresi, ukuran data tabel kecil meningkat signifikan setelah dimuat ke memori. Batas 512 MB berlaku untuk ukuran setelah data dimuat. Anda dapat meningkatkan batas ini hingga maksimal 8192 MB dengan mengatur parameter berikut.
SET odps.sql.mapjoin.memory.max=2048;
Batasan operasi MapJoin
Pada
LEFT OUTER JOIN, tabel kiri harus merupakan tabel besar.Pada
RIGHT OUTER JOIN, tabel kanan harus merupakan tabel besar.FULL OUTER JOINtidak didukung.Pada
INNER JOIN, baik tabel kiri maupun kanan dapat menjadi tabel besar.MapJoin mendukung maksimal 128 tabel kecil. Kesalahan sintaks akan terjadi jika Anda menentukan lebih dari itu.
Menggabungkan tabel besar dengan tabel menengah.
Contoh skew data
Pada contoh berikut,
t0adalah tabel besar dant1adalah tabel menengah.SELECT request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'Solusi
Gunakan hint DISTRIBUTED MAPJOIN untuk mengatasi kesenjangan data, seperti pada contoh berikut.
SELECT /*+distmapjoin(t1)*/ request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'
Long tail yang disebabkan oleh nilai hot key pada join.
Contoh kesenjangan data
Pada contoh berikut, bidang
eleme_uidberisi banyak nilai hot key, yang dapat menyebabkan kesenjangan data.SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> ) t2 ON t1.eleme_uid = t2.eleme_uid;Solusi
Anda dapat menggunakan salah satu dari empat metode berikut untuk mengatasi masalah ini.
No.
Solusi
Deskripsi
Solusi 1
Manual Hot Value Splitting
Analisis dan identifikasi nilai hot key. Filter catatan dengan nilai hot key dari tabel utama. Pertama, lakukan MapJoin pada catatan tersebut. Kemudian, lakukan MergeJoin pada catatan yang tersisa dengan nilai non-hot key. Terakhir, gabungkan hasil kedua join tersebut.
Solusi 2
Atur parameter SkewJoin
set odps.sql.skewjoin=true;.Solusi 3
SkewJoin Hint
Gunakan hint:
/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. Metode SkewJoin Hint menambahkan langkah ekstra untuk menemukan kunci yang miring, yang meningkatkan waktu proses kueri. Jika Anda sudah mengetahui kunci yang miring, Anda dapat mengatur parameter SkewJoin untuk menghemat waktu.Solusi 4
Lakukan modulo equal join dengan tabel multiplier
Gunakan tabel multiplier.
Pisahkan nilai hot key secara manual.
Metode ini melibatkan analisis dan identifikasi nilai hot key. Pertama, filter catatan yang berisi nilai hot key dari tabel utama dan lakukan MapJoin pada catatan tersebut. Kemudian, lakukan MergeJoin pada catatan yang tersisa yang berisi nilai non-hot key. Terakhir, gabungkan hasil kedua join tersebut. Kode berikut memberikan contohnya:
SELECT /*+ MAPJOIN (t2) */ eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid = <skewed_value> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid = <skewed_value> ) t2 ON t1.eleme_uid = t2.eleme_uid UNION ALL SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid != <skewed_value> )t3 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid != <skewed_value> ) t4 ON t3.eleme_uid = t4.eleme_uidAtur parameter SkewJoin.
Ini adalah solusi umum. MaxCompute menyediakan parameter
set odps.sql.skewjoin=true;untuk mengaktifkan fitur SkewJoin. Namun, hanya mengaktifkan SkewJoin saja tidak memengaruhi eksekusi task. Anda juga harus mengatur parameterodps.sql.skewinfoagar fitur ini berlaku. Parameterodps.sql.skewinfomenentukan detail untuk optimasi join. Berikut adalah contoh sintaks perintahnya.SET odps.sql.skewjoin=true; SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")]; --skewed_src adalah tabel traffic, dan skewed_value adalah nilai hot key.Contoh berikut menunjukkan cara menggunakan perintah tersebut:
--Untuk satu nilai miring pada satu bidang SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")]; --Untuk beberapa nilai miring pada satu bidang SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];SkewJoin Hint.
Untuk mengeksekusi MapJoin dalam pernyataan
SELECT, gunakan hint berikut:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. Dalam hint ini,table_nameadalah tabel yang miring,column_nameadalah kolom yang miring, danvalueadalah nilai kunci yang miring. Contoh berikut menunjukkan cara menggunakan hint ini.--Metode 1: Hint nama tabel. Perhatikan bahwa Anda memberi hint alias tabel. SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1; --Metode 2: Hint nama tabel dan kolom yang menurut Anda mungkin miring. Misalnya, kolom c0 dan c1 pada tabel a memiliki kesenjangan data. SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2; --Metode 3: Hint nama tabel dan kolom, serta berikan nilai kunci yang miring. Jika tipe adalah STRING, sertakan nilai dalam tanda kutip. Misalnya, nilai untuk (a.c0=1 dan a.c1="2") dan (a.c0=3 dan a.c1="4") keduanya memiliki kesenjangan data. SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;CatatanMetode SkewJoin hint yang langsung menentukan nilai lebih efisien daripada memisahkan nilai hot key secara manual atau mengatur parameter SkewJoin tanpa menentukan nilai.
Jenis join yang didukung oleh SkewJoin Hint:
Inner Join: Anda dapat memberi hint pada salah satu sisi join.
Left Join, Semi Join, dan Anti Join: Anda hanya dapat memberi hint pada tabel kiri.
Right Join: Anda hanya dapat memberi hint pada tabel kanan.
Full Join: Skew Join Hint tidak didukung.
Berikan hint hanya pada join yang dipastikan memiliki kesenjangan data karena hint tersebut menjalankan operasi agregasi, yang memerlukan biaya.
Untuk join yang diberi hint, tipe data kunci join kiri harus cocok dengan tipe data kunci join kanan. Jika tidak, hint SkewJoin tidak berfungsi. Misalnya, pada contoh sebelumnya, tipe
a.c0harus cocok dengan tipeb.c0, dan tipea.c1harus cocok dengan tipeb.c1. Anda dapat menggunakan CAST dalam subkueri untuk memastikan tipe kunci join konsisten. Contoh berikut menunjukkan cara melakukannya:CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int); CREATE TABLE T1(c0 string, c1 int, c2 int); --Metode 1: SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1; --Metode 2: SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;Setelah Anda menambahkan hint SkewJoin, pengoptimal menjalankan operasi agregasi untuk mengambil 20 nilai hot key teratas. Nilai
20adalah nilai default. Anda dapat mengubah nilai ini dengan menjalankanset odps.optimizer.skew.join.topk.num = xx;.Hint SkewJoin hanya mendukung pemberian hint pada satu sisi join.
Join yang diberi hint harus memiliki kondisi
left key = right keydan tidak mendukung join Produk Kartesius.Anda tidak dapat menambahkan hint SkewJoin ke join yang sudah memiliki hint MapJoin.
Lakukan modulo-equi-join dengan tabel multiplier.
Logika solusi ini berbeda dari tiga solusi sebelumnya. Ini bukan pendekatan divide-and-conquer. Sebaliknya, solusi ini menggunakan tabel multiplier yang memiliki satu kolom bertipe data INT. Nilainya berkisar dari 1 hingga N, di mana N ditentukan oleh tingkat kesenjangan data. Tabel multiplier ini digunakan untuk memperluas tabel perilaku pengguna sebanyak N kali. Kemudian, saat melakukan join, Anda dapat menggunakan ID pengguna dan
numbersebagai kunci asosiasi. Kesenjangan data awal, yang disebabkan oleh distribusi data hanya berdasarkan ID pengguna, berkurang menjadi1/Ndari tingkat semula karena penambahan kondisinumber. Namun, metode ini juga menyebabkan pembengkakan data sebanyak N kali.SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT /*+mapjoin(<multipletable>)*/ eleme_uid, number ... FROM <customertable> JOIN <multipletable> ) t2 ON t1.eleme_uid = t2.eleme_uid AND mod(t1.<value_col>,10)+1 = t2.number;Untuk mengatasi masalah pembengkakan data, Anda dapat membatasi perluasan hanya pada catatan yang memiliki nilai hot key di kedua tabel. Catatan dengan nilai non-hot key tetap tidak berubah. Pertama, temukan catatan dengan nilai hot key. Kemudian, proses tabel traffic dan tabel perilaku pengguna secara terpisah dengan menambahkan kolom baru bernama
eleme_uid_join. Jika ID pengguna adalah nilai hot key, gunakanCONCATuntuk menambahkan bilangan bulat positif yang ditetapkan secara acak (misalnya, dari 0 hingga 1.000). Jika bukan nilai hot key, pertahankan ID pengguna asli. Saat menggabungkan kedua tabel, gunakan kolomeleme_uid_join. Metode ini meningkatkan multiplier untuk nilai hot key guna mengurangi skew dan menghindari pembengkakan data yang tidak perlu untuk nilai non-hot key. Namun, logika ini sepenuhnya mengubah SQL logika bisnis asli sehingga tidak direkomendasikan.
GroupBy
Contoh berikut menunjukkan pseudocode dengan klausa GroupBy.
SELECT shop_id
,sum(is_open) AS business_days
FROM table_xxx_di
WHERE dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;Jika terjadi kesenjangan data, Anda dapat mengatasinya dengan salah satu dari tiga solusi berikut:
No. | Solusi | Deskripsi |
Solusi 1 | Atur parameter anti-skew untuk Group By |
|
Solusi 2 | Tambahkan bilangan acak | Pisahkan kunci yang menyebabkan long tail. |
Solusi 3 | Buat tabel rolling | Kurangi biaya dan tingkatkan efisiensi. |
Solusi 1: Atur parameter anti-skew untuk `GROUP BY`.
SET odps.sql.groupby.skewindata=true;Solusi 2: Tambahkan bilangan acak.
Berbeda dengan Solusi 1, solusi ini mengharuskan Anda menulis ulang pernyataan SQL. Menambahkan bilangan acak untuk memisahkan kunci yang menyebabkan long tail adalah cara efektif untuk mengatasi long tail `GROUP BY`.
Untuk pernyataan SQL
SELECT Key, COUNT(*) AS Cnt FROM TableName GROUP BY Key;, jika combiner tidak digunakan, node map mengacak data ke node reduce. Node reduce kemudian melakukan operasi `COUNT`. Rencana eksekusi yang sesuai adalahM->R.Jika Anda telah menemukan kunci yang menyebabkan long tail, Anda dapat mendistribusikan ulang pekerjaan untuk kunci tersebut sebagai berikut:
-- Asumsikan kunci yang menyebabkan long tail adalah KEY001. SELECT a.Key ,SUM(a.Cnt) AS Cnt FROM(SELECT Key ,COUNT(*) AS Cnt FROM <TableName> GROUP BY Key ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50 ELSE 0 END ) a GROUP BY a.Key;Setelah perubahan, rencana eksekusi menjadi
M->R->R. Meskipun jumlah langkah eksekusi meningkat, kunci long-tail diproses dalam dua langkah, yang dapat mengurangi waktu proses keseluruhan. Konsumsi resource dan efisiensi waktu mirip dengan Solusi 1. Namun, dalam skenario dunia nyata, long tail sering disebabkan oleh lebih dari satu kunci. Mengingat biaya menemukan kunci long-tail dan menulis ulang pernyataan SQL, Solusi 1 lebih hemat biaya.Buat tabel rolling.
Inti dari solusi ini adalah mengurangi biaya dan meningkatkan efisiensi. Persyaratan utamanya adalah mengambil data merchant dari tahun lalu. Untuk task online, membaca semua partisi dari
T-1hinggaT-365setiap kali membuang banyak resource. Anda dapat membuat tabel rolling untuk mengurangi jumlah partisi yang dibaca tanpa memengaruhi pengambilan data dari tahun lalu. Contoh berikut menunjukkan caranya.Pertama, inisialisasi data bisnis merchant selama 365 hari yang diagregasi menggunakan Group By. Tandai tanggal pembaruan data dan simpan data dalam tabel bernama
a. Untuk task online berikutnya, Anda dapat menggabungkan tabelT-2adengan tabeltable_xxx_di, lalu gunakan Group By. Dengan cara ini, jumlah partisi yang dibaca setiap hari berkurang dari 365 menjadi 2. Duplikasi kunci utamashopidsangat berkurang, yang juga mengurangi konsumsi resource.-- Buat tabel rolling. CREATE TABLE IF NOT EXISTS m_xxx_365_df ( shop_id STRING COMMENT, last_update_ds COMMENT, 365d_open_days COMMENT ) PARTITIONED BY ( ds STRING COMMENT 'Partisi tanggal' )LIFECYCLE 7; -- Asumsikan 365d dari 1 Mei 2021 hingga 1 Mei 2022. Inisialisasi tabel terlebih dahulu. INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501') SELECT shop_id, max(ds) as last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE dt BETWEEN '20210501' AND '20220501' GROUP BY shop_id; -- Kemudian, task online yang akan dijalankan adalah sebagai berikut. INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}') SELECT aa.shop_id, aa.last_update_ds, 365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- Hilangkan rolling tak terbatas hari bisnis. FROM ( SELECT shop_id, max(last_update_ds) AS last_update_ds, sum(365d_open_days) AS 365d_open_days FROM ( SELECT shop_id, ds AS last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE ds = '${bizdate}' GROUP BY shop_id UNION ALL SELECT shop_id, last_update_ds, 365d_open_days FROM m_xxx_365_df WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}' GROUP BY shop_id ) GROUP BY shop_id ) AS aa LEFT JOIN ( SELECT shop_id, is_open FROM table_xxx_di WHERE ds = '${bizdate_366}' ) AS bb ON aa.shop_id = bb.shop_id;
Count (Distinct)
Asumsikan sebuah tabel memiliki distribusi data berikut.
ds (partisi) | cnt (jumlah catatan) |
20220416 | 73025514 |
20220415 | 2292806 |
20220417 | 2319160 |
Menggunakan pernyataan berikut dapat menyebabkan kesenjangan data:
SELECT ds
,COUNT(DISTINCT shop_id) AS cnt
FROM demo_data0
GROUP BY ds;Solusi berikut tersedia:
No. | Solusi | Deskripsi |
Solusi 1 | Optimasi pengaturan parameter |
|
Solusi 2 | Agregasi dua tahap umum | Gabungkan bilangan acak ke nilai bidang partisi. |
Solusi 3 | Mirip dengan agregasi dua tahap | Pertama, kelompokkan berdasarkan dua bidang |
Solusi 1: Optimasi dengan mengatur parameter.
Atur parameter berikut.
SET odps.sql.groupby.skewindata=true;Solusi 2: Gunakan agregasi dua tahap umum.
Jika data di bidang
shop_idtidak seragam, Anda tidak dapat menggunakan Solusi 1 untuk optimasi. Metode yang lebih umum adalah menggabungkan bilangan acak ke nilai bidang partisi.-- Metode 1: Gabungkan bilangan acak. CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds SELECT SPLIT_PART(rand_ds, '_',2) ds ,COUNT(*) id_cnt FROM ( SELECT rand_ds ,shop_id FROM demo_data0 GROUP BY rand_ds,shop_id ) GROUP BY SPLIT_PART(rand_ds, '_',2); -- Metode 2: Tambahkan bidang bilangan acak. ROUND(RAND(),1)*10 AS randint10 SELECT ds ,COUNT(*) id_cnt FROM (SELECT ds ,randint10 ,shop_id FROM demo_data0 GROUP BY ds,randint10,shop_id ) GROUP BY ds;Solusi 3: Gunakan metode yang mirip dengan agregasi dua tahap.
Jika data di kedua bidang GroupBy dan Distinct seragam, Anda dapat mengoptimalkan kueri dengan terlebih dahulu mengelompokkan berdasarkan dua bidang (`ds` dan `shop_id`) lalu menggunakan perintah
count(distinct).SELECT ds ,COUNT(*) AS cnt FROM(SELECT ds ,shop_id FROM demo_data0 GROUP BY ds ,shop_id ) GROUP BY ds;
ROW_NUMBER (TopN)
Contoh berikut menunjukkan top 10.
SELECT main_id
,type
FROM (SELECT main_id
,type
,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
FROM <data_demo2>
) A
WHERE A.rn <= 10;Jika terjadi kesenjangan data, Anda dapat mengatasinya dengan salah satu metode berikut:
No. | Solusi | Deskripsi |
Solusi 1 | Agregasi dua tahap menggunakan SQL | Tambahkan kolom acak atau gabungkan bilangan acak dan gunakan sebagai parameter dalam partisi. |
Solusi 2 | Agregasi dua tahap menggunakan fungsi agregat yang didefinisikan pengguna (UDAF) | Optimalkan menggunakan UDAF dengan antrian prioritas min-heap. |
Solusi 1: Gunakan agregasi dua tahap dengan SQL.
Untuk membuat data di setiap grup partisi seuniform mungkin selama tahap map, Anda dapat menambahkan kolom acak dan menggunakannya sebagai parameter dalam partisi.
SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(110 * rand()) % 11 AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10; -- 2. Sesuaikan bilangan acak. SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM(SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(10 * rand()) AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10;Solusi 2: Gunakan agregasi dua tahap dengan UDAF.
Metode SQL dapat menghasilkan kode yang panjang dan sulit dipelihara. Dalam kasus ini, Anda dapat menggunakan fungsi agregat yang didefinisikan pengguna (UDAF) dengan antrian prioritas min-heap untuk optimasi. Artinya, pada tahap
iterate, hanya N item teratas yang diproses. Pada tahapmerge, hanya N elemen yang digabungkan. Prosesnya sebagai berikut.iterate: Dorong K elemen pertama. Untuk elemen berikutnya, terus-menerus bandingkan dengan elemen teratas minimum dan tukar elemen dalam heap.merge: Menggabungkan dua heap dan mengembalikan K elemen teratas secara in-place.terminate: Kembalikan heap sebagai array.Dalam pernyataan SQL, pisahkan array menjadi baris.
@annotate('* -> array<string>') class GetTopN(BaseUDAF): def new_buffer(self): return [[], None] def iterate(self, buffer, order_column_val, k): # heapq.heappush(buffer, order_column_val) # buffer = [heapq.nlargest(k, buffer), k] if not buffer[1]: buffer[1] = k if len(buffer[0]) < k: heapq.heappush(buffer[0], order_column_val) else: heapq.heappushpop(buffer[0], order_column_val) def merge(self, buffer, pbuffer): first_buffer, first_k = buffer second_buffer, second_k = pbuffer k = first_k or second_k merged_heap = first_buffer + second_buffer merged_heap.sort(reverse=True) merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap buffer[0] = merged_heap buffer[1] = k def terminate(self, buffer): return buffer[0] SET odps.sql.python.version=cp37; SELECT main_id,type_val FROM ( SELECT main_id ,get_topn(type, 10) AS type_array FROM data_demo2 GROUP BY main_id ) LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;
Dynamic partition
Dynamic partitioning adalah fitur yang memungkinkan Anda menentukan kolom kunci partisi saat memasukkan data ke tabel partisi, tetapi tanpa memberikan nilai spesifik. Sebaliknya, kolom yang sesuai dalam klausa Select menyediakan nilai partisi. Oleh karena itu, Anda tidak tahu partisi mana yang akan dibuat sebelum pernyataan SQL dijalankan. Anda hanya dapat menentukan partisi yang dibuat setelah pernyataan SQL dijalankan, berdasarkan nilai kolom partisi. Untuk informasi selengkapnya, lihat Masukkan atau timpa data ke partisi dinamis (DYNAMIC PARTITION). Berikut adalah contoh SQL.
CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);
INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;Dalam banyak skenario, membuat tabel dengan partisi dinamis dapat menyebabkan kesenjangan data. Jika terjadi kesenjangan data, Anda dapat menggunakan solusi berikut untuk mengatasinya.
No. | Solusi | Deskripsi |
Solusi 1 | Optimasi konfigurasi parameter | Optimalkan melalui konfigurasi parameter. |
Solusi 2 | Optimasi pemangkasan partisi | Temukan partisi dengan banyak catatan, pangkas, dan masukkan secara terpisah. |
Solusi 1: Optimasi dengan mengonfigurasi parameter.
Dynamic partitioning memungkinkan Anda memasukkan data yang memenuhi kondisi berbeda ke partisi berbeda. Fitur ini menghilangkan kebutuhan untuk beberapa operasi Insert Overwrite ke tabel, terutama saat ada banyak partisi, dan dapat sangat menyederhanakan kode Anda. Namun, dynamic partitioning juga dapat membuat terlalu banyak file kecil.
Contoh kesenjangan data
Ambil SQL sederhana berikut sebagai contoh.
INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM part_test;Asumsikan ada K instans Map dan N partisi target.
ds=1 cfile1 ds=2 ... X ds=3 cfilek ... ds=nDalam kasus paling ekstrem,
K*Nfile kecil dapat dihasilkan. Terlalu banyak file kecil dapat memberikan tekanan manajemen yang besar pada sistem file. Oleh karena itu, MaxCompute menangani partisi dinamis dengan memperkenalkan tahap reduce tambahan. Fitur ini menetapkan partisi target yang sama ke satu instans reduce atau beberapa instans reduce untuk menulis. Metode ini menghindari pembuatan terlalu banyak file kecil. Operasi reduce ini selalu merupakan operasi reduce task terakhir. Di MaxCompute, fitur ini diaktifkan secara default, artinya parameter berikut diatur ke `true`.SET odps.sql.reshuffle.dynamicpt=true;Mengaktifkan fitur ini secara default menyelesaikan masalah terlalu banyak file kecil dan mencegah task gagal karena satu instans menghasilkan terlalu banyak file. Namun, fitur ini juga memperkenalkan masalah baru, seperti kesenjangan data dan konsumsi resource komputasi oleh operasi reduce tambahan. Oleh karena itu, Anda harus menyeimbangkan faktor-faktor ini dengan hati-hati.
Solusi
Tujuan mengaktifkan
set odps.sql.reshuffle.dynamicpt=true;dan memperkenalkan tahap reduce tambahan adalah untuk menyelesaikan masalah terlalu banyak file kecil. Namun, jika jumlah partisi target kecil dan tidak ada risiko membuat terlalu banyak file kecil, mengaktifkan fitur ini secara default membuang resource komputasi dan mengurangi kinerja. Dalam situasi ini, Anda dapat menonaktifkan fitur ini dengan mengaturset odps.sql.reshuffle.dynamicpt=false;untuk meningkatkan kinerja secara signifikan. Berikut adalah contohnya.INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp) SELECT /*+ mapjoin(t2) */ '20150503' AS ds, t1.lv AS lv, t1.type AS tp FROM (SELECT ... FROM tbbi.ads_tb_cornucopia_user_d WHERE ds = '20150503' AND lv IN ('flat', '3rd') AND tp = 'T' AND pref_cat2_id > 0 ) t1 JOIN (SELECT ... FROM tbbi.ads_tb_cornucopia_auct_d WHERE ds = '20150503' AND tp = 'T' AND is_all = 'N' AND cat2_id > 0 ) t2 ON t1.pref_cat2_id = t2.cat2_id;Jika Anda menggunakan parameter default untuk kode di atas, seluruh task membutuhkan waktu sekitar 1 jam 30 menit untuk dijalankan. Task reduce terakhir membutuhkan waktu sekitar 1 jam 20 menit, yang kira-kira
90%dari total waktu proses. Memperkenalkan task reduce tambahan membuat distribusi data setiap instans reduce sangat tidak merata, yang menyebabkan long tail.
Untuk contoh di atas, analisis jumlah historis partisi dinamis yang dibuat menunjukkan bahwa hanya sekitar dua partisi dinamis yang dibuat setiap hari. Oleh karena itu, Anda dapat dengan aman mengatur
set odps.sql.reshuffle.dynamicpt=false;. Task kemudian dapat diselesaikan hanya dalam 9 menit. Dalam kasus ini, mengatur parameter ini kefalsedapat meningkatkan kinerja secara signifikan, menghemat waktu dan resource komputasi, serta memberikan manfaat marjinal tinggi hanya dengan mengatur satu parameter.Optimasi ini tidak terbatas pada task besar yang memakan waktu lama dan mengonsumsi banyak resource. Untuk task kecil dan biasa yang berjalan cepat dan mengonsumsi sedikit resource, Anda dapat mengatur parameter
odps.sql.reshuffle.dynamicptkefalseselama task tersebut menggunakan partisi dinamis dan jumlah partisi dinamis tidak besar. Pengaturan ini menghemat resource dan meningkatkan kinerja dalam semua kasus.Node yang memenuhi ketiga kondisi berikut dapat dioptimalkan, terlepas dari durasi task.
Menggunakan partisi dinamis
Jumlah partisi dinamis <= 50
Tidak memiliki `set odps.sql.reshuffle.dynamicpt=false;`
Tingkat urgensi pengaturan parameter ini untuk suatu node ditentukan oleh waktu eksekusi instans Fuxi terakhir, yang diidentifikasi oleh bidang
diag_level, berdasarkan aturan berikut:Last_Fuxi_Inst_Timelebih dari 30 menit:Diag_Level=4 ('Critical').Last_Fuxi_Inst_Timeantara 20 dan 30 menit:Diag_Level=3 ('High').Last_Fuxi_Inst_Timeantara 10 dan 20 menit:Diag_Level=2 ('Medium').Last_Fuxi_Inst_Timekurang dari 10 menit:Diag_Level=1 ('Low').
Solusi 2: Pemangkasan teroptimasi.
Untuk mengatasi kesenjangan data yang terjadi pada tahap map saat memasukkan data ke partisi dinamis, Anda dapat menemukan partisi dengan banyak catatan, memangkasnya, lalu memasukkannya secara terpisah. Berdasarkan skenario aktual, Anda dapat memodifikasi pengaturan parameter untuk tahap map seperti pada contoh berikut:
SET odps.sql.mapper.split.size=128; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;Hasil menunjukkan bahwa pemindaian tabel penuh dilakukan. Untuk mengoptimalkan kinerja lebih lanjut, Anda dapat menonaktifkan job Reduce yang dihasilkan sistem sebagai berikut:
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;Untuk mengatasi kesenjangan data yang terjadi pada tahap map saat memasukkan data ke partisi dinamis, Anda dapat mengidentifikasi partisi dengan banyak catatan, mengisolasi, lalu memasukkannya secara terpisah. Langkah-langkahnya sebagai berikut.
Gunakan perintah berikut untuk mencari partisi yang berisi banyak catatan.
SELECT ds ,hh ,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds ,hh ORDER BY cnt DESC;Beberapa partisi tersebut adalah sebagai berikut:
ds
hh
cnt
20200928
17
1052800
20191017
17
1041234
20210928
17
1034332
20190328
17
1000321
20210504
1
19
20191003
20
18
20200522
1
18
20220504
1
18
Gunakan perintah berikut untuk memfilter dan memasukkan data ke partisi yang diidentifikasi di atas, dan lakukan operasi insert terpisah untuk partisi yang berisi banyak catatan.
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817'); SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817'); SELECT ds ,hh,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds,hh ORDER BY cnt DESC;