このトピックでは、ApsaraDB RDS for MySQLコネクタの使用方法について説明します。
ApsaraDB RDS for MySQLは、MySQLのブランチに基づいて開発されており、優れたパフォーマンスを提供します。 ApsaraDB RDS for MySQLは、ダブル11の高ボリュームな同時トラフィックを処理した実績のあるソリューションです。 ApsaraDB RDS for MySQLは、ホワイトリスト設定、バックアップとリストア、透過的データ暗号化(TDE)、データ移行、インスタンス、アカウント、データベースの管理などの基本機能を提供します。 ApsaraDB RDS for MySQLの詳細については、ApsaraDB RDS for MySQLデータベースをご参照ください。
ApsaraDB RDS for MySQLコネクタは、今後サポートされなくなります。 ApsaraDB RDS for MySQLコネクタの代わりにMySQLコネクタを使用することをお勧めします。 MySQLコネクタの使用方法の詳細については、MySQLコネクタをご参照ください。
次の表に、ApsaraDB RDS for MySQLコネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | シンクテーブルとディメンションテーブル |
実行モード | バッチモードとストリーミングモード |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、メトリックをご参照ください。 |
APIタイプ | SQL |
シンクテーブルでのデータの更新または削除 | サポートされています |
前提条件
ApsaraDB RDS for MySQLデータベースとApsaraDB RDS for MySQLテーブルが作成されていること。 詳細については、ApsaraDB RDS for MySQLのデータベースとアカウントを作成するをご参照ください。
ApsaraDB RDS for MySQLデータベースのIPアドレスホワイトリストが設定されていること。 詳細については、データベースクライアントまたはCLIを使用してApsaraDB RDS for MySQLインスタンスに接続するをご参照ください。
制限事項
Ververica Runtime(VVR) 2.0.0以降を使用するRealtime Compute for Apache Flinkのみが、ApsaraDB RDS for MySQLコネクタをサポートしています。
ApsaraDB RDS for MySQLコネクタは、ApsaraDB RDS for MySQLデータベースのみをサポートしています。
少なくとも1回のセマンティクスを使用できます。 ApsaraDB RDS for MySQLシンクテーブルにプライマリキーが含まれている場合、冪等性を使用してデータの正確性を確保できます。
高パフォーマンスと安定性を確保するために、Realtime Compute for Apache Flinkの最新バージョンを使用することをお勧めします。 たとえば、VVR 6.X以降を使用するRealtime Compute for Apache Flinkを使用できます。
注意事項
ApsaraDB RDS for MySQLコネクタは、今後廃止される予定です。 MySQLコネクタがビジネス要件を満たせる場合は、MySQLコネクタを使用することをお勧めします。 詳細については、MySQLコネクタをご参照ください。
構文
ApsaraDB RDS for MySQLシンクテーブルを作成するためのステートメント
CREATE TABLE rds_sink( id INT, num BIGINT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='rds', 'tableName'='your-table-name', // テーブル名 'userName'='your-user-name', // ユーザー名 'password'='your-password', // パスワード 'url'='your-url' // URL );
説明ApsaraDB RDS for MySQLコネクタは、出力データの各行をSQLステートメントに変換し、そのステートメントを実行してシンクテーブルにデータを書き込みます。 シンクテーブルにプライマリキーが含まれていない場合、ApsaraDB RDS for MySQLコネクタは
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);
ステートメントを実行します。 シンクテーブルにプライマリキーが含まれている場合、ApsaraDB RDS for MySQLコネクタはINSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
ステートメントを実行します。 物理テーブルにプライマリキー制約に加えて一意インデックス制約が含まれており、プライマリキーは異なるが同じ一意インデックスを持つ2つのレコードが物理テーブルに挿入されると、一意インデックスの競合によりダウンストリームデータが上書きされます。 これにより、データが失われます。ApsaraDB RDS for MySQLデータベースで自動インクリメントプライマリキーが指定されている場合、Flink DDLステートメントで自動インクリメントフィールドを宣言することはできません。 データの書き込み中に、データベースは自動的に自動インクリメントフィールドを設定します。 ApsaraDB RDS for MySQLコネクタは、自動インクリメントフィールドを含むデータの書き込みまたは削除にのみ使用でき、データの更新には使用できません。
ApsaraDB RDS for MySQLディメンションテーブルを作成するためのステートメント
CREATE TABLE rds_dim( id1 INT, id2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', // テーブル名 'userName'='your-user-name', // ユーザー名 'password'='your-password', // パスワード 'url'='your-url', // URL 'cache'='NONE' // キャッシュ );
WITH句のパラメータ
共通パラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
STRING
はい
デフォルト値なし
値をrdsに設定します。
tableName
メタテーブルの名前。
STRING
はい
デフォルト値なし
該当なし。
userName
データベースへのアクセスに使用するユーザー名。
STRING
はい
デフォルト値なし
該当なし。
password
データベースへのアクセスに使用するパスワード。
STRING
はい
デフォルト値なし
該当なし。
url
テーブルへのアクセスに使用するURL。
STRING
はい
デフォルト値なし
ApsaraDB RDS for MySQLデータベースの仮想プライベートクラウド(VPC)エンドポイント。 値は内部エンドポイントです。 詳細については、ApsaraDB RDS for MySQLインスタンスの内部エンドポイントとパブリックエンドポイント、およびポート番号を表示および変更するをご参照ください。
URLは
jdbc:mysql://<内部エンドポイント>:<ポート番号>/<データベース名>
形式です。説明シンクテーブルを作成する場合は、システムパフォーマンスを向上させるために、URLの末尾に?rewriteBatchedStatements=trueを追加する必要があります。
maxRetryTimes
ディメンションテーブルでデータのクエリに失敗した場合、またはシンクテーブルにデータを書き込むのに失敗した場合に実行できる再試行の最大回数。
INTEGER
いいえ
VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は10です。
VVR 4.0.6以前を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は3です。
該当なし。
シンクテーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
batchSize
一度に書き込むことができるデータレコードの数。
INTEGER
いいえ
VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は4096です。
VVR 4.0.0~4.0.6を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は5000です。
VVR 3.X以前を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は100です。
該当なし。
bufferSize
メモリにキャッシュできるデータレコードの最大数。 batchSizeまたはbufferSizeパラメータで指定されたしきい値に達すると、書き込み操作がトリガーされます。
INTEGER
いいえ
10000
VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkのみがこのパラメータをサポートしています。
このパラメータは、プライマリキーを指定した後にのみ有効になります。
flushIntervalMs
メモリバッファをフラッシュする間隔。 指定された期間内にキャッシュされたデータレコードの数がbatchSizeまたはbufferSizeパラメータで指定された上限に達しない場合、システムはキャッシュされたすべてのデータをシンクテーブルに自動的に書き込みます。
INTEGER
いいえ
VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkの場合、デフォルト値は2000です。
VVR 4.0.0~4.0.6を使用するRealtime Compute for Apache Flinkの場合、デフォルト値は0です。
VVR 3.X以前を使用するRealtime Compute for Apache Flinkの場合、デフォルト値は1000です。
このパラメータのデフォルト値が0であるバージョンでこのパラメータを設定しないと、少量のデータがシンクテーブルに書き込まれない場合があります。 この問題を解決するには、Realtime Compute for Apache Flinkの新しいバージョンを使用することをお勧めします。
ignoreDelete
削除操作を無視するかどうかを指定します。
BOOLEAN
いいえ
false
Flink SQLを使用すると、削除操作が発生する場合があります。 複数の出力演算子がプライマリキーに基づいて同じシンクテーブルの異なるフィールドを更新する場合、データ結果が正しくない可能性があります。
たとえば、あるタスクでデータレコードが削除され、別のタスクでデータレコードの一部のフィールドのみが更新されるとします。 この場合、更新されていないフィールドの値は、フィールドが削除されているため、nullまたはデフォルト値になります。 削除操作を回避するには、ignoreDeleteパラメータをtrueに設定します。
connectionMaxActive
データベース接続プールのサイズ。
INTEGER
いいえ
40
VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkのみがこのパラメータをサポートしています。
データベース接続プールへのアクセスがタイムアウトした場合、プール内のデータベース接続の数が不足している可能性があります。 データベース接続プールのサイズを増やすことができます。
データベースでサポートされている並列接続の最大数が少ない場合は、接続プールのサイズを小さくするか、演算子の並列度を下げることができます。
ディメンションテーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
cache
ディメンションテーブルのキャッシュポリシー。
STRING
いいえ
VVR 4.0.6より前のバージョンを使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値はNONEです。
VVR 4.0.6以降を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値はALLです。
ApsaraDB RDS for MySQLコネクタは、ディメンションテーブルに対してNone、LRU、ALLのキャッシュポリシーをサポートしています。 キャッシュポリシーの詳細については、背景情報をご参照ください。
cacheSize
キャッシュできるデータレコードの行の最大数。
INTEGER
いいえ
100000
cacheパラメータをLRUに設定する場合は、cacheSizeパラメータを設定する必要があります。
cacheパラメータをNONEまたはALLに設定する場合は、cacheSizeパラメータを設定する必要はありません。
cacheTTLMs
キャッシュのタイムアウト期間。
LONG
いいえ
cacheパラメータをNONEに設定する場合は、cacheTTLMsパラメータを設定する必要はありません。 これは、キャッシュエントリが期限切れにならないことを示します。
cacheパラメータをLRUに設定する場合は、cacheTTLMsパラメータでキャッシュのタイムアウト期間を指定します。 デフォルトでは、キャッシュエントリは期限切れになりません。
cacheパラメータをALLに設定する場合は、cacheTTLMsパラメータでシステムがキャッシュをリロードする間隔を指定します。 デフォルトでは、キャッシュはリロードされません。
単位: ミリ秒。
maxJoinRows
プライマリテーブルの各データレコードがディメンションテーブルのデータにマッピングされた後に返される結果の最大数。
INTEGER
いいえ
1024
プライマリテーブルとディメンションテーブルを結合する場合、プライマリテーブルの入力データレコードがディメンションテーブルのデータレコードにマッピングされた後に返される結果の数は、このパラメータによって制限されます。
プライマリテーブルのデータレコードがディメンションテーブルの最大n個のデータレコードに対応すると推定できる場合は、
maxJoinRows
パラメータをnに設定して、Realtime Compute for Apache Flinkの効率的なマッチングを確保できます。
データ型マッピング
Flinkのデータ型 | ApsaraDB RDS for MySQLのデータ型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
TINYINT(1) 説明 ディメンションテーブルのみがこのマッピングをサポートしています。 | BOOLEAN |
SMALLINT | SMALLINT |
SMALLINT | TINYINT UNSIGNED |
INT | INT |
INT | SMALLINT UNSIGNED |
BIGINT | BIGINT |
BIGINT | INT UNSIGNED |
DECIMAL(20,0) | BIGINT UNSIGNED |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
サンプルコード
シンクテーブルのサンプルコード
CREATE TEMPORARY TABLE datagen_source( `name` VARCHAR, `age` INT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_sink( `name` VARCHAR, `age` INT ) WITH ( 'connector'='rds', 'password'='your-password', // パスワード 'tableName'='your-tablename', // テーブル名 'url'='your-url', // URL 'userName'='your-username' // ユーザー名 ); INSERT INTO rds_sink SELECT * FROM datagen_source;
ディメンションテーブルのサンプルコード
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_dim( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='rds', 'password'='<yourPassword>', // パスワード 'tableName'='<yourTablename>', // テーブル名 'url'='jdbc:mysql://xxx', // URL 'userName'='<yourUsername>' // ユーザー名 ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector'='blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;