全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor MaxCompute

更新时间:Nov 11, 2025

Topik ini menjelaskan cara menggunakan MaxCompute connector.

Latar Belakang

MaxCompute (sebelumnya dikenal sebagai ODPS) adalah platform komputasi cepat dan sepenuhnya terkelola untuk gudang data skala besar. MaxCompute mampu memproses data dalam skala exabyte serta menyediakan solusi penyimpanan dan pemrosesan data terstruktur dalam jumlah besar di gudang data, lengkap dengan layanan analitik dan pemodelan.

Tabel berikut menguraikan kemampuan yang didukung oleh MaxCompute connector.

Item

Deskripsi

Jenis yang didukung

Tabel sumber, tabel dimensi, dan Tabel sink

Mode menjalankan

Mode streaming dan mode Batch

Format data

T/A

Metrik

监控指标

  • Source

    numRecordsIn

    numRecordsInPerSecond

    numBytesIn

    numBytesInPerSecond

  • Sink

    numRecordsOut

    numRecordsOutPerSecond

    numBytesOut

    numBytesOutPerSecond

  • Dimension table

    dim.odps.cacheSize

Catatan

Untuk informasi selengkapnya, lihat Metrics.

Jenis API

DataStream API dan SQL API

Pembaruan atau penghapusan data di Tabel sink

Jika MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel digunakan, data hanya dapat dimasukkan ke dalam Tabel sink. Jika MaxCompute Upsert Tunnel digunakan, data di Tabel sink dapat diperbarui atau dihapus, serta data dapat dimasukkan ke dalam Tabel sink.

Prasyarat

Tabel MaxCompute telah dibuat. Untuk informasi lebih lanjut tentang cara membuat tabel MaxCompute, lihat Buat Tabel.

Batasan

  • MaxCompute connector hanya mendukung semantik setidaknya sekali.

    Catatan

    Semantik setidaknya sekali digunakan untuk mencegah kehilangan data. Dalam kasus tertentu, data duplikat mungkin ditulis ke MaxCompute. Data duplikat dapat dihasilkan berdasarkan tunnel yang Anda gunakan. Untuk informasi lebih lanjut tentang MaxCompute Tunnel, lihat bagian "Bagaimana cara memilih data tunnel?" dari topik FAQ tentang Penyimpanan Hulu dan Hilir.

  • Secara default, sumber beroperasi dalam mode penuh dan hanya membaca data dari partisi yang ditentukan oleh opsi partition. Setelah semua data dari partisi dibaca, pekerjaan selesai dan tidak memantau partisi baru.

    Untuk terus memantau partisi baru, buat sumber tambahan dengan menentukan opsi startPartition dalam klausa WITH.

    Catatan
    • Setiap kali tabel dimensi diperbarui, tabel dimensi memeriksa partisi terbaru.

    • Setelah tabel sumber mulai berjalan, ia tidak membaca data yang baru ditambahkan ke partisi. Kami sarankan Anda menjalankan penyebaran ketika partisi berisi data lengkap.

SQL

MaxCompute connector dapat digunakan sebagai sumber, dimensi, atau tabel sink dalam pekerjaan berbasis SQL.

Sintaksis

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

Opsi konektor

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel.

    STRING

    Ya

    Tidak ada nilai default

    Tetapkan nilainya menjadi odps.

    endpoint

    Titik akhir MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Titik akhir.

    tunnelEndpoint

    Titik akhir MaxCompute Tunnel.

    STRING

    Tidak

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Titik akhir.

    Catatan

    Jika opsi ini tidak ditentukan, MaxCompute mengalokasikan koneksi tunnel berdasarkan layanan Server Load Balancer (SLB).

    project

    Nama proyek MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    schemaName

    Nama skema MaxCompute.

    STRING

    Tidak

    Tidak ada nilai default

    Opsi ini diperlukan hanya jika fitur skema MaxCompute diaktifkan. Dalam hal ini, Anda harus menetapkan opsi ini ke nama skema tabel MaxCompute. Untuk informasi lebih lanjut, lihat Operasi terkait skema.

    Catatan

    Hanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.

    tableName

    Nama tabel MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    accessId

    ID AccessKey yang digunakan untuk mengakses MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Bagaimana cara melihat informasi tentang ID AccessKey dan rahasia AccessKey akun?

    Penting

    Untuk melindungi pasangan AccessKey Anda, kami sarankan Anda mengonfigurasi ID AccessKey menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel.

    accessKey

    Rahasia AccessKey yang digunakan untuk mengakses MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    partition

    Nama partisi dalam tabel MaxCompute.

    STRING

    Tidak

    Tidak ada nilai default

    Anda tidak perlu menentukan opsi ini untuk tabel MaxCompute non-partisi atau sumber inkremental.

    Catatan

    Untuk informasi lebih lanjut tentang cara menentukan opsi partisi untuk tabel MaxCompute yang dipartisi, lihat bagian "Bagaimana cara mengonfigurasi opsi partisi saat data dibaca dari atau ditulis ke partisi?" dari topik FAQ tentang penyimpanan hulu dan hilir.

    compressAlgorithm

    Algoritma kompresi yang digunakan oleh MaxCompute Tunnel.

    STRING

    Tidak

    SNAPPY

    Nilai valid:

    • RAW (tanpa kompresi)

    • ZLIB

    • SNAPPY

      Dibandingkan dengan ZLIB, SNAPPY dapat secara signifikan meningkatkan throughput. Dalam skenario uji, throughput meningkat sekitar 50%.

    quotaName

    Nama kuota untuk grup sumber daya eksklusif Tunnel MaxCompute.

    STRING

    Tidak

    Tidak ada nilai default

    Anda dapat menentukan opsi ini untuk menggunakan grup sumber daya eksklusif Tunnel MaxCompute.

    Penting
    • Hanya VVR 8.0.3 atau lebih baru yang mendukung opsi ini.

    • Jika Anda menentukan opsi ini, Anda harus menghapus opsi tunnelEndpoint. Jika tidak, tunnel yang ditentukan oleh opsi tunnelEndpoint akan digunakan.

  • Khusus Sumber

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    maxPartitionCount

    Jumlah maksimum partisi dari mana data dapat dibaca.

    INTEGER

    Tidak

    100

    Jika jumlah partisi dari mana data dibaca melebihi nilai opsi ini, pesan kesalahan ini muncul: "Jumlah partisi yang cocok melebihi batas default".

    Penting

    Membaca dari jumlah partisi yang berlebihan dapat membebani MaxCompute dan memperlambat startup pekerjaan. Jika diperlukan untuk kebutuhan bisnis Anda, sesuaikan secara manual nilai opsi ini.

    useArrow

    Menentukan apakah akan menggunakan format Arrow untuk membaca data.

    BOOLEAN

    Tidak

    false

    Format Arrow dapat digunakan untuk memanggil operasi API penyimpanan MaxCompute.

    Penting
    • Opsi ini hanya berlaku dalam penyebaran batch.

    • Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.

    splitSize

    Ukuran data yang dapat ditarik sekaligus saat format Arrow digunakan untuk membaca data.

    MEMORYSIZE

    Tidak

    256 MB

    Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.

    Penting

    Opsi ini hanya berlaku dalam penyebaran batch.

    compressCodec

    Algoritma kompresi yang digunakan saat format Arrow digunakan untuk membaca data.

    STRING

    Tidak

    ""

    Nilai valid:

    • "" (tanpa kompresi)

    • ZSTD

    • LZ4_FRAME

    Dibandingkan tanpa kompresi, throughput dapat ditingkatkan jika Anda menentukan algoritma kompresi.

    Penting
    • Opsi ini hanya berlaku dalam penyebaran batch.

    • Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.

    dynamicLoadBalance

    Menentukan apakah akan mengaktifkan alokasi dinamis shard.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true

    • false

    Alokasi dinamis shard dapat meningkatkan kinerja pemrosesan operator yang berbeda dari Realtime Compute for Apache Flink dan mengurangi waktu keseluruhan yang diperlukan untuk membaca dari MaxCompute. Namun, ini dapat menyebabkan skew data karena jumlah total data yang dibaca oleh operator yang berbeda tidak konsisten.

    Penting
    • Opsi ini hanya berlaku dalam penyebaran batch.

    • Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.

  • Opsi Spesifik untuk Tabel Sumber MaxCompute Inkremental

    Tabel sumber inkremental memantau partisi baru dengan memeriksa secara berkala server MaxCompute untuk mendapatkan semua informasi partisi. Sebelum sumber mulai membaca data dari partisi baru, penulisan data di partisi tersebut harus selesai. Untuk informasi lebih lanjut, lihat bagian "Apa yang harus saya lakukan jika tabel sumber MaxCompute inkremental mendeteksi partisi baru saat data masih ditulis ke partisi?" dari topik FAQ tentang Penyimpanan Hulu dan Hilir. Anda dapat mengonfigurasi opsi startPartition untuk menentukan partisi awal dari mana data dibaca. Hanya data dalam partisi yang urutan alfabetisnya lebih besar dari atau sama dengan urutan alfabetis partisi yang ditentukan oleh opsi startPartition yang dibaca. Sebagai contoh, urutan alfabetis partisi year=2023,month=10 kurang dari urutan alfabetis partisi year=2023,month=9. Dalam hal ini, Anda dapat menambahkan nol sebelum nomor bulan pada nama partisi yang dideklarasikan dalam kode untuk memastikan bahwa urutan alfabetis partisi valid. Dengan cara ini, Anda dapat mengubah nilai opsi partisi dari year=2023,month=9 menjadi year=2023,month=09.

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    startPartition

    Partisi awal dari mana data inkremental dibaca.

    STRING

    Ya

    Tidak ada nilai default

    • Jika Anda menentukan opsi ini, sumber inkremental digunakan. Akibatnya, opsi partisi diabaikan.

    • Jika tabel sumber adalah tabel partisi multi-level, Anda harus mengonfigurasi nilai setiap kolom partisi dalam urutan menurun berdasarkan tingkat partisi.

    Catatan

    Untuk informasi lebih lanjut tentang cara menentukan opsi startPartition, lihat bagian "Bagaimana cara mengonfigurasi opsi startPartition untuk tabel sumber MaxCompute inkremental?" dari topik FAQ tentang penyimpanan hulu dan hilir.

    subscribeIntervalInSec

    Interval di mana MaxCompute dipolling untuk mendapatkan informasi tentang partisi.

    INTEGER

    Tidak

    30

    Satuan: detik.

    modifiedTableOperation

    Operasi yang dilakukan saat data dalam partisi dimodifikasi selama pembacaan partisi.

    Enum (NONE, SKIP)

    Tidak

    NONE

    Sesi unduhan disimpan dalam checkpoint. Setiap kali Anda melanjutkan sesi dari checkpoint, Realtime Compute for Apache Flink mencoba melanjutkan kemajuan pembacaan dari sesi. Namun, sesi tidak tersedia karena data dalam partisi dimodifikasi. Dalam hal ini, penyebaran di-restart berulang kali. Untuk menyelesaikan masalah ini, Anda dapat menentukan opsi ini. Nilai valid:

    • NONE: Jika Anda mengatur opsi ini ke NONE, Anda harus mengubah nilai opsi startPartition agar urutan alfabetis partisi yang ditentukan oleh opsi startPartition lebih besar dari urutan alfabetis partisi yang tidak tersedia dan mulai penyebaran tanpa status.

    • SKIP: Jika Anda tidak ingin memulai penyebaran tanpa status, Anda dapat mengatur opsi ini ke SKIP. Dalam hal ini, Realtime Compute for Apache Flink melewati partisi yang tidak tersedia saat mencoba melanjutkan sesi dari checkpoint.

    Penting
    • Hanya VVR 8.0.3 atau yang lebih baru yang mendukung opsi ini.

    • Jika Anda mengatur opsi ini ke NONE atau SKIP, data yang dibaca dari partisi tempat data dimodifikasi tetap disimpan, dan data yang belum dibaca diabaikan.

  • Khusus Sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    useStreamTunnel

    Menentukan apakah akan menggunakan MaxCompute Streaming Tunnel untuk mengunggah data.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: MaxCompute Streaming Tunnel digunakan untuk mengunggah data.

    • false: MaxCompute Batch Tunnel digunakan untuk mengunggah data.

    Catatan

    Untuk informasi lebih lanjut tentang cara memilih terowongan, lihat bagian "How do I select a data tunnel?" pada topik FAQ tentang penyimpanan hulu dan hilir.

    flushIntervalMs

    Interval di mana operasi flush dilakukan di buffer penulis dalam MaxCompute Tunnel.

    LONG

    Tidak

    30000 (30 detik)

    Data dibuffer dan di-flush secara batch pada interval yang ditentukan oleh flushIntervalMs.

    • Streaming Tunnel: Data yang di-flush langsung tersedia di tabel MaxCompute tujuan.

    • Batch Tunnel: Data yang di-flush hanya tersedia setelah operasi checkpointing selesai. Kami menyarankan Anda mengatur opsi ini ke 0 untuk menonaktifkan fitur flush terjadwal.

    Satuan: milidetik.

    Catatan

    Opsi ini dapat digunakan bersama dengan opsi batchSize. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsi batchSize atau opsi flushIntervalMs terpenuhi.

    batchSize

    Ukuran buffer MaxCompute Tunnel.

    LONG

    Tidak

    67108864 (64 MB)

    MaxCompute sink menyisipkan data ke dalam buffer. Kemudian, MaxCompute sink menulis data dalam buffer ke tabel MaxCompute tujuan ketika ukuran data buffer melebihi nilai yang ditentukan oleh opsi batchSize.

    Satuan: byte.

    Catatan

    Opsi ini dapat digunakan bersama dengan opsi flushIntervalMs. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsi batchSize atau flushIntervalMs terpenuhi.

    numFlushThreads

    Jumlah thread yang digunakan untuk flush data dalam buffer penulis di MaxCompute Tunnel.

    INTEGER

    Tidak

    1

    Setiap MaxCompute sink membuat jumlah thread yang ditentukan oleh opsi numFlushThreads untuk flush data. Jika nilai opsi ini lebih besar dari 1, data dalam partisi yang berbeda dapat di-flush secara bersamaan. Ini meningkatkan efisiensi operasi flush.

    slotNum

    Jumlah slot Tunnel yang digunakan oleh MaxCompute untuk menerima data dari Flink.

    INTEGER

    Tidak

    0

    Untuk informasi tentang batasan jumlah slot, lihat Ikhtisar layanan transmisi data dalam dokumentasi MaxCompute.

    dynamicPartitionLimit

    Jumlah maksimum partisi dinamis ke mana data dapat ditulis.

    INTEGER

    Tidak

    100

    Jika jumlah partisi dinamis ke mana data ditulis dari sink antara dua checkpoint melebihi nilai opsi dynamicPartitionLimit, pesan kesalahan ini muncul: "Terlalu banyak partisi dinamis".

    Penting

    Jika data ditulis ke sejumlah besar partisi tabel MaxCompute, beban kerja pada layanan MaxCompute tinggi, memperlambat checkpointing dan flushing. Untuk mencegah masalah ini, Anda perlu memeriksa apakah data perlu ditulis ke sejumlah besar partisi. Jika bisnis Anda memerlukan data untuk ditulis ke sejumlah besar partisi, secara manual tingkatkan nilai opsi dynamicPartitionLimit.

    retryTimes

    Jumlah maksimum percobaan ulang yang dapat dilakukan untuk permintaan di server MaxCompute.

    INTEGER

    Tidak

    3

    Layanan MaxCompute mungkin tidak tersedia untuk sementara waktu saat Anda membuat sesi, mengirimkan sesi, atau data di-flush. Jika layanan MaxCompute menjadi tidak tersedia, server MaxCompute diminta berdasarkan konfigurasi opsi ini.

    sleepMillis

    Interval percobaan ulang.

    INTEGER

    Tidak

    1000

    Satuan: milidetik.

    enableUpsert

    Menentukan apakah akan menggunakan MaxCompute Upsert Tunnel untuk mengunggah data.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: MaxCompute Upsert Tunnel digunakan untuk memproses data INSERT, UPDATE_AFTER, dan DELETE di Realtime Compute for Apache Flink.

    • false: MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel yang ditentukan oleh opsi useStreamTunnel digunakan untuk memproses data INSERT dan UPDATE_AFTER di Realtime Compute for Apache Flink.

    Penting
    • Jika masalah seperti kesalahan, kegagalan penyebaran, atau gangguan pemrosesan jangka panjang terjadi saat MaxCompute sink melakukan commit sesi dalam mode upsert, kami sarankan Anda mengatur opsi Paralelisme operator sink ke nilai yang kurang dari atau sama dengan 10.

    • Hanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.

    upsertAsyncCommit

    Menentukan apakah akan menggunakan mode asinkron saat MaxCompute sink melakukan commit sesi dalam mode upsert.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: Mode asinkron digunakan. Jika Anda menggunakan mode asinkron, waktu yang dikonsumsi untuk melakukan commit sesi berkurang tetapi data yang ditulis setelah sesi di-commit tidak dapat segera di-query.

    • false: Mode sinkron digunakan secara default. Saat MaxCompute sink melakukan commit sesi, sistem menunggu hingga server memproses sesi.

    Catatan

    Hanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.

    upsertCommitTimeoutMs

    Batas waktu untuk MaxCompute sink melakukan commit sesi dalam mode upsert.

    INTEGER

    Tidak

    120000

    (120 detik)

    Unit: milidetik.

    Catatan

    Hanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.

    sink.operation

    Mode operasi tulis untuk tabel Delta.

    STRING

    Tidak

    insert

    Nilai valid:

    • insert: Data ditulis ke tabel dalam mode tambahan.

    • upsert: Data diperbarui.

    Catatan

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.parallelism

    Tingkat paralelisme saat data ditulis ke tabel Delta.

    INTEGER

    Tidak

    Tidak ada

    • Tingkat paralelisme penulisan data. Jika Anda tidak mengonfigurasi opsi ini, paralelisme data upstream digunakan secara default.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    Penting

    Pastikan nilai opsi write.bucket.num adalah kelipatan integral dari nilai opsi sink.parallelism. Ini membantu memastikan kinerja tulis optimal dan secara efisien menghemat memori node sink.

    sink.file-cached.enable

    Menentukan apakah akan mengaktifkan mode cache file saat data ditulis ke partisi dinamis tabel Delta.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: Mode cache file diaktifkan.

    • false: Mode cache file dinonaktifkan.

    Jika Anda mengaktifkan mode cache file, jumlah file kecil yang ditulis ke server berkurang. Namun, latensi penulisan yang lebih tinggi ada. Kami sarankan Anda mengaktifkan mode cache file jika tabel sink memiliki tingkat paralelisme tinggi.

    Catatan

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.file-cached.writer.num

    Jumlah thread yang digunakan untuk mengunggah data secara bersamaan dalam tugas dalam mode cache file.

    INTEGER

    Tidak

    16

    • Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.

    • Kami sarankan Anda tidak menaikkan nilai opsi ini ke nilai besar. Jika data ditulis ke sejumlah besar partisi secara bersamaan, kesalahan memori habis (OOM) mungkin terjadi.

      Catatan

      Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.bucket.check-interval

    Interval di mana ukuran file diperiksa dalam mode cache file. Satuan: milidetik.

    INTEGER

    Tidak

    60000

    • Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.file-cached.rolling.max-size

    Nilai maksimum file cache tunggal dalam mode cache file.

    MEMORYSIZE

    Tidak

    16 MB

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Jika ukuran file melebihi nilai dari opsi ini, data file akan diunggah ke server.

      Catatan

      Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.file-cached.memory

    Ukuran maksimum memori di luar heap yang digunakan untuk menulis data ke file dalam mode cache file.

    MEMORYSIZE

    Tidak

    64 MB

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable disetel ke true.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.file-cached.memory.segment-size

    Ukuran buffer yang digunakan untuk menulis data ke file dalam mode cache file.

    UKURANMEMORI

    Tidak

    128 KB

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.file-cached.flush.always

    Menentukan apakah cache digunakan untuk menulis data ke file dalam mode cache.

    BOOLEAN

    Tidak

    true

    • Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    sink.file-cached.write.max-retries

    Jumlah percobaan ulang maksimum untuk mengunggah data dalam mode cache file.

    INTEGER

    Tidak

    3

    • Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.writer.max-retries

    Jumlah percobaan ulang maksimum untuk menulis data ke bucket dalam sesi Upsert Writer.

    INTEGER

    Tidak

    3

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.writer.buffer-size

    Ukuran buffer sesi Upsert Writer di Realtime Compute for Apache Flink.

    MEMORYSIZE

    Tidak

    64 MB

    • Saat ukuran buffer total semua bucket mencapai ambang batas tertentu, sistem secara otomatis memperbarui data ke server.

    Catatan

    Data dalam sesi Upsert Writer dapat ditulis ke beberapa bucket secara bersamaan. Kami sarankan Anda meningkatkan nilai opsi ini untuk meningkatkan efisiensi penulisan.

    Jika data ditulis ke sejumlah besar partisi, kesalahan OOM mungkin terjadi. Untuk mencegah masalah ini, Anda dapat menurunkan nilai opsi ini.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.writer.bucket.buffer-size

    Ukuran buffer bucket tunggal di Realtime Compute for Apache Flink.

    MEMORYSIZE

    Tidak

    1 MB

    • Jika sumber daya memori server Flink tidak cukup, Anda dapat menurunkan nilai opsi ini.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.write.bucket.num

    Jumlah bucket untuk tabel tempat data ditulis.

    INTEGER

    Ya

    Tidak ada

    • Nilai opsi ini harus sama dengan nilai opsi write.bucket.num yang dikonfigurasikan untuk tabel Delta tempat data ditulis.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.write.slot-num

    Jumlah slot Tunnel yang digunakan dalam sesi.

    INTEGER

    Tidak

    1

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.commit.max-retries

    Jumlah percobaan ulang maksimum untuk commit sesi upsert.

    INTEGER

    Tidak

    3

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.commit.thread-num

    Tingkat paralelisme commit sesi upsert.

    INTEGER

    Tidak

    16

    • Kami sarankan Anda tidak menaikkan nilai opsi ini ke nilai besar. Jika terlalu banyak commit sesi upsert dilakukan secara bersamaan, konsumsi sumber daya meningkat. Ini dapat menyebabkan masalah kinerja atau konsumsi sumber daya berlebihan.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.commit.timeout

    Periode timeout untuk commit sesi upsert. Unit: detik.

    BILANGAN BULAT

    Tidak

    600

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.flush.concurrent

    Jumlah maksimum bucket ke mana data dalam partisi dapat ditulis secara bersamaan.

    BILANGAN BULAT

    Tidak

    2

    • Setiap kali data dalam bucket diperbarui, satu slot Tunnel akan digunakan.

    • Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    insert.commit.thread-num

    Tingkat paralelisme sesi commit.

    INTEGER

    Tidak

    16

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    insert.arrow-writer.enable

    Menentukan apakah akan menggunakan format Arrow.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: Format Arrow digunakan.

    • false: Format Arrow tidak digunakan.

    Catatan

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    insert.arrow-writer.batch-size

    Jumlah maksimum baris dalam batch data berformat Arrow.

    INTEGER

    Tidak

    512

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    insert.arrow-writer.flush-interval

    Interval di mana penulis melakukan flush data. Satuan: milidetik.

    INTEGER

    Tidak

    100000

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    insert.writer.buffer-size

    Ukuran cache untuk penulis yang dibuffer.

    MEMORYSIZE

    Tidak

    64 MB

    Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.

    upsert.partial-column.enable

    Menentukan apakah hanya memperbarui data di kolom tertentu.

    BOOLEAN

    Tidak

    false

    Opsi ini hanya berlaku untuk tabel sink yang menulis data ke tabel MaxCompute Delta. Untuk informasi lebih lanjut, lihat Memperbarui data di kolom tertentu dalam dokumentasi MaxCompute.

    Nilai valid:

    • true

    • false

    Perilaku pembaruan data bergantung pada apakah sink memiliki catatan dengan kunci utama yang sama dengan data baru.

    • Jika tabel sink berisi data dengan kunci utama yang sama, bidang yang sesuai diperbarui berdasarkan kunci utama. Secara spesifik, bidang yang ditentukan ditimpa dengan nilai baru jika nilainya bukan null.

    • Jika tabel sink tidak berisi catatan dengan kunci utama yang sama, catatan baru akan ditambahkan. Nilai baru akan dimasukkan untuk kolom yang ditentukan sementara null akan dimasukkan untuk semua kolom lainnya.

    Catatan

    Hanya VVR 8.0.11 atau yang lebih baru yang mendukung opsi ini.

  • Spesifik Tabel Dimensi

    Saat penyebaran dimulai, tabel dimensi menarik data penuh dari partisi yang ditentukan oleh opsi partisi. Opsi ini mendukung fungsi max_pt(). Jika cache dimuat ulang setelah entri cache kedaluwarsa, data partisi terbaru yang ditentukan oleh opsi partisi dianalisis ulang. Jika opsi partisi diatur ke max_two_pt(), tabel dimensi dapat menarik data dari dua partisi. Jika opsi partisi tidak diatur ke max_two_pt(), data hanya dari satu partisi dapat ditarik.

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    cache

    Kebijakan cache.

    STRING

    Ya

    Tidak ada nilai default

    Anda harus mengatur opsi cache ke ALL untuk tabel dimensi dan secara eksplisit mendeklarasikan pengaturan dalam pernyataan DDL. Jika jumlah data dalam tabel remote kecil dan sejumlah besar kunci yang hilang ada, kami sarankan Anda mengatur opsi ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat diasosiasikan berdasarkan klausa ON.

    ALL: menunjukkan bahwa semua data dalam tabel dimensi disimpan dalam cache. Sebelum sistem menjalankan penyebaran, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua query berikutnya dalam tabel dimensi. Jika tidak ada kunci yang ada, sistem tidak dapat menemukan catatan data dalam cache. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.

    Catatan
    • Jika opsi cache diatur ke ALL, Anda harus meningkatkan memori node join karena sistem secara asinkron memuat data tabel dimensi. Kami sarankan Anda meningkatkan ukuran memori setidaknya empat kali jumlah data dalam tabel remote. Ukuran memori terkait dengan algoritma kompresi penyimpanan MaxCompute.

    • Jika tabel dimensi berisi sejumlah besar data, Anda dapat menggunakan petunjuk SHUFFLE_HASH untuk mendistribusikan data secara merata ke setiap sub-tugas. Untuk informasi lebih lanjut, lihat bagian "Bagaimana cara menggunakan petunjuk SHUFFLE_HASH untuk tabel dimensi?" dari topik FAQ tentang penyimpanan hulu dan hilir.

    • Jika Anda menggunakan tabel dimensi ultra-besar, pengumpulan sampah (GC) Java virtual machine (JVM) yang sering dapat menyebabkan pengecualian penyebaran. Untuk menyelesaikan masalah ini, Anda dapat meningkatkan memori node tempat tabel dimensi digabungkan dengan tabel lain. Jika masalah tetap ada, kami sarankan Anda mengubah tabel dimensi menjadi tabel dimensi key-value yang mendukung kebijakan cache least recently used (LRU). Sebagai contoh, Anda dapat menggunakan tabel dimensi ApsaraDB for HBase sebagai tabel dimensi key-value.

    cacheSize

    Jumlah maksimum baris data yang dapat disimpan dalam cache.

    LONG

    Tidak

    100000

    Jika jumlah catatan data dalam tabel dimensi melebihi nilai opsi cacheSize, pesan kesalahan ini muncul: "Jumlah baris tabel <table-name> partisi <partition-name> melebihi batas maxRowCount" .

    Penting

    Jika sejumlah besar catatan data ada dalam tabel dimensi, sejumlah besar memori heap JVM dikonsumsi. Dalam hal ini, kecepatan startup penyebaran dan kecepatan pembaruan tabel dimensi melambat. Untuk mencegah masalah ini, Anda perlu memeriksa apakah sejumlah besar catatan data perlu disimpan dalam cache. Jika bisnis Anda memerlukan sejumlah besar catatan data disimpan dalam cache tabel dimensi, secara manual tingkatkan nilai opsi ini.

    cacheTTLMs

    Batas waktu cache.

    LONG

    Tidak

    Long.MAX_VALUE

    Satuan: milidetik.

    cacheReloadTimeBlackList

    Periode waktu selama cache tidak diperbarui. Cache tidak diperbarui selama periode waktu yang ditentukan oleh opsi ini.

    STRING

    Tidak

    Tidak ada nilai default

    Opsi ini berlaku untuk acara promosi online skala besar seperti jam sibuk aktivitas. Anda dapat menentukan opsi ini untuk mencegah penyebaran menjadi tidak stabil saat cache diperbarui. Untuk informasi lebih lanjut tentang cara menentukan opsi, lihat bagian "Bagaimana cara mengonfigurasi opsi CacheReloadTimeBlackList?" dari topik FAQ tentang penyimpanan hulu dan hilir.

    maxLoadRetries

    Jumlah maksimum percobaan ulang yang dapat dilakukan untuk memperbarui cache. Saat data pertama kali ditarik ketika penyebaran dimulai, cache diperbarui. Jika jumlah percobaan ulang melebihi nilai opsi ini, penyebaran gagal dijalankan.

    INTEGER

    Tidak

    10

    Tidak tersedia.

Pemetaan tipe data

Untuk informasi lebih lanjut tentang tipe data yang didukung oleh MaxCompute, lihat Edisi Tipe Data MaxCompute V2.0.

Tipe data MaxCompute

Tipe data Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(presisi, skala)

DECIMAL(presisi, skala)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

TIMESTAMP_NTZ

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Penting

Jika tabel fisik MaxCompute berisi bidang tipe data komposit bersarang (ARRAY, MAP, atau STRUCT, dll) dan bidang tipe JSON, Anda harus menentukan tblproperties('columnar.nested.type'='true') saat membuat tabel fisik MaxCompute untuk memungkinkan Realtime Compute for Apache Flink membaca data dari dan menulis data ke tabel fisik dengan benar.

Ingesti data melalui YAML

MaxCompute connector dapat digunakan sebagai sink ingest data dalam pekerjaan berbasis YAML.

Persyaratan mesin VVR

VVR 11.1 atau lebih baru

Sintaksis

source:
  type: xxx

sink:
   type: maxcompute
   name: MaxComputeSinkaccess-id: ${your_accessId}
   access-key: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}buckets-num: 8

Opsi konfigurasi

Opsi

Diperlukan?

Nilai default

Tipe data

Deskripsi

type

Ya

Tidak ada nilai default.

String

Konektor yang akan digunakan. Atur ke maxcompute.

name

Tidak

Tidak ada nilai default.

String

Nama sink.

access-id

Ya

Tidak ada nilai default.

String

ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Dapatkan di Konsol Manajemen Akses Sumber Daya.

access-key

Ya

Tidak ada nilai default.

String

Rahasia AccessKey Anda.

endpoint

Ya

Tidak ada nilai default.

String

Titik akhir MaxCompute Anda. Konfigurasikan berdasarkan wilayah tempat proyek MaxCompute Anda berada dan metode koneksi jaringan. Untuk informasi lebih lanjut, lihat Titik akhir.

project

Ya

Tidak ada nilai default.

String

Nama proyek MaxCompute Anda. Lakukan hal berikut untuk mendapatkannya:

  1. Masuk ke Konsol MaxCompute.

  2. Di panel navigasi sisi kiri, pilih Workspace>Projects.

  3. Temukan proyek MaxCompute Anda dan salin namanya.

tunnel.endpoint

Tidak

Tidak ada nilai default.

String

Titik akhir Tunnel MaxCompute. Titik akhir ini biasanya disimpulkan secara otomatis oleh MaxCompute dari pengaturan endpoint. Namun, Anda harus mendefinisikannya secara eksplisit di lingkungan jaringan khusus, seperti dengan server proxy.

quota.name

Tidak

Tidak ada nilai default.

String

Nama kuota dari grup sumber daya eksklusif. Jika Anda tidak secara eksplisit menentukan opsi ini, grup sumber daya bersama akan digunakan.

sts-token

Tidak

Tidak ada nilai default.

String

Token STS dari Peran RAM Anda. Opsi ini diperlukan untuk otentikasi identitas jika Anda menggunakan Peran RAM untuk mengakses MaxCompute.

buckets-num

Tidak

16

Integer

Jumlah bucket untuk tabel Delta MaxCompute yang dibuat secara otomatis. Untuk informasi lebih lanjut, lihat Gudang data hampir real-time.

compress.algorithm

Tidak

zlib

String

Algoritma kompresi data. Nilai valid:

  • raw: Data tidak dikompresi.

  • zlib

  • snappy

total.buffer-size

Tidak

64 MB

String

Ukuran buffer dalam memori. Untuk tabel yang dipartisi, buffer ini berlaku pada tingkat partisi. Untuk tabel non-partisi, berlaku pada tingkat tabel. Buffer untuk partisi atau tabel yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya di-flush ke MaxCompute.

bucket.buffer-size

Tidak

4 MB

String

Ukuran buffer dalam memori untuk bucket. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute. Buffer untuk bucket yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya di-flush ke MaxCompute.

commit.thread-num

Tidak

16

Integer

Jumlah maksimum partisi atau tabel yang dapat dicommit secara bersamaan selama checkpointing.

flush.concurrent-num

Tidak

4

Integer

Menentukan jumlah maksimum bucket ke mana Flink dapat secara bersamaan flush data. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute.

Pemetaan lokasi tabel

Saat konektor memicu pembuatan tabel otomatis di MaxCompute, lokasi dipetakan sebagai berikut:

Penting

Jika fitur skema dinonaktifkan untuk proyek MaxCompute Anda, konektor akan mengabaikan tableId.namespace. Dalam hal ini, hanya satu database atau setara logisnya yang dimasukkan ke dalam MaxCompute. Sebagai contoh, ketika data di-ingest dari MySQL ke MaxCompute, hanya satu database MySQL yang di-inject.

Lokasi MySQL

Abstraksi di Flink CDC

Lokasi MaxCompute

Tidak tersedia

Proyek dalam file konfigurasi

Proyek

Database

TableId.namespace

Skema

Catatan

Jika skema dinonaktifkan untuk proyek MaxCompute Anda, pengaturan ini diabaikan.

Tabel

TableId.tableName

Tabel

Pemetaan tipe data

Tipe Flink CDC

Tipe MaxCompute

CHAR

STRING

VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT

Contoh

API SQL

Tabel sumber

Membaca semua data dalam partisi tertentu

Baca semua data dalam partisi yang ditentukan oleh opsi partition.

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
Membaca data inkremental

Baca data mulai dari partisi yang ditentukan oleh startPartition dan terus memantau catatan baru.

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Mulai membaca dari partisi 20180905.
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

Tabel sink

Menulis ke partisi statis

Tulis ke partisi yang ditentukan oleh opsi partition:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- Data ditulis ke partisi 20180905.
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
Menulis ke partisi secara dinamis

Tulis data ke partisi secara dinamis berdasarkan opsi partition:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR -- Secara eksplisit tentukan kolom partisi dinamis.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' -- Jangan tentukan nilai untuk ds. Data ditulis ke partisi yang berbeda berdasarkan nilai bidang ds.
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

Tabel dimensi

Kunci nilai tunggal

Tentukan kunci utama saat setiap kunci memiliki nilai unik:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- Tentukan kunci utama.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
Kunci multi-nilai

Tidak perlu menentukan kunci utama saat sebuah kunci dapat memiliki beberapa nilai:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- Menentukan kunci utama tidak diperlukan.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

API DataStream

Penting
  • Jika Anda ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream tipe terkait. Untuk informasi lebih lanjut tentang cara mengonfigurasi konektor DataStream, lihat Pengaturan Konektor DataStream.

  • Untuk melindungi kekayaan intelektual, VVR 6.0.6 atau lebih baru mendukung debugging lokal program DataStream yang mencakup konektor MaxCompute hingga 30 menit. Setiap sesi debugging yang lebih lama akan mengakibatkan program dihentikan dengan kesalahan. Untuk informasi lebih lanjut, lihat Debug Lokal Program Flink yang Mencakup Konektor.

  • Pembacaan data dari tabel Delta MaxCompute tidak didukung. Tabel Delta adalah tabel yang dibuat dengan primary key yang ditentukan dan properti transactional=true. Untuk informasi lebih lanjut, lihat Konsep Dasar.

Kami sarankan Anda mendeklarasikan tabel MaxCompute menggunakan pernyataan SQL saat menggunakan konektor DataStream MaxCompute. Anda dapat memanggil operasi API Table untuk mengakses tabel MaxCompute atau memanggil operasi API DataStream untuk mengakses aliran data.

Hubungkan ke tabel sumber

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

Menyambungkan ke sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

Dependensi Maven dari konektor MaxCompute berisi kelas-kelas yang diperlukan untuk membuat sumber penuh, sumber tambahan, sink, dan tabel dimensi. MaxCompute DataStream Connectors dari berbagai versi disimpan di repositori pusat Maven.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

Referensi