全部产品
Search
文档中心

MaxCompute:Impor data Kafka ke MaxCompute dalam mode offline atau real-time

更新时间:Jul 06, 2025

Integrasi antara MaxCompute dan Kafka menyediakan pemrosesan serta analitik data yang efisien dan andal. Integrasi ini cocok untuk skenario yang memerlukan pemrosesan real-time, aliran data berskala besar, dan analitik data kompleks. Topik ini menjelaskan cara mengimpor data dari ApsaraMQ for Kafka dan Kafka yang dikelola sendiri ke MaxCompute, termasuk contoh penggunaan.

Impor data dari ApsaraMQ for Kafka ke MaxCompute

MaxCompute terintegrasi dengan ApsaraMQ for Kafka, memungkinkan Anda menggunakan konektor sink MaxCompute yang disediakan oleh ApsaraMQ for Kafka untuk mengimpor data topik tertentu ke tabel MaxCompute secara langsung tanpa memerlukan alat pihak ketiga atau pengembangan kustom. Untuk informasi lebih lanjut tentang cara membuat konektor sink MaxCompute, lihat Buat Konektor Sink MaxCompute.

Impor data dari Kafka Apache yang dikelola sendiri ke MaxCompute

Prasyarat

  • Layanan Kafka versi 2.2 atau lebih baru telah diterapkan, dan topik Kafka telah dibuat. Kami merekomendasikan penggunaan layanan Kafka versi 3.4.0.

  • Proyek MaxCompute dan tabel MaxCompute telah dibuat. Untuk informasi lebih lanjut, lihat Buat Proyek MaxCompute dan Buat Tabel.

Peringatan

Layanan Kafka-connector memungkinkan penulisan data Kafka tipe TEXT, CSV, JSON, atau FLATTEN ke MaxCompute. Perhatikan hal berikut saat menulis berbagai jenis data Kafka. Untuk informasi lebih lanjut tentang tipe data, lihat deskripsi parameter format.

  • Tabel berikut menjelaskan persyaratan untuk tabel MaxCompute tempat data Kafka tipe TEXT atau JSON ditulis.

    Nama bidang

    Tipe data

    Diperlukan

    topic

    STRING

    Ya.

    partition

    BIGINT

    Ya.

    offset

    BIGINT

    Ya.

    key

    • Jika Anda menulis data Kafka tipe TEXT, bidang tersebut harus bertipe STRING.

    • Jika Anda menulis data Kafka tipe JSON, bidang tersebut dapat bertipe STRING atau JSON berdasarkan pengaturan tipe data dari data yang ditulis.

    Bidang ini diperlukan jika Anda perlu menyinkronkan kunci dalam pesan Kafka ke tabel MaxCompute. Untuk informasi lebih lanjut tentang mode di mana pesan Kafka disinkronkan ke MaxCompute, lihat deskripsi parameter mode.

    value

    • Jika Anda menulis data Kafka tipe TEXT, bidang tersebut harus bertipe STRING.

    • Jika Anda menulis data Kafka tipe JSON, bidang tersebut dapat bertipe STRING atau JSON berdasarkan pengaturan tipe data dari data yang ditulis.

    Bidang ini diperlukan jika Anda perlu menyinkronkan nilai dalam pesan Kafka ke tabel MaxCompute. Untuk informasi lebih lanjut tentang mode di mana pesan Kafka disinkronkan ke MaxCompute, lihat deskripsi parameter mode.

    pt

    STRING (bidang partisi)

    Ya.

  • Jika Anda menulis data Kafka tipe FLATTEN atau CSV ke MaxCompute, bidang yang tercantum dalam tabel berikut harus disertakan dan memiliki tipe data yang sesuai. Anda juga dapat mengonfigurasi bidang kustom berdasarkan data yang ditulis.

    Nama bidang

    Tipe data

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING (bidang partisi)

    • Untuk data Kafka tipe CSV, urutan bidang kustom dan tipe bidang dalam tabel MaxCompute harus sesuai dengan data Kafka agar data dapat ditulis dengan benar.

    • Untuk data Kafka tipe FLATTEN, nama bidang kustom dalam tabel MaxCompute harus sesuai dengan nama bidang dalam data Kafka agar data dapat ditulis dengan benar.

      Sebagai contoh, jika data Kafka tipe FLATTEN yang ingin Anda tulis adalah {"A":a,"B":"b","C":{"D":"d","E":"e"}}, Anda dapat menjalankan pernyataan berikut untuk membuat tabel MaxCompute guna menyimpan data tersebut.

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

Konfigurasikan dan mulai layanan Kafka-connector

  1. Di lingkungan Linux, jalankan perintah berikut di CLI atau klik tautan unduhan untuk mengunduh paket kafka-connector-2.0.jar:

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    Untuk mencegah konflik dependensi, kami sarankan Anda membuat subfolder seperti connector di $KAFKA_HOME/libs untuk menyimpan paket kafka-connector-2.0.jar.

    Catatan

    Jika lingkungan penyebaran paket kafka-connector-2.0.jar tidak sama dengan lingkungan penyebaran data Kafka, Anda harus mengonfigurasi dan memulai layanan Kafka-connector sesuai petunjuk di aliware-kafka-demos.

  2. Di direktori $KAFKA_HOME/config, konfigurasikan file connect-distributed.properties.

    Tambahkan konfigurasi berikut ke file connect-distributed.properties.

    ## Tambahkan konten berikut:
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ## Perbarui nilai parameter key.converter dan value.converter.
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. Jalankan perintah berikut di direktori $KAFKA_HOME/ untuk memulai layanan Kafka-connector:

    ## Jalankan perintah berikut:
    bin/connect-distributed.sh config/connect-distributed.properties &

Konfigurasikan dan mulai tugas Kafka-connector

  1. Buat dan konfigurasikan file odps-sink-connector.json dan unggah file odps-sink-connector.json ke lokasi mana pun.

    Kode dan tabel berikut menjelaskan isi serta parameter file odps-sink-connector.json.

    {
      "name": "Nama tugas konektor Kafka",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "endpoint",
        "tunnel_endpoint": "your_tunnel endpoint",
        "project": "project",
        "schema":"default",
        "table": "your_table",
        "account_type": "tipe akun (STS atau ALIYUN)",
        "access_id": "id akses",
        "access_key": "kunci akses",
        "account_id": "id akun untuk sts",
        "sts.endpoint": "sts endpoint",
        "region_id": "id wilayah untuk sts",
        "role_name": "nama peran untuk sts",
        "client_timeout_ms": "Periode valid Token STS (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"topik kafka saat terjadi kesalahan runtime",
        "runtime.error.topic.bootstrap.servers":"server bootstrap topik antrian kesalahan kafka",
        "skip_error":"false"
      }
    }
    • Parameter Umum

      Parameter

      Diperlukan

      Deskripsi

      name

      Ya

      Nama tugas. Nama tersebut harus unik.

      connector.class

      Ya

      Nama kelas layanan Kafka-connector. Nilai default: com.aliyun.odps.kafka.connect.MaxComputeSinkConnector.

      tasks.max

      Ya

      Jumlah maksimum proses konsumen dalam layanan Kafka-connector. Nilai tersebut harus berupa bilangan bulat lebih besar dari 0.

      topics

      Ya

      Nama topik Kafka.

      endpoint

      Ya

      Endpoint MaxCompute.

      Anda harus mengonfigurasi parameter ini berdasarkan wilayah dan tipe koneksi jaringan yang dipilih saat Anda membuat proyek MaxCompute. Untuk informasi lebih lanjut tentang endpoint tipe jaringan yang berbeda di setiap wilayah, lihat Endpoints.

      tunnel_endpoint

      Tidak

      Endpoint publik MaxCompute Tunnel.

      Jika Anda tidak mengonfigurasi parameter ini, lalu lintas secara otomatis diarahkan ke endpoint Tunnel yang sesuai dengan jaringan tempat MaxCompute berada. Jika Anda mengonfigurasi parameter ini, lalu lintas diarahkan ke endpoint yang ditentukan dan pengarahan otomatis tidak dilakukan.

      Untuk informasi lebih lanjut tentang endpoint Tunnel tipe jaringan yang berbeda di setiap wilayah, lihat Endpoints.

      project

      Ya

      Nama proyek MaxCompute yang ingin Anda akses.

      schema

      Tidak

      • Parameter ini diperlukan jika proyek MaxCompute tujuan memiliki model skema tiga lapis. Nilai default: default.

      • Jika proyek MaxCompute tujuan tidak memiliki model skema tiga lapis, Anda tidak perlu mengonfigurasi parameter ini.

      Untuk informasi lebih lanjut tentang skema, lihat Operasi terkait skema.

      table

      Ya

      Nama tabel di proyek MaxCompute tujuan.

      format

      Tidak

      Format pesan yang ditulis. Nilai yang valid:

      • TEXT: string. Ini adalah nilai default.

      • BINARY: larik byte.

      • CSV: daftar string yang dipisahkan oleh koma (,).

      • JSON: string JSON. Untuk informasi lebih lanjut tentang tipe data JSON MaxCompute, lihat Petunjuk penggunaan tipe JSON MaxCompute (versi beta).

      • FLATTEN: string JSON. Kunci dan nilai dalam string JSON diurai dan ditulis ke tabel MaxCompute yang ditentukan. Kunci dalam string JSON harus sesuai dengan nama kolom dalam tabel MaxCompute.

      Untuk informasi lebih lanjut tentang cara mengimpor pesan dalam format yang berbeda, lihat Contoh.

      mode

      Tidak

      Mode di mana pesan disinkronkan ke MaxCompute. Nilai yang valid:

      • KEY: Hanya kunci pesan yang disimpan dan ditulis ke tabel MaxCompute tujuan.

      • VALUE: Hanya nilai pesan yang disimpan dan ditulis ke tabel MaxCompute tujuan.

      • DEFAULT: Baik kunci maupun nilai pesan disimpan dan ditulis ke tabel MaxCompute tujuan. Ini adalah nilai default.

        Jika Anda mengatur parameter ini ke DEFAULT, hanya data tipe TEXT atau BINARY yang dapat ditulis.

      partition_window_type

      Tidak

      Data dipartisi berdasarkan waktu sistem. Nilai yang valid: DAY, HOUR, dan MINUTE. Nilai default: HOUR.

      use_streaming

      Tidak

      Menentukan apakah akan menggunakan Streaming Tunnel. Nilai yang valid:

      • false: Streaming Tunnel tidak digunakan. Ini adalah nilai default.

      • true: Streaming Tunnel digunakan.

      buffer_size_kb

      Tidak

      Ukuran buffer internal penulis partisi odps. Unit: KB. Ukuran default adalah 65.536 KB.

      sink_pool_size

      Tidak

      Jumlah maksimum thread untuk penulisan multi-thread. Nilai defaultnya adalah jumlah core CPU di sistem.

      record_batch_size

      Tidak

      Jumlah maksimum pesan yang dapat dikirim secara bersamaan oleh sebuah thread dalam tugas Kafka-connector.

      skip_error

      Tidak

      Menentukan apakah akan melewati rekaman yang dihasilkan saat terjadi kesalahan yang tidak diketahui. Nilai yang valid:

      • false: Rekaman tidak dilewati. Ini adalah nilai default.

      • true: Rekaman dilewati.

        Catatan
        • Jika skip_error diatur ke false dan parameter runtime.error.topic.name tidak dikonfigurasi, operasi penulisan data selanjutnya dihentikan, proses diblokir, dan pengecualian dicatat saat terjadi kesalahan yang tidak diketahui.

        • Jika skip_error diatur ke true dan parameter runtime.error.topic.name tidak dikonfigurasi, proses penulisan data terus menulis data, dan data abnormal dibuang.

        • Jika parameter skip_error diatur ke false dan parameter runtime.error.topic.name dikonfigurasi, proses penulisan data terus menulis data, dan data abnormal dicatat di topik yang ditentukan oleh runtime.error.topic.name topic.

        Untuk lebih banyak contoh tentang pemrosesan data abnormal, lihat Pemrosesan data abnormal.

      runtime.error.topic.name

      Tidak

      Nama topik Kafka tempat data ditulis saat terjadi kesalahan yang tidak diketahui.

      runtime.error.topic.bootstrap.servers

      Tidak

      Alamat dalam konfigurasi bootstrap-servers. Alamat-alamat tersebut adalah alamat broker Kafka tempat data ditulis saat terjadi kesalahan yang tidak diketahui.

      account_type

      Ya

      Metode yang digunakan untuk mengakses layanan MaxCompute tujuan. Nilai yang valid: STS dan ALIYUN. Nilai default: ALIYUN.

      Anda harus mengonfigurasi parameter kredensial akses yang berbeda untuk metode yang berbeda untuk mengakses MaxCompute. Untuk informasi lebih lanjut, lihat Akses MaxCompute menggunakan metode ALIYUN dan Akses MaxCompute menggunakan metode STS dalam topik ini.

    • Akses MaxCompute menggunakan metode ALIYUN: Konfigurasikan parameter berikut selain parameter umum.

      Parameter

      Deskripsi

      access_id

      ID AccessKey akun Alibaba Cloud Anda atau pengguna RAM dalam akun Alibaba Cloud.

      Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey.

      access_key

      Rahasia AccessKey yang sesuai dengan ID AccessKey.

    • Akses MaxCompute menggunakan metode STS: Konfigurasikan parameter berikut selain parameter umum.

      Parameter

      Deskripsi

      account_id

      ID akun yang digunakan untuk mengakses proyek MaxCompute tujuan. Anda dapat melihat ID akun Anda di Pusat Akun.

      region_id

      ID wilayah tempat proyek MaxCompute tujuan berada. Untuk informasi lebih lanjut tentang ID setiap wilayah, lihat Endpoints.

      role_name

      Nama peran yang digunakan untuk mengakses proyek MaxCompute tujuan. Anda dapat melihat nama peran di halaman Peran.

      client_timeout_ms

      Interval pembaruan token STS. Unit: milidetik. Nilai default: 11.

      sts.endpoint

      Endpoint layanan STS yang diperlukan saat Anda menggunakan token STS untuk otentikasi identitas.

      Untuk informasi lebih lanjut tentang endpoint tipe jaringan yang berbeda di setiap wilayah, lihat Endpoints.

  2. Jalankan perintah berikut untuk memulai tugas Kafka-connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

Contoh

Menulis data tipe TEXT

  1. Persiapkan Data.

    • Buat tabel MaxCompute menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernama topic_text.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      Jalankan perintah berikut untuk membuat pesan Kafka:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (Opsional) Mulai layanan Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.

    Catatan

    Jika layanan Kafka-connector telah dimulai, lewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, file odps-sink-connector.json diunggah ke direktori $KAFKA_HOME/config.

    Kode berikut menampilkan isi file odps-sink-connector.json. Untuk detail lebih lanjut tentang file odps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyID>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. Jalankan perintah berikut untuk memulai tugas Kafka-connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi Hasilnya.

    Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    Hasil berikut dikembalikan:

    # Nilai mode dalam file konfigurasi odps-sink-connector.json adalah VALUE. Oleh karena itu, hanya nilai yang disimpan dan bidang kunci adalah NULL.
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

Menulis data tipe CSV

  1. Persiapkan Data.

    • Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernama topic_csv.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      Jalankan perintah berikut untuk membuat pesan Kafka:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (Opsional) Mulai layanan Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.

    Catatan

    Jika layanan Kafka-connector telah dimulai, lewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, file odps-sink-connector.json diunggah ke direktori $KAFKA_HOME/config.

    Kode berikut menampilkan isi file odps-sink-connector.json. Untuk detail lebih lanjut tentang file odps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Jalankan perintah berikut untuk memulai tugas Kafka-connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi Hasilnya.

    Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    Hasil berikut dikembalikan:

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

Menulis data tipe JSON

  1. Persiapkan Data.

    • Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernama topic_json.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      Jalankan perintah berikut untuk membuat pesan Kafka:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (Opsional) Mulai layanan Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.

    Catatan

    Jika layanan Kafka-connector telah dimulai, lewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, file odps-sink-connector.json diunggah ke direktori $KAFKA_HOME/config.

    Kode berikut menampilkan isi file odps-sink-connector.json. Untuk detail lebih lanjut tentang file odps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Jalankan perintah berikut untuk memulai tugas Kafka-connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi Hasilnya.

    Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    Hasil berikut dikembalikan:

    # Tulis data JSON ke bidang value.
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

Menulis data tipe FLATTEN

  1. Persiapkan Data.

    • Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernama topic_flatten.

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      Jalankan perintah berikut untuk membuat pesan Kafka:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (Opsional) Mulai layanan Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.

    Catatan

    Jika layanan Kafka-connector telah dimulai, lewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, file odps-sink-connector.json diunggah ke direktori $KAFKA_HOME/config.

    Kode berikut menampilkan isi file odps-sink-connector.json. Untuk detail lebih lanjut tentang file odps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Jalankan perintah berikut untuk memulai tugas Kafka-connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi Hasilnya.

    Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    Hasil berikut dikembalikan:

    # Data JSON diurai dan ditulis ke tabel MaxCompute. Bidang exteninfo dalam format JSON bertingkat dapat berupa bidang JSON.
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

Memproses data abnormal

  1. Persiapkan Data.

    • Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka:

      • topic_abnormal

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • runtime_error

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        Catatan

        Jika terjadi kesalahan yang tidak diketahui saat menulis data, data abnormal akan ditulis ke topik runtime_error. Pada umumnya, kesalahan yang tidak diketahui terjadi karena format data Kafka tidak sesuai dengan format tabel MaxCompute.

      Jalankan perintah berikut untuk membuat pesan Kafka:

      Dalam pesan berikut, format data satu pesan tidak sama dengan format tabel MaxCompute.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (Opsional) Mulai layanan Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.

    Catatan

    Jika layanan Kafka-connector telah dimulai, lewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, file odps-sink-connector.json diunggah ke direktori $KAFKA_HOME/config.

    Kode berikut menampilkan isi file odps-sink-connector.json. Untuk detail lebih lanjut tentang file odps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. Jalankan perintah berikut untuk memulai tugas Kafka-connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi Hasilnya.

    • Kueri Data dalam Tabel MaxCompute.

      Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      Hasil berikut dikembalikan:

      # Dua rekaman terakhir ditampilkan. Hal ini disebabkan parameter skip_error diatur ke true. Data dengan id 101 tidak ditulis ke tabel MaxCompute, dan rekaman selanjutnya tidak diblokir dari penulisan ke tabel MaxCompute.
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • Kueri Pesan dalam Topik runtime_error.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk melihat hasil penulisan pesan:

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      Hasil berikut dikembalikan:

      # Data abnormal ditulis ke topik runtime_error.
      {"id":101,"name":"json-4","extendinfos":"null"}