このトピックでは、MaxCompute コネクタの構文、WITH パラメーター、および使用例について説明します。
背景情報
MaxCompute (旧 ODPS) は、高速でフルマネージド型のエクサバイト規模のデータウェアハウスソリューションです。MaxCompute は、構造化データの大規模なバッチを保存および計算し、大規模なデータウェアハウス向けのソリューションと、分析およびモデリング向けのサービスを提供します。
次の表に、MaxCompute コネクタの特徴を示します。
カテゴリ | 詳細 |
サポートされるテーブルタイプ | ソーステーブル、ディメンションテーブル、および結果テーブル |
実行モード | ストリームモードおよびバッチモード |
データフォーマット | サポートされていません |
特定のモニタリングメトリック | |
API タイプ | Datastream および SQL |
結果テーブルのデータの更新または削除 | Batch Tunnel および Stream Tunnel モードはデータ挿入のみをサポートします。Upsert Tunnel モードはデータの挿入、更新、および削除をサポートします。 |
前提条件
MaxCompute テーブルを作成済みであること。詳細については、「テーブルの作成」をご参照ください。
制限事項
MaxCompute コネクタは、at-least-once セマンティクスのみをサポートします。
説明at-least-once セマンティクスは、データ損失を防ぐために使用されます。特定のケースでは、重複データが MaxCompute に書き込まれることがあります。重複データを引き起こす条件は、使用される MaxCompute Tunnel によって異なります。詳細については、「データチャネルの選択方法」をご参照ください。
デフォルトでは、ソーステーブルは完全データモードです。`partition` パラメーターで指定されたパーティションのデータのみを読み取ります。すべてのデータが読み取られると、ジョブは終了します。
新しいパーティションを継続的に監視するには、WITH 句で `startPartition` パラメーターを指定して、増分ソーステーブルモードを使用できます。
説明ディメンションテーブルは、各更新サイクル中に最新のパーティションをチェックするため、この制限の対象にはなりません。
ソーステーブルジョブの開始後にパーティションに新しいデータが追加された場合、その新しいデータは読み取られません。したがって、パーティション内のデータが完全になった後にのみジョブを実行する必要があります。
SQL
SQL ジョブで MaxCompute コネクタをソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。
構文
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);WITH パラメーター
一般パラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | String | はい | なし | 値は odps である必要があります。 |
endpoint | MaxCompute サービスの Endpoint。 | String | はい | なし | 詳細については、「Endpoint」をご参照ください。 |
tunnelEndpoint | MaxCompute Tunnel サービスのエンドポイント。 | String | いいえ | なし | 「Endpoint」をご参照ください。 説明 このパラメーターを指定しない場合、MaxCompute は内部のロードバランシングサービスに基づいてトンネル接続を割り当てます。 |
project | MaxCompute プロジェクトの名前。 | String | はい | なし | なし。 |
schemaName | MaxCompute スキーマの名前。 | String | いいえ | なし | MaxCompute プロジェクトでスキーマ機能が有効になっている場合は、MaxCompute テーブルを含むスキーマの名前を指定します。詳細については、「スキーマ操作」をご参照ください。 説明 このパラメーターは VVR 8.0.6 以降でのみサポートされます。 |
tableName | MaxCompute テーブルの名前。 | String | はい | なし | なし。 |
accessId | MaxCompute アカウントの AccessKey ID。 | String | はい | なし | 詳細については、「AccessKey ID と AccessKey Secret 情報を表示する方法」をご参照ください。 重要 AccessKey 情報の漏洩を避けるため、変数を使用して AccessKey の値を指定することをお勧めします。詳細については、「プロジェクト変数」をご参照ください。 |
accessKey | MaxCompute アカウントの AccessKey Secret。 | String | はい | なし | |
partition | MaxCompute パーティションの名前。 | String | いいえ | なし | このパラメーターは、パーティション分割されていないテーブルまたは増分ソーステーブルには必要ありません。 説明 パーティションテーブルの詳細については、「パーティションからの読み取りまたはパーティションへの書き込み時に Partition パラメーターを指定する方法」をご参照ください。 |
compressAlgorithm | MaxCompute Tunnel で使用される圧縮アルゴリズム。 | String | いいえ | SNAPPY | 有効な値:
|
quotaName | MaxCompute の専用 Data Transmission Service リソースのクォータ名。 | String | いいえ | なし | MaxCompute の専用 Data Transmission Service リソースを使用するには、このパラメーターを設定します。 重要
|
ソーステーブル固有のパラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
maxPartitionCount | 読み取り可能なパーティションの最大数。 | Integer | いいえ | 100 | 読み取るパーティションの数がこの値を超えると、 重要 一度に多数のパーティションを読み取ると、MaxCompute の負荷が増加し、ジョブの起動が遅くなります。partition パラメーターが誤って設定されていないか確認してください。多数のパーティションを読み取るには、maxPartitionCount の値を手動で増やしてください。 |
useArrow | Arrow フォーマットを使用してデータを読み取るかどうかを指定します。 | Boolean | いいえ | false | Arrow フォーマットを使用すると、MaxCompute Storage API を呼び出すことができます。 重要
|
splitSize | Arrow フォーマットを使用してデータを読み取る際に一度にプルするデータのサイズ。 | MemorySize | いいえ | 256 MB | このパラメーターは VVR 8.0.8 以降でのみサポートされます。 重要 このパラメーターはバッチジョブでのみ有効です。 |
compressCodec | Arrow フォーマットでデータを読み取る際に使用される圧縮アルゴリズム。 | String | いいえ | "" | 有効な値:
圧縮アルゴリズムを指定すると、圧縮なしの場合と比較してスループットが向上します。 重要
|
dynamicLoadBalance | シャードを動的に割り当てるかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
シャードを動的に割り当てると、異なる Flink ノードの処理性能を活用し、ソーステーブルの全体的な読み取り時間を短縮できます。ただし、これにより、異なるノードで読み取られるデータ量が不整合になり、データスキューが発生する可能性があります。 重要
|
増分ソーステーブル固有のパラメーター
増分ソーステーブルは、MaxCompute サーバーを定期的にポーリングしてすべてのパーティション情報を取得することで、新しいパーティションを検出します。新しいパーティションが読み取られるとき、そのパーティションのデータは完全である必要があります。詳細については、「増分 MaxCompute ソーステーブルが不完全に書き込まれたパーティションを検出した場合の対処方法」をご参照ください。`startPartition` パラメーターを使用して開始点を指定できます。開始点以上の 辞書式順序 を持つパーティションのみが読み取られることに注意してください。たとえば、パーティション year=2023,month=10 は、パーティション year=2023,month=9 よりも辞書式順序で小さくなります。このタイプのパーティション宣言では、year=2023,month=09 のようにゼロパディングを使用して正しい辞書式順序を保証できます。
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
startPartition | 増分読み取りのための MaxCompute パーティションの開始オフセット (包括的)。 | String | はい | なし |
説明 startPartition パラメーターの詳細については、「増分 MaxCompute の startPartition パラメーターを指定する方法」をご参照ください。 |
subscribeIntervalInSec | パーティションリストについて MaxCompute をポーリングする間隔。 | Integer | いいえ | 30 | 単位は秒です。 |
modifiedTableOperation | 読み取りプロセス中にパーティションデータが変更された場合に実行する操作。 | Enum (NONE, SKIP) | いいえ | NONE | ダウンロードセッションはチェックポイントに保存されるため、チェックポイントからの各回復は、そのセッションからの読み取りを再開しようとします。パーティションデータが変更されたためにセッションが利用できない場合、Flink ジョブは再起動ループに入ります。この場合、このパラメーターを設定できます。有効な値:
重要
|
結果テーブル固有のパラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
useStreamTunnel | MaxCompute Stream Tunnel を使用してデータをアップロードするかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明 詳細については、「データトンネルの選択方法」をご参照ください。 |
flushIntervalMs | MaxCompute Tunnel Writer バッファーのフラッシュ間隔。 | Long | いいえ | 30000 (30 秒) | データは最初にバッファーに書き込まれます。バッファーがいっぱいになるか、`flushIntervalMs` に達すると、データはバッチで結果テーブルに書き込まれます。
単位はミリ秒です。 説明 このパラメーターは `batchSize` と一緒に使用できます。いずれかの条件が満たされるとデータがフラッシュされます。 |
batchSize | MaxCompute Tunnel Writer バッファーのフラッシュサイズ。 | Long | いいえ | 67108864 (64 MB) | 単位はバイトです。 レコードが書き込まれると、データはまず MaxCompute バッファーに保存されます。バッファーが特定のサイズ (`batchSize`) に達すると、バッファー内のデータは結果の MaxCompute テーブルに書き込まれます。 説明 このパラメーターは `flushIntervalMs` と一緒に使用できます。いずれかの条件が満たされるとデータがフラッシュされます。 |
numFlushThreads | MaxCompute Tunnel Writer バッファーをフラッシュするためのスレッド数。 | Integer | いいえ | 1 | 各同時 MaxCompute シンクは、データをフラッシュするために `numFlushThreads` 個のスレッドを作成します。この値が 1 より大きい場合、異なるパーティションのデータを同時にフラッシュできるため、フラッシュ効率が向上します。 |
slotNum | MaxCompute Tunnel Writer によって使用されるスロットの数。 | Integer | いいえ | 0 | スロット数の制限については、「Data Transmission Service の概要」をご参照ください。 |
dynamicPartitionLimit | 書き込む動的パーティションの最大数。 | Integer | いいえ | 100 | 2 つのチェックポイント間で結果テーブルに書き込まれる動的パーティションの数が `dynamicPartitionLimit` を超えると、 重要 一度に多数のパーティションに書き込むと、MaxCompute サービスに負荷がかかり、結果テーブルのフラッシュとジョブのチェックポイントが遅くなります。このエラーが発生した場合は、それほど多くのパーティションに書き込む必要があるかどうかを確認してください。必要な場合は、`dynamicPartitionLimit` パラメーターの値を手動で増やしてください。 |
retryTimes | MaxCompute サーバーへのリクエストの最大リトライ回数。 | Integer | いいえ | 3 | セッションの作成、セッションの送信、またはデータのフラッシュ時に、MaxCompute サービスが一時的に利用できなくなることがあります。システムはこの構成に基づいて操作をリトライします。 |
sleepMillis | 再試行間隔。 | Integer | いいえ | 1000 | 単位はミリ秒です。 |
enableUpsert | MaxCompute Upsert Tunnel を使用してデータをアップロードするかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
重要
|
upsertAsyncCommit | Upsert モードでセッションを送信する際に非同期モードを使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明 このパラメーターは VVR 8.0.6 以降でのみサポートされます。 |
upsertCommitTimeoutMs | Upsert モードでセッションを送信するためのタイムアウト期間。 | Integer | いいえ | 120000 (120 秒) | 単位はミリ秒です。 説明 このパラメーターは VVR 8.0.6 以降でのみサポートされます。 |
sink.operation | Delta Lake テーブルに書き込む際の書き込みモード。 | String | いいえ | insert | 有効な値:
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.parallelism | Delta Lake テーブルへの書き込みの並列度。 | Integer | いいえ | なし |
重要 Delta Lake テーブルの `write.bucket.num` プロパティがこの構成値の整数倍であることを確認してください。これにより、最適な書き込みパフォーマンスが提供され、シンクノードで最も多くのメモリが節約されます。 |
sink.file-cached.enable | Delta Lake テーブルの動的パーティションに書き込む際にファイルキャッシュモードを使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
ファイルキャッシュモードを使用すると、サーバーに書き込まれる小さなファイルの数が減りますが、データ出力のレイテンシーが増加します。結果テーブルの並列度が高い場合は、ファイルキャッシュモードを使用することをお勧めします。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.file-cached.writer.num | ファイルキャッシュモードでデータをアップロードするための同時タスクの数。 | Integer | いいえ | 16 |
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.bucket.check-interval | ファイルキャッシュモードでのファイルサイズをチェックする間隔。単位はミリ秒 (ms) です。 | Integer | いいえ | 60000 | このパラメーターは、 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.file-cached.rolling.max-size | ファイルキャッシュモードでの単一キャッシュファイルの最大サイズ。 | MemorySize | いいえ | 16 M |
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.file-cached.memory | ファイルキャッシュモードでファイルを書き込むための最大オフヒープメモリサイズ。 | MemorySize | いいえ | 64 MB | このパラメーターは、 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.file-cached.memory.segment-size | ファイルキャッシュモードでファイルを書き込むためのバッファーサイズ。 | MemorySize | いいえ | 128 KB | このパラメーターは、 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.file-cached.flush.always | ファイルキャッシュモードでファイルを書き込むためにキャッシュを使用するかどうかを指定します。 | Boolean | いいえ | true | このパラメーターは、 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
sink.file-cached.write.max-retries | ファイルキャッシュモードでデータをアップロードするためのリトライ回数。 | Integer | いいえ | 3 | このパラメーターは、 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.writer.max-retries | Upsert Writer がバケットへの書き込みに失敗した後のリトライ回数。 | Integer | いいえ | 3 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.writer.buffer-size | Flink の単一の Upsert Writer からのデータのキャッシュサイズ。 | MemorySize | いいえ | 64 m |
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.writer.bucket.buffer-size | Flink の単一バケットからのデータのキャッシュサイズ。 | MemorySize | いいえ | 1 m | クラスターのメモリリソースが逼迫している場合は、このパラメーター値を減らすことができます。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.write.bucket.num | 書き込み先のテーブルのバケット数。 | Integer | はい | なし | 値は、宛先 Delta テーブルの 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.write.slot-num | 単一セッションで使用される Tunnel スロットの数。 | Integer | いいえ | 1 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.commit.max-retries | Upsert セッションコミットのリトライ回数。 | Integer | いいえ | 3 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.commit.thread-num | Upsert セッションコミットの並列度。 | Integer | いいえ | 16 | このパラメーターを大きな値に設定することはお勧めしません。同時コミット数が多いほどリソースを多く消費し、パフォーマンスの問題や過剰なリソース消費につながる可能性があります。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.commit.timeout | Upsert セッションコミットを待機するタイムアウト期間。単位は秒 (s) です。 | Integer | いいえ | 600 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.flush.concurrent | 単一のパーティションに同時に書き込むことができるバケットの最大数。 | Integer | いいえ | 2 | バケットのデータがフラッシュされるたびに、Tunnel Slot リソースが占有されます。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
insert.commit.thread-num | コミットセッションの並列度。 | Integer | いいえ | 16 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
insert.arrow-writer.enable | Arrow フォーマットを使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
insert.arrow-writer.batch-size | Arrow バッチの最大行数。 | Integer | いいえ | 512 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
insert.arrow-writer.flush-interval | ライターのフラッシュ間隔。単位はミリ秒 (ms) です。 | Integer | いいえ | 100000 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
insert.writer.buffer-size | バッファードライターのキャッシュサイズ。 | MemorySize | いいえ | 64 MB | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされます。 |
upsert.partial-column.enable | 一部の列のみを更新するかどうかを指定します。 | Boolean | いいえ | false | これは、結果テーブルのタイプが Delta Table の場合にのみ有効です。詳細については、「部分的な列の更新」をご参照ください。 有効な値:
データは、結果テーブルに更新されたデータのプライマリキーが含まれているかどうかに基づいて書き込まれます:
説明 このパラメーターは VVR 8.0.11 以降でのみサポートされます。 |
ディメンションテーブル固有のパラメーター
ジョブが開始されると、MaxCompute ディメンションテーブルは指定されたパーティションから完全データをプルします。`partition` パラメーターは `max_pt()` などの関数をサポートします。キャッシュが期限切れになり再読み込みされると、`partition` パラメーターが再解析され、最新のパーティションからデータがプルされます。`max_two_pt()` を使用する場合、ディメンションテーブルは 2 つのパーティションからデータをプルできます。それ以外の場合は、単一のパーティションしか指定できません。
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
cache | キャッシュポリシー。 | String | はい | なし | 現在、MaxCompute ディメンションテーブルは ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。ディメンションテーブルへの後続のすべてのクエリは、キャッシュを介して実行されます。キャッシュにデータが見つからない場合、キーは存在しません。完全キャッシュは期限切れになった後に再ロードされます。 説明
|
cacheSize | キャッシュするデータレコードの最大数。 | Long | いいえ | 100000 | ディメンションテーブルのデータ量が `cacheSize` を超えると、 重要 大量のディメンションテーブルデータは、多くの JVM ヒープメモリを消費し、ジョブの起動とディメンションテーブルの更新を遅くします。これほど多くのデータをキャッシュする必要があるかどうかを確認してください。必要な場合は、このパラメーターの値を手動で増やしてください。 |
cacheTTLMs | キャッシュのタイムアウト期間。キャッシュ更新の間隔です。 | Long | いいえ | Long.MAX_VALUE (更新しない) | 単位はミリ秒です。 |
cacheReloadTimeBlackList | キャッシュ更新禁止時間。このパラメーターで指定された期間中、キャッシュは更新されません。 | String | いいえ | なし | このパラメーターは、プロモーションイベントのピーク時など、重要な期間中にキャッシュがリフレッシュされることによってデプロイメントが不安定になるのを防ぐために使用されます。このパラメーターの設定方法の詳細については、「CacheReloadTimeBlackList パラメーターの設定方法」をご参照ください。 |
maxLoadRetries | ジョブ開始時の初期データプルを含む、キャッシュ更新の最大リトライ回数。この回数を超えると、ジョブは失敗します。 | Integer | いいえ | 10 | なし。 |
型マッピング
MaxCompute がサポートするデータ型の詳細については、「データ型 2.0」をご参照ください。
MaxCompute 型 | Flink 型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
物理 MaxCompute テーブルにネストされた複雑な型フィールド (ARRAY、MAP、または STRUCT) と JSON 型フィールドの両方が含まれている場合は、物理 MaxCompute テーブルを作成するときに tblproperties('columnar.nested.type'='true') を指定する必要があります。これにより、Flink がデータを正しく読み書きできるようになります。
データインジェスト
MaxCompute コネクタは、YAML ベースのデータインジェストジョブで、宛先にデータを書き込むために使用できます。
制限事項
この機能は VVR 11.1 以降でのみサポートされます。
構文
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8設定項目
設定項目 | 必須 | デフォルト値 | タイプ | 説明 |
type | はい | なし | String | 使用するコネクタ。これを |
name | いいえ | なし | String | シンクの名前。 |
access-id | はい | なし | String | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。AccessKey 管理 ページで AccessKey ID を取得できます。 |
access-key | はい | なし | String | AccessKey ID に対応する AccessKey Secret。 |
endpoint | はい | なし | String | MaxCompute サービスの Endpoint。MaxCompute プロジェクトを作成する際に選択したリージョンとネットワーク接続方法に基づいて、このパラメーターを設定する必要があります。各リージョンとネットワークのエンドポイント値については、「Endpoint」をご参照ください。 |
project | はい | なし | String | MaxCompute プロジェクトの名前。MaxCompute コンソールにログインし、[ワークスペース] > [プロジェクト] ページでプロジェクト名を確認できます。 |
tunnel.endpoint | いいえ | なし | String | MaxCompute Tunnel サービスのエンドポイント。これは通常、指定されたプロジェクトのリージョンに基づいて自動的にルーティングされます。プロキシを使用する場合など、特別なネットワーク環境でのみこの設定を使用してください。 |
quota.name | いいえ | なし | String | MaxCompute Tunnel の専用リソースグループの名前。このパラメーターが設定されていない場合、共有リソースグループが使用されます。詳細については、「専用 Data Transmission Service リソースグループの購入と使用」をご参照ください。 |
sts-token | いいえ | なし | String | RAM ロールによって発行された短命のセキュリティトークンサービス (STS) トークンを認証に使用する場合、このパラメーターは必須です。 |
buckets-num | いいえ | 16 | Integer | MaxCompute Delta テーブルを自動的に作成するために使用されるバケットの数。詳細については、「ほぼリアルタイムのデータウェアハウスの概要」をご参照ください。 |
compress.algorithm | いいえ | zlib | String | MaxCompute への書き込み時に使用されるデータ圧縮アルゴリズム。サポートされている値は |
total.buffer-size | いいえ | 64MB | String | メモリにバッファリングするデータの量。これはパーティションレベル、またはパーティション分割されていないテーブルの場合はテーブルレベルで測定されます。異なるパーティションまたはテーブルのバッファーは独立しています。しきい値に達すると、データは MaxCompute に書き込まれます。 |
bucket.buffer-size | いいえ | 4MB | String | メモリにバッファリングするデータの量。バケットレベルで測定されます。このパラメーターは、Delta Lake テーブルに書き込む場合にのみ有効です。異なるデータバケットのバッファーは独立しています。しきい値に達すると、バケット内のデータは MaxCompute に書き込まれます。 |
commit.thread-num | いいえ | 16 | Integer | チェックポイントフェーズ中に同時に処理できるパーティションまたはテーブルの数。 |
flush.concurrent-num | いいえ | 4 | Integer | MaxCompute にデータを書き込む際に同時に書き込むことができるバケットの数。このパラメーターは、Delta Lake テーブルに書き込む場合にのみ有効です。 |
テーブルの場所のマッピング
コネクタがテーブルを自動的に作成するとき、次のマッピングを使用してソーステーブルの場所情報を MaxCompute テーブルにマッピングします。
MaxCompute プロジェクトがスキーマモデルをサポートしていない場合、同期タスクごとに 1 つの MySQL データベースしか同期できません。たとえば、アップストリームデータソースが MySQL の場合、コネクタは `tableId.namespace` 情報を無視します。このロジックは他のデータソースにも適用されます。
データインジェストジョブのオブジェクト | MaxCompute の場所 | MySQL の場所 |
設定の Project パラメーター | プロジェクト | なし |
TableId.namespace | スキーマ (MaxCompute プロジェクトがスキーマモデルをサポートしていない場合は無視されます) | データベース |
TableId.tableName | テーブル | テーブル |
型マッピング
CDC 型 | MaxCompute 型 |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
使用例
SQL
ソーステーブルの例
完全読み取り
デフォルトでは、ソーステーブルは完全データモードであり、partition パラメーターで指定されたパーティションからデータを読み取ります。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;増分読み取り
startPartition パラメーターで指定されたパーティションからデータを増分的に読み取ります。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 20180905 に対応するパーティションから読み取りを開始します。
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;結果テーブルの例
静的パーティションへの書き込み
partition パラメーターに静的な値を指定することで、静的パーティションにデータを書き込みます。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- 静的パーティション ds=20180905 に書き込みます。
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;動的パーティションへの書き込み
テーブルのパーティションキー列に基づいて partition を指定します。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- 動的パーティション列は明示的に宣言する必要があります。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- パーティション値を指定しません。これは、ds フィールドの値に基づいて異なるパーティションにデータが書き込まれることを示します。
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;ディメンションテーブルの例
1 対 1 ディメンションテーブル
1 対 1 ディメンションテーブルにはプライマリキーを宣言する必要があります。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- 1 対 1 ディメンションテーブルにはプライマリキーを宣言する必要があります。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;1 対多ディメンションテーブル
1 対多ディメンションテーブルにはプライマリキーは必要ありません。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- 1 対多ディメンションテーブルにはプライマリキーは必要ありません。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;DataStream
DataStream API を使用してデータを読み書きするには、DataStream コネクタを使用する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。
知的財産を保護するため、Ververica Runtime (VVR) 6.0.6 以降、このコネクタを使用するジョブは、ローカルデバッグセッション中に最大 30 分間実行できます。30 分後、ジョブはエラーを報告して終了します。MaxCompute コネクタを含むジョブをローカルで実行およびデバッグする方法の詳細については、「コネクタを含むジョブをローカルで実行およびデバッグする」をご参照ください。
Delta Tables からの読み取りはサポートされていません。Delta Tables は、
primary keyとtransactional=trueを指定して作成されたテーブルです。詳細については、「用語」をご参照ください。
DataStream ジョブで MaxCompute コネクタを使用する場合、まず SQL を使用して MaxCompute テーブルを宣言する必要があります。その後、Table と DataStream の間で変換することで、MaxCompute テーブルとデータストリームを接続できます。
ソーステーブルへの接続
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 結果テーブルへの接続
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
MaxCompute コネクタの Maven 依存関係には、完全および増分ソーステーブル、結果テーブル、およびディメンションテーブルを構築するために必要なクラスが含まれています。MaxCompute DataStream コネクタは Maven Central Repository で入手できます。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>よくある質問
完全または増分 MaxCompute ソーステーブルは、ジョブの開始後にパーティションまたはテーブルに追加された新しいデータを読み取ることができますか?
完全または増分 MaxCompute ソーステーブルを持つジョブを一時停止し、その並列処理を変更してからジョブを再開できますか?
増分 MaxCompute ソーステーブルの startPartition パラメーターを設定するにはどうすればよいですか?
パーティションから読み取る、またはパーティションに書き込むときに Partition パラメーターを設定するにはどうすればよいですか?
MaxCompute ソーステーブルを持つジョブが開始状態でスタックしたり、データの生成に時間がかかったりするのはなぜですか?
MaxCompute 結果テーブルを持つジョブが `Invalid partition spec` エラーを返すのはなぜですか?
MaxCompute 結果テーブルを持つジョブが `No more available blockId` エラーを返すのはなぜですか?