このトピックでは、MaxComputeからMySQLデータソースにデータを同期するための同期タスクを作成する方法について説明します。
前提条件
ApsaraDB RDS for MySQLインスタンスが作成され、インスタンスのIDが取得されていること。ApsaraDB RDSコンソールでインスタンスのホワイトリストが設定されていること。詳細については、ApsaraDB RDS for MySQLインスタンスの作成をご参照ください。
説明[サーバーレスリソースグループ] を使用してApsaraDB RDS for MySQLデータソース用に構成された同期タスクを実行する場合は、次の点に注意してください。
仮想プライベートクラウド(VPC)経由でデータソースにアクセスする場合は、サーバーレスリソースグループが関連付けられているvSwitchのCIDRブロックをデータソースのIPアドレスホワイトリストに追加する必要があります。
インターネット経由でデータソースにアクセスする場合は、[サーバーレスリソースグループ] が関連付けられているVPCに構成されているElastic IPアドレス(EIP)をデータソースのIPアドレスホワイトリストに追加する必要があります。
詳細については、「IPアドレスホワイトリストの設定」をご参照ください。
odps_result という名前のテーブルが、データを同期するApsaraDB RDS for MySQLデータベースに作成されていること。次のステートメントを実行することで、テーブルを作成できます。
CREATE TABLE `ODPS_RESULT` ( `education` varchar(255) NULL , `num` int(10) NULL );テーブルの作成後、
desc odps_result;ステートメントを実行してテーブルの詳細を表示できます。result_table という名前の結果テーブルが準備されていること。詳細については、「テーブルの作成とデータのアップロード」をご参照ください。
start という名前のゼロロードノードと insert_data という名前のODPS SQLノードが作成されていること。詳細については、「ワークフローの作成」をご参照ください。
背景情報
Data Integrationを使用して、ビジネスシステムで生成されたビジネスデータをDataWorksワークスペースに定期的に同期できます。 SQLタスクを作成してデータを計算し、Data Integrationを使用して計算結果を指定されたデータソースに定期的に同期して、さらに表示または使用できます。
Data Integrationでは、ApsaraDB RDS、MySQL、SQL Server、PostgreSQL、MaxCompute、ApsaraDB for Memcache(OCS)、PolarDB-X 1.0、Object Storage Service(OSS)、Oracle、File Transfer Protocol(FTP)、Dameng(DM)、Hadoop Distributed File System(HDFS)、MongoDBなど、さまざまなデータソースとのデータのインポートとエクスポートが可能です。 データソースの種類の詳細については、「サポートされているデータソースの種類と同期操作」をご参照ください。
手順 1:データソースを追加する
データソースを追加できるのはワークスペース管理者のみです。他のロールが割り当てられているメンバーは、データソースを表示することしかできません。
SettingCenterページに移動します。
DataWorksコンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションペインで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターに移動] をクリックします。
MySQLデータソースを追加します。
SettingCenterページの左側のナビゲーションペインで、 を選択して、データソースページに移動します。
データソースページの左上隅にある [データソースの追加] をクリックします。[データソースの追加] ダイアログボックスで、[mysql] をクリックします。
MySQLデータソースに関する情報を設定します。
[mysqlデータソースの追加] ページで、パラメーターを設定します。この例では、[構成モード] パラメーターを [alibaba Cloudインスタンスモード] に設定します。
次の表に、MySQLデータソースを追加するための主要なパラメーターを示します。
パラメーター
説明
適用環境
データソースが使用される環境。有効な値:[開発環境] および [本番環境]。
構成モード
データソースを追加するモード。[alibaba Cloudインスタンスモード] を選択します。
Alibaba Cloudアカウント
[現在のalibaba Cloudアカウント] を選択します。
リージョン
データソースが存在するリージョン。
インスタンス
作成したApsaraDB RDS for MySQLインスタンスを選択します。インスタンスを選択した後、[最新のアドレスを取得] をクリックして、インスタンスに関する情報を表示できます。
インスタンスが使用できない場合は、ApsaraDB RDSコンソール でインスタンスを作成できます。
データベース名、ユーザー名、パスワード
デフォルトのApsaraDB RDS for MySQLデータベースの名前、およびデータベースにログインするために使用するユーザー名とパスワード。パスワードにはアットマーク (@) を使用しないでください。
次の説明は、MySQLデータソースを使用する同期タスクを設定するための手順を示しています。
MySQLデータソースを使用するデータベースレベルのリアルタイムまたはバッチ同期タスクを構成する場合、ApsaraDB RDS for MySQLインスタンスでアクセス権限を持つ1つ以上のデータベースを選択できます。
バッチ同期タスクを構成するときに複数のデータベースを選択する場合は、データベースごとにデータソースを追加する必要があります。
認証方法
[認証なし] を選択します。
セカンダリインスタンスの設定
ApsaraDB RDS for MySQLインスタンスがプライマリインスタンスであり、セカンダリ読み取り専用インスタンスがある場合は、[セカンダリインスタンスの設定] をオンにして、セカンダリインスタンスのIDを選択できます。これにより、プライマリインスタンスへの影響を防ぎ、プライマリインスタンスのパフォーマンスを確保できます。プライマリインスタンスに複数のセカンダリ読み取り専用インスタンスがある場合は、読み取り専用インスタンスの1つのデータのみが読み取られます。
説明この機能は、サーバーレスリソースグループのみをサポートしています。
選択したデータソースとリソースグループ間のネットワーク接続をテストします。
[接続設定] セクションで [データ統合] と [データスケジューリング] をそれぞれクリックし、使用するデータ統合のリソースグループとスケジューリングのリソースグループを見つけ、[接続ステータス] 列の [ネットワーク接続のテスト] をクリックします。接続ステータスが [接続済み] の場合、リソースグループはデータソースに接続されています。
説明同期タスクは、特定の種類のリソースグループを1つだけ使用できます。
同期タスクが想定どおりに実行されるようにするには、同期タスクが実行されるすべてのリソースグループの種類とデータソース間のネットワーク接続をテストする必要があります。
データソースがネットワーク接続テストに合格した場合は、[作成完了] をクリックします。
手順 2:同期タスクを作成および構成する
このセクションでは、write_result という名前の同期ノードを作成して同期タスクを生成する方法について説明します。このタスクは、result_table テーブルのデータをMySQLデータソースに同期するために使用されます。手順:
DataStudioページに移動します。
DataWorksコンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションペインで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
バッチ同期ノードを作成します。
DataStudioページの [スケジュール済みワークフロー] ペインで、
アイコンをクリックし、 を選択して、write_resultという名前のバッチ同期ノードを作成し、同期タスクを生成します。特定のリソースグループとデータソース間のネットワーク接続を確立します。
同期ノードの構成タブの [ネットワーク接続とリソースグループの設定] ステップで、ソースに [maxcompute(odps)] を、ターゲットに [mysql] を選択し、同期タスクの実行に使用するリソースグループを選択して、リソースグループとデータソース間のネットワーク接続をテストします。ネットワーク接続の確立方法の詳細については、「ネットワーク接続ソリューション」をご参照ください。
[次へ] をクリックして、[タスクの設定] ステップに進みます。
データ同期の情報を設定します。
ソースとターゲットを設定します。
ソーステーブルとして result_table テーブルを選択し、ターゲットテーブルとして odps_result テーブルを選択します。ビジネス要件に基づいて他のパラメーターを設定します。詳細については、「コードレスUIを使用したバッチ同期タスクの設定」をご参照ください。
ソースフィールドとターゲットフィールド間のマッピングを設定します。
ソースとターゲットを設定した後、ソースフィールドとターゲットフィールド間のマッピングを設定する必要があります。マッピングを設定した後、バッチ同期タスクは、マッピングに基づいて、ソースフィールドの値を同じデータ型のターゲットフィールドに書き込みます。ソーステーブルのフィールドとターゲットテーブルのフィールドが1対1でマッピングされていることを確認してください。
チャネル制御ポリシーを設定します。
パラメーター
説明
タスクの予想最大同時実行数
バッチ同期タスクがソースからデータを読み取るか、ターゲットにデータを書き込むために使用する並列スレッドの最大数。
説明データ同期中に使用される並列スレッドの実際の数は、Data Integration専用リソースグループの仕様により、指定されたしきい値以下になる場合があります。 Data Integration専用リソースグループの料金は、使用される並列スレッドの数に基づいて課金されます。詳細については、「パフォーマンスメトリック」をご参照ください。
DataWorksは、スケジューリング用のリソースグループを使用して、Data Integrationのバッチ同期タスクをData Integration用のリソースグループに発行し、Data Integration用のリソースグループを使用してタスクを実行します。スケジューリング用のリソースグループを使用してバッチ同期タスクをスケジュールするための料金は、タスクの数に基づいて課金されます。タスク発行メカニズムの詳細については、「リソースグループ管理」をご参照ください。
同期レート
スロットリングを有効にするかどうかを指定します。
スロットリングを有効にする場合は、ソースへの読み取り負荷が大きくなりすぎないように、最大転送レートを指定できます。このパラメーターの最小値は 1 MB/s です。
スロットリングを有効にしない場合、データは、指定された最大並列スレッド数に基づいて、ハードウェアで許可される最大転送レートで転送されます。
説明帯域幅はData Integrationによって提供されるメトリックであり、Elastic Network Interface(ENI)の実際のトラフィックを表すものではありません。ほとんどの場合、ENIトラフィックはチャネルトラフィックの1~2倍です。実際のENIトラフィックは、データストレージシステムのシリアル化によって異なります。
ダーティデータレコードのポリシー
許可されるダーティデータレコードの最大数。
重要データ同期中に大量のダーティデータが生成されると、データ同期の全体的な速度に影響します。
このパラメーターが設定されていない場合、データ同期中にダーティデータレコードが許可され、ダーティデータレコードが生成されてもバッチ同期タスクは引き続き実行できます。
このパラメーターを 0 に設定すると、ダーティデータレコードは許可されません。データ同期中にダーティデータレコードが生成されると、バッチ同期タスクは失敗します。
このパラメーターに 0 より大きい値を指定すると、次のようになります。
データ同期中に生成されるダーティデータレコードの数が指定した値以下の場合、ダーティデータレコードは無視され、ターゲットに書き込まれず、バッチ同期タスクは引き続き実行されます。
データ同期中に生成されるダーティデータレコードの数が指定した値を超えると、バッチ同期タスクは失敗します。
説明ダーティデータとは、ビジネスにとって意味のないデータ、指定されたデータ型と一致しないデータ、またはデータ同期中に例外が発生するデータのことです。単一のデータレコードをターゲットに書き込むときに例外が発生した場合、そのデータレコードはダーティデータと見なされます。ターゲットに書き込めなかったデータレコードは、ダーティデータと見なされます。
たとえば、バッチ同期タスクがソースのVARCHAR型のデータをターゲットのINT型のフィールドに書き込もうとすると、データ変換エラーが発生し、データはターゲットに書き込まれません。この場合、データはダーティデータです。バッチ同期タスクを構成するときに、ダーティデータを許可するかどうかを制御できます。また、データ同期中に許可されるダーティデータレコードの最大数を指定することもできます。生成されたダーティデータレコードの数が指定した上限を超えると、バッチ同期タスクは失敗して終了します。
分散実行
バッチ同期タスクの分散実行モードを有効にするかどうかを指定します。
バッチ同期タスクの分散実行モードを有効にすると、システムはタスクをスライスに分割し、複数のElastic Compute Service(ECS)インスタンスに分散して並列実行します。この場合、ECSインスタンスが多いほど、データ同期の速度が速くなります。
バッチ同期タスクの分散実行モードを有効にしない場合、指定された最大並列スレッド数は、単一のECSインスタンスでタスクを実行するためにのみ使用されます。
データ同期の高いパフォーマンスが求められる場合は、分散実行モードを使用してバッチ同期タスクを実行できます。分散実行モードでバッチ同期タスクを実行すると、ECSインスタンスのフラグメントリソースを活用できます。これにより、リソース使用率が向上します。
重要専用リソースグループを使用し、リソースグループにECSインスタンスが1つしかない場合は、分散実行モードでバッチ同期タスクを実行しないことをお勧めします。
1つのECSインスタンスでデータ転送速度のビジネス要件を満たせる場合は、分散実行モードを有効にする必要はありません。これにより、タスクの実行モードを簡素化できます。
分散実行モードは、指定した最大並列スレッド数が 8 以上の場合にのみ有効にできます。
バッチ同期タスクの分散実行モードを有効にすると、より多くのリソースが使用されます。バッチ同期タスクの実行中にメモリ不足(OOM)エラーが報告された場合は、分散実行モードを無効にすることができます。
同期タスクのスケジューリング依存関係を設定します。
目的のワークフローの名前をダブルクリックします。ワークフローの構成タブで、insert_data ノードを write_result ノードの祖先ノードとして設定します。
設定が完了したら、トップツールバーの
アイコンをクリックしてタスクを保存します。
手順 3:同期タスクをコミットおよびデプロイする
同期タスクを保存した後、同期タスクが属するワークフローの構成タブに戻ります。ワークフローの構成タブのトップツールバーにある
アイコンをクリックして、同期タスクをスケジューリングシステムにコミットします。スケジューリングシステムは、設定に基づいて翌日以降のスケジュール時刻にノードを自動的に実行します。
次のステップ
これで、特定のデータソースにデータを同期するための同期タスクを作成する方法を学習しました。次のチュートリアルに進むことができます。次のチュートリアルでは、同期タスクのスケジューリングプロパティとスケジューリング依存関係を設定する方法を学習します。詳細については、「タスクのスケジューリングプロパティとスケジューリング依存関係の設定」をご参照ください。
参考資料
バッチ同期タスクの設定方法の詳細については、「コードレスUIを使用したバッチ同期タスクの設定」をご参照ください。