All Products
Search
Document Center

ApsaraMQ for Kafka:Buat konektor sink MaxCompute

Last Updated:Mar 12, 2026

Jika topik Kafka Anda mengumpulkan data yang memerlukan analitik offline atau penyimpanan di gudang data, Anda dapat mengalirkannya langsung ke MaxCompute dengan membuat konektor sink. Konektor ini secara terus-menerus mengekspor pesan dari topik tertentu dan menuliskannya ke tabel MaxCompute, dengan opsi partisi berbasis waktu untuk kueri yang lebih efisien.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Instans ApsaraMQ for Kafka di Wilayah yang didukung

  • Topik pada instans ApsaraMQ for Kafka yang menghasilkan data untuk diekspor

  • ID kelompok konsumen pada instans ApsaraMQ for Kafka (buat yang baru atau gunakan kelompok yang sudah ada dan tidak sedang digunakan)

  • Proyek MaxCompute

  • Pasangan AccessKey Alibaba Cloud (ID AccessKey dan Rahasia AccessKey) dengan izin untuk mengakses MaxCompute

Kumpulkan nilai-nilai berikut sebelum memulai. Anda memerlukannya saat mengonfigurasi konektor:

NilaiDeskripsiContoh
ID instans ApsaraMQ for KafkaInstans yang menghasilkan dataalikafka_post-cn-9hdsbdhd****
Nama topikTopik sumberguide-sink-topic
ID kelompok konsumenKelompok konsumen untuk konektorGID_EVENTBRIDGE_xxx
ID AccessKeyKredensial untuk mengakses MaxComputeLTAI5tXxx
Rahasia AccessKeyKredensial untuk mengakses MaxComputexXxXxXx
Nama proyek MaxComputeProyek tujuantest_compute
Nama tabel MaxComputeTabel tujuankafka_to_maxcompute

Langkah 1: Buat tabel MaxCompute

Buat tabel tujuan di client MaxCompute. Untuk detailnya, lihat Buat tabel.

Dengan partisi — tambahkan kolom kunci partisi bernama time dengan tipe STRING:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING, valueName STRING, valueAge BIGINT) PARTITIONED by (time STRING);

Tanpa partisi:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING, valueName STRING, valueAge BIGINT);
Tip: Partisi mengorganisasi data ke dalam segmen berbasis waktu, yang meningkatkan performa kueri dan mengurangi biaya pemindaian untuk tabel besar. Aktifkan partisi jika Anda mengharapkan volume data tinggi atau perlu melakukan kueri berdasarkan rentang waktu. Lewati partisi untuk tabel kecil atau ketika penyaringan berbasis waktu tidak diperlukan.

Setelah pernyataan berhasil dijalankan, verifikasi tabel tersebut di halaman Tables di Konsol MaxCompute.

Table creation resultTables page with the new table

Langkah 2: Buat dan mulai konektor

  1. Masuk ke ApsaraMQ for Kafka console. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.

  2. Di panel navigasi sebelah kiri, pilih Connector Ecosystem Integration > Tasks.

  3. Di halaman Tasks, klik Create Task.

  4. Di halaman Create Task, masukkan Task Name dan Description, lalu konfigurasikan bagian-bagian berikut.

Konfigurasikan sumber

Di langkah Source, atur Data Provider ke ApsaraMQ for Kafka dan konfigurasikan parameter berikut. Klik Next Step setelah selesai.

ParameterDeskripsiContoh
RegionWilayah tempat instans ApsaraMQ for Kafka berada.China (Hangzhou)
ApsaraMQ for Kafka InstanceID instans.alikafka_post-cn-9hdsbdhd****
TopicTopik sumber yang menghasilkan data untuk diekspor.guide-sink-topic
Group IDKelompok konsumen untuk konektor. Pilih Quickly Create untuk menghasilkan kelompok otomatis dalam format GID_EVENTBRIDGE_xxx, atau pilih Use Existing Group untuk memilih kelompok yang sudah ada dan tidak sedang digunakan. Memilih kelompok yang sedang digunakan akan memengaruhi langganan pesan yang ada.Use Existing Group
Consumer OffsetLatest Offset: mulai dari pesan terbaru. Earliest Offset: mulai dari pesan tertua.Latest Offset
Network ConfigurationPilih Self-managed Internet untuk transmisi data lintas batas. Jika tidak, pilih Basic Network.Basic Network
Data FormatFormat pengkodean untuk data biner dari sumber. Json (default): JSON terenkripsi UTF-8 dalam muatan. Text: string terenkripsi UTF-8. Binary: string terenkripsi Base64.Json
MessagesJumlah maksimum pesan per pemanggilan fungsi. Pesan dikirim ketika backlog mencapai nilai ini. Nilai valid: 1 hingga 10000.2000
Interval (Unit: Seconds)Interval waktu untuk memanggil fungsi dan mengirim pesan agregasi ke Function Compute. Nilai valid: 0 hingga 15. Nilai 0 mengirim pesan segera setelah agregasi.3

Konfigurasikan penyaringan dan transformasi

  1. Di langkah Filtering, tentukan pola data untuk menyaring pesan. Untuk detailnya, lihat Event patterns.

  2. Di langkah Transformation, tentukan metode pembersihan data untuk operasi seperti pemisahan, pemetaan, pengayaan, dan perutean dinamis. Untuk detailnya, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.

Konfigurasikan sink

Di langkah Sink, atur Service Type ke MaxCompute acs.maxcompute dan konfigurasikan parameter berikut.

ParameterDeskripsiContoh
AccessKey IDID AccessKey untuk mengakses MaxCompute.yourAccessKeyID
AccessKey SecretRahasia AccessKey untuk mengakses MaxCompute.yourAccessKeySecret
MaxCompute Project NameNama proyek MaxCompute.test_compute
MaxCompute Table NameNama tabel MaxCompute yang dibuat di Langkah 1.kafka_to_maxcompute
MaxCompute Table Input ParameterSetelah memilih tabel, nama dan tipe kolom akan ditampilkan. Atur Value Extraction Rule untuk setiap kolom menggunakan ekspresi JSONPath. Lihat Aturan ekstraksi nilai untuk detailnya.$.data.topic
Partition DimensionDisable: tanpa partisi. Enable: partisi data berdasarkan waktu. Jika diaktifkan, konfigurasikan Partition Value menggunakan variabel waktu {yyyy}, {MM}, {dd}, {HH}, {mm} (huruf sensitif) atau konstanta.Enable, {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
Network ConfigurationVPC: kirimkan pesan melalui virtual private cloud (VPC). Internet: kirimkan pesan melalui jaringan publik.Internet
VPCID VPC. Diperlukan hanya jika Network Configuration diatur ke VPC.vpc-bp17fapfdj0dwzjkd****
vSwitchID vSwitch. Diperlukan hanya jika Network Configuration diatur ke VPC.vsw-bp1gbjhj53hdjdkg****
Security GroupID security group. Diperlukan hanya jika Network Configuration diatur ke VPC.test_group

Aturan ekstraksi nilai

Setiap pesan yang dikirimkan ke konektor mengikuti struktur CloudEvents. Gunakan ekspresi JSONPath untuk memetakan bidang pesan ke kolom tabel MaxCompute.

Contoh pesan:

{
  "data": {
    "topic": "t_test",
    "partition": 2,
    "offset": 1,
    "timestamp": 1717048990499,
    "headers": {
      "headers": [],
      "isReadOnly": false
    },
    "key": "MaxCompute-K1",
    "value": "MaxCompute-V1"
  },
  "id": "9b05fc19-9838-4990-bb49-ddb942307d3f-2-1",
  "source": "acs:alikafka",
  "specversion": "1.0",
  "type": "alikafka:Topic:Message",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2024-05-30T06:03:10.499Z",
  "aliyunaccountid": "1413397765616316"
}

Contoh aturan ekstraksi untuk tabel kafka_to_maxcompute:

KolomTipeAturan ekstraksi nilaiNilai yang diekstraksi
topicSTRING$.data.topict_test
valueNameSTRING$.data.valueMaxCompute-V1
valueAgeBIGINT$.data.offset1

Konfigurasikan kebijakan pengulangan

Di bagian Task Property, konfigurasikan kebijakan pengulangan untuk dorongan event yang gagal dan metode penanganan kesalahan. Untuk detailnya, lihat Kebijakan pengulangan dan antrian surat mati.

Simpan dan verifikasi status konektor

Klik Save. Di halaman Tasks, temukan konektor tersebut. Saat kolom Status berubah dari Starting menjadi Running, konektor aktif dan mengalirkan data.

Langkah 3: Verifikasi pengiriman data

Kirim pesan uji dan pastikan pesan tersebut tiba di tabel MaxCompute.

  1. Di halaman Tasks, temukan konektor dan klik nama topik sumber di kolom Event Source.

  2. Di halaman Detail Topik, klik Send Message.

  3. Di panel Start to Send and Consume Message, masukkan pesan uji dan klik OK.

    Send message panel

  4. Di Konsol MaxCompute, kueri partisi untuk memastikan data telah tiba:

       show PARTITIONS kafka_to_maxcompute;

    Partition query result

  5. Kueri data di partisi target. Ganti nilai time dengan nilai partisi aktual dari langkah sebelumnya. Jika kueri mengembalikan data pesan uji, konektor berfungsi dengan benar.

       SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";

    Partition data query result