Data Integration は、MySQL や PolarDB などのソースから MaxCompute へのシャード化されたデータのリアルタイム同期をサポートしています。このトピックでは、Data Integration を使用してシャード化されたデータを MySQL から MaxCompute に同期する方法について説明します。
背景
実際のビジネスシナリオでは、データ同期は 1 つ以上の単純なオフラインまたはリアルタイム同期タスクを使用して完了できないことがよくあります。代わりに、複数のオフライン同期、リアルタイム同期、およびデータ処理タスクの組み合わせが必要となり、構成が非常に複雑になります。これは特に、シャード化された MySQL データベースとテーブルのシナリオで当てはまります。このシナリオでは、多くのアップストリームデータベースとテーブルを単一の MaxCompute テーブルに同時に書き込む必要があります。このシナリオで複数のタスクを構成すると、構成が複雑になり、維持が困難になります。
これらの問題点に対処するため、DataWorks Data Integration のワンクリック同期ソリューションは、ビジネスシナリオ向けに構成可能な同期タスクソリューションを提供します。このソリューションは、さまざまなデータソースのワンクリック同期をサポートしており、企業が簡単かつ迅速にデータを同期できるようにします。
シャード化されたデータを MaxCompute に同期するソリューションは、バイナリログ (Binlog) と、オフラインテーブルを生成する T+1 マージプロセスに基づくリアルタイムソリューションです。リアルタイムデータは Log テーブルに書き込まれます。Base テーブルは、日次パーティションに完全データを格納します。毎日、マージタスクは前日の Base テーブルパーティションと Log テーブルのリアルタイムデータを組み合わせて、新しい完全パーティションを作成します。データをクエリする場合、通常は Base テーブルの最新のパーティションをクエリします。
前提条件
サーバーレスリソースグループまたはデータ統合専用リソースグループを購入済みであること。
MySQL データソースと MaxCompute データソースを作成済みであること。 詳細については、「データソースの構成」をご参照ください。
MySQL: ソースが MySQL データベースの場合は、バイナリログ機能を有効にする必要があります。 詳細については、「MySQL 環境の準備」をご参照ください。
MaxCompute: MaxCompute テーブルからデータを読み取るか、MaxCompute テーブルにデータを書き込むときに、必要に応じて関連プロパティを有効にできます。 詳細については、「MaxCompute 環境の準備」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立済みであること。 詳細については、「ネットワーク接続ソリューション」をご参照ください。
制限事項
ソースデータを MaxCompute の外部テーブルに同期することはできません。
手順
1. 同期タスクのタイプを選択する
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Go To Data Integration] をクリックします。
左側のナビゲーションウィンドウで、[Sync Task] をクリックします。表示されたページで、[Create Sync Task] をクリックしてタスク作成ページに移動します。次の基本情報を構成します。
ソースと宛先:
MySQL→MaxCompute新しいタスク名: 同期タスクのカスタム名を入力します。
同期タイプ:
Real-time synchronization of sharded databases and tables to MaxCompute。
2. ネットワークとリソースを構成する
[ネットワークとリソースの構成] セクションで、同期タスクの [データソース] (複数のシャード化されたデータソース)、[リソースグループ]、および [宛先] (MaxCompute) を選択します。次に、[すべての接続をテスト] をクリックします。

ソースデータソースと宛先データソースへの接続が成功したことを確認したら、[次へ] をクリックします。
3. 基本構成
同期タスクの [ソリューション名]、[所有者]、および [ターゲットタスクの場所] をカスタマイズできます。[次へ] をクリックして、[ソースシャードテーブルの構成] ステップに進みます。
[ワークフローを自動的に作成] を選択すると、システムはワークスペースにビジネスワークフローを自動的に作成します。ワークフローの名前は、「oneclick」+「ソースデータソース名」+「to」+「宛先データソース名」の形式になります。
4. ソースシャードテーブルを構成する
シャード化されたデータ同期リンクの最小単位は論理テーブルです。1 つの論理テーブルは、一連の物理データベースとテーブルルール、および 1 つの宛先ベーステーブルに対応します。論理テーブルを自動的に生成する機能により、論理テーブルの構成手順を大幅に簡素化できます。ほとんどの場合、この機能を使用してソースデータソースをスキャンし、ほとんどの構成操作を簡素化できます。自動スキャンの結果が期待どおりでない場合は、軽微な修正を行うだけで済みます。複雑な物理データベースとテーブルルールについては、手動で論理テーブルを追加できます。
論理テーブルを自動的に生成する
[論理テーブルを自動生成] をクリックします。データソースリストで、論理テーブルを生成するためにスキャンするデータソースを選択します。

[論理テーブル生成ルールの構成] セクションで、[ルールの追加] をクリックします。[プリセットルール] を選択するか、[手動で入力] を選択してカスタムルールを入力できます。

[実行を開始] をクリックします。
マージが完了したら、[次へ] をクリックして [宛先テーブルの設定] ステップに進みます。
論理テーブルを手動で追加する
[論理テーブルを自動生成] の結果が期待どおりでない場合は、論理テーブルのマッチング ルールを編集できます。また、[論理テーブルを手動で追加] をクリックして、スキャンされなかった論理テーブルを追加することもできます。
[論理テーブルを手動で追加] をクリックします。同期する必要がある論理テーブルを追加します。同じ論理テーブルのデータは、同じ宛先テーブルに同期されます。
ソースデータベースまたはソーステーブルの条件を設定します。
各同期テーブルの条件は、データベース条件とテーブル条件に分かれています。[ソーステーブル選択条件] および [ソースデータベース選択条件] セクションで、[条件の追加] をクリックして、それぞれの条件を追加します。同期タスクが実行されると、これらのルールに基づいてソースデータベースとテーブルが検索され、統合されます。これらのデータベースとテーブルは、宛先テーブルのシャード化されたソースとして機能します。
[適用] をクリックします。
データベースルールの一括編集
データベースシャーディングルールは、ソースデータベースの条件を制限できます。これらの条件をすべての同期テーブルに適用できます。
たとえば、すべてのデータが xiaobo1、xiaobo2、xiaobo3 などのデータベースからのものである場合、次の図に示すデータベースシャーディングルールの条件を追加できます。

5. 宛先テーブルを設定する
[書き込みモード] を設定します。
増分データは、MaxCompute の Log テーブルにリアルタイムで書き込まれます。その後、Log テーブルの増分データは、宛先の Base テーブルの完全データと定期的にマージされ、最終結果が Base テーブルに書き込まれます。
時間ベースの自動パーティション分割を構成します。
[時間ベースの自動パーティション分割設定] セクションでは、MaxCompute のパーティションテーブルまたは非パーティションテーブルにデータを書き込むようにタスクを構成できます。パーティションフィールドの名前を定義することもできます。この例では、データはパーティションテーブルに書き込まれ、パーティションフィールドは
dsです。説明パーティションテーブルへの書き込みを選択した場合、
アイコンをクリックして、宛先テーブルのパーティションフィールドの名前を定義できます。ソーステーブルと宛先テーブル間のマッピングを更新します。
[ソースと MaxCompute テーブルのマッピングを更新] をクリックして、宛先テーブルリストを表示します。
機能
説明
プライマリキーのないテーブルのプライマリキーを選択する
現在のソリューションではプライマリキーのないテーブルの同期がサポートされていないため、[同期プライマリキー] 列の
ボタンをクリックして [カスタムプライマリキー] を設定する必要があります。この操作は、テーブルから 1 つ以上のフィールドを選択してプライマリキーとして機能させるもので、このプライマリキーは、データを宛先に書き込む際の重複排除に使用されます。この例では、プライマリキーのないテーブルのプライマリキーとして機能するように
id列が選択されています。宛先テーブルスキーマの編集
[MaxCompute テーブル名] をクリックして、MaxCompute テーブルスキーマをプレビューします。MaxCompute テーブルを作成すると、Data Integration は自動的に
_src_info_という名前の列を追加して、データの行がどの子テーブルからのものかを識別します。_src_info_列は、ソーステーブルのプライマリキーと組み合わせることで、データの一意の行を識別するためにも使用できます。説明デフォルトでは、自動的に作成された MaxCompute テーブルのライフサイクルは 30 日間のみです。フィールドタイプのマッピングも存在する場合があります。つまり、宛先データベースにソースと一致するデータ型がない場合、同期タスクは宛先テーブルの作成時にソースフィールドを宛先の書き込み可能なフィールドタイプに自動的に一致させます。MaxCompute テーブルのライフサイクルまたは宛先テーブルのフィールドタイプのマッピングを変更するには、[MaxCompute テーブル名] 列の宛先テーブル名をクリックして変更を加えます。
テーブル作成方法の選択
[テーブルを自動作成] と [既存のテーブルを使用] をサポートしています。
[テーブル作成方法] を [既存のテーブルを使用] に設定すると、[MaxCompute ベーステーブル名] 列に自動的に作成されたテーブル名が表示されます。ドロップダウンリストから使用したいテーブルの名前を選択することもできます。
[テーブル作成方法] を [テーブルを自動作成] に設定すると、自動的に作成されたテーブル名が表示されます。テーブル名をクリックして、テーブル作成ステートメントを表示および変更できます。
この例では、[テーブルを自動作成] が選択されています。
完全同期
[完全同期] 列で、リアルタイム同期を開始する前に完全データを宛先に同期するかどうかを選択できます。
完全同期をオフにすると、対応するテーブルはオフラインの完全データ同期を行いません。これは、完全データが他の手段によってすでに宛先に同期されているシナリオに適しています。
この例では、すべてのテーブルで完全同期が有効になっています。
宛先テーブル、フィールド、およびデータの読み取り/書き込み関係が正しいことを確認したら、[次へ] をクリックします。
6. テーブルレベルの同期ルールを設定する
シャード化された MySQL データを MaxCompute に同期する場合、データ操作言語 (DML) 同期ルールを構成できます。たとえば、ソースデータベースから削除されたデータをオフラインテーブルに保持するには、次の図に示すようにルールを構成できます:
。これにより、ソーステーブルで発生する挿入、更新、または削除操作の処理ポリシーを定義できます。
通常の処理: ソース DML メッセージは処理のために宛先データソースに渡されます。
無視: メッセージは破棄され、対応する DML メッセージは宛先データソースに送信されません。対応するデータは変更されません。
条件付き通常処理: このオプションを選択すると、フィルター条件を構成できます。同期タスクは、構成したフィルター式に基づいてソースデータをフィルター処理します。条件を満たすデータは正常に処理されます。条件を満たさないデータは無視されます。
ルールを設定しない場合、デフォルトで通常の処理が使用されます。
[次へ] をクリックして、[DDL メッセージ処理ルール] ステップに進みます。
7. DDL メッセージ処理ルールを設定する
DDL 操作はソースで実行される場合があります。ページの右上隅にある [DDL 機能の構成] をクリックして、ビジネス要件に基づいてソースからの DDL メッセージを処理するルールを構成できます。
詳細については、「DDL メッセージを処理するルールを構成する」をご参照ください。
[次へ] をクリックして、[ランタイムリソース設定] ステップに進みます。
8. ランタイムリソース設定
同期リンクが作成されると、オフラインの完全データ同期とリアルタイムの増分データ同期のために個別のサブタスクが生成されます。[ランタイムリソース設定] ステップでは、オフラインとリアルタイムの両方の同期タスクのプロパティを構成する必要があります。
これらのプロパティには、リアルタイム増分同期とオフライン完全同期に使用されるリソースグループ、およびオフライン完全同期に使用されるスケジューリングリソースグループが含まれます。また、[詳細設定] をクリックして、[サブクエリ失敗時に再試行] などのパラメーターを構成することもできます。
DataWorks のオフライン同期タスクは、スケジューリングリソースグループによってデータ統合専用リソースグループに送信されます。したがって、オフライン同期タスクは、データ統合専用リソースグループのリソースに加えて、スケジューリングリソースグループのリソースを使用します。これにより、スケジューリングインスタンス料金が発生します。
9. 同期タスクを実行する
すべての構成が完了したら、ページ下部の [構成を完了] をクリックします。
ページで、作成した同期タスクを見つけます。[アクション] 列で、[送信して実行] をクリックします。
[タスクリスト] で、タスクの [名前/ID] をクリックして、詳細な実行プロセスを表示します。

その他の操作
タスクを構成した後、作成したタスクの管理、テーブルの追加または削除、モニタリングとアラートの構成、および主要な運用メトリックの表示ができます。詳細については、「完全同期タスクと増分同期タスクの O&M」をご参照ください。