All Products
Search
Document Center

DataWorks:Proses data

Last Updated:Jun 16, 2026

Topik ini menjelaskan cara menggunakan node EMR Hive di DataWorks untuk memproses data dari tabel informasi pengguna ods_user_info_d_emr dan tabel log akses ods_raw_log_d_emr. Data yang disinkronkan dari OSS diproses untuk menghasilkan data profil pengguna.

Prasyarat

Sebelum memulai, selesaikan langkah-langkah dalam Sinkronisasi data.

Langkah 1: Rancang alur kerja

Untuk mengonfigurasi dependensi antar node alur kerja, lihat Sinkronisasi data.

Klik ganda alur kerja baru untuk membuka tab konfigurasinya. Klik New Node, pilih EMR Hive, lalu seret ke kanvas di sebelah kanan. Di kotak dialog New Node, masukkan Node Name dan klik Confirm.

Buat tiga node EMR Hive dengan nama dwd_log_info_di_emr, dws_user_info_all_di_emr, dan ads_user_info_1d_emr, lalu konfigurasikan dependensi berikut.

  • dwd_log_info_di_emr: Membersihkan data log mentah dari OSS.

  • dws_user_info_all_di_emr: Menggabungkan data log yang telah dibersihkan dan informasi dasar pengguna.

  • ads_user_info_1d_emr: Menghasilkan data profil pengguna akhir.

Langkah 2: Buat fungsi

Anda harus mengurai data log mentah ke dalam format target. Topik ini menyediakan paket fungsi untuk menerjemahkan alamat IP ke wilayah. Unduh paket tersebut, daftarkan sebagai fungsi di DataWorks, lalu panggil dalam kode Anda.

Unggah resource

  1. Unduh file ip2region-emr.jar.

  2. Di DataStudio, buka alur kerja WorkShop, klik kanan EMR, lalu pilih New Resource > EMR JAR. Konfigurasikan parameter untuk resource baru tersebut dan klik New.

    Tabel berikut menjelaskan parameter utama.

    • Storage Path: Pilih Bucket OSS yang Anda tentukan untuk kluster EMR saat menyiapkan lingkungan.

    • Upload File: Pilih file ip2region-emr.jar yang telah diunduh.

    Biarkan parameter lain pada nilai default atau konfigurasikan sesuai kebutuhan.

  3. Klik ikon image.png di bilah alat untuk melakukan commit resource ke proyek engine EMR di lingkungan pengembangan.

Daftarkan fungsi

  1. Di DataStudio, buka alur kerja, klik kanan EMR, lalu pilih New Function.

  2. Di kotak dialog New Function, atur Function Name menjadi getregion, klik New, lalu konfigurasikan fungsi tersebut.

    Tabel berikut menjelaskan parameter utama.

    • Resource: Pilih ip2region-emr.jar.

    • Class Name: Masukkan org.alidata.emr.udf.Ip2Region.

    Biarkan parameter lain pada nilai default atau konfigurasikan sesuai kebutuhan.

  3. Klik ikon image.png di bilah alat untuk melakukan commit fungsi ke proyek engine EMR di lingkungan pengembangan.

Langkah 3: Konfigurasikan node EMR Hive

Konfigurasikan node dwd_log_info_di_emr

1. Edit kode

Klik ganda node dwd_log_info_di_emr untuk membuka tab konfigurasinya. Di editor kode, masukkan pernyataan SQL berikut.

Catatan

Jika beberapa engine EMR terikat ke DataStudio di ruang kerja Anda, Anda harus memilih EMR Engine. Jika hanya satu engine EMR yang terikat, Anda dapat melewati langkah ini.

-- Create an ODS-layer table.
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
  ip STRING COMMENT 'The IP address.',
  uid STRING COMMENT 'The user ID.',
  `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format.',
  status STRING COMMENT 'The status code returned by the server.',
  bytes STRING COMMENT 'The number of bytes returned to the client.',
  region STRING COMMENT 'The region, which is obtained based on the IP address.',
  method STRING COMMENT 'The HTTP request type.',
  url STRING COMMENT 'The URL.',
  protocol STRING COMMENT 'The HTTP protocol version.',
  referer STRING COMMENT 'The source URL.',
  device STRING COMMENT 'The client type.',
  identity STRING COMMENT 'The access type. Valid values: crawler, feed, user, and unknown.'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
  , uid
  , tm
  , status
  , bytes 
  , getregion(ip) AS region -- Use the UDF to obtain the region from the IP address. 
  , regexp_extract(request, '(^[^ ]+) .*') AS method -- Use a regular expression to split the request into three fields.
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- Use a regular expression to clean the referer to obtain a more accurate URL.
  , CASE
    WHEN lower(agent) RLIKE 'android' THEN 'android' -- Obtain client and access type information from the agent.
    WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN lower(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS tm
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_emr
  WHERE dt = '${bizdate}'
) a;

2. Konfigurasikan properti penjadwalan

Konfigurasi ini menerapkan skenario penjadwalan harian. Pada pukul 00.30, setelah node hulu ods_raw_log_d_emr selesai menyinkronkan data dari file user_log.txt di OSS ke tabel ods_raw_log_d_emr di EMR, node tersebut memicu node dwd_log_info_di_emr. Node ini memproses data dari tabel ods_raw_log_d_emr dan menulis hasilnya ke partisi yang sesuai di tabel dwd_log_info_di_emr.

Parameter

Pengaturan

Scheduling Parameters

Di bagian Scheduling Parameters, tambahkan parameter berikut:

  • Nama: bizdate

  • Nilai: $[yyyymmdd-1]

Time attribute

Atur RUN Attribute ke Allowed Regardless of Status.

Scheduling Dependency

Di bagian Scheduling Dependency, verifikasi bahwa tabel output node dikonfigurasi dengan benar.

Formatnya adalah WorkspaceName.NodeName.

Catatan

Untuk properti waktu, jika Scheduling period diatur ke harian, Anda tidak perlu mengonfigurasi Scheduling Time secara terpisah untuk node ini. Waktu mulai harian node ini dikendalikan oleh waktu terjadwal node virtual workshop_start_emr dalam alur kerja, yang berarti node dijadwalkan berjalan hanya setelah pukul 00.30 setiap hari.

3. Simpan konfigurasi

Konfigurasikan parameter lain yang diperlukan sesuai kebutuhan. Setelah menyelesaikan konfigurasi, klik ikon image.png di bilah alat untuk menyimpan konfigurasi.

Konfigurasikan node dws_user_info_all_di_emr

1. Edit kode

Klik ganda node dws_user_info_all_di_emr untuk membuka tab konfigurasinya. Di editor kode, masukkan pernyataan SQL berikut.

Catatan

Jika beberapa engine EMR terikat ke DataStudio di ruang kerja Anda, Anda harus memilih EMR Engine. Jika hanya satu engine EMR yang terikat, Anda dapat melewati langkah ini.

-- Create a DW-layer table.
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
  uid STRING COMMENT 'The user ID.',
  gender STRING COMMENT 'The gender.',
  age_range STRING COMMENT 'The age range.',
  zodiac STRING COMMENT 'The zodiac sign.',
  region STRING COMMENT 'The region, which is obtained based on the IP address.',
  device STRING COMMENT 'The client type.',
  identity STRING COMMENT 'The access type. Valid values: crawler, feed, user, and unknown.',
  method STRING COMMENT 'The HTTP request type.',
  url STRING COMMENT 'The URL.',
  referer STRING COMMENT 'The source URL.',
  `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format.'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.`time`
FROM (
  SELECT *
  FROM dwd_log_info_di_emr
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d_emr
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

2. Konfigurasikan properti penjadwalan

Konfigurasi ini menerapkan skenario penjadwalan harian. Pada pukul 00.30, setelah tugas hulu ods_user_info_d_emr dan dwd_log_info_di_emr selesai, node dws_user_info_all_di_emr dipicu. Node ini menggabungkan data dari tabel ods_user_info_d_emr dan dwd_log_info_di_emr lalu menulis hasilnya ke tabel dws_user_info_all_di_emr.

Parameter

Pengaturan

Scheduling Parameters

Di bagian Scheduling Parameters, tambahkan parameter berikut:

  • Nama: bizdate

  • Nilai: $[yyyymmdd-1]

Time attribute

Atur RUN Attribute ke Allowed Regardless of Status.

Scheduling Dependency

Di bagian Scheduling Dependency, verifikasi bahwa tabel output node dikonfigurasi dengan benar.

Formatnya adalah WorkspaceName.NodeName.

Catatan

Untuk properti waktu, jika Scheduling period diatur ke harian, Anda tidak perlu mengonfigurasi Scheduling Time secara terpisah untuk node ini. Waktu mulai harian node ini dikendalikan oleh waktu terjadwal node virtual workshop_start_emr dalam alur kerja, yang berarti node dijadwalkan berjalan hanya setelah pukul 00.30 setiap hari.

3. Simpan konfigurasi

Konfigurasikan parameter lain yang diperlukan sesuai kebutuhan. Setelah menyelesaikan konfigurasi, klik ikon image.png di bilah alat untuk menyimpan konfigurasi.

Konfigurasikan node ads_user_info_1d_emr

1. Edit kode

Klik ganda node ads_user_info_1d_emr untuk membuka tab konfigurasinya. Di editor kode, masukkan pernyataan SQL berikut.

Catatan

Jika beberapa engine EMR terikat ke DataStudio di ruang kerja Anda, Anda harus memilih EMR Engine. Jika hanya satu engine EMR yang terikat, Anda dapat melewati langkah ini.

-- Create an RPT-layer table.
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
  uid STRING COMMENT 'The user ID.',
  region STRING COMMENT 'The region, which is obtained based on the IP address.',
  device STRING COMMENT 'The client type.',
  pv BIGINT COMMENT 'The number of page views.',
  gender STRING COMMENT 'The gender.',
  age_range STRING COMMENT 'The age range.',
  zodiac STRING COMMENT 'The zodiac sign.'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;

2. Konfigurasikan properti penjadwalan

Setelah node hulu dws_user_info_all_di_emr selesai menggabungkan tabel ods_user_info_d_emr dan dwd_log_info_di_emr, node tersebut memicu node ads_user_info_1d_emr untuk memproses data lebih lanjut dan menghasilkan dataset akhir.

Parameter

Pengaturan

Scheduling Parameters

Di bagian Scheduling Parameters, tambahkan parameter berikut:

  • Nama: bizdate

  • Nilai: $[yyyymmdd-1]

Time attribute

Atur RUN Attribute ke Allowed Regardless of Status.

Scheduling Dependency

Di bagian Scheduling Dependency, verifikasi bahwa tabel output node dikonfigurasi dengan benar.

Formatnya adalah WorkspaceName.NodeName.

Catatan

Untuk properti waktu, jika Scheduling period diatur ke harian, Anda tidak perlu mengonfigurasi Scheduling Time secara terpisah untuk node ini. Waktu mulai harian node ini dikendalikan oleh waktu terjadwal node virtual workshop_start_emr dalam alur kerja, yang berarti node dijadwalkan berjalan hanya setelah pukul 00.30 setiap hari.

3. Simpan konfigurasi

Konfigurasikan parameter lain yang diperlukan sesuai kebutuhan. Setelah menyelesaikan konfigurasi, klik ikon image.png di bilah alat untuk menyimpan konfigurasi.

Langkah 4: Commit alur kerja

Setelah mengonfigurasi semua node dalam alur kerja, uji alur kerja untuk memastikan berjalan sesuai harapan. Setelah pengujian berhasil, commit alur kerja untuk penerapan.

  1. Di tab konfigurasi alur kerja, klik ikon 运行 untuk menjalankan alur kerja.

  2. Setelah ikon 成功 muncul di samping semua node dalam alur kerja, klik ikon 提交 untuk melakukan commit alur kerja.

  3. Di kotak dialog Submit, pilih node yang akan di-commit, pilih Ignore I/O Inconsistency Alerts, lalu klik Confirm.

  4. Setelah melakukan commit alur kerja, terapkan node-node tersebut.

    1. Di sisi kanan halaman, klik Publish untuk membuka halaman paket penyebaran.

    2. Pilih node yang akan diterapkan dan klik Deploy. Di kotak dialog Confirm Release, klik Publish.

Langkah 5: Jalankan tugas di lingkungan produksi

Setelah tugas diterapkan, sistem menghasilkan instans yang berjalan pada hari berikutnya. Anda dapat melakukan Supplementary data untuk alur kerja yang telah diterapkan guna memeriksa apakah tugas berjalan sesuai harapan di lingkungan produksi. Untuk informasi selengkapnya, lihat Kelola instans pengisian ulang data.

  1. Setelah tugas diterapkan, klik Operation and Maintenance Center di pojok kanan atas.

    Atau, buka tab konfigurasi alur kerja dan klik Go to operations di bilah alat untuk membuka halaman Operation and Maintenance Center.

  2. Di panel navigasi kiri, pilih Auto Triggered Task O&M > Auto Triggered Node untuk membuka halaman Auto Triggered Node. Klik node virtual workshop_start_emr.

  3. Di DAG di sebelah kanan, klik kanan node workshop_start_emr lalu pilih Supplementary data > Current and Descendant Nodes Retroactively.

  4. Pilih tugas yang akan diisi ulang, masukkan tanggal bisnis, lalu klik Submit and Redirect.

  5. Di halaman pengisian ulang data, klik Refresh hingga semua tugas SQL berhasil dijalankan.

Langkah berikutnya

Dalam skenario tugas terjadwal, Anda dapat mengonfigurasi pemantauan kualitas data untuk tabel output guna memverifikasi kualitas data keluaran. Untuk informasi selengkapnya, lihat Konfigurasikan pemantauan kualitas data.