Jika topik Kafka Anda mengumpulkan data yang memerlukan analitik offline atau penyimpanan di gudang data, Anda dapat mengalirkannya langsung ke MaxCompute dengan membuat konektor sink. Konektor ini secara terus-menerus mengekspor pesan dari topik tertentu dan menuliskannya ke tabel MaxCompute, dengan opsi partisi berbasis waktu untuk kueri yang lebih efisien.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans ApsaraMQ for Kafka di Wilayah yang didukung
Topik pada instans ApsaraMQ for Kafka yang menghasilkan data untuk diekspor
ID kelompok konsumen pada instans ApsaraMQ for Kafka (buat yang baru atau gunakan kelompok yang sudah ada dan tidak sedang digunakan)
Proyek MaxCompute
Pasangan AccessKey Alibaba Cloud (ID AccessKey dan Rahasia AccessKey) dengan izin untuk mengakses MaxCompute
Kumpulkan nilai-nilai berikut sebelum memulai. Anda memerlukannya saat mengonfigurasi konektor:
| Nilai | Deskripsi | Contoh |
|---|---|---|
| ID instans ApsaraMQ for Kafka | Instans yang menghasilkan data | alikafka_post-cn-9hdsbdhd**** |
| Nama topik | Topik sumber | guide-sink-topic |
| ID kelompok konsumen | Kelompok konsumen untuk konektor | GID_EVENTBRIDGE_xxx |
| ID AccessKey | Kredensial untuk mengakses MaxCompute | LTAI5tXxx |
| Rahasia AccessKey | Kredensial untuk mengakses MaxCompute | xXxXxXx |
| Nama proyek MaxCompute | Proyek tujuan | test_compute |
| Nama tabel MaxCompute | Tabel tujuan | kafka_to_maxcompute |
Langkah 1: Buat tabel MaxCompute
Buat tabel tujuan di client MaxCompute. Untuk detailnya, lihat Buat tabel.
Dengan partisi — tambahkan kolom kunci partisi bernama time dengan tipe STRING:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING, valueName STRING, valueAge BIGINT) PARTITIONED by (time STRING);Tanpa partisi:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING, valueName STRING, valueAge BIGINT);Tip: Partisi mengorganisasi data ke dalam segmen berbasis waktu, yang meningkatkan performa kueri dan mengurangi biaya pemindaian untuk tabel besar. Aktifkan partisi jika Anda mengharapkan volume data tinggi atau perlu melakukan kueri berdasarkan rentang waktu. Lewati partisi untuk tabel kecil atau ketika penyaringan berbasis waktu tidak diperlukan.
Setelah pernyataan berhasil dijalankan, verifikasi tabel tersebut di halaman Tables di Konsol MaxCompute.


Langkah 2: Buat dan mulai konektor
Masuk ke ApsaraMQ for Kafka console. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.
Di panel navigasi sebelah kiri, pilih Connector Ecosystem Integration > Tasks.
Di halaman Tasks, klik Create Task.
Di halaman Create Task, masukkan Task Name dan Description, lalu konfigurasikan bagian-bagian berikut.
Konfigurasikan sumber
Di langkah Source, atur Data Provider ke ApsaraMQ for Kafka dan konfigurasikan parameter berikut. Klik Next Step setelah selesai.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Region | Wilayah tempat instans ApsaraMQ for Kafka berada. | China (Hangzhou) |
| ApsaraMQ for Kafka Instance | ID instans. | alikafka_post-cn-9hdsbdhd**** |
| Topic | Topik sumber yang menghasilkan data untuk diekspor. | guide-sink-topic |
| Group ID | Kelompok konsumen untuk konektor. Pilih Quickly Create untuk menghasilkan kelompok otomatis dalam format GID_EVENTBRIDGE_xxx, atau pilih Use Existing Group untuk memilih kelompok yang sudah ada dan tidak sedang digunakan. Memilih kelompok yang sedang digunakan akan memengaruhi langganan pesan yang ada. | Use Existing Group |
| Consumer Offset | Latest Offset: mulai dari pesan terbaru. Earliest Offset: mulai dari pesan tertua. | Latest Offset |
| Network Configuration | Pilih Self-managed Internet untuk transmisi data lintas batas. Jika tidak, pilih Basic Network. | Basic Network |
| Data Format | Format pengkodean untuk data biner dari sumber. Json (default): JSON terenkripsi UTF-8 dalam muatan. Text: string terenkripsi UTF-8. Binary: string terenkripsi Base64. | Json |
| Messages | Jumlah maksimum pesan per pemanggilan fungsi. Pesan dikirim ketika backlog mencapai nilai ini. Nilai valid: 1 hingga 10000. | 2000 |
| Interval (Unit: Seconds) | Interval waktu untuk memanggil fungsi dan mengirim pesan agregasi ke Function Compute. Nilai valid: 0 hingga 15. Nilai 0 mengirim pesan segera setelah agregasi. | 3 |
Konfigurasikan penyaringan dan transformasi
Di langkah Filtering, tentukan pola data untuk menyaring pesan. Untuk detailnya, lihat Event patterns.
Di langkah Transformation, tentukan metode pembersihan data untuk operasi seperti pemisahan, pemetaan, pengayaan, dan perutean dinamis. Untuk detailnya, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.
Konfigurasikan sink
Di langkah Sink, atur Service Type ke MaxCompute acs.maxcompute dan konfigurasikan parameter berikut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| AccessKey ID | ID AccessKey untuk mengakses MaxCompute. | yourAccessKeyID |
| AccessKey Secret | Rahasia AccessKey untuk mengakses MaxCompute. | yourAccessKeySecret |
| MaxCompute Project Name | Nama proyek MaxCompute. | test_compute |
| MaxCompute Table Name | Nama tabel MaxCompute yang dibuat di Langkah 1. | kafka_to_maxcompute |
| MaxCompute Table Input Parameter | Setelah memilih tabel, nama dan tipe kolom akan ditampilkan. Atur Value Extraction Rule untuk setiap kolom menggunakan ekspresi JSONPath. Lihat Aturan ekstraksi nilai untuk detailnya. | $.data.topic |
| Partition Dimension | Disable: tanpa partisi. Enable: partisi data berdasarkan waktu. Jika diaktifkan, konfigurasikan Partition Value menggunakan variabel waktu {yyyy}, {MM}, {dd}, {HH}, {mm} (huruf sensitif) atau konstanta. | Enable, {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix |
| Network Configuration | VPC: kirimkan pesan melalui virtual private cloud (VPC). Internet: kirimkan pesan melalui jaringan publik. | Internet |
| VPC | ID VPC. Diperlukan hanya jika Network Configuration diatur ke VPC. | vpc-bp17fapfdj0dwzjkd**** |
| vSwitch | ID vSwitch. Diperlukan hanya jika Network Configuration diatur ke VPC. | vsw-bp1gbjhj53hdjdkg**** |
| Security Group | ID security group. Diperlukan hanya jika Network Configuration diatur ke VPC. | test_group |
Aturan ekstraksi nilai
Setiap pesan yang dikirimkan ke konektor mengikuti struktur CloudEvents. Gunakan ekspresi JSONPath untuk memetakan bidang pesan ke kolom tabel MaxCompute.
Contoh pesan:
{
"data": {
"topic": "t_test",
"partition": 2,
"offset": 1,
"timestamp": 1717048990499,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "MaxCompute-K1",
"value": "MaxCompute-V1"
},
"id": "9b05fc19-9838-4990-bb49-ddb942307d3f-2-1",
"source": "acs:alikafka",
"specversion": "1.0",
"type": "alikafka:Topic:Message",
"datacontenttype": "application/json; charset=utf-8",
"time": "2024-05-30T06:03:10.499Z",
"aliyunaccountid": "1413397765616316"
}Contoh aturan ekstraksi untuk tabel kafka_to_maxcompute:
| Kolom | Tipe | Aturan ekstraksi nilai | Nilai yang diekstraksi |
|---|---|---|---|
topic | STRING | $.data.topic | t_test |
valueName | STRING | $.data.value | MaxCompute-V1 |
valueAge | BIGINT | $.data.offset | 1 |
Konfigurasikan kebijakan pengulangan
Di bagian Task Property, konfigurasikan kebijakan pengulangan untuk dorongan event yang gagal dan metode penanganan kesalahan. Untuk detailnya, lihat Kebijakan pengulangan dan antrian surat mati.
Simpan dan verifikasi status konektor
Klik Save. Di halaman Tasks, temukan konektor tersebut. Saat kolom Status berubah dari Starting menjadi Running, konektor aktif dan mengalirkan data.
Langkah 3: Verifikasi pengiriman data
Kirim pesan uji dan pastikan pesan tersebut tiba di tabel MaxCompute.
Di halaman Tasks, temukan konektor dan klik nama topik sumber di kolom Event Source.
Di halaman Detail Topik, klik Send Message.
Di panel Start to Send and Consume Message, masukkan pesan uji dan klik OK.

Di Konsol MaxCompute, kueri partisi untuk memastikan data telah tiba:
show PARTITIONS kafka_to_maxcompute;
Kueri data di partisi target. Ganti nilai
timedengan nilai partisi aktual dari langkah sebelumnya. Jika kueri mengembalikan data pesan uji, konektor berfungsi dengan benar.SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";