全部产品
Search
文档中心

MaxCompute:Penyetelan skew data

更新时间:Jan 07, 2026

Topik ini menjelaskan skenario umum kesenjangan data di MaxCompute dan menyediakan solusi yang sesuai.

MapReduce

Untuk memahami kesenjangan data, Anda perlu terlebih dahulu memahami MapReduce. MapReduce adalah framework komputasi terdistribusi yang menerapkan metode divide-and-conquer dengan membagi masalah besar atau kompleks menjadi submasalah yang lebih kecil dan mudah dikelola, menyelesaikannya secara paralel, lalu menggabungkan hasilnya menjadi solusi akhir. Dibandingkan dengan framework pemrograman paralel tradisional, MapReduce menawarkan toleransi kesalahan yang lebih tinggi, kemudahan penggunaan, serta ekstensibilitas yang lebih baik. Saat menggunakan MapReduce untuk pemrograman paralel, Anda tidak perlu mengelola isu non-pemrograman pada kluster terdistribusi, seperti penyimpanan data, komunikasi antar-node, dan mekanisme transmisi, sehingga sangat menyederhanakan pemrograman terdistribusi.

Gambar berikut menunjukkan alur kerja MapReduce.MapReduce

Ketidakseimbangan data

Kesenjangan data sering terjadi selama tahap reduce. Mapper biasanya membagi data secara merata berdasarkan file input. Kesenjangan data muncul ketika data dalam suatu tabel didistribusikan secara tidak merata di antara worker yang berbeda. Distribusi yang tidak merata ini menyebabkan beberapa worker menyelesaikan komputasinya dengan cepat, sementara yang lain membutuhkan waktu jauh lebih lama. Di lingkungan produksi, sebagian besar data bersifat miring dan sering kali mengikuti aturan 80/20—misalnya, 20% Pengguna aktif di forum mungkin menghasilkan 80% postingan, atau 20% pengguna menghasilkan 80% traffic website. Di era data besar, dengan pertumbuhan volume data yang eksponensial, kesenjangan data dapat berdampak signifikan terhadap kinerja program terdistribusi. Gejala umumnya adalah progres eksekusi pekerjaan tersangkut di 99%.

Cara mengidentifikasi kesenjangan data

Di MaxCompute, Anda dapat menggunakan Logview untuk mengidentifikasi kesenjangan data. Langkah-langkah berikut menjelaskan prosesnya:判断数据倾斜

  1. Di Fuxi Jobs, urutkan pekerjaan berdasarkan latency secara descending dan pilih tahap pekerjaan dengan waktu proses terlama.

  2. Di daftar Fuxi Instance untuk Fuxi Stage tersebut, urutkan task berdasarkan latency secara descending. Pilih task dengan waktu proses yang jauh lebih lama dari rata-rata—biasanya task pertama dalam daftar—lalu lihat log StdOut-nya.

  3. Gunakan informasi dari StdOut untuk melihat graf eksekusi pekerjaan yang sesuai.

  4. Gunakan informasi kunci dari graf eksekusi pekerjaan untuk menemukan potongan kode SQL yang menyebabkan kesenjangan data.

Contoh berikut menunjukkan cara menggunakan metode ini.

  1. Temukan log Logview dari log operasional task. Untuk informasi selengkapnya, lihat Titik masuk Logview.logview

  2. Untuk mengidentifikasi masalah dengan cepat, buka antarmuka Logview, urutkan task Fuxi berdasarkan Latency secara descending, lalu pilih task dengan waktu proses terlama.Fuxi Task

  3. Task R31_26_27 memiliki waktu proses terlama. Klik task R31_26_27 untuk membuka halaman detail instans, seperti yang ditunjukkan pada gambar berikut. 时间最长业务 Nilai Latency: {min:00:00:06, avg:00:00:13, max:00:26:40} menunjukkan bahwa untuk semua instans task ini, waktu proses minimum adalah 6 s, rata-rata waktu proses adalah 13 s, dan waktu proses maksimum adalah 26 menit 40 detik. Anda dapat mengurutkan berdasarkan Latency (waktu proses instans) secara descending. Anda akan melihat empat instans dengan waktu proses panjang. MaxCompute menganggap instans Fuxi sebagai long-tailed jika waktu prosesnya lebih dari dua kali rata-rata. Artinya, setiap instans task dengan waktu proses lebih dari 26 s dianggap long-tailed. Dalam kasus ini, 21 instans memiliki waktu proses lebih dari 26 s. Kehadiran instans long-tailed tidak selalu menunjukkan skew task. Anda juga perlu membandingkan nilai avg dan max dari waktu proses instans. Jika nilai max jauh lebih besar daripada nilai avg, hal ini menunjukkan kesenjangan data yang parah. Task ini memerlukan penanganan.

  4. Klik ikon Output log di kolom StdOut untuk melihat log output, seperti yang ditunjukkan pada gambar berikut. Example output

  5. Setelah mengidentifikasi masalah, buka tab Job Details. Klik kanan R31_26_27 dan pilih Expand All untuk memperluas task. Untuk informasi selengkapnya, lihat Gunakan Logview 2.0 untuk melihat informasi pekerjaan. 展开任务 Periksa langkah sebelum StreamLineRead22, yaitu StreamLineWriter21. Langkah ini mengungkap kunci yang menyebabkan kesenjangan data: new_uri_path_structure, cookie_x5check_userid, dan cookie_userid. Gunakan informasi ini untuk menemukan potongan SQL yang menyebabkan kesenjangan data.KEY

Pemecahan masalah skew data dan solusi

Penyebab paling umum kesenjangan data adalah sebagai berikut:

  • Join

  • GroupBy

  • Count (Distinct)

  • ROW_NUMBER (TopN)

  • Dynamic partition

Frekuensi kemunculan, dari yang paling hingga paling jarang, adalah: JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > Dynamic partition.

Join

Kesenjangan data dari operasi join dapat terjadi dalam berbagai situasi, seperti menggabungkan tabel besar dengan tabel kecil, menggabungkan tabel besar dengan tabel menengah, atau ketika nilai hot key menyebabkan long tail.

  • Menggabungkan tabel besar dengan tabel kecil.

    • Contoh skew data

      Pada contoh berikut, t1 adalah tabel besar, sedangkan t2 dan t3 adalah tabel kecil.

      SELECT  t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN <other_viewtable> t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
    • Solusi

      Gunakan sintaks MAPJOIN HINT, seperti pada contoh berikut.

      SELECT  /*+ mapjoin(t2,t3)*/
              t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN (<other_viewtable>) t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
      • Catatan

        • Saat mereferensikan tabel kecil atau subkueri, Anda harus menggunakan alias-nya.

        • MapJoin mendukung penggunaan subkueri sebagai tabel kecil.

        • Pada MapJoin, Anda dapat menggunakan non-equi joins atau menghubungkan beberapa kondisi dengan OR. Anda juga dapat menghitung Produk Kartesius dengan menghilangkan klausa ON dan menggunakan mapjoin on 1 = 1, misalnya, select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;. Namun, operasi ini dapat menyebabkan pembengkakan data.

        • Pada MapJoin, pisahkan beberapa tabel kecil dengan koma (,), seperti /*+ mapjoin(a,b,c)*/.

        • Selama tahap map, MapJoin memuat semua data dari tabel yang ditentukan ke dalam memori. Oleh karena itu, tabel yang ditentukan harus kecil. Total memori yang digunakan oleh tabel setelah dimuat tidak boleh melebihi 512 MB. Karena MaxCompute menggunakan penyimpanan terkompresi, ukuran data tabel kecil meningkat signifikan setelah dimuat ke memori. Batas 512 MB berlaku untuk ukuran setelah data dimuat. Anda dapat meningkatkan batas ini hingga maksimal 8192 MB dengan mengatur parameter berikut.

          SET odps.sql.mapjoin.memory.max=2048;
      • Batasan operasi MapJoin

        • Pada LEFT OUTER JOIN, tabel kiri harus merupakan tabel besar.

        • Pada RIGHT OUTER JOIN, tabel kanan harus merupakan tabel besar.

        • FULL OUTER JOIN tidak didukung.

        • Pada INNER JOIN, baik tabel kiri maupun kanan dapat menjadi tabel besar.

        • MapJoin mendukung maksimal 128 tabel kecil. Kesalahan sintaks akan terjadi jika Anda menentukan lebih dari itu.

  • Menggabungkan tabel besar dengan tabel menengah.

    • Contoh skew data

      Pada contoh berikut, t0 adalah tabel besar dan t1 adalah tabel menengah.

      SELECT  request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
    • Solusi

      Gunakan hint DISTRIBUTED MAPJOIN untuk mengatasi kesenjangan data, seperti pada contoh berikut.

      SELECT  /*+distmapjoin(t1)*/
              request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
  • Long tail yang disebabkan oleh nilai hot key pada join.

    • Contoh kesenjangan data

      Pada contoh berikut, bidang eleme_uid berisi banyak nilai hot key, yang dapat menyebabkan kesenjangan data.

      SELECT
      eleme_uid,
      ...
      FROM (
          SELECT
          eleme_uid,
          ...
          FROM <viewtable>
      )t1
      LEFT JOIN(
          SELECT
          eleme_uid,
          ...
          FROM <customertable>
      )  t2
      ON t1.eleme_uid = t2.eleme_uid;
    • Solusi

      Anda dapat menggunakan salah satu dari empat metode berikut untuk mengatasi masalah ini.

      No.

      Solusi

      Deskripsi

      Solusi 1

      Manual Hot Value Splitting

      Analisis dan identifikasi nilai hot key. Filter catatan dengan nilai hot key dari tabel utama. Pertama, lakukan MapJoin pada catatan tersebut. Kemudian, lakukan MergeJoin pada catatan yang tersisa dengan nilai non-hot key. Terakhir, gabungkan hasil kedua join tersebut.

      Solusi 2

      Atur parameter SkewJoin

      set odps.sql.skewjoin=true;.

      Solusi 3

      SkewJoin Hint

      Gunakan hint: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. Metode SkewJoin Hint menambahkan langkah ekstra untuk menemukan kunci yang miring, yang meningkatkan waktu proses kueri. Jika Anda sudah mengetahui kunci yang miring, Anda dapat mengatur parameter SkewJoin untuk menghemat waktu.

      Solusi 4

      Lakukan modulo equal join dengan tabel multiplier

      Gunakan tabel multiplier.

      • Pisahkan nilai hot key secara manual.

        Metode ini melibatkan analisis dan identifikasi nilai hot key. Pertama, filter catatan yang berisi nilai hot key dari tabel utama dan lakukan MapJoin pada catatan tersebut. Kemudian, lakukan MergeJoin pada catatan yang tersisa yang berisi nilai non-hot key. Terakhir, gabungkan hasil kedua join tersebut. Kode berikut memberikan contohnya:

        SELECT
        /*+ MAPJOIN (t2) */
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid = <skewed_value>
        )t1
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid = <skewed_value>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        UNION ALL
        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid != <skewed_value>
        )t3
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid != <skewed_value>
        )  t4
        ON t3.eleme_uid = t4.eleme_uid
      • Atur parameter SkewJoin.

        Ini adalah solusi umum. MaxCompute menyediakan parameter set odps.sql.skewjoin=true; untuk mengaktifkan fitur SkewJoin. Namun, hanya mengaktifkan SkewJoin saja tidak memengaruhi eksekusi task. Anda juga harus mengatur parameter odps.sql.skewinfo agar fitur ini berlaku. Parameter odps.sql.skewinfo menentukan detail untuk optimasi join. Berikut adalah contoh sintaks perintahnya.

        SET odps.sql.skewjoin=true;
        SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  --skewed_src adalah tabel traffic, dan skewed_value adalah nilai hot key.

        Contoh berikut menunjukkan cara menggunakan perintah tersebut:

        --Untuk satu nilai miring pada satu bidang
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")];
        
        --Untuk beberapa nilai miring pada satu bidang
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
      • SkewJoin Hint.

        Untuk mengeksekusi MapJoin dalam pernyataan SELECT, gunakan hint berikut: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. Dalam hint ini, table_name adalah tabel yang miring, column_name adalah kolom yang miring, dan value adalah nilai kunci yang miring. Contoh berikut menunjukkan cara menggunakan hint ini.

        --Metode 1: Hint nama tabel. Perhatikan bahwa Anda memberi hint alias tabel.
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1;
        
        --Metode 2: Hint nama tabel dan kolom yang menurut Anda mungkin miring. Misalnya, kolom c0 dan c1 pada tabel a memiliki kesenjangan data.
        SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        
        --Metode 3: Hint nama tabel dan kolom, serta berikan nilai kunci yang miring. Jika tipe adalah STRING, sertakan nilai dalam tanda kutip. Misalnya, nilai untuk (a.c0=1 dan a.c1="2") dan (a.c0=3 dan a.c1="4") keduanya memiliki kesenjangan data.
        SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        Catatan

        Metode SkewJoin hint yang langsung menentukan nilai lebih efisien daripada memisahkan nilai hot key secara manual atau mengatur parameter SkewJoin tanpa menentukan nilai.

        Jenis join yang didukung oleh SkewJoin Hint:

        • Inner Join: Anda dapat memberi hint pada salah satu sisi join.

        • Left Join, Semi Join, dan Anti Join: Anda hanya dapat memberi hint pada tabel kiri.

        • Right Join: Anda hanya dapat memberi hint pada tabel kanan.

        • Full Join: Skew Join Hint tidak didukung.

        Berikan hint hanya pada join yang dipastikan memiliki kesenjangan data karena hint tersebut menjalankan operasi agregasi, yang memerlukan biaya.

        Untuk join yang diberi hint, tipe data kunci join kiri harus cocok dengan tipe data kunci join kanan. Jika tidak, hint SkewJoin tidak berfungsi. Misalnya, pada contoh sebelumnya, tipe a.c0 harus cocok dengan tipe b.c0, dan tipe a.c1 harus cocok dengan tipe b.c1. Anda dapat menggunakan CAST dalam subkueri untuk memastikan tipe kunci join konsisten. Contoh berikut menunjukkan cara melakukannya:

        CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int);
        CREATE TABLE T1(c0 string, c1 int, c2 int);
        
        --Metode 1:
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1;
        
        --Metode 2:
        SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;

        Setelah Anda menambahkan hint SkewJoin, pengoptimal menjalankan operasi agregasi untuk mengambil 20 nilai hot key teratas. Nilai 20 adalah nilai default. Anda dapat mengubah nilai ini dengan menjalankan set odps.optimizer.skew.join.topk.num = xx;.

        • Hint SkewJoin hanya mendukung pemberian hint pada satu sisi join.

        • Join yang diberi hint harus memiliki kondisi left key = right key dan tidak mendukung join Produk Kartesius.

        • Anda tidak dapat menambahkan hint SkewJoin ke join yang sudah memiliki hint MapJoin.

      • Lakukan modulo-equi-join dengan tabel multiplier.

        Logika solusi ini berbeda dari tiga solusi sebelumnya. Ini bukan pendekatan divide-and-conquer. Sebaliknya, solusi ini menggunakan tabel multiplier yang memiliki satu kolom bertipe data INT. Nilainya berkisar dari 1 hingga N, di mana N ditentukan oleh tingkat kesenjangan data. Tabel multiplier ini digunakan untuk memperluas tabel perilaku pengguna sebanyak N kali. Kemudian, saat melakukan join, Anda dapat menggunakan ID pengguna dan number sebagai kunci asosiasi. Kesenjangan data awal, yang disebabkan oleh distribusi data hanya berdasarkan ID pengguna, berkurang menjadi 1/N dari tingkat semula karena penambahan kondisi number. Namun, metode ini juga menyebabkan pembengkakan data sebanyak N kali.

        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
        )t1
        LEFT JOIN(
            SELECT
            /*+mapjoin(<multipletable>)*/
            eleme_uid,
            number
            ...
            FROM <customertable>
            JOIN <multipletable>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        AND mod(t1.<value_col>,10)+1 = t2.number;

        Untuk mengatasi masalah pembengkakan data, Anda dapat membatasi perluasan hanya pada catatan yang memiliki nilai hot key di kedua tabel. Catatan dengan nilai non-hot key tetap tidak berubah. Pertama, temukan catatan dengan nilai hot key. Kemudian, proses tabel traffic dan tabel perilaku pengguna secara terpisah dengan menambahkan kolom baru bernama eleme_uid_join. Jika ID pengguna adalah nilai hot key, gunakan CONCAT untuk menambahkan bilangan bulat positif yang ditetapkan secara acak (misalnya, dari 0 hingga 1.000). Jika bukan nilai hot key, pertahankan ID pengguna asli. Saat menggabungkan kedua tabel, gunakan kolom eleme_uid_join. Metode ini meningkatkan multiplier untuk nilai hot key guna mengurangi skew dan menghindari pembengkakan data yang tidak perlu untuk nilai non-hot key. Namun, logika ini sepenuhnya mengubah SQL logika bisnis asli sehingga tidak direkomendasikan.

GroupBy

Contoh berikut menunjukkan pseudocode dengan klausa GroupBy.

SELECT  shop_id
        ,sum(is_open) AS business_days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;

Jika terjadi kesenjangan data, Anda dapat mengatasinya dengan salah satu dari tiga solusi berikut:

No.

Solusi

Deskripsi

Solusi 1

Atur parameter anti-skew untuk Group By

set odps.sql.groupby.skewindata=true;.

Solusi 2

Tambahkan bilangan acak

Pisahkan kunci yang menyebabkan long tail.

Solusi 3

Buat tabel rolling

Kurangi biaya dan tingkatkan efisiensi.

  • Solusi 1: Atur parameter anti-skew untuk `GROUP BY`.

    SET odps.sql.groupby.skewindata=true;
  • Solusi 2: Tambahkan bilangan acak.

    Berbeda dengan Solusi 1, solusi ini mengharuskan Anda menulis ulang pernyataan SQL. Menambahkan bilangan acak untuk memisahkan kunci yang menyebabkan long tail adalah cara efektif untuk mengatasi long tail `GROUP BY`.

    Untuk pernyataan SQL SELECT Key, COUNT(*) AS Cnt FROM TableName GROUP BY Key;, jika combiner tidak digunakan, node map mengacak data ke node reduce. Node reduce kemudian melakukan operasi `COUNT`. Rencana eksekusi yang sesuai adalah M->R.

    Jika Anda telah menemukan kunci yang menyebabkan long tail, Anda dapat mendistribusikan ulang pekerjaan untuk kunci tersebut sebagai berikut:

    -- Asumsikan kunci yang menyebabkan long tail adalah KEY001.
    SELECT  a.Key
            ,SUM(a.Cnt) AS Cnt
    FROM(SELECT  Key
                ,COUNT(*) AS Cnt
                FROM    <TableName>
                GROUP BY Key
                ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
                 ELSE 0
                END
            ) a
    GROUP BY a.Key;

    Setelah perubahan, rencana eksekusi menjadi M->R->R. Meskipun jumlah langkah eksekusi meningkat, kunci long-tail diproses dalam dua langkah, yang dapat mengurangi waktu proses keseluruhan. Konsumsi resource dan efisiensi waktu mirip dengan Solusi 1. Namun, dalam skenario dunia nyata, long tail sering disebabkan oleh lebih dari satu kunci. Mengingat biaya menemukan kunci long-tail dan menulis ulang pernyataan SQL, Solusi 1 lebih hemat biaya.

  • Buat tabel rolling.

    Inti dari solusi ini adalah mengurangi biaya dan meningkatkan efisiensi. Persyaratan utamanya adalah mengambil data merchant dari tahun lalu. Untuk task online, membaca semua partisi dari T-1 hingga T-365 setiap kali membuang banyak resource. Anda dapat membuat tabel rolling untuk mengurangi jumlah partisi yang dibaca tanpa memengaruhi pengambilan data dari tahun lalu. Contoh berikut menunjukkan caranya.

    Pertama, inisialisasi data bisnis merchant selama 365 hari yang diagregasi menggunakan Group By. Tandai tanggal pembaruan data dan simpan data dalam tabel bernama a. Untuk task online berikutnya, Anda dapat menggabungkan tabel T-2 a dengan tabel table_xxx_di, lalu gunakan Group By. Dengan cara ini, jumlah partisi yang dibaca setiap hari berkurang dari 365 menjadi 2. Duplikasi kunci utama shopid sangat berkurang, yang juga mengurangi konsumsi resource.

    -- Buat tabel rolling.
    CREATE TABLE IF NOT EXISTS m_xxx_365_df
    (
      shop_id STRING COMMENT,
      last_update_ds COMMENT,
      365d_open_days COMMENT
    )
    PARTITIONED BY
    (
      ds STRING COMMENT 'Partisi tanggal'
    )LIFECYCLE 7;
    -- Asumsikan 365d dari 1 Mei 2021 hingga 1 Mei 2022. Inisialisasi tabel terlebih dahulu.
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
      SELECT shop_id,
             max(ds) as last_update_ds,
             sum(is_open) AS 365d_open_days
      FROM table_xxx_di
      WHERE dt BETWEEN '20210501' AND '20220501'
      GROUP BY shop_id;
    -- Kemudian, task online yang akan dijalankan adalah sebagai berikut.
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
      SELECT aa.shop_id, 
             aa.last_update_ds, 
             365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- Hilangkan rolling tak terbatas hari bisnis.
      FROM (
        SELECT shop_id, 
               max(last_update_ds) AS last_update_ds, 
               sum(365d_open_days) AS 365d_open_days
        FROM (
          SELECT shop_id,
                 ds AS last_update_ds,
                 sum(is_open) AS 365d_open_days
          FROM table_xxx_di
          WHERE ds = '${bizdate}'
          GROUP BY shop_id
          UNION ALL
          SELECT shop_id,
                 last_update_ds,
                 365d_open_days
          FROM m_xxx_365_df
          WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}'
          GROUP BY shop_id
        )
        GROUP BY shop_id
      ) AS aa
      LEFT JOIN (
        SELECT shop_id,
               is_open
        FROM table_xxx_di
        WHERE ds = '${bizdate_366}'
      ) AS bb
      ON aa.shop_id = bb.shop_id;
                                

Count (Distinct)

Asumsikan sebuah tabel memiliki distribusi data berikut.

ds (partisi)

cnt (jumlah catatan)

20220416

73025514

20220415

2292806

20220417

2319160

Menggunakan pernyataan berikut dapat menyebabkan kesenjangan data:

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

Solusi berikut tersedia:

No.

Solusi

Deskripsi

Solusi 1

Optimasi pengaturan parameter

SET odps.sql.groupby.skewindata=true;

Solusi 2

Agregasi dua tahap umum

Gabungkan bilangan acak ke nilai bidang partisi.

Solusi 3

Mirip dengan agregasi dua tahap

Pertama, kelompokkan berdasarkan dua bidang (ds+shop_id), lalu gunakan count(distinct).

  • Solusi 1: Optimasi dengan mengatur parameter.

    Atur parameter berikut.

    SET odps.sql.groupby.skewindata=true;
  • Solusi 2: Gunakan agregasi dua tahap umum.

    Jika data di bidang shop_id tidak seragam, Anda tidak dapat menggunakan Solusi 1 untuk optimasi. Metode yang lebih umum adalah menggabungkan bilangan acak ke nilai bidang partisi.

    -- Metode 1: Gabungkan bilangan acak. CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds
    SELECT  SPLIT_PART(rand_ds, '_',2) ds
            ,COUNT(*) id_cnt
      FROM (
            SELECT  rand_ds
                    ,shop_id
            FROM    demo_data0
            GROUP BY rand_ds,shop_id
            )
    GROUP BY SPLIT_PART(rand_ds, '_',2);
    
    -- Metode 2: Tambahkan bidang bilangan acak. ROUND(RAND(),1)*10 AS randint10
    SELECT  ds
            ,COUNT(*) id_cnt
    FROM    (SELECT  ds
                     ,randint10
                     ,shop_id
               FROM  demo_data0
            GROUP BY ds,randint10,shop_id
            )
    GROUP BY ds;
  • Solusi 3: Gunakan metode yang mirip dengan agregasi dua tahap.

    Jika data di kedua bidang GroupBy dan Distinct seragam, Anda dapat mengoptimalkan kueri dengan terlebih dahulu mengelompokkan berdasarkan dua bidang (`ds` dan `shop_id`) lalu menggunakan perintah count(distinct).

    SELECT  ds
            ,COUNT(*) AS cnt
    FROM(SELECT  ds
                ,shop_id
                FROM    demo_data0
                GROUP BY ds ,shop_id
        )
    GROUP BY ds;

ROW_NUMBER (TopN)

Contoh berikut menunjukkan top 10.

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

Jika terjadi kesenjangan data, Anda dapat mengatasinya dengan salah satu metode berikut:

No.

Solusi

Deskripsi

Solusi 1

Agregasi dua tahap menggunakan SQL

Tambahkan kolom acak atau gabungkan bilangan acak dan gunakan sebagai parameter dalam partisi.

Solusi 2

Agregasi dua tahap menggunakan fungsi agregat yang didefinisikan pengguna (UDAF)

Optimalkan menggunakan UDAF dengan antrian prioritas min-heap.

  • Solusi 1: Gunakan agregasi dua tahap dengan SQL.

    Untuk membuat data di setiap grup partisi seuniform mungkin selama tahap map, Anda dapat menambahkan kolom acak dan menggunakannya sebagai parameter dalam partisi.

    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                            FROM (SELECT  main_id
                                          ,type
                                          ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                     FROM (SELECT  main_id
                                                   ,type
                                                   ,ceil(110 * rand()) % 11 AS src_pt
                                             FROM  data_demo2
                                          )
                                    ) B
                            WHERE   B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
    -- 2. Sesuaikan bilangan acak.
    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                        FROM(SELECT  main_id
                                     ,type
                                     ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                               FROM  (SELECT  main_id
                                              ,type
                                              ,ceil(10 * rand()) AS src_pt
                                              FROM    data_demo2
                                      )
                                    ) B
                            WHERE B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
  • Solusi 2: Gunakan agregasi dua tahap dengan UDAF.

    Metode SQL dapat menghasilkan kode yang panjang dan sulit dipelihara. Dalam kasus ini, Anda dapat menggunakan fungsi agregat yang didefinisikan pengguna (UDAF) dengan antrian prioritas min-heap untuk optimasi. Artinya, pada tahap iterate, hanya N item teratas yang diproses. Pada tahap merge, hanya N elemen yang digabungkan. Prosesnya sebagai berikut.

    • iterate: Dorong K elemen pertama. Untuk elemen berikutnya, terus-menerus bandingkan dengan elemen teratas minimum dan tukar elemen dalam heap.

    • merge: Menggabungkan dua heap dan mengembalikan K elemen teratas secara in-place.

    • terminate: Kembalikan heap sebagai array.

    • Dalam pernyataan SQL, pisahkan array menjadi baris.

    @annotate('* -> array<string>')
    class GetTopN(BaseUDAF):
        def new_buffer(self):
            return [[], None]
        def iterate(self, buffer, order_column_val, k):
            # heapq.heappush(buffer, order_column_val)
            # buffer = [heapq.nlargest(k, buffer), k]
            if not buffer[1]:
                buffer[1] = k
            if len(buffer[0]) < k:
                heapq.heappush(buffer[0], order_column_val)
            else:
                heapq.heappushpop(buffer[0], order_column_val)
        def merge(self, buffer, pbuffer):
            first_buffer, first_k = buffer
            second_buffer, second_k = pbuffer
            k = first_k or second_k
            merged_heap = first_buffer + second_buffer
            merged_heap.sort(reverse=True)
            merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
            buffer[0] = merged_heap
            buffer[1] = k
        def terminate(self, buffer):
            return buffer[0]
    
    SET odps.sql.python.version=cp37;
    SELECT main_id,type_val FROM (
      SELECT  main_id ,get_topn(type, 10) AS type_array
      FROM data_demo2
      GROUP BY main_id
    )
    LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

Dynamic partition

Dynamic partitioning adalah fitur yang memungkinkan Anda menentukan kolom kunci partisi saat memasukkan data ke tabel partisi, tetapi tanpa memberikan nilai spesifik. Sebaliknya, kolom yang sesuai dalam klausa Select menyediakan nilai partisi. Oleh karena itu, Anda tidak tahu partisi mana yang akan dibuat sebelum pernyataan SQL dijalankan. Anda hanya dapat menentukan partisi yang dibuat setelah pernyataan SQL dijalankan, berdasarkan nilai kolom partisi. Untuk informasi selengkapnya, lihat Masukkan atau timpa data ke partisi dinamis (DYNAMIC PARTITION). Berikut adalah contoh SQL.

CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);

INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;

Dalam banyak skenario, membuat tabel dengan partisi dinamis dapat menyebabkan kesenjangan data. Jika terjadi kesenjangan data, Anda dapat menggunakan solusi berikut untuk mengatasinya.

No.

Solusi

Deskripsi

Solusi 1

Optimasi konfigurasi parameter

Optimalkan melalui konfigurasi parameter.

Solusi 2

Optimasi pemangkasan partisi

Temukan partisi dengan banyak catatan, pangkas, dan masukkan secara terpisah.

  • Solusi 1: Optimasi dengan mengonfigurasi parameter.

    Dynamic partitioning memungkinkan Anda memasukkan data yang memenuhi kondisi berbeda ke partisi berbeda. Fitur ini menghilangkan kebutuhan untuk beberapa operasi Insert Overwrite ke tabel, terutama saat ada banyak partisi, dan dapat sangat menyederhanakan kode Anda. Namun, dynamic partitioning juga dapat membuat terlalu banyak file kecil.

    • Contoh kesenjangan data

      Ambil SQL sederhana berikut sebagai contoh.

      INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM  part_test;

      Asumsikan ada K instans Map dan N partisi target.

                                  ds=1
      cfile1                      ds=2
      ...             X           ds=3
      cfilek                      ...
                                  ds=n

      Dalam kasus paling ekstrem, K*N file kecil dapat dihasilkan. Terlalu banyak file kecil dapat memberikan tekanan manajemen yang besar pada sistem file. Oleh karena itu, MaxCompute menangani partisi dinamis dengan memperkenalkan tahap reduce tambahan. Fitur ini menetapkan partisi target yang sama ke satu instans reduce atau beberapa instans reduce untuk menulis. Metode ini menghindari pembuatan terlalu banyak file kecil. Operasi reduce ini selalu merupakan operasi reduce task terakhir. Di MaxCompute, fitur ini diaktifkan secara default, artinya parameter berikut diatur ke `true`.

      SET odps.sql.reshuffle.dynamicpt=true;

      Mengaktifkan fitur ini secara default menyelesaikan masalah terlalu banyak file kecil dan mencegah task gagal karena satu instans menghasilkan terlalu banyak file. Namun, fitur ini juga memperkenalkan masalah baru, seperti kesenjangan data dan konsumsi resource komputasi oleh operasi reduce tambahan. Oleh karena itu, Anda harus menyeimbangkan faktor-faktor ini dengan hati-hati.

    • Solusi

      Tujuan mengaktifkan set odps.sql.reshuffle.dynamicpt=true; dan memperkenalkan tahap reduce tambahan adalah untuk menyelesaikan masalah terlalu banyak file kecil. Namun, jika jumlah partisi target kecil dan tidak ada risiko membuat terlalu banyak file kecil, mengaktifkan fitur ini secara default membuang resource komputasi dan mengurangi kinerja. Dalam situasi ini, Anda dapat menonaktifkan fitur ini dengan mengatur set odps.sql.reshuffle.dynamicpt=false; untuk meningkatkan kinerja secara signifikan. Berikut adalah contohnya.

      INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp)
      SELECT /*+ mapjoin(t2) */
          '20150503' AS ds,
          t1.lv AS lv,
          t1.type AS tp
      FROM
          (SELECT  ...
          FROM tbbi.ads_tb_cornucopia_user_d
          WHERE ds = '20150503'
          AND lv IN ('flat', '3rd')
          AND tp = 'T'
          AND pref_cat2_id > 0
          ) t1
      JOIN
          (SELECT ...
          FROM tbbi.ads_tb_cornucopia_auct_d
          WHERE ds = '20150503'
          AND tp = 'T'
          AND is_all = 'N'
          AND cat2_id > 0
          ) t2
      ON t1.pref_cat2_id = t2.cat2_id;

      Jika Anda menggunakan parameter default untuk kode di atas, seluruh task membutuhkan waktu sekitar 1 jam 30 menit untuk dijalankan. Task reduce terakhir membutuhkan waktu sekitar 1 jam 20 menit, yang kira-kira 90% dari total waktu proses. Memperkenalkan task reduce tambahan membuat distribusi data setiap instans reduce sangat tidak merata, yang menyebabkan long tail.

    Untuk contoh di atas, analisis jumlah historis partisi dinamis yang dibuat menunjukkan bahwa hanya sekitar dua partisi dinamis yang dibuat setiap hari. Oleh karena itu, Anda dapat dengan aman mengatur set odps.sql.reshuffle.dynamicpt=false;. Task kemudian dapat diselesaikan hanya dalam 9 menit. Dalam kasus ini, mengatur parameter ini ke false dapat meningkatkan kinerja secara signifikan, menghemat waktu dan resource komputasi, serta memberikan manfaat marjinal tinggi hanya dengan mengatur satu parameter.

    Optimasi ini tidak terbatas pada task besar yang memakan waktu lama dan mengonsumsi banyak resource. Untuk task kecil dan biasa yang berjalan cepat dan mengonsumsi sedikit resource, Anda dapat mengatur parameter odps.sql.reshuffle.dynamicpt ke false selama task tersebut menggunakan partisi dinamis dan jumlah partisi dinamis tidak besar. Pengaturan ini menghemat resource dan meningkatkan kinerja dalam semua kasus.

    Node yang memenuhi ketiga kondisi berikut dapat dioptimalkan, terlepas dari durasi task.

    • Menggunakan partisi dinamis

    • Jumlah partisi dinamis <= 50

    • Tidak memiliki `set odps.sql.reshuffle.dynamicpt=false;`

    Tingkat urgensi pengaturan parameter ini untuk suatu node ditentukan oleh waktu eksekusi instans Fuxi terakhir, yang diidentifikasi oleh bidang diag_level, berdasarkan aturan berikut:

    • Last_Fuxi_Inst_Time lebih dari 30 menit: Diag_Level=4 ('Critical').

    • Last_Fuxi_Inst_Time antara 20 dan 30 menit: Diag_Level=3 ('High').

    • Last_Fuxi_Inst_Time antara 10 dan 20 menit: Diag_Level=2 ('Medium').

    • Last_Fuxi_Inst_Time kurang dari 10 menit: Diag_Level=1 ('Low').

  • Solusi 2: Pemangkasan teroptimasi.

    Untuk mengatasi kesenjangan data yang terjadi pada tahap map saat memasukkan data ke partisi dinamis, Anda dapat menemukan partisi dengan banyak catatan, memangkasnya, lalu memasukkannya secara terpisah. Berdasarkan skenario aktual, Anda dapat memodifikasi pengaturan parameter untuk tahap map seperti pada contoh berikut:

    SET odps.sql.mapper.split.size=128;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT  *
    FROM    dwd_alsc_ent_shop_info_hi;

    Hasil menunjukkan bahwa pemindaian tabel penuh dilakukan. Untuk mengoptimalkan kinerja lebih lanjut, Anda dapat menonaktifkan job Reduce yang dihasilkan sistem sebagai berikut:

    SET odps.sql.reshuffle.dynamicpt=false ;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT *
    FROM dwd_alsc_ent_shop_info_hi;

    Untuk mengatasi kesenjangan data yang terjadi pada tahap map saat memasukkan data ke partisi dinamis, Anda dapat mengidentifikasi partisi dengan banyak catatan, mengisolasi, lalu memasukkannya secara terpisah. Langkah-langkahnya sebagai berikut.

    1. Gunakan perintah berikut untuk mencari partisi yang berisi banyak catatan.

      SELECT  ds
              ,hh
              ,COUNT(*) AS cnt
      FROM    dwd_alsc_ent_shop_info_hi
      GROUP BY ds
               ,hh
      ORDER BY cnt DESC;

      Beberapa partisi tersebut adalah sebagai berikut:

      ds

      hh

      cnt

      20200928

      17

      1052800

      20191017

      17

      1041234

      20210928

      17

      1034332

      20190328

      17

      1000321

      20210504

      1

      19

      20191003

      20

      18

      20200522

      1

      18

      20220504

      1

      18

    2. Gunakan perintah berikut untuk memfilter dan memasukkan data ke partisi yang diidentifikasi di atas, dan lakukan operasi insert terpisah untuk partisi yang berisi banyak catatan.

      SET odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');
      
      SET odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');
      
      SELECT  ds
        ,hh,COUNT(*) AS cnt
       FROM dwd_alsc_ent_shop_info_hi
       GROUP BY ds,hh ORDER BY cnt DESC;