MaxCompute は、新しい Flink コネクタプラグインを提供します。このプラグインを使用して、Flink から MaxCompute の標準テーブルおよび Delta テーブルにデータを書き込むことができます。このトピックでは、新しい Flink コネクタの機能と、MaxCompute にデータを書き込むための主な手順について説明します。
背景情報
サポートされている書き込みモード
新しい Flink コネクタは、upsert または insert モードで MaxCompute にデータを書き込むことをサポートしています。upsert モードでは、次の 2 つの方法のいずれかでデータを書き込むことができます。
プライマリキーによるグループ化
パーティションフィールドによるグループ化
テーブルに多数のパーティションがある場合は、パーティションフィールドでデータをグループ化できます。このメソッドはデータスキューを引き起こす可能性があることに注意してください。
upsert モードでの Flink コネクタのデータ書き込みプロセスと推奨パラメーター構成の詳細については、「データウェアハウスへのリアルタイムデータインジェスト」をご参照ください。
Flink ジョブを構成して MaxCompute にデータを書き込む際に、Flink コネクタのパラメーターを設定して書き込みモードを指定できます。コネクタパラメーターの完全なリストについては、「付録: 新しい Flink コネクタのすべてのパラメーター」をご参照ください。
Flink upsert ジョブのチェックポイント間隔を 3 分以上に設定してください。そうしないと、書き込み効率が低下し、多数の小さなファイルが生成される可能性があります。
次の表は、MaxCompute と Realtime Compute for Apache Flink の間のフィールドデータ型のマッピングを示しています。
Flink データ型
MaxCompute データ型
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
説明Flink の TIMESTAMP データ型にはタイムゾーンが含まれていませんが、MaxCompute の TIMESTAMP データ型には含まれています。この違いにより、8 時間の時間のずれが生じます。タイムスタンプを合わせるには、TIMESTAMP_LTZ(9) を使用してください。
-- FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT '作成時間', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新時間', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
セルフマネージドのオープンソース Flink クラスターから MaxCompute へのデータ書き込み
準備: MaxCompute テーブルの作成
まず、Flink データを書き込むことができる MaxCompute テーブルを作成します。次の例では、パーティション化されていない Delta テーブルとパーティション化された Delta テーブルを作成して、Flink データを MaxCompute に書き込む主なプロセスを示します。テーブルのプロパティについては、「Delta テーブルのパラメーター」をご参照ください。
-- パーティション化されていない Delta テーブルを作成します。 CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; -- パーティション化された Delta テーブルを作成します。 CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;オープンソースの Flink クラスターを構築します。Flink バージョン 1.13、1.15、1.16、および 1.17 がサポートされています。お使いの Flink バージョンに対応する Flink コネクタを選択してください:
説明バージョン 1.16 用の Flink コネクタは Flink 1.17 で使用できます。
このトピックでは、Flink コネクタ 1.13 を例として使用します。パッケージをローカル環境にダウンロードして解凍してください。
Flink コネクタをダウンロードし、Flink クラスターパッケージに追加します。
Flink コネクタの JAR パッケージをローカル環境にダウンロードします。
Flink コネクタの JAR パッケージを、解凍した Flink インストールパッケージの lib ディレクトリに追加します。
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
Flink サービスを開始します。
cd $FLINK_HOME/bin ./start-cluster.shFlink SQL クライアントを起動します。
cd $FLINK_HOME/bin ./sql-client.shFlink テーブルを作成し、Flink コネクタのパラメーターを構成します。
Flink SQL または Flink DataStream API のいずれかを使用して、Flink テーブルを作成し、パラメーターを構成できます。以下のセクションでは、両方のメソッドの主要な例を示します。
Flink SQL の使用
Flink SQL エディターに移動し、次のコマンドを実行してテーブルを作成し、パラメーターを構成します。
-- Flink SQL に対応するパーティション化されていないテーブルを登録します。 CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- Flink SQL に対応するパーティション化されたテーブルを登録します。 CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );Flink テーブルにデータを書き込み、MaxCompute テーブルをクエリして、データが正常に書き込まれたことを確認します。
-- Flink SQL クライアントでパーティション化されていないテーブルにデータを挿入します。 INSERT INTO mf_flink VALUES (1,'Danny',27, false); -- MaxCompute でデータをクエリし、結果を確認します。 SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- Flink SQL クライアントでパーティション化されていないテーブルにデータを挿入します。 INSERT INTO mf_flink VALUES (1,'Danny',28, false); -- MaxCompute でデータをクエリし、結果を確認します。 SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- Flink SQL クライアントでパーティション化されたテーブルにデータを挿入します。 INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); -- MaxCompute でデータをクエリし、結果を確認します。 SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- Flink SQL クライアントでパーティション化されたテーブルにデータを挿入します。 INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); -- MaxCompute でデータをクエリし、結果を確認します。 SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
DataStream API を使用する
DataStream API を使用する場合は、まず次の依存関係を追加します。
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>説明xxx を実際のバージョン番号に置き換えてください。
次のサンプルコードは、テーブルを作成してパラメーターを構成する方法を示しています。
package com.aliyun.odps.flink.examples; // Flink と MaxCompute の連携の例 import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); // ソーステーブルからデータを読み込む DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); // upsert モードを設定 config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); // commit スレッド数を設定 config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); // メジャーコンパクションの最小 commit 回数を設定 OdpsConf odpsConfig = new OdpsConf("accessid", // AccessKey ID "accesskey", // AccessKey Secret "endpoint", // MaxCompute エンドポイント "project", // MaxCompute プロジェクト名 "tunnel endpoint"); // Tunnel エンドポイント OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") // プロジェクト名 .tableName("user_ledger_portfolio") // テーブル名 .partition("") // パーティション .configuration(config) // 構成 .odpsConf(odpsConfig) // ODPS 構成 .sink(input, false); // データをシンクに書き込む env.execute(); } }
Alibaba Cloud 上のフルマネージド Flink から MaxCompute へのデータ書き込み
準備: MaxCompute テーブルの作成
まず、Flink データを書き込むことができる MaxCompute テーブルを作成します。次の例では、Delta テーブルを作成する方法を示します。
SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;Realtime Compute for Apache Flink コンソールにログインし、Flink コネクタの情報を表示します。Flink コネクタは、Alibaba Cloud 上のフルマネージド Flink の Ververica Platform (VVP) にプリロードされています。
Flink SQL ジョブを使用して Flink テーブルを作成し、リアルタイムの Flink データを生成します。ジョブを開発した後、デプロイできます。
Flink ジョブ開発ページで、Flink SQL ジョブを作成および編集します。次の例では、Flink ソーステーブルと一時的な Flink シンクテーブルを作成します。この例には、リアルタイムデータを自動的に生成してソーステーブルに書き込むロジックも含まれています。その後、ジョブログロジックはソーステーブルから一時的なシンクテーブルにデータを書き込みます。SQL ジョブ開発の詳細については、「ジョブ開発マップ」をご参照ください。
-- Flink ソーステーブルを作成します。 CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); -- 一時的な Flink シンクテーブルを作成します。 CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); -- Flink 計算ロジック INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;各項目の意味は次のとおりです。
odps.end.point: 対応するリージョンの内部ネットワークエンドポイントを使用します。upsert.write.bucket.num: このパラメーターの値は、MaxCompute の Delta テーブルの write.bucket.num プロパティの値と同じでなければなりません。MaxCompute でデータをクエリして、Flink データが正常に書き込まれたことを確認します。
SELECT * FROM mf_flink_upsert WHERE ds=2023061517; -- 結果: Flink データはランダムに生成されるため、MaxCompute でのクエリ結果は例と異なる場合があります。 +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
付録: 新しい Flink コネクタのすべてのパラメーター
基本パラメータ
パラメータ
必須
デフォルト値
説明
connector
はい
なし
コネクタタイプ。これを
MaxComputeに設定します。odps.project.name
はい
なし
MaxCompute プロジェクトの名前。
odps.access.id
はい
なし
Alibaba Cloud アカウントの AccessKey ID。この情報は AccessKey ペアページ で表示できます。
odps.access.key
はい
なし
Alibaba Cloud アカウントの AccessKey Secret。この情報は AccessKey ペアページ で表示できます。
odps.end.point
はい
なし
MaxCompute のエンドポイント。さまざまなリージョンにおける MaxCompute のエンドポイントについては、「エンドポイント」をご参照ください。
odps.tunnel.end.point
いいえ
なし
Tunnel サービスのパブリックエンドポイント。Tunnel エンドポイントを構成しない場合、トラフィックは MaxCompute サービスが配置されているネットワークに対応する Tunnel エンドポイントに自動的にルーティングされます。Tunnel エンドポイントを構成する場合、トラフィックは指定されたエンドポイントにルーティングされ、自動ルーティングは無効になります。
さまざまなリージョンとネットワークタイプに対応する Tunnel エンドポイントについては、「エンドポイント」をご参照ください。
odps.tunnel.quota.name
いいえ
なし
MaxCompute にアクセスするために使用される Tunnel クォータの名前。
table.name
はい
なし
MaxCompute テーブルの名前。フォーマットは
[project.][schema.]tableです。odps.namespace.schema
いいえ
false
3 層モデルを使用するかどうかを指定します。3 層モデルの詳細については、「スキーマ操作」をご参照ください。
sink.operation
はい
insert
書き込みモード。有効な値:
insertおよびupsert。説明MaxCompute Delta テーブルのみが upsert 書き込みをサポートしています。
sink.parallelism
いいえ
なし
書き込みの並列処理の次数。これが設定されていない場合、デフォルトで入力データの並列処理の次数が使用されます。
説明write.bucket.numテーブルプロパティの値がこのパラメーターの値の整数倍であることを確認してください。これにより、最適な書き込みパフォーマンスが確保され、シンクノードのメモリを最大限に節約できます。sink.meta.cache.time
いいえ
400
メタデータキャッシュのサイズ。
sink.meta.cache.expire.time
いいえ
1200
メタデータキャッシュのタイムアウト期間 (秒)。
sink.coordinator.enable
いいえ
はい。
コーディネーターモードを有効にするかどうかを指定します。
パーティションパラメータ
パラメータ
必須
デフォルト値
説明
sink.partition
いいえ
なし
書き込み先のパーティションの名前。
動的パーティションを使用する場合、これは動的パーティションの親パーティションの名前です。
sink.partition.default-value
いいえ
__DEFAULT_PARTITION__
動的パーティションで使用するデフォルトのパーティション名。
sink.dynamic-partition.limit
いいえ
100
動的パーティションに書き込む場合、これは単一のチェックポイントで同時にインポートできるパーティションの最大数です。
説明この値を大幅に増やさないでください。同時に多数のパーティションに書き込むと、シンクノードでメモリ不足 (OOM) エラーが発生する可能性があります。書き込みの同時パーティション数がしきい値を超えると、書き込みジョブは失敗します。
sink.group-partition.enable
いいえ
false
動的パーティションに書き込むときに、パーティションごとにデータをグループ化するかどうかを指定します。
sink.partition.assigner.class
いいえ
なし
PartitionAssigner 実装クラス。
FileCached モードでの書き込み用パラメーター
多数の動的パーティションがある場合は、ファイルキャッシュモードを使用できます。次のパラメーターを使用して、データ書き込み用のキャッシュファイル情報を構成できます。
パラメータ
必須
デフォルト値
説明
sink.file-cached.enable
いいえ
false
FileCached モードでの書き込みを有効にするかどうかを指定します。有効な値:
false: モードを無効にします。
true: モードを有効にします。
説明動的パーティションが多数ある場合は、ファイルキャッシュモードを使用してください。
sink.file-cached.tmp.dirs
いいえ
./local
ファイルキャッシュモードでのキャッシュファイルのデフォルトディレクトリ。
sink.file-cached.writer.num
いいえ
16
ファイルキャッシュモードでデータをアップロードするための単一タスクの同時スレッド数。
説明この値を大幅に増やさないでください。同時に多数のパーティションに書き込むと、OOM エラーが発生する可能性があります。
sink.bucket.check-interval
いいえ
60000
ファイルキャッシュモードでファイルサイズをチェックする間隔。単位: ミリ秒 (ms)。
sink.file-cached.rolling.max-size
いいえ
16 M
ファイルキャッシュモードでの単一キャッシュファイルの最大サイズ。
ファイルがこのサイズを超えると、そのデータはサーバーにアップロードされます。
sink.file-cached.memory
いいえ
64 M
ファイルキャッシュモードでファイルの書き込みに使用されるオフヒープメモリの最大サイズ。
sink.file-cached.memory.segment-size
いいえ
128 KB
ファイルキャッシュモードでファイルの書き込みに使用されるバッファーのサイズ。
sink.file-cached.flush.always
いいえ
true
ファイルキャッシュモードで、ファイルの書き込み時にキャッシュを使用するかどうかを指定します。
sink.file-cached.write.max-retries
いいえ
3
ファイルキャッシュモードでのデータアップロードの再試行回数。
insertまたはupsert書き込みのパラメーターUpsert 書き込みパラメーター
パラメータ
必須
デフォルト値
説明
upsert.writer.max-retries
いいえ
3
Upsert Writer がバケットへの書き込みに失敗した場合の再試行回数。
upsert.writer.buffer-size
いいえ
64 m
Flink の単一の Upsert Writer のキャッシュサイズ。
説明すべてのバケットの合計バッファーサイズがしきい値に達すると、システムは自動的にデータをサーバーにフラッシュします。
単一の Upsert Writer は、同時に複数のバケットに書き込むことができます。この値を増やすと、書き込み効率が向上します。
多数のパーティションに書き込む場合、OOM エラーのリスクがあります。この場合、この値を減らすことを検討してください。
upsert.writer.bucket.buffer-size
いいえ
1 m
Flink の単一バケットのキャッシュサイズ。Flink サーバーのメモリリソースが不足している場合は、この値を減らすことができます。
upsert.write.bucket.num
はい
なし
シンクテーブルのバケット数。この値は、シンクテーブルの
write.bucket.numプロパティの値と同じでなければなりません。upsert.write.slot-num
いいえ
1
単一セッションで使用される Tunnel スロットの数。
upsert.commit.max-retries
いいえ
3
upsert セッションコミットの再試行回数。
upsert.commit.thread-num
いいえ
16
upsert セッションコミットの並列処理の次数。
このパラメーターを大きな値に設定しないでください。コミットの同時実行数が多いとリソース消費量が増加し、パフォーマンスの問題や過剰なリソース使用量が発生する可能性があります。
upsert.major-compact.min-commits
いいえ
100
メジャーコンパクションをトリガーするために必要なコミットの最小数。
upsert.commit.timeout
いいえ
600
upsert セッションコミットが待機するタイムアウト期間。単位: 秒 (s)。
upsert.major-compact.enable
いいえ
false
メジャーコンパクションを有効にするかどうかを指定します。
upsert.flush.concurrent
いいえ
2
単一パーティションで同時に書き込むことができるバケットの最大数。
説明バケット内のデータがフラッシュされるたびに、Tunnel スロットが占有されます。
説明upsert 書き込みの推奨パラメーター構成の詳細については、「Upsert 書き込みの推奨パラメーター構成」をご参照ください。
Insert 書き込みパラメーター
パラメータ
必須
デフォルト値
説明
insert.commit.thread-num
いいえ
16
コミットセッションの並列処理の次数。
insert.arrow-writer.enable
いいえ
false
Arrow 形式を使用するかどうかを指定します。
insert.arrow-writer.batch-size
いいえ
512
Arrow バッチの最大行数。
insert.arrow-writer.flush-interval
いいえ
100000
ライターがデータをフラッシュする間隔。単位: ミリ秒 (ms)。
insert.writer.buffer-size
いいえ
64 M
バッファリングされたライターのキャッシュサイズ。