すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:データをデータウェアハウスにリアルタイムで取り込む

最終更新日:Jan 17, 2025

データウェアハウス内の中程度および非常に時間の影響を受けるデータのビジネス要件を満たすために、MaxComputeは、デルタテーブルに基づいて数分以内にリアルタイムのデータ書き込みとプライマリキーの更新をサポートしています。 これにより、データウェアハウスのデータ更新効率が大幅に向上します。

データ書き込みシナリオ

従来のリレーショナルデータベースまたはオフラインデータ分析方法を使用して、コメント、評価、いいねなどの突然のイベントやホットなイベントに関連する顧客行動ログを処理する場合、ほとんどの場合、データは翌日にしか分析できません。 このようなシナリオには、高いリソース消費、高いコスト、データ遅延、複雑な更新などの問題が含まれます。

これらの問題に対処するために、リアルタイムでデータをデータウェアハウスに取り込むソリューションを使用できます。 このソリューションは、増分データを数分以内にDeltaテーブルに同期するのに役立ちます。 このように、データ書き込みからデータクエリまでの待ち時間は5〜10分に制限されます。 これにより、データ分析の適時性が大幅に向上します。 運用タスクを実行してMaxCompute運用データストア (ODS) レイヤーの標準テーブルにデータを同期する場合、DeltaテーブルのUPSERT機能を使用して、運用タスクが変換されるリスクを回避できます。 この機能は、データをDeltaテーブルに効果的に同期し、重複データが格納されるのを防ぎます。 これにより、ストレージ効率が向上し、ストレージコストが削減されます。

image

FlinkデータをDeltaテーブルに書き込む

このトピックでは、Flinkコネクタを使用してMaxComputeのDeltaテーブルにリアルタイムでデータを書き込む方法について説明します。

image

データの書き込みプロセスを次の表に示します。

いいえ

説明

1

データは主キーによってグループ化され、同時にテーブルに書き込まれます。

書き込みスループットを向上させるために、パーティションキー列でデータをグループ化し、次の条件が満たされた場合にテーブルパーティションにデータを書き込むこともできます。(1) データを多数のパーティションに同時に書き込む必要があります。 (2) データはパーティションに均等に分散されます。 (3) テーブルのバケット数が少ない。 たとえば、テーブルのデータを格納するバケットは10個未満です。

2

UpsertWriterTaskは、データを受信した後、データが属するパーティションを解析し、UpsertOperatorCoordinatorに要求を送信します。 次に、UpsertOperatorCoordinatorは、パーティションにデータをリアルタイムで書き込むためのupsertセッションを作成します。

3

UpsertOperatorCoordinatorは、作成されたupsertセッションのIDをUpsertWriterTaskに返します。

4

UpsertWriterTaskは、Upsertセッションに基づいてupsert Writerを作成し、MaxComputeのTunnel Serverに接続して、テーブルにデータを継続的に書き込みます。

ファイルキャッシュモードが有効になっている場合、データ送信中にデータが最初にFlinkのローカルディスクのキャッシュに入ります。 データファイルのサイズが特定のしきい値に達するか、またはチェックポイントプロセスが開始するまで、データはトンネルサーバに送信されます。

5

チェックポイントプロセスが開始されると、Upsert WriterはすべてのデータをTunnel Serverに送信し、UpsertOperatorCoordinatorにリクエストを送信してコミット操作をトリガーします。 コミット操作が成功すると、データが表示されます。

6

自動メジャーコンパクションが有効になっている場合、パーティションのコミット数が特定のしきい値を超えると、UpsertOperatorCoordinatorはStorage Serviceに対してメジャーコンパクション操作を開始します。

説明

この操作は、テーブルデータのサイズに基づいて、リアルタイムデータインポートのための待ち時間を引き起こす可能性があります。 したがって、自動メジャーコンパクションは慎重に使用する必要があります。

MaxComputeのDeltaテーブルにFlinkデータを書き込む方法の詳細については、「Flinkを使用してDeltaテーブルにデータを書き込む」をご参照ください。

UPSERTステートメントのパラメーター設定の提案

UPSERTステートメントのパラメーター設定を調整して、システムのスループットとリアルタイムデータ書き込みのパフォーマンスを向上させ、さまざまなビジネス要件を満たすようにシステムの安定性を確保できます。 UPSERTステートメントのパラメーターの詳細については、「UPSERTステートメントのパラメーター」をご参照ください。

共通キーパラメータの設定

  • テーブルのバケットの数は、テーブルの最大書き込み同時実行性に影響を与え、合計書き込みスループットを決定することができる。 1メガバイト/秒 × テーブルのバケット数の式に基づいて、合計書き込みスループットを計算することを推奨します。

    達成できる実際のスループットは、sink.parallelismなどの特定のパラメータに関連しています。 詳細については、「テーブル形式とデータガバナンス」をご参照ください。

  • sink.parallelismパラメーターは、データ書き込み用のシンクノードの並列処理を指定します。 パフォーマンスを向上させるために、テーブルのバケット数をこのパラメーターの値の整数倍に設定することをお勧めします。 理論的には、sink.parallelismパラメーターの値がテーブルのバケット数と同じである場合、最適なパフォーマンスを実現できます。

非パーティションテーブルのスループット向上のためのパラメーター設定

  • 書き込み同時実行性を高めるようにsink.parallelismパラメーターを設定してもスループットが改善されない場合、シンクノードのアップストリームデータ処理リンクが非効率的になる可能性があります。 全体的なパフォーマンスを向上させるために、データ処理リンクを最適化することを推奨します。

  • テーブルのバケット数がsink.parallelismパラメーターの値の整数倍の場合、1つのシンクノードでデータが書き込まれるバケット数は、次の式に基づいて計算されます。テーブルのバケット数 /sink.parallelism. テーブルのバケット数が多すぎると、パフォーマンスに悪影響を及ぼします。 テーブルのバケット数とsink.parallelismパラメーターの値を優先的に調整することをお勧めします。 upsert.writer.buffer-sizeパラメータの値を、単一のシンクノードによってデータが書き込まれるバケットの数で割った値が、特定の閾値 (128 KBなど) 未満である場合、ネットワーク伝送効率が低下する可能性がある。 ネットワークパフォーマンスを向上させるには、upsert.writer.buffer-sizeパラメーターの値を大きくすることを推奨します。

  • upsert.flush.concurrentパラメーターは、データが同時にフラッシュされるバケットの数を指定します。 このパラメーターのデフォルト値は2です。 スループットを向上させるには、このパラメーターの値を増やし、パフォーマンスの向上を確認します。

    説明

    このパラメーターを大きすぎる値に設定すると、同時に過剰な数のバケットにデータが書き込まれる可能性があります。 これは、ネットワーク輻輳を引き起こし、全体的なスループットを低下させる。 したがって、システムを安定して効率的に運用するためには、このパラメーターの値をビジネス要件に基づいて適切な値に調整する必要があります。

少数のパーティションへの同時書き込みのスループットを向上させるためのパラメーター設定

このシナリオでは、「共通キーパラメーターの設定」および「パーティション分割されていないテーブルのスループット改善のためのパラメーター設定」セクションを参照できます。 次の点に注意することもできます。

  • データは、単一のシンクノードによって複数のパーティションに書き込まれます。 チェックポイントプロセス中、各パーティションへのデータ書き込みは独立してコミットされる。 したがって、全体的な書き込みスループットが影響を受ける可能性があります。

  • 単一のシンクノードのバッファデータの最大メモリは、upsert.writer.buffer-size × パーティション数の式に基づいて計算されます。 メモリ不足 (OOM) エラーが発生した場合は、メモリ使用量が上限を超えないように、upsert.writer.buffer-sizeパラメーターの値を小さくすることを推奨します。

  • チェックポイントプロセス中のコミット操作に必要な時間を短縮するために、e upsert.com mit.thread-numパラメーターの値を大きくすることができます。 このパラメーターのデフォルト値は16です。これは、パーティションのコミット操作を同時に実行するために16個のスレッドが使用されることを示します。

    説明

    システムパフォーマンスを向上させるために、mit.thread-numパラメーターe upsert.comの値を増やすことができます。 ただし、過度の同時実行による問題を回避するため、このパラメーターの値を32を超える値に上げないでください。

を増やすためのパラメーター設定スループット多数のパーティションへの同時書き込みの実行 (ファイルキャッシュモード)

このシナリオでは、「少数のパーティションへの同時書き込みのスループットを向上させるためのパラメーター設定」セクションを参照できます。 次の点に注意することもできます。

  • 各パーティションのデータはローカルファイルにキャッシュされ、チェックポイントプロセス中にMaxComputeに同時に書き込まれます。

  • sink.file-cached.writer.numパラメーターのデフォルト値は16です。 このパラメーターの値を大きくすると、単一のシンクノードによって同時にデータが書き込まれるパーティションの数を増やすことができます。 このパラメーターを32より大きい値に設定しないことを推奨します。 データが同時に書き込まれるバケットの数は、次の式に基づいて設定することを推奨します。sink.file-cached.writer.num × upsert.flush.concurrent. ただし、sink.file-cached.writer.numパラメーターを大きすぎる値に設定しないでください。 そうしないと、ネットワーク輻輳が発生し、全体のスループットが低下する。

説明

ファイルキャッシュモードでデータを書き込むためのパラメーターの詳細については、「ファイルキャッシュモードでのデータ書き込みのパラメーター」をご参照ください。

その他の提案

上記の提案に基づいてパラメーター設定を調整した後、スループット要件を満たすことができない場合、またはスループットが不安定な場合は、次の要因を考慮してください。

  • プロジェクトごとに無料で使用できるパブリックトンネルリソースグループは制限されています。 上限に達すると、データを書き込むことができない。 これは全体的なスループットを低下させる。

  • コネクタのアップストリームデータ処理リンクは非効率的であり、全体的なスループットが低くなる。 全体的なパフォーマンスを向上させるために、データ処理リンクを最適化することを推奨します。

よくある質問

Flink関連の問題

  • 問題1:

    • 問題の説明: エラーメッセージ「Checkpoint xxx expired before completing」が表示されます。

    • 原因: チェックポイントプロセスがタイムアウトしました。 ほとんどの場合、この問題は、チェックポイントプロセス中にデータが過剰な数のパーティションに書き込まれるために発生します。

    • 解決策:

  • 問題2:

    • 問題の説明: エラーメッセージ "org.apache.flink.util.FlinkException: OperatorCoordinatorからタスクへのOperatorEventが失われました。 一貫性を確保するためにタスクフェイルオーバーをトリガーします。が表示されます。

    • 原因: JobManagerとTaskManagerの間の通信が異常です。 タスクが自動的に再試行されます。

    • 解決策: タスクの安定性を確保するために、タスクリソースの数を増やすことを推奨します。

データ書き込みの問題

  • 問題1:

    • 問題の説明: TIMESTAMPタイプのデータがMaxComputeに書き込まれた後、8時間の時間オフセットが発生します。

    • 原因: FlinkのTIMESTAMPタイプのデータにタイムゾーン情報が含まれていません。 また、MaxComputeにデータを書き込むときにタイムゾーンは変換されません。 したがって、データはゼロタイムゾーンデータと見なされます。 ただし、MaxComputeは、MaxComputeがデータを読み取るときに、プロジェクトのタイムゾーンに基づいてデータを変換します。

    • 解決方法: MaxComputeシンクテーブルのTIMESTAMPタイプのデータをTIMESTAMP_LTZタイプのデータに置き換えます。

トンネル関連の問題

  • 問題1:

    • 問題の説明: データの書き込み時にTengine関連のエラーが発生します。 次のエラーメッセージが表示されます。

      <body>
      <h1>An error occurred.</h1>
      <p>Sorry, the page you are looking for is currently unavailable.<br/>
      Please try again later.</p>
      <p>If you are the system administrator of this resource then you should check
      the <a href="http://nginx.org/r/error_log">error log</a> for details.</p>
      <p><em>Faithfully yours, tengine.</em></p>
      </body>
      </html>
    • 原因: トンネルサービスは一時的に利用できません。

    • 解決策: Tunnelサービスが復元されるまで待ちます。 その後、タスクは正常に再試行できます。

  • 問題2:

    • 問題の説明: エラーメッセージ「java.io.IOException: RequestId=xxxxxx, ErrorCode=SlotExceeded, ErrorMessage=あなたのスロットクォータを超えています。」が表示されます。

    • 原因: 書き込みクォータが上限を超えています。 排他的トンネルリソースグループの書き込み同時実行を減らすか、並列処理を増やす必要があります。

    • 解決策:

      • 使用する必要があるシステムリソースの数を減らすために、書き込みの同時実行性を減らします。

      • 排他的トンネルリソースグループの並列性を高め、より高いデータ書き込み要件を満たすために処理能力を向上させます。