すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ApsaraDB RDS for MySQL

最終更新日:Mar 27, 2026
重要

ApsaraDB RDS for MySQL コネクタは、将来的にサポートされなくなります。代わりに MySQL コネクタ を使用してください。

ApsaraDB RDS for MySQL コネクタを使用すると、Flink SQL の出力を ApsaraDB RDS for MySQL の結果テーブルに書き込んだり、ストリームを ApsaraDB RDS for MySQL のディメンションテーブルに対して結合したりできます。

サポートされるテーブルタイプ: 結果テーブル・ディメンションテーブル

サポートされる実行モード: バッチモード・ストリーミングモード

API タイプ: SQL

結果テーブルでのデータの更新と削除: サポートされています

前提条件

開始する前に、以下が準備できていることを確認してください。

制限事項

  • Ververica Runtime (VVR) 2.0.0 以降を使用した Realtime Compute for Apache Flink が必要です。最高のパフォーマンスと安定性を得るには、VVR 6.X 以降を使用してください。

  • ApsaraDB RDS for MySQL データベースのみがサポートされています。

  • コネクタは at-least-once セマンティクスを使用します。結果テーブルにプライマリキーがある場合、べき等性によりデータの正確性が保証されます。

仕組み

結果テーブルの書き込み動作

各出力行は、結果テーブルに書き込まれる前に SQL ステートメントに変換されます。

  • プライマリキーなしINSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...); を実行します。

  • プライマリキーありINSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...) ON DUPLICATE KEY UPDATE col1 = VALUES(col1), col2 = VALUES(col2), ...; を実行します。

一意なインデックスの競合: 物理テーブルにプライマリキーに加えて一意なインデックス制約がある場合、異なるプライマリキーで同じ一意なインデックス値を持つ 2 つの行を挿入すると、先の行が上書きされ、データ損失が発生します。

自動インクリメントのプライマリキー: Flink DDL で自動インクリメントフィールドを宣言しないでください。データベースがこれらの値を自動的に割り当てます。コネクタは自動インクリメントフィールドを持つ行の書き込みと削除はできますが、更新はできません。

ディメンションテーブルのキャッシュポリシー

コネクタは、ディメンションテーブルのルックアップに対して 3 つのキャッシュポリシーをサポートしています。

ポリシー 動作 使用するケース
NONE キャッシュなし — すべてのルックアップでデータベースに直接クエリを実行します 低レイテンシー要件、小規模なデータセット
LRU タスクマネージャーごとに、最近使用された固定数の行をキャッシュします 大規模なテーブルで頻繁にアクセスされるサブセット
ALL テーブル全体をメモリにロードし、定期的に再読み込みします 小規模で静的なリファレンス テーブル

構文

結果テーブル

CREATE TABLE rds_sink (
  id  INT,
  num BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'  = 'rds',
  'tableName'  = '<your-table-name>',
  'userName'   = '<your-user-name>',
  'password'   = '<your-password>',
  'url'        = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
説明

結果テーブルの書き込みスループットを向上させるには、url の値に ?rewriteBatchedStatements=true を追加してください。

ディメンションテーブル

CREATE TABLE rds_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>',
  'cache'     = 'NONE'
);

WITH 句のパラメーター

共通パラメーター

パラメーター 必須 デフォルト値 説明
connector STRING はい 次のように設定します:rds
tableName STRING はい ApsaraDB RDS for MySQL の物理テーブルの名前
userName STRING はい データベースのユーザー名
password STRING はい データベースのパスワード
url STRING はい データベースの Virtual Private Cloud (VPC) エンドポイント。フォーマットは jdbc:mysql://<internal-endpoint>:<port>/<database-name> です。結果テーブルの場合、?rewriteBatchedStatements=true を追加します。エンドポイントの詳細については、「ApsaraDB RDS for MySQL インスタンスの内部およびパブリックエンドポイントとポート番号の表示と変更
maxRetryTimes INTEGER いいえ 10 (VVR 4.0.7 以降)、3 (VVR 4.0.6 以前) 失敗したディメンションテーブルのルックアップまたは結果テーブルへの書き込みの最大リトライ回数

シンクテーブル パラメーター

パラメーター 必須 デフォルト値 説明
batchSize INTEGER いいえ 4096 (VVR 4.0.7 以降)、5000 (VVR 4.0.0–4.0.6)、100 (VVR 3.x 以前) バッチごとに書き込まれる行数
bufferSize INTEGER いいえ 10000 書き込みがトリガーされる前にメモリにキャッシュされる最大行数。VVR 4.0.7 以降でサポートされています。プライマリキーが定義されている場合にのみ有効です
flushIntervalMs INTEGER いいえ 2000 (VVR 4.0.7 以降)、0 (VVR 4.0.0–4.0.6)、1000 (VVR 3.x 以前) batchSize または bufferSize のしきい値に達したかどうかに関わらず、バッファーが結果テーブルにフラッシュされる間隔 (ミリ秒単位)。0 (VVR 4.0.0–4.0.6 のデフォルト) に設定すると、少量のバッファーデータが書き込まれない可能性があるため、これを避けるには新しい VVR バージョンにアップグレードしてください
ignoreDelete BOOLEAN いいえ false 削除操作をスキップするには true に設定します。複数の演算子が同じ行の異なるフィールドを更新する場合に便利です。この設定がないと、ある演算子での削除に続いて別の演算子で部分的な更新が行われると、更新されなかったフィールドが null またはそのデフォルト値になります
connectionMaxActive INTEGER いいえ 40 接続プールのサイズ。VVR 4.0.7 以降でサポートされています。接続プールのタイムアウトが発生する場合はこの値を増やし、データベースが同時接続数を制限している場合は減らしてください

ディメンションテーブルのパラメーター

パラメーター 必須 デフォルト値 説明
cache STRING いいえ NONE (VVR 4.0.6 未満)、ALL (VVR 4.0.6 以降) キャッシュポリシー。有効な値: NONELRUALL。「キャッシュポリシー
cacheSize INTEGER いいえ 100000 キャッシュする行の最大数。cacheLRU に設定されている場合は必須です。NONE および ALL
cacheTTLMs LONG いいえ NONELRU では有効期限なし。ALL キャッシュの TTL (Time-to-Live) (ミリ秒単位)。LRU の場合、この期間が過ぎると行は期限切れになります。ALL の場合、この間隔でキャッシュ全体が再読み込みされます
maxJoinRows INTEGER いいえ 1024 入力行ごとに一致するディメンションテーブルの最大行数。不要なスキャンを避けるために、プライマリテーブルの行ごとに予想されるディメンション行の最大数に設定してください

メトリック

結果テーブルは以下のメトリックを公開します。ディメンションテーブルにはメトリックはありません。

メトリック 説明
numRecordsOut 書き込まれた合計行数
numRecordsOutPerSecond 1 秒あたりに書き込まれた行数
numBytesOut 書き込まれた合計バイト数
numBytesOutPerSecond 1 秒あたりに書き込まれたバイト数
currentSendTime 現在の書き込みレイテンシー
numRecordsOutErrors 書き込みエラーの合計数

メトリックの定義については、「メトリック」をご参照ください。

データ型のマッピング

Flink 型 ApsaraDB RDS for MySQL 型
BOOLEAN BOOLEAN
TINYINT TINYINT
TINYINT(1) (ディメンションテーブルのみ) BOOLEAN
SMALLINT SMALLINT
SMALLINT TINYINT UNSIGNED
INT INT
INT SMALLINT UNSIGNED
BIGINT BIGINT
BIGINT INT UNSIGNED
DECIMAL(20, 0) BIGINT UNSIGNED
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

結果テーブルの例

次の例では、DataGen ソースから読み取り、ApsaraDB RDS for MySQL の結果テーブルに書き込みます。

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_sink (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);

INSERT INTO rds_sink
SELECT * FROM datagen_source;

ディメンションテーブルの例

次の例では、テンポラル結合を使用して、ストリームを ApsaraDB RDS for MySQL のディメンションテーブルに対して結合します。

CREATE TEMPORARY TABLE datagen_source (
  a          INT,
  b          BIGINT,
  c          STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;

よくある質問

次のステップ