すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ApsaraDB RDS for MySQLコネクタ

最終更新日:Jan 07, 2025

このトピックでは、ApsaraDB RDS for MySQLコネクタの使用方法について説明します。

ApsaraDB RDS for MySQLは、MySQLのブランチに基づいて開発されており、優れたパフォーマンスを提供します。 ApsaraDB RDS for MySQLは、ダブル11の高ボリュームな同時トラフィックを処理した実績のあるソリューションです。 ApsaraDB RDS for MySQLは、ホワイトリスト設定、バックアップとリストア、透過的データ暗号化(TDE)、データ移行、インスタンス、アカウント、データベースの管理などの基本機能を提供します。 ApsaraDB RDS for MySQLの詳細については、ApsaraDB RDS for MySQLデータベースをご参照ください。

重要

ApsaraDB RDS for MySQLコネクタは、今後サポートされなくなります。 ApsaraDB RDS for MySQLコネクタの代わりにMySQLコネクタを使用することをお勧めします。 MySQLコネクタの使用方法の詳細については、MySQLコネクタをご参照ください。

次の表に、ApsaraDB RDS for MySQLコネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

シンクテーブルとディメンションテーブル

実行モード

バッチモードとストリーミングモード

データ形式

該当なし

メトリック

  • ディメンションテーブルのメトリック: なし

  • シンクテーブルのメトリック:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

    • numRecordsOutErrors

説明

メトリックの詳細については、メトリックをご参照ください。

APIタイプ

SQL

シンクテーブルでのデータの更新または削除

サポートされています

前提条件

制限事項

  • Ververica Runtime(VVR) 2.0.0以降を使用するRealtime Compute for Apache Flinkのみが、ApsaraDB RDS for MySQLコネクタをサポートしています。

  • ApsaraDB RDS for MySQLコネクタは、ApsaraDB RDS for MySQLデータベースのみをサポートしています。

  • 少なくとも1回のセマンティクスを使用できます。 ApsaraDB RDS for MySQLシンクテーブルにプライマリキーが含まれている場合、冪等性を使用してデータの正確性を確保できます。

  • 高パフォーマンスと安定性を確保するために、Realtime Compute for Apache Flinkの最新バージョンを使用することをお勧めします。 たとえば、VVR 6.X以降を使用するRealtime Compute for Apache Flinkを使用できます。

注意事項

ApsaraDB RDS for MySQLコネクタは、今後廃止される予定です。 MySQLコネクタがビジネス要件を満たせる場合は、MySQLコネクタを使用することをお勧めします。 詳細については、MySQLコネクタをご参照ください。

構文

  • ApsaraDB RDS for MySQLシンクテーブルを作成するためのステートメント

    CREATE TABLE rds_sink(
      id INT,
      num BIGINT,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',  // テーブル名
      'userName'='your-user-name', // ユーザー名
      'password'='your-password', // パスワード
      'url'='your-url' // URL
    );
    説明
    • ApsaraDB RDS for MySQLコネクタは、出力データの各行をSQLステートメントに変換し、そのステートメントを実行してシンクテーブルにデータを書き込みます。 シンクテーブルにプライマリキーが含まれていない場合、ApsaraDB RDS for MySQLコネクタはINSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);ステートメントを実行します。 シンクテーブルにプライマリキーが含まれている場合、ApsaraDB RDS for MySQLコネクタはINSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;ステートメントを実行します。 物理テーブルにプライマリキー制約に加えて一意インデックス制約が含まれており、プライマリキーは異なるが同じ一意インデックスを持つ2つのレコードが物理テーブルに挿入されると、一意インデックスの競合によりダウンストリームデータが上書きされます。 これにより、データが失われます。

    • ApsaraDB RDS for MySQLデータベースで自動インクリメントプライマリキーが指定されている場合、Flink DDLステートメントで自動インクリメントフィールドを宣言することはできません。 データの書き込み中に、データベースは自動的に自動インクリメントフィールドを設定します。 ApsaraDB RDS for MySQLコネクタは、自動インクリメントフィールドを含むデータの書き込みまたは削除にのみ使用でき、データの更新には使用できません。

  • ApsaraDB RDS for MySQLディメンションテーブルを作成するためのステートメント

    CREATE TABLE rds_dim(
     id1 INT,
     id2 VARCHAR
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name', // テーブル名
      'userName'='your-user-name', // ユーザー名
      'password'='your-password', // パスワード
      'url'='your-url', // URL
      'cache'='NONE' // キャッシュ
    );

WITH句のパラメータ

  • 共通パラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    STRING

    はい

    デフォルト値なし

    値をrdsに設定します。

    tableName

    メタテーブルの名前。

    STRING

    はい

    デフォルト値なし

    該当なし。

    userName

    データベースへのアクセスに使用するユーザー名。

    STRING

    はい

    デフォルト値なし

    該当なし。

    password

    データベースへのアクセスに使用するパスワード。

    STRING

    はい

    デフォルト値なし

    該当なし。

    url

    テーブルへのアクセスに使用するURL。

    STRING

    はい

    デフォルト値なし

    ApsaraDB RDS for MySQLデータベースの仮想プライベートクラウド(VPC)エンドポイント。 値は内部エンドポイントです。 詳細については、ApsaraDB RDS for MySQLインスタンスの内部エンドポイントとパブリックエンドポイント、およびポート番号を表示および変更するをご参照ください。

    URLはjdbc:mysql://<内部エンドポイント>:<ポート番号>/<データベース名>形式です。

    説明

    シンクテーブルを作成する場合は、システムパフォーマンスを向上させるために、URLの末尾に?rewriteBatchedStatements=trueを追加する必要があります。

    maxRetryTimes

    ディメンションテーブルでデータのクエリに失敗した場合、またはシンクテーブルにデータを書き込むのに失敗した場合に実行できる再試行の最大回数。

    INTEGER

    いいえ

    • VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は10です。

    • VVR 4.0.6以前を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は3です。

    該当なし。

  • シンクテーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    batchSize

    一度に書き込むことができるデータレコードの数。

    INTEGER

    いいえ

    • VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は4096です。

    • VVR 4.0.0~4.0.6を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は5000です。

    • VVR 3.X以前を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値は100です。

    該当なし。

    bufferSize

    メモリにキャッシュできるデータレコードの最大数。 batchSizeまたはbufferSizeパラメータで指定されたしきい値に達すると、書き込み操作がトリガーされます。

    INTEGER

    いいえ

    10000

    • VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkのみがこのパラメータをサポートしています。

    • このパラメータは、プライマリキーを指定した後にのみ有効になります。

    flushIntervalMs

    メモリバッファをフラッシュする間隔。 指定された期間内にキャッシュされたデータレコードの数がbatchSizeまたはbufferSizeパラメータで指定された上限に達しない場合、システムはキャッシュされたすべてのデータをシンクテーブルに自動的に書き込みます。

    INTEGER

    いいえ

    • VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkの場合、デフォルト値は2000です。

    • VVR 4.0.0~4.0.6を使用するRealtime Compute for Apache Flinkの場合、デフォルト値は0です。

    • VVR 3.X以前を使用するRealtime Compute for Apache Flinkの場合、デフォルト値は1000です。

    このパラメータのデフォルト値が0であるバージョンでこのパラメータを設定しないと、少量のデータがシンクテーブルに書き込まれない場合があります。 この問題を解決するには、Realtime Compute for Apache Flinkの新しいバージョンを使用することをお勧めします。

    ignoreDelete

    削除操作を無視するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    Flink SQLを使用すると、削除操作が発生する場合があります。 複数の出力演算子がプライマリキーに基づいて同じシンクテーブルの異なるフィールドを更新する場合、データ結果が正しくない可能性があります。

    たとえば、あるタスクでデータレコードが削除され、別のタスクでデータレコードの一部のフィールドのみが更新されるとします。 この場合、更新されていないフィールドの値は、フィールドが削除されているため、nullまたはデフォルト値になります。 削除操作を回避するには、ignoreDeleteパラメータをtrueに設定します。

    connectionMaxActive

    データベース接続プールのサイズ。

    INTEGER

    いいえ

    40

    • VVR 4.0.7以降を使用するRealtime Compute for Apache Flinkのみがこのパラメータをサポートしています。

    • データベース接続プールへのアクセスがタイムアウトした場合、プール内のデータベース接続の数が不足している可能性があります。 データベース接続プールのサイズを増やすことができます。

    • データベースでサポートされている並列接続の最大数が少ない場合は、接続プールのサイズを小さくするか、演算子の並列度を下げることができます。

  • ディメンションテーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    cache

    ディメンションテーブルのキャッシュポリシー。

    STRING

    いいえ

    • VVR 4.0.6より前のバージョンを使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値はNONEです。

    • VVR 4.0.6以降を使用するRealtime Compute for Apache Flinkの場合、このパラメータのデフォルト値はALLです。

    ApsaraDB RDS for MySQLコネクタは、ディメンションテーブルに対してNone、LRU、ALLのキャッシュポリシーをサポートしています。 キャッシュポリシーの詳細については、背景情報をご参照ください。

    cacheSize

    キャッシュできるデータレコードの行の最大数。

    INTEGER

    いいえ

    100000

    • cacheパラメータをLRUに設定する場合は、cacheSizeパラメータを設定する必要があります。

    • cacheパラメータをNONEまたはALLに設定する場合は、cacheSizeパラメータを設定する必要はありません。

    cacheTTLMs

    キャッシュのタイムアウト期間。

    LONG

    いいえ

    • cacheパラメータをNONEに設定する場合は、cacheTTLMsパラメータを設定する必要はありません。 これは、キャッシュエントリが期限切れにならないことを示します。

    • cacheパラメータをLRUに設定する場合は、cacheTTLMsパラメータでキャッシュのタイムアウト期間を指定します。 デフォルトでは、キャッシュエントリは期限切れになりません。

    • cacheパラメータをALLに設定する場合は、cacheTTLMsパラメータでシステムがキャッシュをリロードする間隔を指定します。 デフォルトでは、キャッシュはリロードされません。

    単位: ミリ秒。

    maxJoinRows

    プライマリテーブルの各データレコードがディメンションテーブルのデータにマッピングされた後に返される結果の最大数。

    INTEGER

    いいえ

    1024

    プライマリテーブルとディメンションテーブルを結合する場合、プライマリテーブルの入力データレコードがディメンションテーブルのデータレコードにマッピングされた後に返される結果の数は、このパラメータによって制限されます。

    プライマリテーブルのデータレコードがディメンションテーブルの最大n個のデータレコードに対応すると推定できる場合は、maxJoinRowsパラメータをnに設定して、Realtime Compute for Apache Flinkの効率的なマッチングを確保できます。

データ型マッピング

Flinkのデータ型

ApsaraDB RDS for MySQLのデータ型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

TINYINT(1)

説明

ディメンションテーブルのみがこのマッピングをサポートしています。

BOOLEAN

SMALLINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

INT

INT

INT

SMALLINT UNSIGNED

BIGINT

BIGINT

BIGINT

INT UNSIGNED

DECIMAL(20,0)

BIGINT UNSIGNED

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

VARBINARY

VARBINARY

サンプルコード

  • シンクテーブルのサンプルコード

    CREATE TEMPORARY TABLE datagen_source(
     `name` VARCHAR,
     `age` INT
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE rds_sink(
     `name` VARCHAR,
     `age` INT
    ) WITH (
      'connector'='rds',
      'password'='your-password', // パスワード
      'tableName'='your-tablename', // テーブル名
      'url'='your-url', // URL
      'userName'='your-username' // ユーザー名
    );
    
    INSERT INTO rds_sink
    SELECT * FROM datagen_source;
  • ディメンションテーブルのサンプルコード

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE rds_dim(
     a INT,
     b VARCHAR,
     c VARCHAR
    ) WITH (
     'connector'='rds',
     'password'='<yourPassword>', // パスワード
     'tableName'='<yourTablename>', // テーブル名
     'url'='jdbc:mysql://xxx', // URL
     'userName'='<yourUsername>' // ユーザー名
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    ) WITH (
     'connector'='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;

FAQ