データソース、ネットワーク環境、およびリソースを準備した後、ソースの単一テーブルまたはデータベースからターゲットに増分データを同期するためのリアルタイム同期タスクを作成できます。このトピックでは、ソースの単一テーブルまたはデータベースからターゲットにリアルタイムで増分データを同期するためのリアルタイム同期タスクを作成する方法と、タスクの実行中の情報を表示する方法について説明します。
前提条件
使用するデータソースが準備されていること。同期タスクを構成する前に、データの読み取り元となるデータソースとデータの書き込み先となるデータソースを準備する必要があります。このようにして、データ同期タスクを構成するときに、データソースを選択できます。リアルタイム同期とデータソースの追加をサポートするデータソースタイプの詳細については、「リアルタイム同期をサポートするデータソースタイプ」をご参照ください。
説明データソースを追加する前に理解しておく必要のある項目の詳細については、「概要」をご参照ください。
Data Integration 専用リソースグループ、またはビジネス要件を満たすサーバーレスリソースグループが購入され、リソースグループがワークスペースに関連付けられていること。詳細については、「サーバーレスリソースグループを使用する」または「Data Integration 専用リソースグループを作成して使用する」をご参照ください。
リソースグループとデータソース間にネットワーク接続が確立されていること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
注意事項
以下の状況では、リアルタイム同期タスクの同期オフセットを手動で指定する必要があります。
タスクが中断された後にリアルタイム同期タスクを再開する場合、タスクが中断された時点を手動でタスクの同期オフセットとして設定する必要があります。このようにして、リアルタイム同期タスクが再起動された後、タスクはその時点からデータの同期を開始できます。
リアルタイム同期タスクの実行中にデータが失われた場合、または例外が発生した場合、タスクの同期オフセットを、データがターゲットへの書き込みを開始した時点よりも前の時点に手動でリセットする必要があります。これにより、データの整合性を確保できます。
ターゲットテーブルまたはフィールドマッピングに関連する構成の変更など、リアルタイム同期タスクの構成を変更した後、同期データの精度を確保するために、タスクの同期オフセットを手動で指定する必要があります。
同期タスクの実行中に、同期オフセットが正しくないか存在しないことを示すエラーがシステムから報告された場合は、次の方法を参照して問題を解決できます。
同期オフセットのリセット: 同期タスクがリアルタイム同期タスクの場合、タスクの同期オフセットをリセットし、ソースデータベースで使用可能な最も古い同期オフセットを選択できます。
バイナリログの保存期間の変更: 同期タスクで使用されるソースデータベースの同期オフセットの期限が切れている場合は、ソースデータベースのバイナリログの保存期間を変更できます。たとえば、保存期間を 7 日間に設定できます。
データの再同期: 同期タスクの実行中にデータが失われた場合は、同期タスクを実行して完全データを再同期するか、バッチ同期タスクを構成して失われたデータを手動で同期できます。
DataStudio ページに移動する
リアルタイム同期タスクを作成および構成するには、DataWorks コンソールで DataStudio ページに移動する必要があります。
DataStudio ページに移動します。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
ステップ 1: リアルタイム同期タスクを作成する
ワークフローを作成します。詳細については、「ワークフローを作成する」をご参照ください。
リアルタイム同期タスクを作成します。
次のいずれかの方法を使用して、リアルタイム同期タスクを作成できます。
方法 1: DataStudio ページの [スケジュール済みワークフロー] ペインで、[ビジネスフロー] セクションから目的のワークフローを見つけ、ワークフローの名前をクリックします。次に、[data Integration] を右クリックし、
を選択します。方法 2: DataStudio ページの [スケジュール済みワークフロー] ペインで、[ビジネスフロー] セクションから目的のワークフローを見つけ、ワークフローの名前をダブルクリックします。表示されるワークフロー構成タブの上部ツールバーにある [ノードの作成] をクリックします。[data Integration] セクションで、[リアルタイム同期] を右側のキャンバスにドラッグします。
[ノードの作成] ダイアログボックスで、次の表に示すパラメーターを構成します。
パラメーター
説明
ノードタイプ
タスクのタイプ。デフォルト値: リアルタイム同期。
同期方法
単一テーブルから増分データを同期するために使用されるリアルタイム同期タスクを作成する場合は、このパラメーターを エンドツーエンド ETL に設定します。この方法では、1 つ以上のテーブルから単一のターゲットテーブルに増分データを同期できます。
説明この同期方法を使用する場合、データは 1 つのターゲットテーブルにのみ書き込むことができます。複数のターゲットテーブルにデータを書き込む場合は、次のいずれかのソリューションを使用できます。
データ同期中にデータをフィルタリング、文字列の置換、またはデータのマスキングを行う場合は、複数のリアルタイム同期タスクを作成し、各タスクを使用して増分データを単一のテーブルにリアルタイムで同期できます。
複数のソーステーブルから複数のターゲットテーブルに増分データを同期する場合は、複数のリアルタイム同期タスクを作成できます。特定タイプのデータソースの場合、リアルタイム同期タスクを作成して、データベースからのすべての増分データを同期することもできます。
一度に完全データを同期してから、増分データをリアルタイムでターゲットに同期する場合は、Data Integration で同期タスクを作成できます。詳細については、「Data Integration で同期タスクを構成する」をご参照ください。
データベースからのすべての増分データを同期するために使用されるリアルタイム同期タスクを作成する場合は、Maxcompute への移行 など、データベースの変更を同期するために使用されるデータ同期方法を選択します。
パス
リアルタイム同期タスクを保存するディレクトリ。
名前
タスク名は 128 文字以内で、文字、数字、アンダースコア (_)、およびピリオド (.) のみを含めることができます。
ステップ 2: リソースグループを構成する
リアルタイム同期タスクを実行するには、サーバーレスリソースグループまたは Data Integration 専用リソースグループのみを使用できます。リアルタイム同期タスクのリソースグループを構成するには、次の操作を実行します。作成したタスクの名前をダブルクリックします。タスクの構成タブの右側のナビゲーションウィンドウで、[基本構成] タブをクリックします。[基本構成] タブで、[リソースグループ] ドロップダウンリストから、データソースに接続されているリソースグループを選択します。
リソースグループを作成済みであるにもかかわらず、リソースグループが表示されない場合は、リソースグループがワークスペースに関連付けられているかどうかを確認する必要があります。詳細については、「サーバーレス リソースグループを使用する」または「データ統合専用リソースグループを作成して使用する」をご参照ください。
リアルタイム同期タスクとバッチ同期タスクは、異なるリソースグループで実行することをお勧めします。同じリソースグループでタスクを実行すると、2 つのタスクがリソースを競合し、互いに影響を及ぼします。たとえば、タスクで使用される CPU リソース、メモリリソース、およびネットワークが互いに影響を及ぼす可能性があります。この場合、バッチ同期タスクが遅くなったり、リアルタイム同期タスクが遅延したりする可能性があります。さらに悪いことに、リソース不足のためにメモリ不足 (OOM) エラーが発生する可能性があります。
サーバーレスリソースグループを使用して同期タスクを実行する場合は、同期タスクの実行に使用できる CU 数の制限を指定できます。リソース不足のために同期タスクでメモリ不足 (OOM) エラーが報告された場合は、上限を適切に変更できます。
ステップ 3: リアルタイム同期タスクを構成する
単一テーブルから増分データを同期するためのリアルタイム同期タスクを構成する
ソースを構成します。
リアルタイム同期タスクの構成タブの [入力] セクションで、目的のソースタイプを右側のキャンバスにドラッグします。
ソースタイプをクリックし、表示されるパネルでパラメーターを構成します。
単一テーブルから増分データを同期するために使用されるリアルタイム同期タスクでサポートされているソースタイプと、関連ソースの構成方法については、次のトピックを参照してください。
オプション: データ変換コンポーネントを構成します。
データ同期の際にソースデータを目的の形式に変換する場合は、データ変換コンポーネントを構成できます。
リアルタイム同期タスクの構成タブの [変換] セクションで、目的のデータ変換コンポーネントを右側のキャンバスにドラッグします。
単一テーブルから増分データを同期するために使用されるリアルタイム同期タスクでは、次のデータ変換コンポーネントがサポートされています。
データフィルタリング: データフィルタリングコンポーネントを使用して、フィールドサイズなどの特定のルールに基づいてソースのデータをフィルタリングできます。ルールに一致するデータのみが保持されます。
文字列置換: 文字列置換コンポーネントを使用して、STRING データタイプのフィールド値を置換できます。
データマスキング: データマスキングコンポーネントを使用して、リアルタイム同期タスクで指定された単一ソーステーブルの機密データをマスキングし、タスクがマスキングされたデータを指定されたデータベースに保存できるようにすることができます。
コンポーネントをクリックし、表示されるパネルでパラメーターを構成します。
ターゲットを構成します。
リアルタイム同期タスクの構成タブの [出力] セクションで、目的のターゲットタイプを右側のキャンバスにドラッグします。
ターゲットタイプをクリックし、表示されるパネルでパラメーターを構成します。
単一テーブルから増分データを同期するために使用されるリアルタイム同期タスクでサポートされているターゲットタイプと、関連ターゲットの構成方法については、次のトピックを参照してください。
ソースをターゲットに接続します。
ソースとターゲットを構成した後、線を描画して接続できます。このようにして、構成に基づいてデータソース間でデータを同期できます。
データベースから増分データを同期するためのリアルタイム同期タスクを構成する
データを読み取るテーブルを選択し、マッピングルールを構成します。
[ソースと同期ルールの構成] ステップの [ソース] セクションで、[タイプ] パラメーターと [データソース] パラメーターを構成します。
データを読み取るテーブルを選択します。
[同期するソーステーブルの選択] セクションでは、選択したデータソースのすべてのテーブルが [ソーステーブル] リストに表示されます。[ソーステーブル] リストからすべてまたは一部のテーブルを選択し、
アイコンをクリックして、テーブルを [選択したテーブル] リストに移動できます。
重要選択したテーブルにプライマリキーがない場合、テーブル内のデータをリアルタイムで同期することはできません。
ソーステーブルの名前とターゲットテーブルの名前のマッピングルールを構成します。
増分データを同期するソースデータベースとテーブルを選択すると、リアルタイム同期タスクは、データベースとテーブルのデータを、ソースデータベースとテーブルと同じ名前のターゲットスキーマとテーブルに自動的に書き込みます。そのようなターゲットスキーマまたはテーブルが存在しない場合、システムはターゲットにスキーマまたはテーブルを自動的に作成します。[テーブル/データベース名にマッピングルールを設定する] セクションでマッピングルールを構成して、データを書き込むターゲットスキーマまたはテーブルの名前を指定できます。マッピングルールでターゲットテーブル名を指定して、複数のソーステーブルのデータを同じテーブルに書き込むことができます。また、マッピングルールでプレフィックスを指定して、ソースデータベースとは異なるプレフィックスで始まる名前のデータベース、またはソーステーブルとは異なるプレフィックスで始まる名前のテーブルにデータを書き込むこともできます。
ソーステーブル名とターゲットテーブル名の変換ルール: このタイプのマッピングルールでは、正規表現を使用して、データを書き込むターゲットテーブルの名前をソーステーブルの名前にマッピングできます。
例 1: プレフィックス doc_ で始まる名前のソーステーブルから、プレフィックス pre_ で始まる名前のターゲットテーブルにデータを同期します。
例 2: 複数のソーステーブルから同じターゲットテーブルにデータを同期します。
table_01、table_02、および table_03 から my_table に増分データを同期するには、[ソーステーブル名とターゲットテーブル名の変換ルール] タイプのマッピングルールを構成し、[ソース] を table.* に、[ターゲット] を my_table に設定します。
ターゲットテーブル名のルール: このタイプのマッピングルールでは、組み込み変数を使用して、データを書き込むターゲットテーブルの名前を指定し、ターゲットテーブルの名前にプレフィックスとサフィックスを追加できます。次の組み込み変数がサポートされています。
${db_table_name_src_transed}: [ソーステーブル名とターゲットテーブル名の変換ルール] タイプのマッピングルールに基づいてマッピングされたターゲットテーブルの名前
${db_name_src_transed}: [ソースデータベース名とターゲットスキーマ名の変換ルール] タイプのマッピングルールに基づいてマッピングされたターゲットスキーマの名前
${ds_name_src}: ソースの名前
たとえば、pre_${db_table_name_src_transed}_post を構成して、前の例で生成されたテーブル名 my_table を pre_my_table_post に変換できます。
ソースデータベース名とターゲットスキーマ名の変換ルール: このタイプのマッピングルールでは、正規表現を使用して、データを書き込むターゲットスキーマの名前を指定できます。
例: プレフィックス doc_ で始まる名前のソーススキーマから、プレフィックス pre_ で始まる名前のターゲットスキーマにデータを同期します。
ターゲットを選択し、ターゲットテーブルまたはターゲットトピックを構成します。
[ターゲットテーブルの設定] ステップで、[ターゲット] の基本情報を構成します。たとえば、[書き込みモード] パラメーターと [時間による自動パーティション分割] パラメーターを構成できます。必要な構成は、データソースタイプによって異なります。DataWorks コンソールに表示されるパラメーターが優先されます。
[ソーステーブルとターゲットテーブルのマッピングを更新] をクリックして、ソーステーブルをターゲットテーブルにマッピングします。
ターゲットスキーマとターゲットテーブルのカスタム名を指定できます。また、[操作] 列の [追加フィールドの編集] をクリックして、ターゲットテーブルに追加フィールドを追加し、追加フィールドに定数または変数を値として割り当てることもできます。必要な構成は、データソースタイプによって異なります。DataWorks コンソールに表示されるパラメーターが優先されます。
説明多数のテーブルからデータを同期する場合、マッピングに時間がかかる場合があります。
オプション: テーブルレベルで DML 処理ルールを構成します。
DataWorks では、一部の同期タスクのテーブルレベルの DML 処理ルールを構成できます。ソーステーブルで実行される INSERT、UPDATE、および DELETE 操作に対して生成されるメッセージの処理ルールを構成できます。
説明DML 操作によって生成されたデータ変更の同期のサポートは、ターゲットタイプによって異なります。DataWorks コンソールで同期タスクを構成するときに、同期タスクが DML 処理ルールの構成をサポートしているかどうかを確認できます。詳細については、「サポートされている DML 操作と DDL 操作」をご参照ください。
DDL 処理ルールを構成します。
DDL 操作は、ソーステーブルで実行される場合があります。リアルタイム同期タスクを構成するときに、ビジネス要件に基づいて、さまざまな DDL 操作に対して生成されるメッセージの処理ルールを構成できます。DDL 操作によって生成されたデータ変更の同期のサポートは、ターゲットタイプによって異なります。詳細については、「サポートされている DML 操作と DDL 操作」をご参照ください。特定のターゲットタイプの処理ルールを構成することもできます。特定のターゲットタイプの処理ルールを構成するには、次の手順を実行します。[data Integration] ページの左側のナビゲーションウィンドウで、 を選択します。[リアルタイム同期の DDL メッセージの処理ポリシー] ページで、DDL 処理ルールを構成します。次の表に、さまざまなタイプの DDL メッセージの処理ルールを示します。
DDL メッセージタイプ
処理ルール
CreateTable
DataWorks は、次のルールに基づいて関連タイプの DDL メッセージを処理します。
通常処理: DataWorks は DDL メッセージをターゲットに送信します。次に、ターゲットは DDL メッセージを処理します。ターゲットによって DDL メッセージへの応答方法は異なります。そのため、DataWorks はメッセージをターゲットに送信するだけです。
無視: DataWorks は DDL メッセージを破棄し、ターゲットに配信しません。
アラート: DataWorks は DDL メッセージを破棄し、リアルタイム同期ログにアラートを生成します。アラートは、実行エラーが原因でメッセージが破棄されたことを示します。
エラー: DataWorks はリアルタイム同期タスクを終了し、タスクステータスを [失敗] に設定します。
DropTable
AddColumn
DropColumn
RenameTable
RenameColumn
ChangeColumn
TruncateTable
リアルタイム同期タスクの実行に必要なリソースを構成します。
ソースからのデータの読み取りとターゲットへのデータの書き込みに使用できる並列スレッドの最大数を指定できます。
データ同期中にダーティデータを許可するかどうかを指定できます。
許可しない: データ同期中にダーティデータレコードが生成された場合、リアルタイム同期タスクは失敗します。
許可する: データ同期中にダーティデータレコードが生成された場合、ダーティデータレコードはターゲットに書き込まれずに無視され、リアルタイム同期タスクは実行を続けます。
[完了] をクリックします。
ステップ 4: リアルタイム同期タスクをコミットしてデプロイする
上部ツールバーの
アイコンをクリックして、タスクを保存します。
上部ツールバーの
アイコンをクリックして、タスクをコミットします。
[送信] ダイアログボックスで、[変更の説明] フィールドに説明を入力します。
[確認] をクリックします。
標準モードのワークスペースを使用する場合は、タスクをコミットした後に、リアルタイム同期タスクを本番環境にデプロイする必要があります。リアルタイム同期タスクをデプロイするには、DataStudio ページの上部ナビゲーションバーにある [デプロイ] をクリックします。詳細については、「タスクをデプロイする」をご参照ください。
次のステップ
リアルタイム同期タスクの構成が完了したら、オペレーションセンターの [リアルタイム同期タスク] ページでタスクを開始および管理できます。[リアルタイム同期タスク] ページに移動するには、次の操作を実行します。DataWorks コンソールにログオンし、[オペレーションセンター] ページに移動します。[オペレーションセンター] ページの左側のナビゲーションウィンドウで、リアルタイム同期タスクの O&M」をご参照ください。
を選択します。詳細については、「付録
タスクの移行
DataStudio ページで単一テーブルからデータを同期するためのリアルタイム同期タスクを構成した後、タスクの構成タブで [data Integration に移行] をクリックして、タスクを [data Integration] ページに移行できます。
次のタイプのリアルタイム同期タスクのみを移行できます。
単一の Kafka トピックから MaxCompute にデータを同期するために使用されるリアルタイム同期タスク
単一の Kafka トピックから Hologres にデータを同期するために使用されるリアルタイム同期タスク
DataStudio ページの [スケジュール済みワークフロー] ペインで、単一テーブルからデータを同期するために使用される目的のリアルタイム同期タスクを見つけ、タスク名をダブルクリックしてタスクの構成テーブルに移動し、構成タブで [data Integration に移行] をクリックしてタスクを移行します。
DataStudio ページの左上隅にある
アイコンをクリックし、 を選択します。[同期タスク] ページが表示されます。[同期タスク] ページで、DataStudio から移行されたリアルタイム同期タスクを表示します。
リアルタイム同期タスクを Data Integration に移行した後、オペレーションセンターに移動することなく、Data Integration でタスクの O&M 操作を直接実行できます。リアルタイム同期タスクは、オペレーションセンターには表示されません。移行操作は、保存されているタスク構成と実行中のタスクには影響しません。
DataStudio からリアルタイム同期タスクを移行した後、元のタスクは DataStudio のゴミ箱に移動されます。タスクの変更と O&M 操作は、Data Integration の [同期タスク] ページでのみ実行できます。
DataStudio で操作を実行する
DataStudio でリアルタイム同期タスクを作成する場合は、次の手順を参照できます。