All Products
Search
Document Center

MaxCompute:,

Last Updated:Feb 27, 2026

Integrasi MaxCompute dengan Kafka menyediakan kemampuan pemrosesan dan analitik data yang efisien serta andal, cocok untuk skenario yang memerlukan pemrosesan real-time, aliran data berskala besar, dan analitik data kompleks. Topik ini menjelaskan cara menulis data dari Message Queue for Apache Kafka dan instans Kafka self-managed ke MaxCompute, serta menyediakan contoh terperinci untuk instans Kafka self-managed.

Menulis data Kafka ke MaxCompute: Kafka fully managed Alibaba Cloud

MaxCompute terintegrasi erat dengan Message Queue for Apache Kafka. Anda dapat menggunakan MaxCompute Sink Connector untuk Message Queue for Apache Kafka guna mengimpor data secara berkelanjutan dari topik tertentu ke tabel MaxCompute tanpa memerlukan alat pihak ketiga atau pengembangan kustom. Untuk informasi selengkapnya, lihat Buat MaxCompute Sink Connector.

Menulis data Kafka ke MaxCompute: Kafka open source self-managed

Prasyarat

  • Anda telah menerapkan Kafka V2.2 atau versi yang lebih baru dan membuat topik Kafka. Disarankan menggunakan versi 3.4.0.

  • Anda telah membuat proyek dan tabel MaxCompute. Untuk informasi selengkapnya, lihat Buat proyek MaxCompute dan Buat tabel.

Catatan

Konektor Kafka mendukung penulisan data dalam format TEXT, CSV, JSON, dan FLATTEN. Catatan berikut berlaku untuk setiap format. Untuk informasi selengkapnya tentang tipe data, lihat Deskripsi tipe data.

  • Saat menulis data Kafka dalam format TEXT atau JSON ke MaxCompute, tabel MaxCompute harus memenuhi persyaratan berikut:

    Nama bidang

    Tipe bidang

    Bidang tetap

    topic

    STRING

    Ya

    partition

    BIGINT

    Ya

    offset

    BIGINT

    Ya

    key

    • Saat menulis data Kafka TEXT, tipe bidang harus STRING.

    • Saat menulis data Kafka JSON, tipe bidang dapat berupa STRING atau JSON, tergantung pada data yang ditulis.

    Bidang ini tetap untuk menyinkronkan key dari pesan Kafka ke tabel MaxCompute. Untuk informasi selengkapnya tentang mode penyinkronan pesan Kafka ke MaxCompute, lihat mode.

    value

    • Saat menulis data Kafka TEXT, tipe bidang harus STRING.

    • Saat menulis data Kafka JSON, tipe bidang dapat berupa STRING atau JSON, tergantung pada data yang ditulis.

    Bidang ini tetap untuk menyinkronkan value dari pesan Kafka ke tabel MaxCompute. Untuk informasi selengkapnya tentang mode penyinkronan pesan Kafka ke MaxCompute, lihat mode.

    pt

    STRING (bidang partisi)

    Ya

  • Saat menulis data Kafka dalam format FLATTEN atau CSV ke MaxCompute, tabel harus mencakup bidang dan tipe data berikut. Anda dapat menentukan bidang lain berdasarkan data yang ditulis.

    Nama bidang

    Tipe bidang

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING (bidang partisi)

    • Saat menulis data Kafka dalam format CSV ke tabel MaxCompute, urutan dan tipe data bidang kustom di tabel MaxCompute harus sesuai dengan kolom dalam data Kafka agar operasi penulisan berhasil.

    • Saat menulis data Kafka dalam format FLATTEN ke tabel MaxCompute, nama bidang kustom di tabel MaxCompute harus sesuai dengan nama bidang dalam data Kafka agar operasi penulisan berhasil.

      Sebagai contoh, jika data Kafka FLATTEN adalah {"A":a,"B":"b","C":{"D":"d","E":"e"}}, tabel MaxCompute harus dikonfigurasi sebagai berikut.

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

Konfigurasikan dan jalankan layanan konektor Kafka

  1. Contoh ini menggunakan lingkungan Linux. Di jendela perintah, unduh paket kafka-connector-2.0.jar dengan menjalankan perintah berikut atau menggunakan tautan unduh.

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    Untuk mencegah konflik dependensi, buat subfolder, seperti connector, di direktori $KAFKA_HOME/libs dan letakkan paket kafka-connector-2.0.jar di dalamnya.

    Catatan

    Jika paket kafka-connector-2.0.jar tidak kompatibel dengan lingkungan penerapan Kafka Anda, lihat Konfigurasikan Kafka-connector untuk informasi selengkapnya tentang cara mengonfigurasi dan menjalankan layanan Kafka-connector.

  2. Di direktori $KAFKA_HOME/config, konfigurasikan file connect-distributed.properties.

    Tambahkan konten berikut ke file connect-distributed.properties.

    ## Tambahkan konten berikut
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ## Perbarui nilai parameter key.converter dan value.converter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. Di direktori $KAFKA_HOME/, jalankan perintah berikut untuk memulai layanan Kafka-connector.

    ## Perintah mulai
    bin/connect-distributed.sh config/connect-distributed.properties &

Konfigurasikan dan jalankan task konektor Kafka

  1. Buat dan konfigurasikan file konfigurasi odps-sink-connector.json. Kemudian, unggah file odps-sink-connector.json ke lokasi apa pun.

    Konten dan parameter file konfigurasi odps-sink-connector.json dijelaskan pada bagian berikut.

    {
      "name": "Nama task konektor Kafka",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "endpoint",
        "tunnel_endpoint": "your_tunnel endpoint",
        "project": "project",
        "schema":"default",
        "table": "your_table",
        "account_type": "tipe akun (STS atau ALIYUN)",
        "access_id": "access id",
        "access_key": "access key",
        "account_id": "account id untuk sts",
        "sts.endpoint": "sts endpoint",
        "region_id": "region id untuk sts",
        "role_name": "nama role untuk sts",
        "client_timeout_ms": "Periode valid Token STS (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"topik kafka saat terjadi error waktu proses",
        "runtime.error.topic.bootstrap.servers":"server bootstrap kafka dari antrian topik error",
        "skip_error":"false"
      }
    }
    • Parameter umum

      Parameter

      Wajib

      Deskripsi

      name

      Ya

      Nama task. Nama harus unik.

      connector.class

      Ya

      Nama kelas untuk memulai layanan Kafka connector. Nilai default adalah com.aliyun.odps.kafka.connect.MaxComputeSinkConnector.

      tasks.max

      Ya

      Jumlah maksimum proses consumer dalam Kafka connector. Nilainya harus bilangan bulat lebih besar dari 0.

      topics

      Ya

      Nama topik Kafka.

      endpoint

      Ya

      Titik akhir layanan MaxCompute.

      Anda harus mengonfigurasi titik akhir berdasarkan wilayah dan jenis konektivitas jaringan yang dipilih saat membuat proyek MaxCompute. Untuk daftar titik akhir tiap wilayah dan jaringan, lihat Titik akhir.

      tunnel_endpoint

      Tidak

      Titik akhir publik layanan Tunnel.

      Jika Anda tidak mengonfigurasi titik akhir Tunnel, tunnel akan secara otomatis mengarah ke titik akhir Tunnel yang sesuai dengan jaringan tempat layanan MaxCompute berada. Jika Anda mengonfigurasi titik akhir Tunnel, konfigurasi Anda akan diprioritaskan dan pengarahan otomatis dinonaktifkan.

      Untuk daftar titik akhir tiap wilayah dan jaringan, lihat Titik akhir.

      project

      Ya

      Nama proyek MaxCompute target.

      schema

      Tidak

      • Parameter ini wajib jika proyek MaxCompute target dikonfigurasi dengan model skema tiga lapis. Nilai default adalah default.

      • Parameter ini tidak diperlukan jika proyek MaxCompute target tidak dikonfigurasi dengan model skema tiga lapis.

      Untuk informasi selengkapnya tentang skema, lihat Operasi skema.

      table

      Ya

      Nama tabel di proyek MaxCompute target.

      format

      Tidak

      Format pesan yang akan ditulis. Nilai yang valid:

      • TEXT (default): Pesan berupa string.

      • BINARY: Pesan berupa array byte.

      • CSV: Pesan berupa string dengan nilai yang dipisahkan koma (,).

      • JSON: Pesan berupa string dalam tipe data JSON. Untuk informasi selengkapnya tentang tipe data JSON MaxCompute, lihat Tipe data JSON.

      • FLATTEN: Pesan berupa string dalam tipe data JSON. Kunci dan nilai dalam string JSON diurai dan ditulis ke kolom yang sesuai di tabel MaxCompute. Kunci dalam data JSON harus sesuai dengan nama kolom di tabel MaxCompute.

      Untuk contoh impor pesan dalam berbagai format, lihat Contoh penggunaan.

      mode

      Tidak

      Mode untuk menyinkronkan pesan ke MaxCompute. Nilai yang valid:

      • KEY: Hanya menyimpan key pesan dan menulisnya ke tabel MaxCompute target.

      • VALUE: Hanya menyimpan value pesan dan menulisnya ke tabel MaxCompute target.

      • DEFAULT (default): Menyimpan key dan value pesan, lalu menulis keduanya ke tabel MaxCompute target.

        Dalam mode DEFAULT, hanya format data TEXT dan BINARY yang didukung.

      partition_window_type

      Tidak

      Memartisi data berdasarkan waktu sistem. Nilai yang valid: DAY, HOUR (default), dan MINUTE.

      use_streaming

      Tidak

      Menentukan apakah akan menggunakan saluran data streaming. Nilai yang valid:

      • false (default): Dinonaktifkan.

      • true: Diaktifkan.

      buffer_size_kb

      Tidak

      Ukuran buffer internal untuk penulis partisi odps, dalam KB. Nilai default adalah 65536 KB.

      sink_pool_size

      Tidak

      Jumlah maksimum thread untuk penulisan multi-threaded. Nilai default adalah jumlah core CPU di sistem.

      record_batch_size

      Tidak

      Jumlah maksimum pesan yang dapat dikirim secara paralel oleh satu thread dalam satu task konektor Kafka sekaligus.

      skip_error

      Tidak

      Menentukan apakah akan melewati catatan yang menyebabkan error tidak dikenal. Nilai yang valid:

      • false (default): Tidak melewati catatan tersebut.

      • true: Melewati catatan tersebut.

        Catatan
        • Jika skip_error diatur ke false dan parameter runtime.error.topic.name tidak dikonfigurasi, proses akan berhenti menulis data saat terjadi error tidak dikenal. Proses terblokir, dan exception dicatat di log.

        • Jika skip_error diatur ke true dan runtime.error.topic.name tidak dikonfigurasi, proses penulisan data berlanjut, dan data abnormal dibuang.

        • Jika skip_error diatur ke false dan runtime.error.topic.name dikonfigurasi, proses penulisan data berlanjut, dan data abnormal dicatat di topik yang ditentukan oleh runtime.error.topic.name.

        Untuk contoh penanganan data abnormal, lihat Contoh penanganan data abnormal.

      runtime.error.topic.name

      Tidak

      Nama topik Kafka tempat data yang menyebabkan error tidak dikenal selama operasi penulisan ditulis.

      runtime.error.topic.bootstrap.servers

      Tidak

      Alamat server bootstrap instans Kafka tempat data yang menyebabkan error tidak dikenal selama operasi penulisan ditulis.

      account_type

      Ya

      Metode akses ke layanan MaxCompute target. Nilai yang valid adalah STS dan ALIYUN. Nilai default adalah ALIYUN.

      Metode akses yang berbeda memerlukan parameter kredensial akses yang berbeda. Untuk informasi selengkapnya, lihat Akses MaxCompute menggunakan metode ALIYUN dan Akses MaxCompute menggunakan metode STS.

    • Selain parameter umum, Anda juga harus mengonfigurasi parameter berikut.

      Nama Parameter

      Deskripsi

      access_id

      ID AccessKey Akun Alibaba Cloud atau Pengguna RAM Anda.

      Anda dapat memperoleh ID AccessKey di halaman Manajemen AccessKey.

      access_key

      Rahasia AccessKey yang sesuai dengan ID AccessKey.

    • Selain parameter umum, Anda juga harus mengonfigurasi parameter berikut.

      Parameter

      Deskripsi

      account_id

      ID akun yang digunakan untuk mengakses proyek MaxCompute target. Anda dapat melihat ID akun Anda di Pusat Akun.

      region_id

      ID wilayah proyek MaxCompute target. Untuk ID tiap wilayah, lihat Titik akhir.

      role_name

      Nama role yang digunakan untuk mengakses proyek MaxCompute target. Anda dapat melihat nama role di halaman Roles.

      client_timeout_ms

      Interval refresh untuk token Security Token Service (STS), dalam milidetik (ms). Nilai default adalah 11 ms.

      sts.endpoint

      Titik akhir layanan STS yang diperlukan untuk autentikasi identitas menggunakan token keamanan sementara (STS).

      Untuk daftar titik akhir tiap wilayah dan jaringan, lihat Titik akhir.

  2. Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

Contoh penggunaan

Menulis data TEXT

  1. Persiapkan data.

    • Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka. Contoh ini menggunakan topic_text sebagai nama topik.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      Jalankan perintah berikut untuk membuat pesan Kafka.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (Opsional) Jalankan layanan Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.

    Catatan

    Jika layanan Kafka-connector sudah berjalan, Anda dapat melewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json. Kemudian, unggah file odps-sink-connector.json ke lokasi apa pun, misalnya ke path $KAFKA_HOME/config.

    Kode berikut memberikan contoh file odps-sink-connector.json. Untuk informasi selengkapnya tentang file odps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi hasil.

    Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    Output berikut dikembalikan:

    # Karena parameter mode dalam file konfigurasi odps-sink-connector.json diatur ke VALUE, hanya konten value yang disimpan. Bidang key bernilai NULL.
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

Menulis data CSV

  1. Persiapkan data.

    • Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • Tulis data ke Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernama topic_csv.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      Jalankan perintah berikut untuk membuat pesan Kafka.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (Opsional) Jalankan layanan Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.

    Catatan

    Jika layanan Kafka-connector sudah berjalan, Anda dapat melewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file odps-sink-connector.json ke lokasi apa pun. Topik ini menggunakan path $KAFKA_HOME/config sebagai contoh.

    Kode berikut memberikan contoh file odps-sink-connector.json. Untuk informasi selengkapnya tentang file odps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi hasil.

    Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    Output berikut dikembalikan:

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

Menulis data JSON

  1. Persiapkan data.

    • Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka. Contoh ini menggunakan topic_json sebagai nama topik.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      Jalankan perintah berikut untuk membuat pesan Kafka.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (Opsional) Jalankan layanan Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.

    Catatan

    Jika layanan Kafka-connector sudah berjalan, Anda dapat melewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json. Kemudian, unggah file odps-sink-connector.json ke lokasi apa pun, misalnya ke path $KAFKA_HOME/config.

    Kode berikut memberikan contoh file odps-sink-connector.json. Untuk informasi selengkapnya tentang file odps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi hasil.

    Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    Output berikut dikembalikan:

    # Data JSON berhasil ditulis ke bidang value.
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

Menulis data FLATTEN

  1. Persiapkan data.

    • Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka. Contoh ini menggunakan topic_flatten sebagai nama topik.

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      Jalankan perintah berikut untuk membuat pesan Kafka.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (Opsional) Jalankan layanan Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.

    Catatan

    Jika layanan Kafka-connector sudah berjalan, Anda dapat melewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file odps-sink-connector.json ke lokasi apa pun. Topik ini menggunakan path $KAFKA_HOME/config sebagai contoh.

    Kode berikut memberikan contoh file odps-sink-connector.json. Untuk informasi selengkapnya tentang file odps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Jalankan perintah berikut untuk memulai task konektor Kafka.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi hasil.

    Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    Berikut ini menunjukkan hasilnya:

    # Data JSON diurai dan ditulis ke tabel MaxCompute, dengan extendinfo sebagai bidang JSON yang mendukung nesting.
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

Contoh penanganan data abnormal

  1. Persiapkan data.

    • Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Buat data Kafka.

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka.

      • Topik topic_abnormal.

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • Topik pesan untuk exception runtime_error.

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        Catatan

        Jika terjadi error selama operasi penulisan data, data abnormal ditulis ke topik runtime_error. Jenis error ini biasanya disebabkan oleh ketidaksesuaian antara data Kafka dan skema tabel MaxCompute.

      Jalankan perintah berikut untuk membuat pesan Kafka.

      Salah satu pesan dalam perintah berikut tidak sesuai dengan skema tabel MaxCompute target.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (Opsional) Jalankan layanan Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.

    Catatan

    Jika layanan Kafka-connector sudah berjalan, Anda dapat melewati langkah ini.

  3. Buat dan konfigurasikan file odps-sink-connector.json, lalu unggah file odps-sink-connector.json ke lokasi apa pun. Topik ini menggunakan path $KAFKA_HOME/config sebagai contoh.

    Kode berikut memberikan contoh file odps-sink-connector.json. Untuk informasi selengkapnya tentang file odps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. Jalankan perintah berikut untuk memulai task konektor Kafka.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verifikasi hasil.

    • Kueri data tabel MaxCompute

      Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      Output berikut dikembalikan:

      # Seperti yang terlihat dari hasil, data dengan ID 101 tidak ditulis ke MaxCompute karena tidak sesuai dengan skema tabel.
      # Karena parameter runtime.error.topic.name dikonfigurasi, proses tidak terblokir, dan data berikutnya ditulis berhasil.
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • Kueri pesan di topik runtime_error

      Di direktori $KAFKA_HOME/bin/, jalankan perintah berikut untuk melihat pesan.

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      Hasil berikut dikembalikan:

      # Data abnormal berhasil ditulis ke antrian pesan runtime_error.
      {"id":101,"name":"json-4","extendinfos":"null"}