Integrasi MaxCompute dengan Kafka menyediakan kemampuan pemrosesan dan analitik data yang efisien serta andal, cocok untuk skenario yang memerlukan pemrosesan real-time, aliran data berskala besar, dan analitik data kompleks. Topik ini menjelaskan cara menulis data dari Message Queue for Apache Kafka dan instans Kafka self-managed ke MaxCompute, serta menyediakan contoh terperinci untuk instans Kafka self-managed.
Menulis data Kafka ke MaxCompute: Kafka fully managed Alibaba Cloud
MaxCompute terintegrasi erat dengan Message Queue for Apache Kafka. Anda dapat menggunakan MaxCompute Sink Connector untuk Message Queue for Apache Kafka guna mengimpor data secara berkelanjutan dari topik tertentu ke tabel MaxCompute tanpa memerlukan alat pihak ketiga atau pengembangan kustom. Untuk informasi selengkapnya, lihat Buat MaxCompute Sink Connector.
Menulis data Kafka ke MaxCompute: Kafka open source self-managed
Prasyarat
Anda telah menerapkan Kafka V2.2 atau versi yang lebih baru dan membuat topik Kafka. Disarankan menggunakan versi 3.4.0.
Anda telah membuat proyek dan tabel MaxCompute. Untuk informasi selengkapnya, lihat Buat proyek MaxCompute dan Buat tabel.
Catatan
Konektor Kafka mendukung penulisan data dalam format TEXT, CSV, JSON, dan FLATTEN. Catatan berikut berlaku untuk setiap format. Untuk informasi selengkapnya tentang tipe data, lihat Deskripsi tipe data.
Saat menulis data Kafka dalam format TEXT atau JSON ke MaxCompute, tabel MaxCompute harus memenuhi persyaratan berikut:
Nama bidang
Tipe bidang
Bidang tetap
topic
STRING
Ya
partition
BIGINT
Ya
offset
BIGINT
Ya
key
Saat menulis data Kafka TEXT, tipe bidang harus STRING.
Saat menulis data Kafka JSON, tipe bidang dapat berupa STRING atau JSON, tergantung pada data yang ditulis.
Bidang ini tetap untuk menyinkronkan key dari pesan Kafka ke tabel MaxCompute. Untuk informasi selengkapnya tentang mode penyinkronan pesan Kafka ke MaxCompute, lihat mode.
value
Saat menulis data Kafka TEXT, tipe bidang harus STRING.
Saat menulis data Kafka JSON, tipe bidang dapat berupa STRING atau JSON, tergantung pada data yang ditulis.
Bidang ini tetap untuk menyinkronkan value dari pesan Kafka ke tabel MaxCompute. Untuk informasi selengkapnya tentang mode penyinkronan pesan Kafka ke MaxCompute, lihat mode.
pt
STRING (bidang partisi)
Ya
Saat menulis data Kafka dalam format FLATTEN atau CSV ke MaxCompute, tabel harus mencakup bidang dan tipe data berikut. Anda dapat menentukan bidang lain berdasarkan data yang ditulis.
Nama bidang
Tipe bidang
topic
STRING
partition
BIGINT
offset
BIGINT
pt
STRING (bidang partisi)
Saat menulis data Kafka dalam format CSV ke tabel MaxCompute, urutan dan tipe data bidang kustom di tabel MaxCompute harus sesuai dengan kolom dalam data Kafka agar operasi penulisan berhasil.
Saat menulis data Kafka dalam format FLATTEN ke tabel MaxCompute, nama bidang kustom di tabel MaxCompute harus sesuai dengan nama bidang dalam data Kafka agar operasi penulisan berhasil.
Sebagai contoh, jika data Kafka FLATTEN adalah
{"A":a,"B":"b","C":{"D":"d","E":"e"}}, tabel MaxCompute harus dikonfigurasi sebagai berikut.CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
Konfigurasikan dan jalankan layanan konektor Kafka
Contoh ini menggunakan lingkungan Linux. Di jendela perintah, unduh paket
kafka-connector-2.0.jardengan menjalankan perintah berikut atau menggunakan tautan unduh.wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jarUntuk mencegah konflik dependensi, buat subfolder, seperti
connector, di direktori$KAFKA_HOME/libsdan letakkan paketkafka-connector-2.0.jardi dalamnya.CatatanJika paket
kafka-connector-2.0.jartidak kompatibel dengan lingkungan penerapan Kafka Anda, lihat Konfigurasikan Kafka-connector untuk informasi selengkapnya tentang cara mengonfigurasi dan menjalankan layananKafka-connector.Di direktori
$KAFKA_HOME/config, konfigurasikan fileconnect-distributed.properties.Tambahkan konten berikut ke file
connect-distributed.properties.## Tambahkan konten berikut plugin.path=<KAFKA_HOME>/libs/connector ## Perbarui nilai parameter key.converter dan value.converter key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverterDi direktori
$KAFKA_HOME/, jalankan perintah berikut untuk memulai layananKafka-connector.## Perintah mulai bin/connect-distributed.sh config/connect-distributed.properties &
Konfigurasikan dan jalankan task konektor Kafka
Buat dan konfigurasikan file konfigurasi
odps-sink-connector.json. Kemudian, unggah fileodps-sink-connector.jsonke lokasi apa pun.Konten dan parameter file konfigurasi
odps-sink-connector.jsondijelaskan pada bagian berikut.{ "name": "Nama task konektor Kafka", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "endpoint", "tunnel_endpoint": "your_tunnel endpoint", "project": "project", "schema":"default", "table": "your_table", "account_type": "tipe akun (STS atau ALIYUN)", "access_id": "access id", "access_key": "access key", "account_id": "account id untuk sts", "sts.endpoint": "sts endpoint", "region_id": "region id untuk sts", "role_name": "nama role untuk sts", "client_timeout_ms": "Periode valid Token STS (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"topik kafka saat terjadi error waktu proses", "runtime.error.topic.bootstrap.servers":"server bootstrap kafka dari antrian topik error", "skip_error":"false" } }Parameter umum
Parameter
Wajib
Deskripsi
name
Ya
Nama task. Nama harus unik.
connector.class
Ya
Nama kelas untuk memulai layanan
Kafka connector. Nilai default adalahcom.aliyun.odps.kafka.connect.MaxComputeSinkConnector.tasks.max
Ya
Jumlah maksimum proses consumer dalam
Kafka connector. Nilainya harus bilangan bulat lebih besar dari 0.topics
Ya
Nama topik Kafka.
endpoint
Ya
Titik akhir layanan MaxCompute.
Anda harus mengonfigurasi titik akhir berdasarkan wilayah dan jenis konektivitas jaringan yang dipilih saat membuat proyek MaxCompute. Untuk daftar titik akhir tiap wilayah dan jaringan, lihat Titik akhir.
tunnel_endpoint
Tidak
Titik akhir publik layanan Tunnel.
Jika Anda tidak mengonfigurasi titik akhir Tunnel, tunnel akan secara otomatis mengarah ke titik akhir Tunnel yang sesuai dengan jaringan tempat layanan MaxCompute berada. Jika Anda mengonfigurasi titik akhir Tunnel, konfigurasi Anda akan diprioritaskan dan pengarahan otomatis dinonaktifkan.
Untuk daftar titik akhir tiap wilayah dan jaringan, lihat Titik akhir.
project
Ya
Nama proyek MaxCompute target.
schema
Tidak
Parameter ini wajib jika proyek MaxCompute target dikonfigurasi dengan model skema tiga lapis. Nilai default adalah default.
Parameter ini tidak diperlukan jika proyek MaxCompute target tidak dikonfigurasi dengan model skema tiga lapis.
Untuk informasi selengkapnya tentang skema, lihat Operasi skema.
table
Ya
Nama tabel di proyek MaxCompute target.
format
Tidak
Format pesan yang akan ditulis. Nilai yang valid:
TEXT (default): Pesan berupa string.
BINARY: Pesan berupa array byte.
CSV: Pesan berupa string dengan nilai yang dipisahkan koma (,).
JSON: Pesan berupa string dalam tipe data JSON. Untuk informasi selengkapnya tentang tipe data JSON MaxCompute, lihat Tipe data JSON.
FLATTEN: Pesan berupa string dalam tipe data JSON. Kunci dan nilai dalam string JSON diurai dan ditulis ke kolom yang sesuai di tabel MaxCompute. Kunci dalam data JSON harus sesuai dengan nama kolom di tabel MaxCompute.
Untuk contoh impor pesan dalam berbagai format, lihat Contoh penggunaan.
mode
Tidak
Mode untuk menyinkronkan pesan ke MaxCompute. Nilai yang valid:
KEY: Hanya menyimpan key pesan dan menulisnya ke tabel MaxCompute target.
VALUE: Hanya menyimpan value pesan dan menulisnya ke tabel MaxCompute target.
DEFAULT (default): Menyimpan key dan value pesan, lalu menulis keduanya ke tabel MaxCompute target.
Dalam mode DEFAULT, hanya format data TEXT dan BINARY yang didukung.
partition_window_type
Tidak
Memartisi data berdasarkan waktu sistem. Nilai yang valid: DAY, HOUR (default), dan MINUTE.
use_streaming
Tidak
Menentukan apakah akan menggunakan saluran data streaming. Nilai yang valid:
false (default): Dinonaktifkan.
true: Diaktifkan.
buffer_size_kb
Tidak
Ukuran buffer internal untuk penulis partisi odps, dalam KB. Nilai default adalah 65536 KB.
sink_pool_size
Tidak
Jumlah maksimum thread untuk penulisan multi-threaded. Nilai default adalah jumlah core CPU di sistem.
record_batch_size
Tidak
Jumlah maksimum pesan yang dapat dikirim secara paralel oleh satu thread dalam satu task konektor Kafka sekaligus.
skip_error
Tidak
Menentukan apakah akan melewati catatan yang menyebabkan error tidak dikenal. Nilai yang valid:
false (default): Tidak melewati catatan tersebut.
true: Melewati catatan tersebut.
CatatanJika skip_error diatur ke false dan parameter runtime.error.topic.name tidak dikonfigurasi, proses akan berhenti menulis data saat terjadi error tidak dikenal. Proses terblokir, dan exception dicatat di log.
Jika skip_error diatur ke true dan runtime.error.topic.name tidak dikonfigurasi, proses penulisan data berlanjut, dan data abnormal dibuang.
Jika skip_error diatur ke false dan runtime.error.topic.name dikonfigurasi, proses penulisan data berlanjut, dan data abnormal dicatat di topik yang ditentukan oleh runtime.error.topic.name.
Untuk contoh penanganan data abnormal, lihat Contoh penanganan data abnormal.
runtime.error.topic.name
Tidak
Nama topik Kafka tempat data yang menyebabkan error tidak dikenal selama operasi penulisan ditulis.
runtime.error.topic.bootstrap.servers
Tidak
Alamat server bootstrap instans Kafka tempat data yang menyebabkan error tidak dikenal selama operasi penulisan ditulis.
account_type
Ya
Metode akses ke layanan MaxCompute target. Nilai yang valid adalah STS dan ALIYUN. Nilai default adalah ALIYUN.
Metode akses yang berbeda memerlukan parameter kredensial akses yang berbeda. Untuk informasi selengkapnya, lihat Akses MaxCompute menggunakan metode ALIYUN dan Akses MaxCompute menggunakan metode STS.
Selain parameter umum, Anda juga harus mengonfigurasi parameter berikut.
Nama Parameter
Deskripsi
access_id
ID AccessKey Akun Alibaba Cloud atau Pengguna RAM Anda.
Anda dapat memperoleh ID AccessKey di halaman Manajemen AccessKey.
access_key
Rahasia AccessKey yang sesuai dengan ID AccessKey.
Selain parameter umum, Anda juga harus mengonfigurasi parameter berikut.
Parameter
Deskripsi
account_id
ID akun yang digunakan untuk mengakses proyek MaxCompute target. Anda dapat melihat ID akun Anda di Pusat Akun.
region_id
ID wilayah proyek MaxCompute target. Untuk ID tiap wilayah, lihat Titik akhir.
role_name
Nama role yang digunakan untuk mengakses proyek MaxCompute target. Anda dapat melihat nama role di halaman Roles.
client_timeout_ms
Interval refresh untuk token Security Token Service (STS), dalam milidetik (ms). Nilai default adalah 11 ms.
sts.endpoint
Titik akhir layanan STS yang diperlukan untuk autentikasi identitas menggunakan token keamanan sementara (STS).
Untuk daftar titik akhir tiap wilayah dan jaringan, lihat Titik akhir.
Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
Contoh penggunaan
Menulis data TEXT
Persiapkan data.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka. Contoh ini menggunakantopic_textsebagai nama topik.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_textJalankan perintah berikut untuk membuat pesan Kafka.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
(Opsional) Jalankan layanan
Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.CatatanJika layanan
Kafka-connectorsudah berjalan, Anda dapat melewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json. Kemudian, unggah fileodps-sink-connector.jsonke lokasi apa pun, misalnya ke path$KAFKA_HOME/config.Kode berikut memberikan contoh file
odps-sink-connector.json. Untuk informasi selengkapnya tentang fileodps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi hasil.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.
set odps.sql.allow.fullscan=true; select * from table_text;Output berikut dikembalikan:
# Karena parameter mode dalam file konfigurasi odps-sink-connector.json diatur ke VALUE, hanya konten value yang disimpan. Bidang key bernilai NULL. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
Menulis data CSV
Persiapkan data.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);Tulis data ke Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernamatopic_csv.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csvJalankan perintah berikut untuk membuat pesan Kafka.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
(Opsional) Jalankan layanan
Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.CatatanJika layanan
Kafka-connectorsudah berjalan, Anda dapat melewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah fileodps-sink-connector.jsonke lokasi apa pun. Topik ini menggunakan path$KAFKA_HOME/configsebagai contoh.Kode berikut memberikan contoh file
odps-sink-connector.json. Untuk informasi selengkapnya tentang fileodps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi hasil.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.
set odps.sql.allow.fullscan=true; select * from table_csv;Output berikut dikembalikan:
+-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
Menulis data JSON
Persiapkan data.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.
CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka. Contoh ini menggunakantopic_jsonsebagai nama topik.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_jsonJalankan perintah berikut untuk membuat pesan Kafka.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
(Opsional) Jalankan layanan
Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.CatatanJika layanan
Kafka-connectorsudah berjalan, Anda dapat melewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json. Kemudian, unggah fileodps-sink-connector.jsonke lokasi apa pun, misalnya ke path$KAFKA_HOME/config.Kode berikut memberikan contoh file
odps-sink-connector.json. Untuk informasi selengkapnya tentang fileodps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai task migrasi data konektor Kafka.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi hasil.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.
set odps.sql.allow.fullscan=true; select * from table_json;Output berikut dikembalikan:
# Data JSON berhasil ditulis ke bidang value. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
Menulis data FLATTEN
Persiapkan data.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka. Contoh ini menggunakantopic_flattensebagai nama topik../kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flattenJalankan perintah berikut untuk membuat pesan Kafka.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
(Opsional) Jalankan layanan
Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.CatatanJika layanan
Kafka-connectorsudah berjalan, Anda dapat melewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah fileodps-sink-connector.jsonke lokasi apa pun. Topik ini menggunakan path$KAFKA_HOME/configsebagai contoh.Kode berikut memberikan contoh file
odps-sink-connector.json. Untuk informasi selengkapnya tentang fileodps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai task konektor Kafka.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi hasil.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.
set odps.sql.allow.fullscan=true; select * from table_flatten;Berikut ini menunjukkan hasilnya:
# Data JSON diurai dan ditulis ke tabel MaxCompute, dengan extendinfo sebagai bidang JSON yang mendukung nesting. +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
Contoh penanganan data abnormal
Persiapkan data.
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute dan membuat tabel target.
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka.Topik
topic_abnormal.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormalTopik pesan untuk exception
runtime_error.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_errorCatatanJika terjadi error selama operasi penulisan data, data abnormal ditulis ke topik
runtime_error. Jenis error ini biasanya disebabkan oleh ketidaksesuaian antara data Kafka dan skema tabel MaxCompute.
Jalankan perintah berikut untuk membuat pesan Kafka.
Salah satu pesan dalam perintah berikut tidak sesuai dengan skema tabel MaxCompute target.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
(Opsional) Jalankan layanan
Kafka-connector. Untuk informasi selengkapnya, lihat Konfigurasikan dan jalankan layanan konektor Kafka.CatatanJika layanan
Kafka-connectorsudah berjalan, Anda dapat melewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah fileodps-sink-connector.jsonke lokasi apa pun. Topik ini menggunakan path$KAFKA_HOME/configsebagai contoh.Kode berikut memberikan contoh file
odps-sink-connector.json. Untuk informasi selengkapnya tentang fileodps-sink-connector.json, lihat Konfigurasikan dan jalankan task konektor Kafka.{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }Jalankan perintah berikut untuk memulai task konektor Kafka.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi hasil.
Kueri data tabel MaxCompute
Gunakan client lokal (odpscmd) atau alat lain yang dapat menjalankan perintah SQL MaxCompute untuk terhubung ke MaxCompute, lalu jalankan perintah berikut untuk mengkueri data dan memverifikasi hasil.
set odps.sql.allow.fullscan=true; select * from table_flatten;Output berikut dikembalikan:
# Seperti yang terlihat dari hasil, data dengan ID 101 tidak ditulis ke MaxCompute karena tidak sesuai dengan skema tabel. # Karena parameter runtime.error.topic.name dikonfigurasi, proses tidak terblokir, dan data berikutnya ditulis berhasil. +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+Kueri pesan di topik
runtime_errorDi direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk melihat pesan.sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginningHasil berikut dikembalikan:
# Data abnormal berhasil ditulis ke antrian pesan runtime_error. {"id":101,"name":"json-4","extendinfos":"null"}