DataWorks は、リアルタイムデータ同期機能を提供します。この機能を使用すると、単一のテーブルまたはデータベース全体に対して、ソースデータベースからターゲットデータベースにデータをリアルタイムで同期できます。さまざまな入力データソースと出力データソースの組み合わせでリアルタイム同期タスクを作成し、リアルタイム増分同期を実行できます。このトピックでは、単一のテーブルまたはデータベース全体の増分データに対してリアルタイム同期タスクを作成する方法と、タスク作成後にその実行ステータスを確認する方法について説明します。
前提条件
データソースが構成されている。リアルタイム同期タスクを構成する前に、ソースデータベースとターゲットデータベースがセットアップされていることを確認する必要があります。これにより、同期タスクでデータソース名を選択して、読み取りおよび書き込み操作を制御できます。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。
サーバーレスリソースグループが使用され、ワークスペースにアタッチされている。
リソースグループとデータソースの間にネットワーク接続が確立されている。詳細については、「ネットワーク接続ソリューション」をご参照ください。
ステップ 1: リアルタイム同期ノードの作成
次の手順に従って、リアルタイム同期ノードを作成します。
DataWorks コンソールの [Workspaces] ページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[Actions] 列で を選択します。
リアルタイム同期ノードを作成します。
左側のナビゲーションウィンドウで、
アイコンをクリックします。ディレクトリツリーで [Workspace Directories] を見つけ、その横にある
アイコンをクリックし、 を選択して [ノードの作成] ダイアログボックスを開きます。[ノードの作成] ダイアログボックスで、[同期方法] を選択します。使用可能なメソッドは次のとおりです:
同期タスク構成タイプ
同期方法
単一テーブル (Topic) から単一テーブル (Topic) への ETL
データベースの変更を MaxCompute に同期する
データベースからのデータ変更を Hologres に同期する
データベースからのデータ変更を AnalyticDB For MySQL 3.0 に同期する
データベースからのデータ変更を DataHub に同期する
データベースの変更を Kafka に同期する
[ノードの作成] ダイアログボックスで、リアルタイム同期ノードの名前を入力し、[確認] をクリックしてノード構成ページに進みます。
ステップ 2: リアルタイム同期タスクの構成
リアルタイム同期ノードに構成する必要があるパラメーターは、ノードの作成時に選択した同期方法によって異なります。
単一テーブルのリアルタイム同期タスクを構成する
ノード構成ページで、次の手順に従って、単一テーブルのリアルタイム同期タスクの入力データソース、データ変換、および出力データソースを構成します:
入力データソースを構成します。
ノード構成ページの左側にある [入力] セクションで、データソースコンポーネントをキャンバスにドラッグします。

入力コンポーネントをクリックして、右側の [ノード構成] ダイアログボックスでそのパラメーターを構成します。
単一テーブル同期では、次の入力データソースタイプがサポートされています。各データソースタイプの構成の詳細については、リンク先のトピックをご参照ください。
(オプション) データ変換を構成します。
左側の [変換] セクションで、変換コンポーネントをキャンバスにドラッグし、アップストリームコンポーネントに接続します。

変換コンポーネントをクリックし、右側の [ノード構成] ダイアログボックスでそのパラメーターを構成します。
単一テーブル同期では、次の変換メソッドがサポートされています。各メソッドの構成の詳細については、リンク先のトピックをご参照ください。
データフィルタリング: フィールドサイズなどのルールに基づいてデータをフィルター処理します。指定されたルールを満たすデータのみが保持されます。
文字列の置換: 文字列フィールドの値を置き換えます。
データマスキング: リアルタイム同期中に単一テーブルのデータをマスキングし、マスキングされたデータを指定されたデータベースの場所に保存します。
説明複数の変換コンポーネントを使用して、データをフィルター処理および加工できます。
出力データソースを構成します。
左側の [出力] セクションで、ターゲット出力データソースコンポーネントをキャンバスにドラッグします。コンポーネントをそのアップストリームコンポーネントに接続します。ターゲットデータソース、テーブル、およびフィールドマッピングを構成します。ターゲットテーブルが存在しない場合は、[ワンクリックでテーブルを作成] をクリックして作成します。

出力コンポーネントをクリックして、右側の [ノード構成] ダイアログボックスでそのパラメーターを構成します。
単一テーブル同期では、次の出力データソースタイプがサポートされています。各データソースタイプの構成の詳細については、リンク先のトピックをご参照ください。
キャンバスの上にあるツールバーで、[保存] をクリックしてタスク構成を保存します。
データベース全体のリアルタイム同期タスクを構成する
ノード構成ページで、次の手順に従って、データベース全体のリアルタイム同期タスクを構成します:
同期ソースとルールを構成します。
[データソース] セクションで、[タイプ] と [データソース] を選択します。
[同期するソーステーブルの選択] セクションで、[ソーステーブル] エリアから 1 つ以上のテーブルを選択し、
アイコンをクリックして [選択したテーブル] エリアに移動します。テーブル名のマッピングルールを設定します。
説明このステップでは、同期するソースからデータベースとテーブルを選択できます。デフォルトでは、ソースデータベースまたはテーブルのデータは、同じ名前のターゲットスキーマまたはテーブルに書き込まれます。ターゲットスキーマまたはテーブルが存在しない場合、システムは自動的に作成します。また、[テーブル/データベース名マッピングルールの設定] を使用して、ターゲットスキーマまたはテーブルの名前をカスタマイズすることもできます。
[ソースとターゲットのテーブル名変換ルール]: 正規表現を使用して、ソーステーブル名をターゲットテーブル名に変換できます。
例 1: この例では、プレフィックスが
doc_のソーステーブルから、プレフィックスがpre_のターゲットテーブルにデータを同期します。
例 2: 複数のテーブルから単一のターゲットテーブルにデータを書き込みます。
ソーステーブル
table_01、table_02、およびtable_03から単一のテーブルmy_tableにデータを同期するには、[ソース] をtable_*に、[ターゲット] をmy_tableに設定して正規表現変換ルールを構成します。
[ターゲットテーブル名ルール]: 組み込み変数を使用してターゲットテーブル名を生成できます。生成された名前にプレフィックスとサフィックスを追加することもできます。使用可能な組み込み変数は次のとおりです:
${db_table_name_src_transed}: ソースとターゲットのテーブル名変換ルールに基づいて変換された後のテーブルの名前。${db_name_src_transed}: ソースデータベースとターゲットスキーマ名の変換ルールによって生成されたターゲットスキーマ名。${ds_name_src}: ソースデータソースの名前。
たとえば、前のステップでテーブル名変換ルールを使用してソーステーブルの名前を
my_tableに変更した場合、新しいテーブル名に対してさらに文字列連結を実行できます。プレフィックスとサフィックスを追加するには、前のステップの結果を表す${db_table_name_src_transed}変数を使用します。たとえば、式pre_${db_table_name_src_transed}_postは、ソーステーブルをpre_my_table_postという名前のターゲットテーブルにマッピングします。
[次へ] をクリックして、[ターゲットテーブルの設定] ページに進みます。
ターゲットテーブルを設定します。
[ターゲットテーブルの設定] または [Topic] ページで、書き込みモードやパーティション設定など、ターゲットデータソース の基本情報を構成できます。使用可能な設定は、各データソースのリアルタイム同期インターフェイスによって異なります。
[ソースとターゲットのテーブルマッピングを更新] をクリックして、ソーステーブルとターゲットテーブル間のマッピングを作成します。
[追加フィールドの編集] または [ターゲットテーブルの追加フィールドを一括編集] を使用して、定数、変数、およびその他の操作をターゲットテーブルに追加できます。構成は、各データソースのリアルタイム同期インターフェイスによって異なります。
ターゲットテーブルにプライマリキーがない場合、移行はサポートされません。代替として、[同期プライマリキー] を設定できます。
説明多数のテーブルを同期している場合、プロセスが遅くなることがあります。プロセスが完了するまでお待ちください。
[次へ] をクリックして、[テーブルレベルの同期ルールの設定] ページに進みます。
説明システムが一部のテーブルを自動的に作成する必要があると検出した場合、次のステップに進む前にテーブルが作成されるのを待つ必要があります。
(オプション) テーブルレベルの同期ルールを設定します。
一部の同期ソリューションは、カスタムのテーブルレベル DML 処理ポリシーをサポートしています。これにより、ソーステーブルで発生する挿入、更新、削除操作の処理方法を定義できます。
説明サポートされる DML 操作はデータソースによって異なる場合があります。特定の同期ソリューションが DML 処理ポリシーをサポートするかどうかは、プロダクトインターフェイスによって異なります。さまざまなデータソースでサポートされている DML 操作の詳細については、「サポートされている DML および DDL 操作」をご参照ください。
[次へ] をクリックして、[DDL メッセージの処理ポリシーの設定] ページに進みます。
DDL メッセージ処理ルールを設定します。
[DDL メッセージの処理ポリシーの設定] ページでは、ソースデータソースで発生する DDL 操作の処理方法を構成できます。リアルタイム同期中に、必要に応じてターゲットに同期されるさまざまな DDL メッセージの処理ポリシーを設定できます。サポートされる DDL 操作はデータソースによって異なります。詳細については、「サポートされている DML および DDL 操作」をご参照ください。各ターゲットデータベースタイプに対して DDL 処理ポリシーを設定できます。次の表に、DDL メッセージ処理ポリシーを示します。
DDL メッセージタイプ
処理ポリシー
テーブルの作成
DataWorks が対応するタイプの DDL メッセージを受信した場合の処理ポリシーは次のとおりです:
通常の処理: この DDL メッセージは処理のためにターゲットデータソースに渡されます。ターゲットデータソースごとに処理ポリシーが異なる場合があります。
無視: この DDL メッセージは破棄され、ターゲットデータソースには送信されません。
エラー: リアルタイム同期タスクは直ちにエラーステータスで終了します。
テーブルの削除
列の追加
列の削除
テーブル名の変更
列名の変更
列の型の変更
テーブルの切り捨て
[次へ] をクリックして、[ランタイムリソースの設定] ページに移動します。
ランタイムリソースを設定します。
: タスクがターゲットデータベースに書き込むための同時スレッドの最大数を指定します。
: 同期タスクがダーティデータを許容するかどうかを指定します。
ダーティデータを許可しない場合、実行中にダーティデータが生成されるとタスクは失敗して停止します。
ダーティデータを許可する場合、タスクはダーティデータを無視して実行を継続します。ダーティデータはターゲットに書き込まれません。
構成を完了します。
[構成の完了] をクリックして、データベース全体のリアルタイム同期タスクの構成を完了します。
ステップ 3: リアルタイム同期ノードのランタイムリソースの構成
構成が完了したら、次の手順に従ってリアルタイム同期ノードのランタイムリソースを構成します。
ノード構成ページで、右側のツールバーから [基本構成] を選択します。
[基本構成] ページで、アタッチしたサーバーレス [リソースグループ] を選択します。タスク実行のために [割り当て済み] CU の数と [同時実行数] を構成します。
ランタイムリソースを構成した後、ノード構成ページの上部にあるツールバーの [保存] ボタンをクリックして、リアルタイム同期タスクを保存します。
ステップ 4: リアルタイム同期タスクの実行
リアルタイム同期ノードを構成した後、次の手順に従ってタスクを実行します。
ノードを公開します。
タスクを実行する前に、オペレーションセンターに公開する必要があります。画面の指示に従って、リアルタイム同期ノードを公開します。詳細については、「ノードまたはワークフローを公開する」をご参照ください。
ノードを実行します。
ノードが公開された後、[本番環境に公開] セクションの下部にある [オペレーションセンターへ移動] をクリックして、[オペレーションセンター] の [リアルタイム同期タスク] リストページを開きます。
作成したリアルタイム同期タスクを見つけ、[アクション] 列の [開始] をクリックします。[開始] ページで、[オフセットのリセット] などの設定を構成し、[確認] をクリックしてタスクを開始します。
説明オフセットを手動で設定するかどうかの詳細については、「付録: オフセットのリセットについて」をご参照ください。
タスクの [ステータス] が [実行中] に変わったら、その [タスク名] をクリックして [実行情報] ページを開きます。
[実行情報] ページで、[ログ] タブをクリックしてタスクの実行ステータスを表示します。
付録: オフセットのリセットについて
次のような状況では、DataWorks リアルタイム同期タスクのオフセットを手動で設定する必要がある場合があります:
中断後のタスクの回復: タスクを再開するときに、中断した時点にオフセットを手動で設定できます。これにより、ブレークポイントから同期が再開されることが保証されます。
データの損失または例外: データが書き込まれる前の時点にオフセットをリセットして、データ整合性を確保できます。
タスク構成の調整: ターゲットテーブルまたはフィールドマッピングを変更した後、オフセットを手動で設定して同期の精度を確保できます。
オフセットエラーまたは存在しないオフセットが発生した場合は、次のいずれかの方法で問題を解決できます:
オフセットのリセット: タスクを開始するときに、ソースデータベースから利用可能な最も早いオフセットを選択します。
ログ保持期間の調整: オフセットが期限切れになった場合は、データベースのログ保持期間を調整できます。たとえば、保持期間を 7 日間に設定できます。
データ同期: データが失われた場合は、完全同期を再度実行するか、バッチ同期タスクを構成できます。