Integrasi antara MaxCompute dan Kafka menyediakan pemrosesan serta analitik data yang efisien dan andal. Integrasi ini cocok untuk skenario yang memerlukan pemrosesan real-time, aliran data berskala besar, dan analitik data kompleks. Topik ini menjelaskan cara mengimpor data dari ApsaraMQ for Kafka dan Kafka yang dikelola sendiri ke MaxCompute, termasuk contoh penggunaan.
Impor data dari ApsaraMQ for Kafka ke MaxCompute
MaxCompute terintegrasi dengan ApsaraMQ for Kafka, memungkinkan Anda menggunakan konektor sink MaxCompute yang disediakan oleh ApsaraMQ for Kafka untuk mengimpor data topik tertentu ke tabel MaxCompute secara langsung tanpa memerlukan alat pihak ketiga atau pengembangan kustom. Untuk informasi lebih lanjut tentang cara membuat konektor sink MaxCompute, lihat Buat Konektor Sink MaxCompute.
Impor data dari Kafka Apache yang dikelola sendiri ke MaxCompute
Prasyarat
Layanan Kafka versi 2.2 atau lebih baru telah diterapkan, dan topik Kafka telah dibuat. Kami merekomendasikan penggunaan layanan Kafka versi 3.4.0.
Proyek MaxCompute dan tabel MaxCompute telah dibuat. Untuk informasi lebih lanjut, lihat Buat Proyek MaxCompute dan Buat Tabel.
Peringatan
Layanan Kafka-connector memungkinkan penulisan data Kafka tipe TEXT, CSV, JSON, atau FLATTEN ke MaxCompute. Perhatikan hal berikut saat menulis berbagai jenis data Kafka. Untuk informasi lebih lanjut tentang tipe data, lihat deskripsi parameter format.
Tabel berikut menjelaskan persyaratan untuk tabel MaxCompute tempat data Kafka tipe TEXT atau JSON ditulis.
Nama bidang
Tipe data
Diperlukan
topic
STRING
Ya.
partition
BIGINT
Ya.
offset
BIGINT
Ya.
key
Jika Anda menulis data Kafka tipe TEXT, bidang tersebut harus bertipe STRING.
Jika Anda menulis data Kafka tipe JSON, bidang tersebut dapat bertipe STRING atau JSON berdasarkan pengaturan tipe data dari data yang ditulis.
Bidang ini diperlukan jika Anda perlu menyinkronkan kunci dalam pesan Kafka ke tabel MaxCompute. Untuk informasi lebih lanjut tentang mode di mana pesan Kafka disinkronkan ke MaxCompute, lihat deskripsi parameter mode.
value
Jika Anda menulis data Kafka tipe TEXT, bidang tersebut harus bertipe STRING.
Jika Anda menulis data Kafka tipe JSON, bidang tersebut dapat bertipe STRING atau JSON berdasarkan pengaturan tipe data dari data yang ditulis.
Bidang ini diperlukan jika Anda perlu menyinkronkan nilai dalam pesan Kafka ke tabel MaxCompute. Untuk informasi lebih lanjut tentang mode di mana pesan Kafka disinkronkan ke MaxCompute, lihat deskripsi parameter mode.
pt
STRING (bidang partisi)
Ya.
Jika Anda menulis data Kafka tipe FLATTEN atau CSV ke MaxCompute, bidang yang tercantum dalam tabel berikut harus disertakan dan memiliki tipe data yang sesuai. Anda juga dapat mengonfigurasi bidang kustom berdasarkan data yang ditulis.
Nama bidang
Tipe data
topic
STRING
partition
BIGINT
offset
BIGINT
pt
STRING (bidang partisi)
Untuk data Kafka tipe CSV, urutan bidang kustom dan tipe bidang dalam tabel MaxCompute harus sesuai dengan data Kafka agar data dapat ditulis dengan benar.
Untuk data Kafka tipe FLATTEN, nama bidang kustom dalam tabel MaxCompute harus sesuai dengan nama bidang dalam data Kafka agar data dapat ditulis dengan benar.
Sebagai contoh, jika data Kafka tipe FLATTEN yang ingin Anda tulis adalah
{"A":a,"B":"b","C":{"D":"d","E":"e"}}, Anda dapat menjalankan pernyataan berikut untuk membuat tabel MaxCompute guna menyimpan data tersebut.CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
Konfigurasikan dan mulai layanan Kafka-connector
Di lingkungan Linux, jalankan perintah berikut di CLI atau klik tautan unduhan untuk mengunduh paket
kafka-connector-2.0.jar:wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jarUntuk mencegah konflik dependensi, kami sarankan Anda membuat subfolder seperti
connectordi$KAFKA_HOME/libsuntuk menyimpan paketkafka-connector-2.0.jar.CatatanJika lingkungan penyebaran paket
kafka-connector-2.0.jartidak sama dengan lingkungan penyebaran data Kafka, Anda harus mengonfigurasi dan memulai layananKafka-connectorsesuai petunjuk di aliware-kafka-demos.Di direktori
$KAFKA_HOME/config, konfigurasikan fileconnect-distributed.properties.Tambahkan konfigurasi berikut ke file
connect-distributed.properties.## Tambahkan konten berikut: plugin.path=<KAFKA_HOME>/libs/connector ## Perbarui nilai parameter key.converter dan value.converter. key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverterJalankan perintah berikut di direktori
$KAFKA_HOME/untuk memulai layananKafka-connector:## Jalankan perintah berikut: bin/connect-distributed.sh config/connect-distributed.properties &
Konfigurasikan dan mulai tugas Kafka-connector
Buat dan konfigurasikan file
odps-sink-connector.jsondan unggah fileodps-sink-connector.jsonke lokasi mana pun.Kode dan tabel berikut menjelaskan isi serta parameter file
odps-sink-connector.json.{ "name": "Nama tugas konektor Kafka", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "endpoint", "tunnel_endpoint": "your_tunnel endpoint", "project": "project", "schema":"default", "table": "your_table", "account_type": "tipe akun (STS atau ALIYUN)", "access_id": "id akses", "access_key": "kunci akses", "account_id": "id akun untuk sts", "sts.endpoint": "sts endpoint", "region_id": "id wilayah untuk sts", "role_name": "nama peran untuk sts", "client_timeout_ms": "Periode valid Token STS (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"topik kafka saat terjadi kesalahan runtime", "runtime.error.topic.bootstrap.servers":"server bootstrap topik antrian kesalahan kafka", "skip_error":"false" } }Parameter Umum
Parameter
Diperlukan
Deskripsi
name
Ya
Nama tugas. Nama tersebut harus unik.
connector.class
Ya
Nama kelas layanan
Kafka-connector. Nilai default:com.aliyun.odps.kafka.connect.MaxComputeSinkConnector.tasks.max
Ya
Jumlah maksimum proses konsumen dalam layanan
Kafka-connector. Nilai tersebut harus berupa bilangan bulat lebih besar dari 0.topics
Ya
Nama topik Kafka.
endpoint
Ya
Endpoint MaxCompute.
Anda harus mengonfigurasi parameter ini berdasarkan wilayah dan tipe koneksi jaringan yang dipilih saat Anda membuat proyek MaxCompute. Untuk informasi lebih lanjut tentang endpoint tipe jaringan yang berbeda di setiap wilayah, lihat Endpoints.
tunnel_endpoint
Tidak
Endpoint publik MaxCompute Tunnel.
Jika Anda tidak mengonfigurasi parameter ini, lalu lintas secara otomatis diarahkan ke endpoint Tunnel yang sesuai dengan jaringan tempat MaxCompute berada. Jika Anda mengonfigurasi parameter ini, lalu lintas diarahkan ke endpoint yang ditentukan dan pengarahan otomatis tidak dilakukan.
Untuk informasi lebih lanjut tentang endpoint Tunnel tipe jaringan yang berbeda di setiap wilayah, lihat Endpoints.
project
Ya
Nama proyek MaxCompute yang ingin Anda akses.
schema
Tidak
Parameter ini diperlukan jika proyek MaxCompute tujuan memiliki model skema tiga lapis. Nilai default: default.
Jika proyek MaxCompute tujuan tidak memiliki model skema tiga lapis, Anda tidak perlu mengonfigurasi parameter ini.
Untuk informasi lebih lanjut tentang skema, lihat Operasi terkait skema.
table
Ya
Nama tabel di proyek MaxCompute tujuan.
format
Tidak
Format pesan yang ditulis. Nilai yang valid:
TEXT: string. Ini adalah nilai default.
BINARY: larik byte.
CSV: daftar string yang dipisahkan oleh koma (,).
JSON: string JSON. Untuk informasi lebih lanjut tentang tipe data JSON MaxCompute, lihat Petunjuk penggunaan tipe JSON MaxCompute (versi beta).
FLATTEN: string JSON. Kunci dan nilai dalam string JSON diurai dan ditulis ke tabel MaxCompute yang ditentukan. Kunci dalam string JSON harus sesuai dengan nama kolom dalam tabel MaxCompute.
Untuk informasi lebih lanjut tentang cara mengimpor pesan dalam format yang berbeda, lihat Contoh.
mode
Tidak
Mode di mana pesan disinkronkan ke MaxCompute. Nilai yang valid:
KEY: Hanya kunci pesan yang disimpan dan ditulis ke tabel MaxCompute tujuan.
VALUE: Hanya nilai pesan yang disimpan dan ditulis ke tabel MaxCompute tujuan.
DEFAULT: Baik kunci maupun nilai pesan disimpan dan ditulis ke tabel MaxCompute tujuan. Ini adalah nilai default.
Jika Anda mengatur parameter ini ke DEFAULT, hanya data tipe TEXT atau BINARY yang dapat ditulis.
partition_window_type
Tidak
Data dipartisi berdasarkan waktu sistem. Nilai yang valid: DAY, HOUR, dan MINUTE. Nilai default: HOUR.
use_streaming
Tidak
Menentukan apakah akan menggunakan Streaming Tunnel. Nilai yang valid:
false: Streaming Tunnel tidak digunakan. Ini adalah nilai default.
true: Streaming Tunnel digunakan.
buffer_size_kb
Tidak
Ukuran buffer internal penulis partisi odps. Unit: KB. Ukuran default adalah 65.536 KB.
sink_pool_size
Tidak
Jumlah maksimum thread untuk penulisan multi-thread. Nilai defaultnya adalah jumlah core CPU di sistem.
record_batch_size
Tidak
Jumlah maksimum pesan yang dapat dikirim secara bersamaan oleh sebuah thread dalam tugas Kafka-connector.
skip_error
Tidak
Menentukan apakah akan melewati rekaman yang dihasilkan saat terjadi kesalahan yang tidak diketahui. Nilai yang valid:
false: Rekaman tidak dilewati. Ini adalah nilai default.
true: Rekaman dilewati.
CatatanJika skip_error diatur ke false dan parameter runtime.error.topic.name tidak dikonfigurasi, operasi penulisan data selanjutnya dihentikan, proses diblokir, dan pengecualian dicatat saat terjadi kesalahan yang tidak diketahui.
Jika skip_error diatur ke true dan parameter runtime.error.topic.name tidak dikonfigurasi, proses penulisan data terus menulis data, dan data abnormal dibuang.
Jika parameter skip_error diatur ke false dan parameter runtime.error.topic.name dikonfigurasi, proses penulisan data terus menulis data, dan data abnormal dicatat di topik yang ditentukan oleh runtime.error.topic.name topic.
Untuk lebih banyak contoh tentang pemrosesan data abnormal, lihat Pemrosesan data abnormal.
runtime.error.topic.name
Tidak
Nama topik Kafka tempat data ditulis saat terjadi kesalahan yang tidak diketahui.
runtime.error.topic.bootstrap.servers
Tidak
Alamat dalam konfigurasi bootstrap-servers. Alamat-alamat tersebut adalah alamat broker Kafka tempat data ditulis saat terjadi kesalahan yang tidak diketahui.
account_type
Ya
Metode yang digunakan untuk mengakses layanan MaxCompute tujuan. Nilai yang valid: STS dan ALIYUN. Nilai default: ALIYUN.
Anda harus mengonfigurasi parameter kredensial akses yang berbeda untuk metode yang berbeda untuk mengakses MaxCompute. Untuk informasi lebih lanjut, lihat Akses MaxCompute menggunakan metode ALIYUN dan Akses MaxCompute menggunakan metode STS dalam topik ini.
Akses MaxCompute menggunakan metode ALIYUN: Konfigurasikan parameter berikut selain parameter umum.
Parameter
Deskripsi
access_id
ID AccessKey akun Alibaba Cloud Anda atau pengguna RAM dalam akun Alibaba Cloud.
Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey.
access_key
Rahasia AccessKey yang sesuai dengan ID AccessKey.
Akses MaxCompute menggunakan metode STS: Konfigurasikan parameter berikut selain parameter umum.
Parameter
Deskripsi
account_id
ID akun yang digunakan untuk mengakses proyek MaxCompute tujuan. Anda dapat melihat ID akun Anda di Pusat Akun.
region_id
ID wilayah tempat proyek MaxCompute tujuan berada. Untuk informasi lebih lanjut tentang ID setiap wilayah, lihat Endpoints.
role_name
Nama peran yang digunakan untuk mengakses proyek MaxCompute tujuan. Anda dapat melihat nama peran di halaman Peran.
client_timeout_ms
Interval pembaruan token STS. Unit: milidetik. Nilai default: 11.
sts.endpoint
Endpoint layanan STS yang diperlukan saat Anda menggunakan token STS untuk otentikasi identitas.
Untuk informasi lebih lanjut tentang endpoint tipe jaringan yang berbeda di setiap wilayah, lihat Endpoints.
Jalankan perintah berikut untuk memulai tugas Kafka-connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
Contoh
Menulis data tipe TEXT
Persiapkan Data.
Buat tabel MaxCompute menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernamatopic_text.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_textJalankan perintah berikut untuk membuat pesan Kafka:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
(Opsional) Mulai layanan
Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.CatatanJika layanan
Kafka-connectortelah dimulai, lewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, fileodps-sink-connector.jsondiunggah ke direktori$KAFKA_HOME/config.Kode berikut menampilkan isi file
odps-sink-connector.json. Untuk detail lebih lanjut tentang fileodps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "<yourAccessKeyID>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai tugas Kafka-connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi Hasilnya.
Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:
set odps.sql.allow.fullscan=true; select * from table_text;Hasil berikut dikembalikan:
# Nilai mode dalam file konfigurasi odps-sink-connector.json adalah VALUE. Oleh karena itu, hanya nilai yang disimpan dan bidang kunci adalah NULL. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
Menulis data tipe CSV
Persiapkan Data.
Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernamatopic_csv.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csvJalankan perintah berikut untuk membuat pesan Kafka:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
(Opsional) Mulai layanan
Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.CatatanJika layanan
Kafka-connectortelah dimulai, lewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, fileodps-sink-connector.jsondiunggah ke direktori$KAFKA_HOME/config.Kode berikut menampilkan isi file
odps-sink-connector.json. Untuk detail lebih lanjut tentang fileodps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai tugas Kafka-connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi Hasilnya.
Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:
set odps.sql.allow.fullscan=true; select * from table_csv;Hasil berikut dikembalikan:
+-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
Menulis data tipe JSON
Persiapkan Data.
Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.
CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernamatopic_json.sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_jsonJalankan perintah berikut untuk membuat pesan Kafka:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
(Opsional) Mulai layanan
Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.CatatanJika layanan
Kafka-connectortelah dimulai, lewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, fileodps-sink-connector.jsondiunggah ke direktori$KAFKA_HOME/config.Kode berikut menampilkan isi file
odps-sink-connector.json. Untuk detail lebih lanjut tentang fileodps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai tugas Kafka-connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi Hasilnya.
Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:
set odps.sql.allow.fullscan=true; select * from table_json;Hasil berikut dikembalikan:
# Tulis data JSON ke bidang value. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
Menulis data tipe FLATTEN
Persiapkan Data.
Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka bernamatopic_flatten../kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flattenJalankan perintah berikut untuk membuat pesan Kafka:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
(Opsional) Mulai layanan
Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.CatatanJika layanan
Kafka-connectortelah dimulai, lewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, fileodps-sink-connector.jsondiunggah ke direktori$KAFKA_HOME/config.Kode berikut menampilkan isi file
odps-sink-connector.json. Untuk detail lebih lanjut tentang fileodps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }Jalankan perintah berikut untuk memulai tugas Kafka-connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi Hasilnya.
Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:
set odps.sql.allow.fullscan=true; select * from table_flatten;Hasil berikut dikembalikan:
# Data JSON diurai dan ditulis ke tabel MaxCompute. Bidang exteninfo dalam format JSON bertingkat dapat berupa bidang JSON. +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
Memproses data abnormal
Persiapkan Data.
Buat tabel MaxCompute tujuan menggunakan Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute.
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Buat data Kafka.
Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk membuat topik Kafka:topic_abnormalsh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormalruntime_errorsh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_errorCatatanJika terjadi kesalahan yang tidak diketahui saat menulis data, data abnormal akan ditulis ke topik
runtime_error. Pada umumnya, kesalahan yang tidak diketahui terjadi karena format data Kafka tidak sesuai dengan format tabel MaxCompute.
Jalankan perintah berikut untuk membuat pesan Kafka:
Dalam pesan berikut, format data satu pesan tidak sama dengan format tabel MaxCompute.
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
(Opsional) Mulai layanan
Kafka-connector. Untuk informasi lebih lanjut, lihat Konfigurasikan dan Mulai Layanan Kafka-connector.CatatanJika layanan
Kafka-connectortelah dimulai, lewati langkah ini.Buat dan konfigurasikan file
odps-sink-connector.json, lalu unggah file tersebut ke lokasi mana pun. Dalam contoh ini, fileodps-sink-connector.jsondiunggah ke direktori$KAFKA_HOME/config.Kode berikut menampilkan isi file
odps-sink-connector.json. Untuk detail lebih lanjut tentang fileodps-sink-connector.json, lihat Konfigurasikan dan mulai tugas Kafka-connector.{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }Jalankan perintah berikut untuk memulai tugas Kafka-connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.jsonVerifikasi Hasilnya.
Kueri Data dalam Tabel MaxCompute.
Jalankan perintah berikut pada Klien MaxCompute (odpscmd) atau alat lain yang dapat menjalankan SQL MaxCompute untuk memeriksa hasil penulisan data:
set odps.sql.allow.fullscan=true; select * from table_flatten;Hasil berikut dikembalikan:
# Dua rekaman terakhir ditampilkan. Hal ini disebabkan parameter skip_error diatur ke true. Data dengan id 101 tidak ditulis ke tabel MaxCompute, dan rekaman selanjutnya tidak diblokir dari penulisan ke tabel MaxCompute. +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+Kueri Pesan dalam Topik
runtime_error.Di direktori
$KAFKA_HOME/bin/, jalankan perintah berikut untuk melihat hasil penulisan pesan:sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginningHasil berikut dikembalikan:
# Data abnormal ditulis ke topik runtime_error. {"id":101,"name":"json-4","extendinfos":"null"}