MaxCompute では、Delta テーブルを活用することで、数分以内にリアルタイムなデータ書き込みおよびプライマリキーによる更新が可能となり、データインジェストからクエリ利用可能までの遅延を 5~10 分に短縮できます。
従来のバッチパイプラインでは、データが翌日になって初めて可視化されるため、ウイルス的コンテンツ周辺の顧客行動ログ、コメント、評価、いいねなどの時間依存性の高いイベントには対応できません。ニアリアルタイムインジェストでは、増分データを数分以内に Delta テーブルへ同期します。すでに MaxCompute のオペレーショナルデータストア (ODS) レイヤーへ書き込む本番タスクをお持ちの場合、そのタスクを変更することなく、Delta テーブルの UPSERT 機能を活用してデータをインジェストできます。UPSERT により重複レコードが防止され、ストレージ効率が向上し、ストレージコストも削減されます。
ニアリアルタイムインジェストの仕組み
このソリューションでは、Flink コネクタを用いて、トンネルサービスが管理するアップサートセッション経由でストリーミングデータを MaxCompute Delta テーブルへ書き込みます。
Flink データを Delta テーブルへ書き込む
Flink コネクタは、以下の 6 ステップのプロセスを経て、データを MaxCompute Delta テーブルへ書き込みます。

| ステップ | 説明 |
|---|---|
| 1 | データはプライマリキーでグループ化され、テーブルへ並列で書き込まれます。代替として、以下の条件を満たす場合、パーティションキー列でデータをグループ化することも可能です:多数のパーティションへ同時に書き込みが発生する、データが各パーティションに均等に分散している、テーブルのバケット数が 10 未満である。 |
| 2 | UpsertWriterTask がデータの所属パーティションを解析し、UpsertOperatorCoordinator へリクエストを送信します。UpsertOperatorCoordinator は、当該パーティションへのリアルタイム書き込み用にアップサートセッションを作成します。 |
| 3 | UpsertOperatorCoordinator がアップサートセッション ID を UpsertWriterTask へ返却します。 |
| 4 | UpsertWriterTask は、受信したセッションに基づき Upsert Writer を作成し、MaxCompute トンネルサーバーへ接続して継続的にデータを書き込みます。ファイルキャッシュモードでは、データが Flink ノードのローカルディスク上にバッファーされ、ファイルサイズがしきい値に達したとき、またはチェックポイント開始時にトンネルサーバーへ転送されます。 |
| 5 | チェックポイント開始時に、Upsert Writer がすべてのデータをトンネルサーバーへ送信し、コミットをトリガーします。コミット成功後にデータが可視化されます。 |
| 6 | 自動メジャーコンパクションが有効化されている場合、UpsertOperatorCoordinator は、パーティションのコミット数がしきい値を超えると、ストレージサービスに対してメジャーコンパクション操作を開始します。 警告 メジャーコンパクションは、テーブルのデータ量に応じてリアルタイムデータインポートの遅延を増加させる可能性があります。自動メジャーコンパクションは慎重にご使用ください。 |
詳細については、「Flink を使用して Delta テーブルにデータを書き込む」をご参照ください。
スループット向上のための UPSERT パラメーター調整
デフォルトの UPSERT パラメーターは、ほとんどのワークロードで動作しますが、特定のスループット目標を達成したり、高パーティション数下でのパフォーマンスを安定化させたりするために調整可能です。パラメーターの完全なリファレンスについては、「UPSERT 文のパラメーター」をご参照ください。
ベースライン:バケット数および sink の並列度
スループットの上限を決定する 2 つのパラメーターがあります:
バケット数:推定最大書き込みスループットは 1 MB/s × バケット数 です。持続的なデータインジェストレートに基づき設定してください。
`sink.parallelism`: 最適なパフォーマンスを得るには、この値をバケット数と同じ値に設定します。最低限、バケット数は
sink.parallelismの整数倍である必要があります。
各 sink ノードに割り当てられるバケット数は、バケット数 ÷ sink.parallelism です。
非パーティション化テーブル
使用タイミング:データにパーティションキー列が存在しない、または単一の論理パーティションへ書き込む場合です。
sink.parallelism を増加させてもスループットが向上しない場合、ボトルネックは sink ノードよりも上流にある可能性があります。まず、上流のデータ処理パイプラインを最適化してください。
upsert.writer.buffer-size ÷ buckets-per-sink-node の値が 128 KB を下回ると、ネットワーク転送効率が低下します。パフォーマンスを回復するには、upsert.writer.buffer-size を増加させてください。
スループットを向上させるには、upsert.flush.concurrent(デフォルト:2)を増加させてください。増加に伴うパフォーマンス変化をモニターしてください。あまりにも大きな値を設定すると、複数のバケットが同時にフラッシュされ、ネットワーク輻輳を引き起こし、全体のスループットが低下します。
少数のパーティション
使用タイミング:少数のパーティションへ同時に書き込む場合です。
上記の非パーティション化テーブルに関するガイドラインを適用し、さらに以下の点を検討してください:
チェックポイント実行時、各パーティションへの書き込みは独立してコミットされるため、全体のスループットが制限される可能性があります。
各 sink ノードの最大バッファーメモリは
upsert.writer.buffer-size × パーティション数です。メモリ不足(OOM)エラーが発生した場合は、upsert.writer.buffer-sizeを減少させてください。チェックポイント時のコミットを並列化するには、
upsert.commit.thread-num(デフォルト:16)を増加させてください。ただし、32を超えないようご注意ください。これを超えると、過剰な同時実行による問題が発生し、パフォーマンスが低下します。
多数のパーティション(ファイルキャッシュモード)
使用タイミング:多数のパーティションへ同時に書き込み、かつチェックポイントのコミット時間がボトルネックとなっている場合です。
上記の少数パーティションに関するガイドラインを適用し、さらに以下の点を検討してください:
各パーティションのデータはローカルファイルにキャッシュされ、チェックポイント実行時に MaxCompute へ並列で書き込まれます。
sink.file-cached.writer.num(デフォルト:16)は、単一の sink ノードが同時に書き込むパーティション数を制御します。この値を32を超えて設定しないでください。実効的な同時書き込みバケット数は
sink.file-cached.writer.num × upsert.flush.concurrentです。両パラメーターを連動して調整し、ネットワーク輻輳を回避できるよう、積の値を十分に小さく保ってください。
ファイルキャッシュモードのパラメーター一覧については、「ファイルキャッシュモードでのデータ書き込みパラメーター」をご参照ください。
主要パラメーターのリファレンス
| パラメーター | デフォルト | 推奨最大値 | 説明 |
|---|---|---|---|
sink.parallelism | — | — | sink ノードの並列度。バケット数と等しく設定してください |
upsert.writer.buffer-size | — | — | バケットあたりのバッファーサイズ。各バケットあたりのスループットが 128 KB を下回る場合、増加させてください |
upsert.flush.concurrent | 2 | — | 同時にフラッシュされるバケット数。徐々に増加させ、ネットワーク輻輳をモニターしてください |
upsert.commit.thread-num | 16 | 32 | チェックポイント実行時のパーティションコミットを並列化するスレッド数。32 を超えると、過剰な同時実行による問題が発生し、スループットが低下します |
sink.file-cached.writer.num | 16 | 32 | ファイルキャッシュモードにおける同時パーティション書き込み数。32 を超えると、ネットワーク輻輳によりスループットが低下します |
パラメーター調整でも改善しない場合
パラメーター調整後もスループット目標が達成できない場合:
各プロジェクトのパブリックトンネルリソースグループには、クォータが設定されています。このクォータに達すると、書き込み要求が拒否され、実効スループットが低下します。専用トンネルリソースグループへの切り替え、または同時実行数の削減をご検討ください。
コネクタへデータを供給する上流のデータ処理パイプラインがボトルネックとなっている可能性があります。上流パイプラインのプロファイリングと最適化を行ってください。
耐障害性とエラー処理
本番環境への導入前に、以下の障害モードに対応するパイプライン設計を行ってください。
チェックポイントの完了前に期限切れになる
エラー: Checkpoint xxx expired before completing
単一のチェックポイント間隔内で書き込まれるパーティション数が多すぎ、コミットフェーズがチェックポイントタイムアウトを超過しています。
解決方法:
Flink のチェックポイント間隔を延長し、コミットフェーズに必要な時間を確保してください。
sink.file-cached.enableをtrueに設定して、ファイルキャッシュモードを有効化してください。
ファイルキャッシュモードのパラメーターについては、「付録:新バージョン Flink コネクタのパラメーター」をご参照ください。
OperatorEvent の喪失およびタスクのフェールオーバー発生
エラー: org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency.
JobManager と TaskManager 間の通信が中断されました。タスクは自動的に再試行されます。同様のエラーが繰り返される場合は、タスクのリソースを増加させて接続の安定化を図ってください。
TIMESTAMP データ書き込み後の 8 時間のタイムスタンプオフセット
Flink の TIMESTAMP 型にはタイムゾーン情報が含まれません。MaxCompute では、受信した TIMESTAMP 値を UTC+0 として扱い、読み取り時にプロジェクトで設定されたタイムゾーンへ変換します。そのため、UTC+8 を使用するプロジェクトでは、見た目上 8 時間のオフセットが発生します。
MaxCompute の sink テーブル内の TIMESTAMP カラムを、TIMESTAMP_LTZ に置き換えてください。TIMESTAMP_LTZ はパイプライン全体でタイムゾーン情報を保持するため、読み取り時に変換によるオフセットが発生しません。
データ書き込み中の Tengine エラー
エラー: Tengine から提供される HTML ページに Sorry, the page you are looking for is currently unavailable. というメッセージが表示されます。
トンネルサービスが一時的に利用不可となっています。サービスの復旧を待ってください。Flink タスクは自動的に再試行を行い、トンネルサービスが復旧すると書き込みを再開します。
SlotExceeded:書き込みクォータの超過
エラー: java.io.IOException: RequestId=xxxxxx, ErrorCode=SlotExceeded, ErrorMessage=Your slot quota is exceeded.
同時書き込みスロット数がプロジェクトのクォータを超過しています。書き込みの同時実行数(sink.parallelism)を減少させるか、専用トンネルリソースグループの並列度を増加させて、利用可能なクォータを拡大してください。