All Products
Search
Document Center

Realtime Compute for Apache Flink:Konektor MaxCompute

Last Updated:Mar 11, 2026

Topik ini menjelaskan cara menggunakan konektor MaxCompute.

Latar Belakang

MaxCompute (sebelumnya dikenal sebagai ODPS) adalah platform komputasi yang cepat dan sepenuhnya terkelola untuk gudang data berskala besar. MaxCompute mampu memproses data dalam skala exabyte serta menyediakan solusi penyimpanan dan komputasi untuk data terstruktur dalam jumlah besar di gudang data, lengkap dengan layanan analitik dan pemodelan.

Tabel berikut menjelaskan kemampuan yang didukung oleh konektor MaxCompute.

Item

Deskripsi

Jenis yang didukung

Tabel sumber, tabel dimensi, dan tabel sink

Mode operasi

Mode streaming dan mode batch

Format data

N/A

Metrik

Metrik

  • Source

    numRecordsIn

    numRecordsInPerSecond

    numBytesIn

    numBytesInPerSecond

  • Sink

    numRecordsOut

    numRecordsOutPerSecond

    numBytesOut

    numBytesOutPerSecond

  • Tabel dimensi

    dim.odps.cacheSize

Catatan

Untuk informasi selengkapnya, lihat Metrik pemantauan.

Jenis API

DataStream API dan SQL API

Pembaruan atau penghapusan data pada tabel sink

Jika MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel digunakan, data hanya dapat dimasukkan ke tabel sink. Jika MaxCompute Upsert Tunnel digunakan, data pada tabel sink dapat diperbarui atau dihapus, serta data baru dapat dimasukkan ke tabel sink.

Prasyarat

Tabel MaxCompute telah dibuat. Untuk informasi selengkapnya tentang cara membuat tabel MaxCompute, lihat Buat tabel.

Batasan

  • Konektor MaxCompute hanya mendukung semantik at-least-once.

    Catatan

    Semantik at-least-once digunakan untuk mencegah kehilangan data. Dalam kondisi tertentu, data duplikat mungkin ditulis ke MaxCompute, tergantung pada tunnel yang digunakan. Untuk informasi lebih lanjut mengenai MaxCompute Tunnel, lihat bagian "Bagaimana cara memilih tunnel data?" dalam topik FAQ tentang penyimpanan hulu dan hilir.

  • Secara default, sumber beroperasi dalam mode penuh dan hanya membaca data dari partisi yang ditentukan oleh opsi partition. Setelah semua data dari partisi tersebut selesai dibaca, pekerjaan berhenti dan tidak memantau partisi baru.

    Untuk memantau partisi baru secara berkelanjutan, buat sumber inkremental dengan menentukan opsi startPartition dalam klausa WITH.

    Catatan
    • Setiap kali tabel dimensi diperbarui, tabel tersebut memeriksa partisi terbaru.

    • Setelah tabel sumber mulai berjalan, data yang baru ditambahkan ke partisi tidak akan dibaca. Disarankan agar Anda menjalankan penerapan setelah partisi berisi data lengkap.

SQL

Konektor MaxCompute dapat digunakan sebagai tabel sumber, dimensi, atau sink dalam pekerjaan berbasis SQL.

Sintaksis

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

Opsi konektor

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    connector

    Jenis tabel.

    STRING

    Ya

    Tidak ada nilai default

    Atur nilainya ke odps.

    endpoint

    Titik akhir MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    Untuk informasi selengkapnya, lihat Endpoint.

    tunnelEndpoint

    Titik akhir MaxCompute Tunnel.

    STRING

    Tidak

    Tidak ada nilai default

    Untuk informasi selengkapnya, lihat Endpoint.

    Catatan

    Jika opsi ini tidak ditentukan, MaxCompute mengalokasikan koneksi tunnel berdasarkan layanan Server Load Balancer (SLB).

    project

    Nama proyek MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    N/A.

    schemaName

    Nama skema MaxCompute.

    STRING

    Tidak

    Tidak ada nilai default

    Opsi ini hanya wajib jika fitur skema MaxCompute diaktifkan. Dalam hal ini, Anda harus mengatur opsi ini ke nama skema tabel MaxCompute. Untuk informasi selengkapnya, lihat Operasi skema.

    Catatan

    Hanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.

    tableName

    Nama tabel MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    N/A.

    accessId

    ID AccessKey yang digunakan untuk mengakses MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat informasi ID AccessKey dan Rahasia AccessKey akun?

    Penting

    Untuk melindungi pasangan AccessKey Anda, kami menyarankan Anda mengonfigurasi ID AccessKey dengan menggunakan variabel. Untuk informasi selengkapnya, lihat Kelola variabel.

    accessKey

    Rahasia AccessKey yang digunakan untuk mengakses MaxCompute.

    STRING

    Ya

    Tidak ada nilai default

    partition

    Nama partisi dalam tabel MaxCompute.

    STRING

    Tidak

    Tidak ada nilai default

    Anda tidak perlu menentukan opsi ini untuk tabel MaxCompute non-partisi atau sumber inkremental.

    Catatan

    Untuk informasi selengkapnya tentang cara menentukan opsi partition untuk tabel MaxCompute berpartisi, lihat bagian "Bagaimana cara mengonfigurasi opsi partition saat membaca atau menulis data ke partisi?" dalam topik FAQ tentang penyimpanan hulu dan hilir.

    compressAlgorithm

    Algoritma kompresi yang digunakan oleh MaxCompute Tunnel.

    STRING

    Tidak

    SNAPPY

    Nilai yang valid:

    • RAW (tanpa kompresi)

    • ZLIB

    • SNAPPY

      Dibandingkan dengan ZLIB, SNAPPY dapat meningkatkan throughput secara signifikan. Dalam skenario pengujian, throughput meningkat sekitar 50%.

    quotaName

    Nama kuota untuk grup resource Tunnel eksklusif MaxCompute.

    STRING

    Tidak

    Tidak ada nilai default

    Anda dapat menentukan opsi ini untuk menggunakan grup resource Tunnel eksklusif MaxCompute.

    Penting
    • Hanya VVR 8.0.3 atau versi lebih baru yang mendukung opsi ini.

    • Jika Anda menentukan opsi ini, Anda harus menghapus opsi tunnelEndpoint. Jika tidak, tunnel yang ditentukan oleh opsi tunnelEndpoint akan digunakan.

  • Khusus sumber

    Opsi

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    maxPartitionCount

    Jumlah maksimum partisi dari mana data dapat dibaca.

    INTEGER

    Tidak

    100

    Jika jumlah partisi dari mana data dibaca melebihi nilai opsi ini, pesan kesalahan berikut muncul: "The number of matched partitions exceeds the default limit".

    Penting

    Membaca dari jumlah partisi yang berlebihan dapat membebani MaxCompute dan memperlambat startup pekerjaan. Jika diperlukan untuk kebutuhan bisnis Anda, sesuaikan nilai opsi ini secara manual.

    useArrow

    Menentukan apakah akan menggunakan format Arrow untuk membaca data.

    BOOLEAN

    Tidak

    false

    Format Arrow dapat digunakan untuk memanggil operasi API penyimpanan MaxCompute.

    Penting
    • Opsi ini hanya berlaku dalam penerapan batch.

    • Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.

    splitSize

    Ukuran data yang dapat ditarik sekaligus saat menggunakan format Arrow untuk membaca data.

    MEMORYSIZE

    Tidak

    256 MB

    Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.

    Penting

    Opsi ini hanya berlaku dalam penerapan batch.

    compressCodec

    Algoritma kompresi yang digunakan saat menggunakan format Arrow untuk membaca data.

    STRING

    Tidak

    ""

    Nilai yang valid:

    • "" (tanpa kompresi)

    • ZSTD

    • LZ4_FRAME

    Dibandingkan tanpa kompresi, throughput dapat ditingkatkan jika Anda menentukan algoritma kompresi.

    Penting
    • Opsi ini hanya berlaku dalam penerapan batch.

    • Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.

    dynamicLoadBalance

    Menentukan apakah akan mengaktifkan alokasi shard secara dinamis.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true

    • false

    Alokasi shard secara dinamis dapat meningkatkan kinerja pemrosesan operator yang berbeda pada Realtime Compute for Apache Flink dan mengurangi waktu total yang diperlukan untuk membaca dari MaxCompute. Namun, hal ini dapat menyebabkan kesenjangan data karena jumlah total data yang dibaca oleh operator yang berbeda tidak konsisten.

    Penting
    • Opsi ini hanya berlaku dalam penerapan batch.

    • Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.

  • Opsi khusus untuk tabel sumber MaxCompute inkremental

    Sumber tabel inkremental memantau partisi baru dengan melakukan polling berkala ke server MaxCompute untuk mendapatkan semua informasi partisi. Sebelum sumber mulai membaca data dari partisi baru, penulisan data ke partisi tersebut harus selesai. Untuk informasi selengkapnya, lihat bagian "Apa yang harus saya lakukan jika sumber tabel MaxCompute inkremental mendeteksi partisi baru saat data masih ditulis ke partisi tersebut?" dalam topik FAQ tentang penyimpanan hulu dan hilir. Anda dapat mengonfigurasi opsi startPartition untuk menentukan partisi awal dari mana data dibaca. Hanya data dalam partisi yang urutan abjadnya lebih besar dari atau sama dengan urutan abjad partisi yang ditentukan oleh opsi startPartition yang akan dibaca. Misalnya, urutan abjad partisi year=2023,month=10 lebih kecil daripada urutan abjad partisi year=2023,month=9. Dalam kasus ini, Anda dapat menambahkan nol di depan angka bulan dalam nama partisi yang dideklarasikan dalam kode untuk memastikan urutan abjad partisi valid. Dengan cara ini, Anda dapat mengubah nilai opsi partition dari year=2023,month=9 menjadi year=2023,month=09.

    Opsi

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    startPartition

    Partisi awal dari mana data inkremental dibaca.

    STRING

    Ya

    Tidak ada nilai default

    • Jika Anda menentukan opsi ini, sumber inkremental digunakan. Akibatnya, opsi partition diabaikan.

    • Jika tabel sumber adalah tabel berpartisi multi-level, Anda harus mengonfigurasi nilai setiap kolom partisi dalam urutan menurun berdasarkan level partisi.

    Catatan

    Untuk informasi selengkapnya tentang cara menentukan opsi startPartition, lihat bagian "Bagaimana cara mengonfigurasi opsi startPartition untuk tabel sumber MaxCompute inkremental?" dalam topik FAQ tentang penyimpanan hulu dan hilir.

    subscribeIntervalInSec

    Interval polling ke MaxCompute untuk mendapatkan informasi partisi.

    INTEGER

    Tidak

    30

    Unit: detik.

    modifiedTableOperation

    Operasi yang dilakukan ketika data dalam partisi dimodifikasi selama pembacaan partisi.

    Enum (NONE, SKIP)

    Tidak

    NONE

    Sesi unduhan disimpan dalam checkpoint. Setiap kali Anda melanjutkan sesi dari checkpoint, Realtime Compute for Apache Flink mencoba melanjutkan progres pembacaan dari sesi tersebut. Namun, sesi tersebut tidak tersedia karena data dalam partisi dimodifikasi. Dalam kasus ini, penerapan akan terus-menerus dimulai ulang. Untuk mengatasi masalah ini, Anda dapat menentukan opsi ini. Nilai yang valid:

    • NONE: Jika Anda mengatur opsi ini ke NONE, Anda harus mengubah nilai opsi startPartition agar urutan abjad partisi yang ditentukan oleh opsi startPartition lebih besar dari urutan abjad partisi yang tidak tersedia, lalu memulai penerapan tanpa state.

    • SKIP: Jika Anda tidak ingin memulai penerapan tanpa state, Anda dapat mengatur opsi ini ke SKIP. Dalam hal ini, Realtime Compute for Apache Flink melewati partisi yang tidak tersedia saat mencoba melanjutkan sesi dari checkpoint.

    Penting
    • Hanya VVR 8.0.3 atau versi lebih baru yang mendukung opsi ini.

    • Jika Anda mengatur opsi ini ke NONE atau SKIP, data yang telah dibaca dari partisi yang dimodifikasi dipertahankan, sedangkan data yang belum dibaca diabaikan.

  • Khusus sink

    Opsi

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    useStreamTunnel

    Menentukan apakah akan menggunakan MaxCompute Streaming Tunnel untuk mengunggah data.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true: MaxCompute Streaming Tunnel digunakan untuk mengunggah data.

    • false: MaxCompute Batch Tunnel digunakan untuk mengunggah data.

    Catatan

    Untuk informasi selengkapnya tentang cara memilih tunnel, lihat bagian "Bagaimana cara memilih tunnel data?" dalam topik FAQ tentang penyimpanan hulu dan hilir.

    flushIntervalMs

    Interval flush dalam buffer writer di MaxCompute Tunnel.

    LONG

    Tidak

    30000 (30 detik)

    Data dibuffer dan diflush secara batch sesuai interval yang ditentukan oleh flushIntervalMs.

    • Streaming Tunnel: Data yang diflush langsung tersedia di tabel MaxCompute tujuan.

    • Batch Tunnel: Data yang diflush hanya tersedia setelah operasi checkpointing selesai. Kami menyarankan Anda mengatur opsi ini ke 0 untuk menonaktifkan fitur flushing terjadwal.

    Unit: milidetik.

    Catatan

    Opsi ini dapat digunakan bersama opsi batchSize. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsi batchSize atau opsi flushIntervalMs terpenuhi.

    batchSize

    Ukuran buffer MaxCompute Tunnel.

    LONG

    Tidak

    67108864 (64 MB)

    Sink MaxCompute memasukkan data ke buffer. Kemudian, sink MaxCompute menulis data dalam buffer ke tabel MaxCompute tujuan ketika ukuran data buffer melebihi nilai yang ditentukan oleh opsi batchSize.

    Unit: byte.

    Catatan

    Opsi ini dapat digunakan bersama opsi flushIntervalMs. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsi batchSize atau opsi flushIntervalMs terpenuhi.

    numFlushThreads

    Jumlah thread yang digunakan untuk flush data dalam buffer writer di MaxCompute Tunnel.

    INTEGER

    Tidak

    1

    Setiap sink MaxCompute membuat jumlah thread sesuai nilai opsi numFlushThreads untuk flush data. Jika nilai opsi ini lebih dari 1, data dalam partisi berbeda dapat diflush secara bersamaan. Hal ini meningkatkan efisiensi operasi flush.

    slotNum

    Jumlah slot Tunnel yang digunakan oleh MaxCompute untuk menerima data dari Flink.

    INTEGER

    Tidak

    0

    Untuk informasi tentang batasan jumlah slot, lihat Ikhtisar layanan transmisi data dalam dokumentasi MaxCompute.

    dynamicPartitionLimit

    Jumlah maksimum partisi dinamis tempat data dapat ditulis.

    INTEGER

    Tidak

    100

    Jika jumlah partisi dinamis tempat data ditulis dari sink antara dua checkpoint melebihi nilai opsi dynamicPartitionLimit, pesan kesalahan berikut muncul: "Too many dynamic partitions".

    Penting

    Jika data ditulis ke sejumlah besar partisi tabel MaxCompute, beban kerja pada layanan MaxCompute tinggi, sehingga memperlambat checkpointing dan flushing. Untuk mencegah masalah ini, Anda perlu memeriksa apakah data perlu ditulis ke banyak partisi. Jika bisnis Anda memerlukan penulisan data ke banyak partisi, tingkatkan nilai opsi dynamicPartitionLimit secara manual.

    retryTimes

    Jumlah maksimum retry permintaan ke server MaxCompute.

    INTEGER

    Tidak

    3

    Layanan MaxCompute mungkin tidak tersedia untuk periode singkat saat Anda membuat sesi, mengirim sesi, atau data diflush. Jika layanan MaxCompute tidak tersedia, server MaxCompute diminta berdasarkan konfigurasi opsi ini.

    sleepMillis

    Interval retry.

    INTEGER

    Tidak

    1000

    Satuan: milidetik.

    enableUpsert

    Menentukan apakah akan menggunakan MaxCompute Upsert Tunnel untuk mengunggah data.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true: MaxCompute Upsert Tunnel digunakan untuk memproses data INSERT, UPDATE_AFTER, dan DELETE di Realtime Compute for Apache Flink.

    • false: MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel yang ditentukan oleh opsi useStreamTunnel digunakan untuk memproses data INSERT dan UPDATE_AFTER di Realtime Compute for Apache Flink.

    Penting
    • Jika terjadi masalah seperti error, kegagalan penerapan, atau gangguan pemrosesan jangka panjang saat sink MaxCompute melakukan commit sesi dalam mode upsert, kami menyarankan Anda mengatur opsi Parallelism operator sink ke nilai kurang dari atau sama dengan 10.

    • Hanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.

    upsertAsyncCommit

    Menentukan apakah akan menggunakan mode asinkron saat sink MaxCompute melakukan commit sesi dalam mode upsert.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true: Mode asinkron digunakan. Jika Anda menggunakan mode asinkron, waktu yang dibutuhkan untuk commit sesi berkurang, tetapi data yang ditulis setelah sesi dikomit tidak dapat langsung diquery.

    • false: Mode sinkron digunakan secara default. Saat sink MaxCompute melakukan commit sesi, sistem menunggu hingga server memproses sesi tersebut.

    Catatan

    Hanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.

    upsertCommitTimeoutMs

    Waktu timeout untuk commit sesi oleh sink MaxCompute dalam mode upsert.

    INTEGER

    Tidak

    120000

    (120 detik)

    Unit: milidetik.

    Catatan

    Hanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.

    sink.operation

    Mode operasi penulisan untuk tabel Delta.

    STRING

    Tidak

    insert

    Nilai yang valid:

    • insert: Data ditulis ke tabel dalam mode append.

    • upsert: Data diperbarui.

    Catatan

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.parallelism

    Tingkat paralelisme saat menulis data ke tabel Delta.

    INTEGER

    Tidak

    None

    • Tingkat paralelisme penulisan data. Jika Anda tidak mengonfigurasi opsi ini, paralelisme data hulu digunakan secara default.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    Penting

    Pastikan nilai opsi write.bucket.num merupakan kelipatan bulat dari nilai opsi sink.parallelism. Hal ini membantu memastikan kinerja penulisan optimal dan menghemat memori node sink secara efisien.

    sink.file-cached.enable

    Menentukan apakah akan mengaktifkan mode cache file saat menulis data ke partisi dinamis tabel Delta.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true: Mode cache file diaktifkan.

    • false: Mode cache file dinonaktifkan.

    Jika Anda mengaktifkan mode cache file, jumlah file kecil yang ditulis ke server berkurang. Namun, latensi penulisan menjadi lebih tinggi. Kami menyarankan Anda mengaktifkan mode cache file saat tabel sink memiliki tingkat paralelisme tinggi.

    Catatan

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.file-cached.writer.num

    Jumlah thread yang digunakan untuk mengunggah data secara konkuren dalam satu task dalam mode cache file.

    INTEGER

    Tidak

    16

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Kami menyarankan Anda tidak menaikkan nilai opsi ini ke angka yang terlalu besar. Jika data ditulis ke banyak partisi secara bersamaan, error out of memory (OOM) dapat terjadi.

      Catatan

      Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.bucket.check-interval

    Interval pemeriksaan ukuran file dalam mode cache file. Unit: milidetik.

    INTEGER

    Tidak

    60000

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.file-cached.rolling.max-size

    Nilai maksimum file cache tunggal dalam mode cache file.

    MEMORYSIZE

    Tidak

    16 MB

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Jika ukuran file melebihi nilai opsi ini, data file diunggah ke server.

      Catatan

      Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.file-cached.memory

    Ukuran maksimum memori off-heap yang digunakan untuk menulis data ke file dalam mode cache file.

    MEMORYSIZE

    Tidak

    64 MB

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.file-cached.memory.segment-size

    Ukuran buffer yang digunakan untuk menulis data ke file dalam mode cache file.

    MEMORYSIZE

    Tidak

    128 KB

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.file-cached.flush.always

    Menentukan apakah cache digunakan untuk menulis data ke file dalam mode cache file.

    BOOLEAN

    Tidak

    true

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    sink.file-cached.write.max-retries

    Jumlah retry untuk mengunggah data dalam mode cache file.

    INTEGER

    Tidak

    3

    • Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.writer.max-retries

    Jumlah maksimum retry untuk menulis data ke bucket dalam sesi Upsert Writer.

    INTEGER

    Tidak

    3

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.writer.buffer-size

    Ukuran buffer sesi Upsert Writer di Realtime Compute for Apache Flink.

    MEMORYSIZE

    Tidak

    64 MB

    • Ketika ukuran buffer total semua bucket mencapai ambang batas yang ditentukan, sistem secara otomatis memperbarui data ke server.

    Catatan

    Data dalam sesi Upsert Writer dapat ditulis ke beberapa bucket secara bersamaan. Kami menyarankan Anda menaikkan nilai opsi ini untuk meningkatkan efisiensi penulisan.

    Jika data ditulis ke banyak partisi, error OOM dapat terjadi. Untuk mencegah hal ini, Anda dapat menurunkan nilai opsi ini.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.writer.bucket.buffer-size

    Ukuran buffer bucket tunggal di Realtime Compute for Apache Flink.

    MEMORYSIZE

    Tidak

    1 MB

    • Jika sumber daya memori server Flink tidak mencukupi, Anda dapat menurunkan nilai opsi ini.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.write.bucket.num

    Jumlah bucket untuk tabel tempat data ditulis.

    INTEGER

    Ya

    None

    • Nilai opsi ini harus sama dengan nilai opsi write.bucket.num yang dikonfigurasi untuk tabel Delta tempat data ditulis.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.write.slot-num

    Jumlah slot Tunnel yang digunakan dalam sesi.

    INTEGER

    Tidak

    1

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.commit.max-retries

    Jumlah maksimum retry untuk commit sesi upsert.

    INTEGER

    Tidak

    3

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.commit.thread-num

    Tingkat paralelisme commit sesi upsert.

    INTEGER

    Tidak

    16

    • Kami menyarankan Anda tidak menaikkan nilai opsi ini ke angka yang terlalu besar. Jika terlalu banyak commit sesi upsert dilakukan secara bersamaan, konsumsi sumber daya meningkat. Hal ini dapat menyebabkan masalah kinerja atau konsumsi sumber daya berlebihan.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.commit.timeout

    Waktu timeout untuk commit sesi upsert. Unit: detik.

    INTEGER

    Tidak

    600

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.flush.concurrent

    Jumlah maksimum bucket tempat data dalam partisi dapat ditulis secara bersamaan.

    INTEGER

    Tidak

    2

    • Setiap kali data dalam bucket direfresh, satu slot Tunnel digunakan.

    • Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    insert.commit.thread-num

    Tingkat paralelisme sesi commit.

    INTEGER

    Tidak

    16

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    insert.arrow-writer.enable

    Menentukan apakah akan menggunakan format Arrow.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true: Format Arrow digunakan.

    • false: Format Arrow tidak digunakan.

    Catatan

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    insert.arrow-writer.batch-size

    Jumlah maksimum baris dalam batch data berformat Arrow.

    INTEGER

    Tidak

    512

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    insert.arrow-writer.flush-interval

    Interval flush writer. Unit: milidetik.

    INTEGER

    Tidak

    100000

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    insert.writer.buffer-size

    Ukuran cache untuk buffered writer.

    MEMORYSIZE

    Tidak

    64 MB

    Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.

    upsert.partial-column.enable

    Menentukan apakah hanya akan memperbarui data di kolom tertentu.

    BOOLEAN

    Tidak

    false

    Opsi ini hanya berlaku untuk tabel sink yang menulis data ke Tabel Delta MaxCompute. Untuk informasi selengkapnya, lihat Perbarui data di kolom tertentu dalam dokumentasi MaxCompute.

    Nilai yang valid:

    • true

    • false

    Perilaku pembaruan data bergantung pada apakah sink memiliki catatan dengan primary key yang sama dengan data baru.

    • Jika tabel sink berisi data dengan primary key yang sama, bidang yang sesuai diperbarui berdasarkan primary key tersebut. Secara spesifik, bidang yang ditentukan ditimpa dengan nilai baru jika nilainya bukan null.

    • Jika tabel sink tidak berisi catatan dengan primary key yang sama, catatan baru akan ditambahkan. Nilai baru akan dimasukkan untuk kolom yang ditentukan, sedangkan null akan dimasukkan untuk semua kolom lainnya.

    Catatan

    Hanya VVR 8.0.11 atau versi lebih baru yang mendukung opsi ini.

  • Khusus tabel dimensi

    Saat penerapan dimulai, tabel dimensi menarik data lengkap dari partisi yang ditentukan oleh opsi partition. Opsi ini mendukung fungsi max_pt(). Jika cache dimuat ulang setelah entri cache kedaluwarsa, data dari partisi terbaru yang ditentukan oleh opsi partition akan diurai ulang. Jika opsi partition diatur ke max_two_pt(), tabel dimensi dapat menarik data dari dua partisi. Jika tidak, hanya data dari satu partisi yang dapat ditarik.

    Opsi

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    cache

    Kebijakan cache.

    STRING

    Ya

    Tidak ada nilai default

    Anda harus mengatur opsi cache ke ALL untuk tabel dimensi dan secara eksplisit mendeklarasikan pengaturan ini dalam pernyataan DDL. Jika jumlah data dalam tabel remote kecil dan terdapat banyak missing key, kami menyarankan Anda mengatur opsi ini ke ALL. Sumber dan tabel dimensi tidak dapat diasosiasikan berdasarkan klausa ON.

    ALL: menunjukkan bahwa semua data dalam tabel dimensi dicache. Sebelum sistem menjalankan penerapan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan demikian, cache digunakan untuk semua query selanjutnya pada tabel dimensi. Jika tidak ada key yang cocok, sistem tidak dapat menemukan catatan data dalam cache. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.

    Catatan
    • Jika opsi cache diatur ke ALL, Anda harus menambah memori node join karena sistem memuat data tabel dimensi secara asinkron. Kami menyarankan Anda menambah ukuran memori minimal empat kali lipat dari jumlah data dalam tabel remote. Ukuran memori terkait dengan algoritma kompresi penyimpanan MaxCompute.

    • Jika tabel dimensi berisi banyak data, Anda dapat menggunakan petunjuk SHUFFLE_HASH untuk mendistribusikan data secara merata ke setiap subtask. Untuk informasi selengkapnya, lihat bagian "Bagaimana cara menggunakan petunjuk SHUFFLE_HASH untuk tabel dimensi?" dalam topik FAQ tentang penyimpanan hulu dan hilir.

    • Jika Anda menggunakan tabel dimensi berukuran sangat besar, garbage collection (GC) JVM yang sering dapat menyebabkan exception penerapan. Untuk mengatasi masalah ini, Anda dapat menambah memori node tempat tabel dimensi di-join dengan tabel lain. Jika masalah tetap berlanjut, kami menyarankan Anda mengonversi tabel dimensi menjadi tabel dimensi key-value yang mendukung kebijakan cache least recently used (LRU). Misalnya, Anda dapat menggunakan tabel dimensi ApsaraDB for HBase sebagai tabel dimensi key-value.

    cacheSize

    Jumlah maksimum baris data yang dapat dicache.

    LONG

    Tidak

    100000

    Jika jumlah catatan data dalam tabel dimensi melebihi nilai opsi cacheSize, pesan kesalahan berikut muncul: "Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit" .

    Penting

    Jika tabel dimensi berisi banyak catatan data, sejumlah besar memori heap JVM dikonsumsi. Dalam kasus ini, kecepatan startup penerapan dan kecepatan pembaruan tabel dimensi melambat. Untuk mencegah masalah ini, Anda perlu memeriksa apakah banyak catatan data perlu dicache. Jika bisnis Anda memerlukan caching banyak catatan data dalam tabel dimensi, tingkatkan nilai opsi ini secara manual.

    cacheTTLMs

    Waktu timeout cache.

    LONG

    Tidak

    Long.MAX_VALUE

    Unit: milidetik.

    cacheReloadTimeBlackList

    Periode waktu saat cache tidak direfresh. Cache tidak direfresh selama periode waktu yang ditentukan oleh opsi ini.

    STRING

    Tidak

    Tidak ada nilai default

    Opsi ini berlaku untuk acara promosi online berskala besar seperti jam puncak aktivitas. Anda dapat menentukan opsi ini untuk mencegah penerapan menjadi tidak stabil saat cache direfresh. Untuk informasi selengkapnya tentang cara menentukan opsi ini, lihat bagian "Bagaimana cara mengonfigurasi opsi CacheReloadTimeBlackList?" dalam topik FAQ tentang penyimpanan hulu dan hilir.

    maxLoadRetries

    Jumlah maksimum retry untuk refresh cache. Saat data pertama kali ditarik saat penerapan dimulai, cache direfresh. Jika jumlah retry melebihi nilai opsi ini, penerapan gagal dijalankan.

    INTEGER

    Tidak

    10

    N/A.

Pemetaan tipe data

Untuk informasi selengkapnya tentang tipe data yang didukung oleh MaxCompute, lihat Sistem tipe data MaxCompute versi 2.0.

Tipe data MaxCompute

Tipe data Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

TIMESTAMP_NTZ

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Penting

Jika tabel fisik MaxCompute berisi field bertipe data komposit bersarang (ARRAY, MAP, atau STRUCT, dll.) dan field bertipe JSON, Anda harus menentukan tblproperties('columnar.nested.type'='true') saat membuat tabel fisik MaxCompute agar Realtime Compute for Apache Flink dapat membaca dan menulis data ke tabel tersebut dengan benar.

Flink CDC (pratinjau publik)

Konektor MaxCompute dapat digunakan sebagai sink ingesti data dalam pekerjaan berbasis YAML.

Persyaratan mesin VVR

VVR 11.1 atau versi lebih baru

Sintaksis

source:
  type: xxx

sink:
   type: maxcompute
   name: MaxComputeSinkaccess-id: ${your_accessId}
   access-key: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}buckets-num: 8

Opsi konfigurasi

Opsi

Wajib?

Nilai default

Tipe data

Deskripsi

type

Ya

Tidak ada nilai default.

String

Konektor yang digunakan. Atur ke maxcompute.

name

Tidak

Tidak ada nilai default.

String

Nama sink.

access-id

Ya

Tidak ada nilai default.

String

ID AccessKey Akun Alibaba Cloud atau Pengguna RAM Anda. Dapatkan di Konsol Resource Access Management.

access-key

Ya

Tidak ada nilai default.

String

Rahasia AccessKey Anda.

endpoint

Ya

Tidak ada nilai default.

String

Titik akhir MaxCompute Anda. Konfigurasikan berdasarkan wilayah tempat proyek MaxCompute Anda berada dan metode koneksi jaringan. Untuk informasi selengkapnya, lihat Endpoint.

project

Ya

Tidak ada nilai default.

String

Nama proyek MaxCompute Anda. Lakukan langkah-langkah berikut untuk mendapatkannya:

  1. Login ke Konsol MaxCompute.

  2. Di panel navigasi sebelah kiri, pilih Workspace>Projects.

  3. Temukan proyek MaxCompute Anda dan salin namanya.

tunnel.endpoint

Tidak

Tidak ada nilai default.

String

Titik akhir MaxCompute Tunnel. Titik akhir ini biasanya disimpulkan secara otomatis oleh MaxCompute dari pengaturan endpoint. Namun, Anda harus menentukannya secara eksplisit di lingkungan jaringan khusus, seperti dengan server proxy.

quota.name

Tidak

Tidak ada nilai default.

String

Nama kuota grup resource eksklusif. Jika Anda tidak menentukan opsi ini secara eksplisit, grup resource bersama digunakan.

sts-token

Tidak

Tidak ada nilai default.

String

Token STS Peran RAM Anda. Opsi ini wajib untuk otentikasi identitas jika Anda menggunakan Peran RAM untuk mengakses MaxCompute.

buckets-num

Tidak

16

Integer

Jumlah bucket untuk tabel Delta MaxCompute yang dibuat secara otomatis. Untuk informasi selengkapnya, lihat Gudang data near real-time.

compress.algorithm

Tidak

zlib

String

Algoritma kompresi data. Nilai yang valid:

  • raw: Data tidak dikompresi.

  • zlib

  • snappy

total.buffer-size

Tidak

64 MB

String

Ukuran buffer dalam memori. Untuk tabel berpartisi, buffer ini berlaku di tingkat partisi. Untuk tabel non-partisi, berlaku di tingkat tabel. Buffer untuk partisi atau tabel yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya diflush ke MaxCompute.

bucket.buffer-size

Tidak

4 MB

String

Ukuran buffer dalam memori untuk bucket. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute. Buffer untuk bucket yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya diflush ke MaxCompute.

commit.thread-num

Tidak

16

Integer

Jumlah maksimum partisi atau tabel yang dapat dikomit secara konkuren selama checkpointing.

flush.concurrent-num

Tidak

4

Integer

Menentukan jumlah maksimum bucket tempat Flink dapat secara konkuren melakukan flush data. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute.

Pemetaan lokasi tabel

Saat konektor memicu pembuatan tabel otomatis di MaxCompute, lokasi dipetakan sebagai berikut:

Penting

Jika fitur skema dinonaktifkan untuk proyek MaxCompute Anda, konektor akan mengabaikan tableId.namespace. Dalam hal ini, hanya satu database atau ekuivalen logisnya yang diingesti ke MaxCompute. Misalnya, saat data diingesti dari MySQL ke MaxCompute, hanya satu database MySQL yang dimasukkan.

Lokasi MySQL

Abstraksi di Flink CDC

Lokasi MaxCompute

N/A

Proyek dalam file konfigurasi

Proyek

Database

TableId.namespace

Skema

Catatan

Jika skema dinonaktifkan untuk proyek MaxCompute Anda, pengaturan ini diabaikan.

Tabel

TableId.tableName

Tabel

Pemetaan tipe data

Tipe Flink CDC

Tipe MaxCompute

CHAR

STRING

VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision>3)

TIMESTAMP

TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision<=3)

DATETIME

TIMESTAMP_WITH_TIME_ZONE(Precision>3)

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE(Precision<=3)

DATETIME

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT

Contoh

SQL API

Tabel sumber

Baca semua data dalam partisi tertentu

Baca semua data dalam partisi yang ditentukan oleh opsi partition.

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
Baca data inkremental

Baca data mulai dari partisi yang ditentukan oleh startPartition dan pantau catatan baru secara berkelanjutan.

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Mulai membaca dari partisi 20180905.
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

Tabel sink

Tulis ke partisi statis

Tulis ke partisi yang ditentukan oleh opsi partition:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- Data ditulis ke partisi 20180905.
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
Tulis ke partisi secara dinamis

Tulis data ke partisi secara dinamis berdasarkan opsi partition:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR -- Tentukan secara eksplisit kolom partisi dinamis.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' -- Jangan tentukan nilai untuk ds. Data ditulis ke partisi berbeda berdasarkan nilai field ds.
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

Tabel dimensi

Key bernilai tunggal

Tentukan primary key saat setiap key memiliki nilai unik:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- Tentukan primary key.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
Key bernilai ganda

Jangan tentukan primary key saat key dapat memiliki beberapa nilai:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- Menentukan primary key tidak diperlukan.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream API

Penting
  • Jika Anda ingin memanggil DataStream API untuk membaca atau menulis data, gunakan konektor DataStream jenis terkait. Untuk informasi selengkapnya tentang cara mengonfigurasi konektor DataStream, lihat Integrasikan konektor DataStream.

  • Untuk melindungi kekayaan intelektual, VVR 6.0.6 atau versi lebih baru mendukung debugging lokal program DataStream menggunakan konektor MaxCompute hingga 30 menit. Sesi debugging yang lebih lama akan mengakibatkan program dihentikan dengan error. Untuk informasi selengkapnya, lihat Debug konektor secara lokal.

  • Membaca data dari Tabel Delta MaxCompute tidak didukung. Tabel Delta adalah tabel yang dibuat dengan primary key tertentu dan properti transactional=true. Untuk informasi selengkapnya, lihat Istilah.

Disarankan untuk mendeklarasikan tabel MaxCompute menggunakan pernyataan SQL saat menggunakan konektor DataStream MaxCompute. Anda dapat memanggil operasi Table API untuk mengakses tabel MaxCompute atau memanggil operasi DataStream API untuk mengakses stream data.

Hubungkan ke tabel sumber

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

Hubungkan ke sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

Dependensi Maven konektor MaxCompute mencakup kelas yang diperlukan untuk membuat sumber lengkap, sumber inkremental, sink, dan tabel dimensi. Konektor DataStream MaxCompute versi berbeda tersedia di repositori pusat Maven.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

Referensi