MaxCompute menyediakan konektor Flink baru. Anda dapat menggunakannya untuk menulis data dari Flink ke tabel standar maupun Delta di MaxCompute. Topik ini menjelaskan kemampuan konektor Flink baru serta prosedur utama untuk menulis data ke MaxCompute.
Informasi latar belakang
Mode penulisan yang didukung
Konektor Flink baru mendukung penulisan data ke MaxCompute dalam mode upsert atau insert. Dalam mode upsert, data dapat ditulis dengan salah satu dari dua cara berikut:
Dikelompokkan berdasarkan kunci primer
Dikelompokkan berdasarkan bidang partisi
Jika tabel memiliki banyak partisi, Anda dapat mengelompokkan data berdasarkan bidang partisi. Perhatikan bahwa metode ini dapat menyebabkan kesenjangan data.
Untuk informasi lebih lanjut tentang proses penulisan data dan konfigurasi parameter yang direkomendasikan untuk konektor Flink dalam mode upsert, lihat Ingesti Data Real-time ke gudang data.
Saat mengonfigurasi pekerjaan Flink untuk menulis data ke MaxCompute, Anda dapat mengatur parameter konektor Flink untuk menentukan mode penulisan. Untuk daftar lengkap parameter konektor, lihat Lampiran: Semua parameter konektor Flink baru.
Atur interval checkpoint untuk pekerjaan upsert Flink menjadi lebih dari 3 menit. Jika tidak, efisiensi penulisan mungkin rendah dan banyak file kecil akan dihasilkan.
Tabel berikut memetakan tipe data bidang antara MaxCompute dan Realtime Compute for Apache Flink.
Tipe data Flink
Tipe data MaxCompute
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
CatatanTipe data TIMESTAMP Flink tidak mencakup zona waktu, sedangkan tipe data TIMESTAMP MaxCompute mencakup zona waktu. Perbedaan ini menyebabkan selisih waktu 8 jam. Gunakan TIMESTAMP_LTZ(9) untuk menyelaraskan timestamp.
-- FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT 'Waktu pembuatan', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Waktu pembaruan', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Menulis data dari kluster Flink open source yang dikelola sendiri ke MaxCompute
Persiapan: Buat tabel MaxCompute.
Pertama, buat tabel MaxCompute tempat Anda dapat menulis data Flink. Contoh berikut menunjukkan cara membuat tabel Delta non-partisi dan tabel Delta terpartisi untuk mendemonstrasikan proses utama penulisan data Flink ke MaxCompute. Untuk informasi tentang properti tabel, lihat Parameter tabel Delta.
-- Buat tabel Delta non-partisi. CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; -- Buat tabel Delta partisi. CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;Buat kluster open source Flink. Versi Flink 1.13, 1.15, 1.16, dan 1.17 didukung. Pilih konektor Flink yang sesuai dengan versi Flink Anda:
CatatanKonektor Flink untuk versi 1.16 dapat digunakan untuk Flink 1.17.
Topik ini menggunakan Konektor Flink 1.13 sebagai contoh. Unduh paket ke lingkungan lokal Anda dan ekstrak.
Unduh konektor Flink dan tambahkan ke paket kluster Flink.
Unduh paket JAR konektor Flink ke lingkungan lokal Anda.
Tambahkan paket JAR konektor Flink ke direktori lib dari paket instalasi Flink yang telah diekstrak.
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
Mulai layanan Flink.
cd $FLINK_HOME/bin ./start-cluster.shMulai klien SQL Flink.
cd $FLINK_HOME/bin ./sql-client.shBuat tabel Flink dan konfigurasikan parameter konektor Flink.
Anda dapat membuat tabel Flink dan mengonfigurasi parameter menggunakan Flink SQL atau API DataStream Flink. Bagian berikut memberikan contoh inti untuk kedua metode tersebut.
Gunakan Flink SQL
Buka editor Flink SQL dan jalankan perintah berikut untuk membuat tabel serta mengonfigurasi parameter.
-- Daftarkan tabel non-partisi yang sesuai di Flink SQL. CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- Daftarkan tabel terpartisi yang sesuai di Flink SQL. CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );Tulis data ke tabel Flink dan kueri tabel MaxCompute untuk memverifikasi bahwa data berhasil ditulis.
-- Masukkan data ke tabel non-partisi di klien Flink SQL. INSERT INTO mf_flink VALUES (1,'Danny',27, false); -- Kueri data di MaxCompute dan periksa hasilnya. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- Masukkan data ke tabel non-partisi di klien Flink SQL. INSERT INTO mf_flink VALUES (1,'Danny',28, false); -- Kueri data di MaxCompute dan periksa hasilnya. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- Masukkan data ke tabel terpartisi di klien Flink SQL. INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); -- Kueri data di MaxCompute dan periksa hasilnya. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- Masukkan data ke tabel terpartisi di klien Flink SQL. INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); -- Kueri data di MaxCompute dan periksa hasilnya. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
Gunakan DataStream API
Saat menggunakan API DataStream, pertama-tama tambahkan dependensi berikut.
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>CatatanGanti xxx dengan nomor versi sebenarnya.
Kode contoh berikut menunjukkan cara membuat tabel dan mengonfigurasi parameter.
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Menulis data dari Flink yang sepenuhnya dikelola di Alibaba Cloud ke MaxCompute
Persiapan: Buat tabel MaxCompute.
Pertama, buat tabel MaxCompute tempat Anda dapat menulis data Flink. Contoh berikut menunjukkan cara membuat tabel Delta.
SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;Masuk ke Konsol Realtime Compute for Apache Flink dan lihat informasi konektor Flink. Konektor Flink telah dimuat sebelumnya pada Platform Ververica (VVP) untuk Flink yang sepenuhnya dikelola di Alibaba Cloud.
Gunakan pekerjaan Flink SQL untuk membuat tabel Flink dan menghasilkan data Flink real-time. Setelah Anda mengembangkan pekerjaan tersebut, Anda dapat menerapkannya.
Di halaman pengembangan pekerjaan Flink, buat dan edit pekerjaan Flink SQL. Contoh berikut membuat tabel sumber Flink dan tabel sink Flink sementara. Contoh ini juga mencakup logika untuk secara otomatis menghasilkan data real-time dan menuliskannya ke tabel sumber. Logika pekerjaan kemudian menulis data dari tabel sumber ke tabel sink sementara. Untuk informasi lebih lanjut tentang pengembangan pekerjaan SQL, lihat Peta pengembangan pekerjaan.
-- Buat tabel sumber Flink. CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); -- Buat tabel sink Flink sementara. CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); -- Logika komputasi Flink INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;Di mana:
odps.end.point: Gunakan Titik akhir jaringan internal untuk wilayah yang sesuai.upsert.write.bucket.num: Nilai parameter ini harus sama dengan nilai properti write.bucket.num dari tabel Delta di MaxCompute.Kueri data di MaxCompute untuk memverifikasi bahwa data Flink berhasil ditulis.
SELECT * FROM mf_flink_upsert WHERE ds=2023061517; -- Hasil: Karena data Flink dihasilkan secara acak, hasil kueri Anda di MaxCompute mungkin berbeda dari contoh. +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
Lampiran: Semua parameter konektor Flink baru
Parameter Dasar
Parameter
Diperlukan
Nilai default
Deskripsi
connector
Ya
Tidak ada
Jenis konektor. Atur ke
MaxCompute.odps.project.name
Ya
Tidak ada
Nama proyek MaxCompute.
odps.access.id
Ya
Tidak ada
ID AccessKey Akun Alibaba Cloud Anda. Anda dapat melihat informasi ini di halaman Pasangan AccessKey.
odps.access.key
Ya
Tidak ada
Rahasia AccessKey Akun Alibaba Cloud Anda. Anda dapat melihat informasi ini di halaman Pasangan AccessKey.
odps.end.point
Ya
Tidak ada
Titik akhir MaxCompute. Untuk Titik akhir MaxCompute di berbagai Wilayah, lihat Titik akhir.
odps.tunnel.end.point
Tidak
Tidak ada
Titik akhir publik layanan Tunnel. Jika Anda tidak mengonfigurasi Titik akhir Tunnel, lalu lintas akan secara otomatis diarahkan ke Titik akhir Tunnel yang sesuai dengan jaringan tempat layanan MaxCompute berada. Jika Anda mengonfigurasi Titik akhir Tunnel, lalu lintas akan diarahkan ke Titik akhir yang ditentukan dan pengarahan otomatis dinonaktifkan.
Untuk Titik akhir Tunnel di berbagai Wilayah dan jenis jaringan, lihat Titik akhir.
odps.tunnel.quota.name
Tidak
Tidak ada
Nama kuota Tunnel yang digunakan untuk mengakses MaxCompute.
table.name
Ya
Tidak ada
Nama tabel MaxCompute. Formatnya adalah
[project.][schema.]table.odps.namespace.schema
Tidak
false
Menentukan apakah akan menggunakan model tiga lapisan. Untuk informasi lebih lanjut tentang model tiga lapisan, lihat Operasi skema.
sink.operation
Ya
insert
Mode penulisan. Nilai yang valid:
insertdanupsert.CatatanHanya tabel Delta MaxCompute yang mendukung penulisan upsert.
sink.parallelism
Tidak
Tidak ada
Tingkat paralelisme untuk penulisan. Jika tidak diatur, tingkat paralelisme data masukan digunakan secara default.
CatatanPastikan bahwa nilai properti tabel
write.bucket.nummerupakan kelipatan bilangan bulat dari nilai parameter ini. Hal ini memastikan kinerja penulisan optimal dan menghemat memori paling banyak pada node sink.sink.meta.cache.time
Tidak
400
Ukuran cache metadata.
sink.meta.cache.expire.time
Tidak
1200
Periode timeout untuk cache metadata dalam satuan detik (s).
sink.coordinator.enable
Tidak
Ya.
Menentukan apakah akan mengaktifkan mode koordinator.
Parameter Partisi
Parameter
Diperlukan
Nilai default
Deskripsi
sink.partition
Tidak
Tidak ada
Nama partisi yang akan ditulis.
Jika Anda menggunakan partisi dinamis, ini adalah nama partisi induk dari partisi dinamis.
sink.partition.default-value
Tidak
__DEFAULT_PARTITION__
Nama partisi default yang digunakan untuk partisi dinamis.
sink.dynamic-partition.limit
Tidak
100
Saat menulis ke partisi dinamis, ini adalah jumlah maksimum partisi yang dapat diimpor secara bersamaan dalam satu checkpoint.
CatatanJangan meningkatkan nilai ini secara signifikan. Menulis ke terlalu banyak partisi secara bersamaan dapat menyebabkan error kehabisan memori (OOM) pada node sink. Jika jumlah partisi konkuren untuk penulisan melebihi ambang batas, pekerjaan penulisan gagal.
sink.group-partition.enable
Tidak
false
Saat menulis ke partisi dinamis, menentukan apakah akan mengelompokkan data berdasarkan partisi.
sink.partition.assigner.class
Tidak
Tidak ada
Kelas implementasi PartitionAssigner.
Parameter untuk menulis dalam mode FileCached
Jika Anda memiliki banyak partisi dinamis, Anda dapat menggunakan mode cache file. Anda dapat menggunakan parameter berikut untuk mengonfigurasi informasi file cache untuk penulisan data.
Parameter
Diperlukan
Nilai default
Deskripsi
sink.file-cached.enable
Tidak
false
Menentukan apakah akan mengaktifkan penulisan dalam mode FileCached. Nilai yang valid:
false: Menonaktifkan mode.
true: Mengaktifkan mode.
CatatanGunakan mode cache file saat terdapat banyak partisi dinamis.
sink.file-cached.tmp.dirs
Tidak
./local
Direktori default untuk file cache dalam mode cache file.
sink.file-cached.writer.num
Tidak
16
Jumlah thread konkuren untuk satu tugas mengunggah data dalam mode cache file.
CatatanJangan meningkatkan nilai ini secara signifikan. Menulis ke terlalu banyak partisi secara bersamaan dapat menyebabkan error OOM.
sink.bucket.check-interval
Tidak
60000
Interval untuk memeriksa ukuran file dalam mode cache file. Satuan: milidetik (ms).
sink.file-cached.rolling.max-size
Tidak
16 M
Ukuran maksimum satu file cache dalam mode cache file.
Jika file melebihi ukuran ini, datanya diunggah ke server.
sink.file-cached.memory
Tidak
64 M
Ukuran maksimum memori off-heap yang digunakan untuk menulis file dalam mode cache file.
sink.file-cached.memory.segment-size
Tidak
128 KB
Ukuran buffer yang digunakan untuk menulis file dalam mode cache file.
sink.file-cached.flush.always
Tidak
true
Dalam mode cache file, menentukan apakah akan menggunakan cache saat menulis file.
sink.file-cached.write.max-retries
Tidak
3
Jumlah percobaan ulang untuk mengunggah data dalam mode cache file.
Parameter untuk penulisan
insertatauupsertParameter penulisan Upsert
Parameter
Diperlukan
Nilai default
Deskripsi
upsert.writer.max-retries
Tidak
3
Jumlah percobaan ulang jika Upsert Writer gagal menulis ke bucket.
upsert.writer.buffer-size
Tidak
64 m
Ukuran cache untuk satu Upsert Writer di Flink.
CatatanSaat ukuran buffer total semua bucket mencapai ambang batas, sistem secara otomatis mengosongkan data ke server.
Satu Upsert Writer dapat menulis ke beberapa bucket secara bersamaan. Tingkatkan nilai ini untuk meningkatkan efisiensi penulisan.
Jika Anda menulis ke banyak partisi, terdapat risiko error OOM. Dalam kasus ini, pertimbangkan untuk mengurangi nilai ini.
upsert.writer.bucket.buffer-size
Tidak
1 m
Ukuran cache untuk satu bucket di Flink. Jika server Flink kekurangan sumber daya memori, Anda dapat mengurangi nilai ini.
upsert.write.bucket.num
Ya
Tidak ada
Jumlah bucket untuk tabel sink. Nilai ini harus sama dengan nilai properti
write.bucket.numdari tabel sink.upsert.write.slot-num
Tidak
1
Jumlah slot Tunnel yang digunakan oleh satu sesi.
upsert.commit.max-retries
Tidak
3
Jumlah percobaan ulang untuk commit sesi upsert.
upsert.commit.thread-num
Tidak
16
Tingkat paralelisme untuk commit sesi upsert.
Jangan mengatur parameter ini ke nilai yang besar. Jumlah commit konkuren yang tinggi meningkatkan konsumsi sumber daya, yang dapat menyebabkan masalah kinerja atau penggunaan sumber daya berlebihan.
upsert.major-compact.min-commits
Tidak
100
Jumlah minimum commit yang diperlukan untuk memicu kompaksi mayor.
upsert.commit.timeout
Tidak
600
Periode timeout untuk commit sesi upsert menunggu. Satuan: detik (s).
upsert.major-compact.enable
Tidak
false
Menentukan apakah akan mengaktifkan pemadatan utama.
upsert.flush.concurrent
Tidak
2
Jumlah maksimum bucket yang dapat ditulis secara bersamaan dalam satu partisi.
CatatanSetiap kali data dalam bucket dikosongkan, satu slot Tunnel digunakan.
CatatanUntuk informasi lebih lanjut tentang konfigurasi parameter yang direkomendasikan untuk penulisan upsert, lihat Konfigurasi parameter yang direkomendasikan untuk penulisan upsert.
Parameter penulisan Insert
Parameter
Diperlukan
Nilai default
Deskripsi
insert.commit.thread-num
Tidak
16
Tingkat paralelisme untuk sesi commit.
insert.arrow-writer.enable
Tidak
false
Menentukan apakah akan menggunakan format Arrow.
insert.arrow-writer.batch-size
Tidak
512
Jumlah maksimum baris dalam satu batch Arrow.
insert.arrow-writer.flush-interval
Tidak
100000
Interval di mana writer mengosongkan data. Satuan: milidetik (ms).
insert.writer.buffer-size
Tidak
64 M
Ukuran cache untuk penulis yang di-buffer.