ApsaraDB RDS for MySQL コネクタは、将来的にサポートされなくなります。代わりに MySQL コネクタ を使用してください。
ApsaraDB RDS for MySQL コネクタを使用すると、Flink SQL の出力を ApsaraDB RDS for MySQL の結果テーブルに書き込んだり、ストリームを ApsaraDB RDS for MySQL のディメンションテーブルに対して結合したりできます。
サポートされるテーブルタイプ: 結果テーブル・ディメンションテーブル
サポートされる実行モード: バッチモード・ストリーミングモード
API タイプ: SQL
結果テーブルでのデータの更新と削除: サポートされています
前提条件
開始する前に、以下が準備できていることを確認してください。
-
ApsaraDB RDS for MySQL のデータベースとテーブル。詳細については、「ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントの作成」をご参照ください。
-
データベースに設定された IP アドレスホワイトリスト。詳細については、「データベースクライアントまたは CLI を使用した ApsaraDB RDS for MySQL インスタンスへの接続」をご参照ください。
制限事項
-
Ververica Runtime (VVR) 2.0.0 以降を使用した Realtime Compute for Apache Flink が必要です。最高のパフォーマンスと安定性を得るには、VVR 6.X 以降を使用してください。
-
ApsaraDB RDS for MySQL データベースのみがサポートされています。
-
コネクタは at-least-once セマンティクスを使用します。結果テーブルにプライマリキーがある場合、べき等性によりデータの正確性が保証されます。
仕組み
結果テーブルの書き込み動作
各出力行は、結果テーブルに書き込まれる前に SQL ステートメントに変換されます。
-
プライマリキーなし —
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...);を実行します。 -
プライマリキーあり —
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...) ON DUPLICATE KEY UPDATE col1 = VALUES(col1), col2 = VALUES(col2), ...;を実行します。
一意なインデックスの競合: 物理テーブルにプライマリキーに加えて一意なインデックス制約がある場合、異なるプライマリキーで同じ一意なインデックス値を持つ 2 つの行を挿入すると、先の行が上書きされ、データ損失が発生します。
自動インクリメントのプライマリキー: Flink DDL で自動インクリメントフィールドを宣言しないでください。データベースがこれらの値を自動的に割り当てます。コネクタは自動インクリメントフィールドを持つ行の書き込みと削除はできますが、更新はできません。
ディメンションテーブルのキャッシュポリシー
コネクタは、ディメンションテーブルのルックアップに対して 3 つのキャッシュポリシーをサポートしています。
| ポリシー | 動作 | 使用するケース |
|---|---|---|
NONE |
キャッシュなし — すべてのルックアップでデータベースに直接クエリを実行します | 低レイテンシー要件、小規模なデータセット |
LRU |
タスクマネージャーごとに、最近使用された固定数の行をキャッシュします | 大規模なテーブルで頻繁にアクセスされるサブセット |
ALL |
テーブル全体をメモリにロードし、定期的に再読み込みします | 小規模で静的なリファレンス テーブル |
構文
結果テーブル
CREATE TABLE rds_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
結果テーブルの書き込みスループットを向上させるには、url の値に ?rewriteBatchedStatements=true を追加してください。
ディメンションテーブル
CREATE TABLE rds_dim (
id1 INT,
id2 VARCHAR
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>',
'cache' = 'NONE'
);
WITH 句のパラメーター
共通パラメーター
| パラメーター | 型 | 必須 | デフォルト値 | 説明 |
|---|---|---|---|---|
connector |
STRING | はい | — | 次のように設定します:rds |
tableName |
STRING | はい | — | ApsaraDB RDS for MySQL の物理テーブルの名前 |
userName |
STRING | はい | — | データベースのユーザー名 |
password |
STRING | はい | — | データベースのパスワード |
url |
STRING | はい | — | データベースの Virtual Private Cloud (VPC) エンドポイント。フォーマットは jdbc:mysql://<internal-endpoint>:<port>/<database-name> です。結果テーブルの場合、?rewriteBatchedStatements=true を追加します。エンドポイントの詳細については、「ApsaraDB RDS for MySQL インスタンスの内部およびパブリックエンドポイントとポート番号の表示と変更 |
maxRetryTimes |
INTEGER | いいえ | 10 (VVR 4.0.7 以降)、3 (VVR 4.0.6 以前) | 失敗したディメンションテーブルのルックアップまたは結果テーブルへの書き込みの最大リトライ回数 |
シンクテーブル パラメーター
| パラメーター | 型 | 必須 | デフォルト値 | 説明 |
|---|---|---|---|---|
batchSize |
INTEGER | いいえ | 4096 (VVR 4.0.7 以降)、5000 (VVR 4.0.0–4.0.6)、100 (VVR 3.x 以前) | バッチごとに書き込まれる行数 |
bufferSize |
INTEGER | いいえ | 10000 | 書き込みがトリガーされる前にメモリにキャッシュされる最大行数。VVR 4.0.7 以降でサポートされています。プライマリキーが定義されている場合にのみ有効です |
flushIntervalMs |
INTEGER | いいえ | 2000 (VVR 4.0.7 以降)、0 (VVR 4.0.0–4.0.6)、1000 (VVR 3.x 以前) | batchSize または bufferSize のしきい値に達したかどうかに関わらず、バッファーが結果テーブルにフラッシュされる間隔 (ミリ秒単位)。0 (VVR 4.0.0–4.0.6 のデフォルト) に設定すると、少量のバッファーデータが書き込まれない可能性があるため、これを避けるには新しい VVR バージョンにアップグレードしてください |
ignoreDelete |
BOOLEAN | いいえ | false | 削除操作をスキップするには true に設定します。複数の演算子が同じ行の異なるフィールドを更新する場合に便利です。この設定がないと、ある演算子での削除に続いて別の演算子で部分的な更新が行われると、更新されなかったフィールドが null またはそのデフォルト値になります |
connectionMaxActive |
INTEGER | いいえ | 40 | 接続プールのサイズ。VVR 4.0.7 以降でサポートされています。接続プールのタイムアウトが発生する場合はこの値を増やし、データベースが同時接続数を制限している場合は減らしてください |
ディメンションテーブルのパラメーター
| パラメーター | 型 | 必須 | デフォルト値 | 説明 |
|---|---|---|---|---|
cache |
STRING | いいえ | NONE (VVR 4.0.6 未満)、ALL (VVR 4.0.6 以降) | キャッシュポリシー。有効な値: NONE、LRU、ALL。「キャッシュポリシー |
cacheSize |
INTEGER | いいえ | 100000 | キャッシュする行の最大数。cache が LRU に設定されている場合は必須です。NONE および ALL |
cacheTTLMs |
LONG | いいえ | NONE と LRU では有効期限なし。ALL |
キャッシュの TTL (Time-to-Live) (ミリ秒単位)。LRU の場合、この期間が過ぎると行は期限切れになります。ALL の場合、この間隔でキャッシュ全体が再読み込みされます |
maxJoinRows |
INTEGER | いいえ | 1024 | 入力行ごとに一致するディメンションテーブルの最大行数。不要なスキャンを避けるために、プライマリテーブルの行ごとに予想されるディメンション行の最大数に設定してください |
メトリック
結果テーブルは以下のメトリックを公開します。ディメンションテーブルにはメトリックはありません。
| メトリック | 説明 |
|---|---|
numRecordsOut |
書き込まれた合計行数 |
numRecordsOutPerSecond |
1 秒あたりに書き込まれた行数 |
numBytesOut |
書き込まれた合計バイト数 |
numBytesOutPerSecond |
1 秒あたりに書き込まれたバイト数 |
currentSendTime |
現在の書き込みレイテンシー |
numRecordsOutErrors |
書き込みエラーの合計数 |
メトリックの定義については、「メトリック」をご参照ください。
データ型のマッピング
| Flink 型 | ApsaraDB RDS for MySQL 型 |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| TINYINT(1) (ディメンションテーブルのみ) | BOOLEAN |
| SMALLINT | SMALLINT |
| SMALLINT | TINYINT UNSIGNED |
| INT | INT |
| INT | SMALLINT UNSIGNED |
| BIGINT | BIGINT |
| BIGINT | INT UNSIGNED |
| DECIMAL(20, 0) | BIGINT UNSIGNED |
| FLOAT | FLOAT |
| DECIMAL | DECIMAL |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | VARCHAR |
| VARBINARY | VARBINARY |
例
結果テーブルの例
次の例では、DataGen ソースから読み取り、ApsaraDB RDS for MySQL の結果テーブルに書き込みます。
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
INSERT INTO rds_sink
SELECT * FROM datagen_source;
ディメンションテーブルの例
次の例では、テンポラル結合を使用して、ストリームを ApsaraDB RDS for MySQL のディメンションテーブルに対して結合します。
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;
よくある質問
次のステップ
-
MySQL コネクタ — このコネクタの推奨される代替手段
-
ApsaraDB RDS for MySQL — 製品概要と機能ドキュメント
-
メトリック — すべてのコネクタメトリックの定義