このトピックでは、AnalyticDB for MySQL 3.0 コネクタの使用方法について説明します。
背景情報
AnalyticDB for MySQL 3.0 は、データベースとビッグデータ技術を統合した、クラウドネイティブのエンタープライズクラスのデータウェアハウジングサービスです。高スループット、リアルタイムのデータ書き込み、更新、削除、低レイテンシーのリアルタイムデータ分析、および複雑な抽出、変換、ロード (ETL) 操作をサポートします。AnalyticDB for MySQL は、アップストリームおよびダウンストリームのエコシステムツールと互換性があり、エンタープライズクラスのレポートシステム、データウェアハウス、およびデータサービスエンジンを構築するために使用できます。
次の表に、AnalyticDB for MySQL 3.0 コネクタがサポートする機能を示します。
項目 | 説明 |
サポートされているタイプ | ソーステーブル、ディメンションテーブル、および結果テーブル 説明 Ververica Runtime (VVR) 8.0.4 以降を使用する Realtime Compute for Apache Flink のみがソーステーブルをサポートします。ソーステーブルのパラメーターと構成の詳細については、「Flink を使用してバイナリログをサブスクライブする」をご参照ください。ディメンションテーブルと結果テーブルのパラメーターの詳細については、「WITH 句のパラメーター」をご参照ください。 |
実行モード | ストリーミングモードとバッチモード |
データフォーマット | 該当なし |
固有のメトリック | 該当なし |
API タイプ | SQL |
結果テーブルでのデータの更新または削除 | はい |
前提条件
AnalyticDB for MySQL クラスターとテーブルが作成されていること。詳細については、「クラスターの作成」および「CREATE TABLE」をご参照ください。
AnalyticDB for MySQL クラスターにホワイトリストが設定されていること。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。
構文
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);Flink データ定義言語 (DDL) で定義されたプライマリキーは、列名を含め、AnalyticDB for MySQL データベースの物理テーブルのプライマリキーと一致する必要があります。不一致があると、データの正確性に影響を与える可能性があります。
WITH パラメーター
一般パラメーター
パラメーター
説明
データの型
必須
デフォルト値
備考
connector
結果テーブルのタイプ。
String
はい
デフォルト値なし
値を adb3.0 に設定します。
url
データベースの Java Database Connectivity (JDBC) URL。
String
はい
デフォルト値なし
AnalyticDB for MySQL データベースの JDBC 接続 URL。URL は jdbc:mysql://<endpoint>:<port>/<databaseName> フォーマットです。
endpoint と port: [AnalyticDB for MySQL コンソール] にログインします。対応するクラスターの名前をクリックします。[クラスター情報] ページで、[ネットワーク情報] セクションからエンドポイントとポートを取得します。
databaseName: AnalyticDB for MySQL データベースの名前。
userName
データベースにアクセスするためのユーザー名。
String
はい
デフォルト値なし
該当なし。
password
データベースのパスワード。
String
はい
デフォルト値なし
該当なし。
tableName
データベース内のテーブルの名前。
String
はい
デフォルト値なし
該当なし。
maxRetryTimes
データの書き込みまたは読み取りが失敗した場合の最大再試行回数。
Integer
いいえ
10
該当なし。
結果テーブル固有のパラメーター
パラメーター
注意
データの型
必須
デフォルト値
備考
batchSize
バッチ書き込みあたりのレコード数。
Integer
いいえ
1000
このパラメーターは、プライマリキーを指定した後にのみ有効になります。
bufferSize
メモリにキャッシュするデータレコードの最大数。書き込みは、batchSize または bufferSize のいずれかのしきい値に達したときにトリガーされます。
Integer
いいえ
1000
このパラメーターは、プライマリキーを指定した後にのみ有効になります。
flushIntervalMs
キャッシュがクリアされる間隔。この期間が経過してもキャッシュ内のデータが出力条件を満たさない場合、システムはキャッシュされたすべてのデータを自動的に出力します。
Integer
いいえ
3000
単位: ミリ秒。
ignoreDelete
削除操作を無視するかどうかを指定します。
Boolean
いいえ
false
有効な値:
true: 削除操作を無視します。
false: 削除操作を受け入れます。
replaceMode
DDL 文でプライマリキーが定義されている場合にデータを挿入するパターン。
VVR 11.2 以降の場合、タイプは文字列です。
11.2 より前の VVR の場合、タイプはブール値です。
いいえ
Ververica Runtime (VVR) 11.2 以降の場合、デフォルト値は replace です。
11.2 より前の VVR の場合、デフォルト値は true です。
Ververica Runtime (VVR) 11.2 以降では、次の値を使用できます:
replace:
replace into構文を使用してデータを書き込みます。プライマリキーが重複している場合、新しいデータ行が既存の行を置き換えます。upsert:
insert into on duplicate key update構文を使用してデータを書き込みます。この構文は、プライマリキーが存在しない場合は新しい行を挿入し、プライマリキーが存在する場合は既存の行を更新します。たとえば、AnalyticDB for MySQL テーブルに a (プライマリキー)、b、c、d の 4 つのフィールドがあり、結果テーブルが a と b のフィールドのデータのみを提供する場合、重複したプライマリキーが見つかったときに更新されるのは b フィールドのみです。c と d フィールドは変更されません。insert:
insert ignore into構文を使用してデータを書き込みます。プライマリキーが重複している場合、最初のデータ入力が保持され、後続の入力は無視されます。
VVR 11.2 より前のバージョンでは、ブール値のみがサポートされます:
true: replace 値と同じです。
false: upsert 値と同じです。
注意: VVR 11.2 以降は、以前のバージョンの true および false 値と互換性があります。
説明このパラメーターは、AnalyticDB for MySQL 3.1.3.5 以降でのみサポートされます。
このパラメーターは、結果テーブルの DDL でプライマリキーが定義されている場合にのみ有効になります。結果テーブルの DDL でプライマリキーが定義されていない場合、データは常に
insert ignore into構文を使用して挿入されます。
excludeUpdateColumns
同じプライマリキーを持つデータを更新するときにスキップするフィールド。
String
いいえ
空の文字列
更新で複数のフィールドを無視するには、フィールド名をコンマ (,) で区切ります。例:
excludeUpdateColumns='column1,column2'。たとえば、結果テーブルに a (プライマリキー)、b、c、d の 4 つのフィールドがあるとします。`excludeUpdateColumns` を `'c,d'` に設定します。プライマリキーが一意の場合、システムは a、b、c、d の 4 つのフィールドすべての値を挿入します。プライマリキーが重複している場合、システムは b フィールドのみを更新します。c と d フィールドの値は変更されません。
説明このパラメーターは、`replaceMode` が `'upsert'` または `'false'` に設定されている場合にのみ有効になります。
無視するフィールドの名前を 1 行に記述します。改行は使用しないでください。
connectionMaxActive
スレッドプールの最大サイズ。
Integer
いいえ
40
該当なし。
ディメンションテーブルのみ
パラメーター
説明
データの型
必須
デフォルト値
注意
cache
キャッシュポリシー。
String
いいえ
ALL
AnalyticDB for MySQL 3.0 ディメンションテーブルは、次の 3 つのキャッシュポリシーをサポートしています:
None: キャッシュなし。
LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュを検索します。レコードが見つからない場合、システムは物理ディメンションテーブルを検索します。
ALL (デフォルト): ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。その後、ディメンションテーブルのすべてのルックアップはキャッシュを使用して実行されます。キーがキャッシュに見つからない場合、そのキーは存在しないと見なされます。キャッシュの有効期限が切れると、システムはキャッシュ全体を再読み込みします。
これは、リモートテーブルが小さく、ソーステーブルとディメンションテーブルの間の結合で多くのキーミスが発生するシナリオに適しています。
説明キャッシュポリシーを ALL に設定した場合は、ノードのメモリサイズを監視して、メモリ不足 (OOM) エラーを防ぎます。
システムはディメンションテーブルのデータを非同期にロードするため、ALL キャッシュポリシーを使用する場合は、ディメンションテーブル結合ノードのメモリを増やしてください。メモリサイズは、リモートテーブルのデータサイズの 2 倍である必要があります。
cacheSize
キャッシュできるデータレコードの最大数。
Integer
いいえ
100000
cacheSize パラメーターは LRU キャッシュに関連しています。キャッシュが LRU に設定されている場合は、cacheSize パラメーターを設定する必要があります。
cacheTTLMs
キャッシュのタイムアウト (ミリ秒単位)。
Integer
いいえ
Long.MAX_VALUE
cacheTTLMs パラメーターは、cache パラメーターが LRU または ALL に設定されている場合にのみ適用されます。
cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間を指定します。デフォルト値は
Long.MAX_VALUEで、キャッシュエントリが期限切れにならないことを意味します。cache が ALL に設定されている場合、cacheTTLMs は物理テーブルからデータを再読み込みする間隔を指定します。デフォルト値は
Long.MAX_VALUEで、データが再読み込みされないことを意味します。
説明cache が None に設定されている場合は、cacheTTLMs を設定しないでください。この設定はキャッシュを無効にするため、キャッシュのタイムアウトは必要ありません。
maxJoinRows
プライマリテーブルの各レコードに対するディメンションテーブルからの最大一致数。
Integer
いいえ
1024
プライマリテーブルのレコードがディメンションテーブルの最大 n 個のレコードに一致する場合、効率的なリアルタイムコンピューティングのために maxJoinRows='n' を設定します。
説明結合中、このパラメーターは、プライマリテーブルの各レコードに対してディメンションテーブルが返すことができる一致レコードの数を制限します。
型マッピング
AnalyticDB for MySQL 3.0 データの型 | Flink データの型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) or NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
例
シンクテーブル
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); p CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;