DataWorks のデータ統合機能を使用して、Tablestore から MaxCompute に完全データをエクスポートし、オフラインでの分析と処理を行います。DataWorks は、ビッグデータ開発とガバナンスのためのプラットフォームです。
前提条件
データをエクスポートする前に、以下の準備を完了してください。
Tablestore のソーステーブルに関する情報を取得します。これには、インスタンス名、インスタンスのエンドポイント、リージョン ID が含まれます。
データストレージの宛先として使用する MaxCompute プロジェクトを作成します。
ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey を作成します。アカウントまたはユーザーには、Tablestore と MaxCompute へのアクセス権限が必要です。
DataWorks を有効化し、ご利用の MaxCompute または Tablestore インスタンスが配置されているリージョンに ワークスペースを作成します。
サーバーレスリソースグループを作成し、ワークスペースにアタッチします。課金の詳細については、「サーバーレスリソースグループの課金」をご参照ください。
DataWorks ワークスペースと Tablestore インスタンスが異なるリージョンにある場合は、VPC ピアリング接続を作成して、クロスリージョンネットワーク接続を有効にする必要があります。
操作手順
以下の手順に従って、Tablestore から MaxCompute への完全データのエクスポートを設定します。
ステップ 1:Tablestore データソースの追加
DataWorks で Tablestore データソースを設定して、ソースデータテーブルに接続します。
DataWorks コンソールにログインします。ターゲットリージョンに切り替えます。左側のナビゲーションウィンドウで、を選択します。ドロップダウンリストからワークスペースを選択し、[データ統合に移動] をクリックします。
左側のナビゲーションウィンドウで、[データソース] をクリックします。
[データソース] ページで、[データソースの追加] をクリックします。
[データソースの追加] ダイアログボックスで、データソースタイプとして [Tablestore] を検索して選択します。
[OTS データソースの追加] ダイアログボックスで、次の表に従ってデータソースパラメーターを設定します。
パラメーター
説明
データソース名
データソース名は、文字、数字、アンダースコア (_) の組み合わせである必要があります。数字またはアンダースコア (_) で始めることはできません。
データソースの説明
データソースの簡単な説明。説明は 80 文字以内で入力してください。
リージョン
Tablestore インスタンスが存在するリージョンを選択します。
Tablestore インスタンス名
Tablestore インスタンスの名前。
エンドポイント
Tablestore インスタンスのエンドポイント。VPC アドレスを使用します。
AccessKey ID
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。
AccessKey シークレット
リソースグループの接続性をテストします。
データソースを作成する際は、リソースグループの接続性をテストして、同期タスク用のリソースグループがデータソースに接続できることを確認する必要があります。そうしないと、データ同期タスクを実行できません。
[接続設定] セクションで、リソースグループの [接続ステータス] 列にある [ネットワーク接続のテスト] をクリックします。
接続性テストに合格したら、[完了] をクリックします。新しいデータソースがデータソースリストに表示されます。
接続性テストが失敗した場合は、[ネットワーク接続性診断ツール] を使用して問題をトラブルシューティングします。
ステップ 2:MaxCompute データソースの追加
データエクスポートの宛先として使用する MaxCompute データソースを設定します。
再度 [データソースの追加] をクリックします。 データソースタイプとして [MaxCompute] を選択し、パラメーターを設定します。
パラメーター
説明
データソース名
名前は、文字、数字、アンダースコア (_) で構成する必要があります。数字またはアンダースコア (_) で始めることはできません。
データソースの説明
データソースの簡単な説明。説明は 80 文字以内で入力してください。
認証方式
デフォルト値は [Alibaba Cloud アカウントおよび Alibaba Cloud RAM ロール] です。この値は変更できません。
Alibaba Cloud アカウント
現在の Alibaba Cloud アカウント: 指定されたリージョンで、現在のアカウントの [MaxCompute プロジェクト名] と [デフォルトのアクセス ID] を選択します。
他の Alibaba Cloud アカウント:指定されたリージョンで、他のアカウントの [Alibaba Cloud アカウントの UID]、[MaxCompute プロジェクト名]、および [RAM ロール] を入力します。
リージョン
MaxCompute プロジェクトが配置されているリージョン。
エンドポイント
デフォルト値は [自動調整] です。 必要に応じて [カスタム設定] を選択することもできます。
パラメーターを設定し、接続性テストが成功したら、[完了] をクリックしてデータソースを追加します。
ステップ 3:バッチ同期タスクの設定
データ同期タスクを作成して、Tablestore から MaxCompute へのデータ転送ルールとフィールドマッピングを定義します。
タスクノードの作成
[データ開発] ページに移動します。
DataWorks コンソールにログインします。
上部のナビゲーションバーで、リソースグループとリージョンを選択します。
左側のナビゲーションウィンドウで、 をクリックします。
ターゲットワークスペースを選択し、[Data Studio に移動] をクリックします。
DataStudio コンソールで、[ワークスペースディレクトリ] の右側にある
アイコンをクリックし、 を選択します。[ノードの作成] ダイアログボックスで、[パス] を選択します。データソースを [Tablestore] に、宛先を [MaxCompute(ODPS)] に設定します。[名前] を入力し、[OK] をクリックします。
同期タスクの設定
[ワークスペースディレクトリ] で、新しいバッチ同期タスクノードをクリックして開きます。同期タスクは、コードレス UI またはコードエディタを使用して設定できます。
コードレス UI (デフォルト)
コードレス UI で以下の項目を設定します。
データソース: ソースと宛先のデータソースを選択します。
ランタイムリソース: リソースグループを選択します。選択すると、システムはデータソースの接続性を自動的にテストします。
データソース:
テーブル: ドロップダウンリストからソースデータテーブルを選択します。
プライマリキー範囲 (開始): データを読み取るプライマリキー範囲の開始を JSON 配列フォーマットで指定します。
inf_minは無限小の値を表します。たとえば、プライマリキーが
int型のid列と、string型のname列で構成されている場合、構成例は次のとおりです。特定のプライマリキー範囲
完全データ
[ { "type": "int", "value": "000" }, { "type": "string", "value": "aaa" } ][ { "type": "inf_min" }, { "type": "inf_min" } ]プライマリキー範囲 (終了): データを読み取るプライマリキー範囲の終了を指定します。値は JSON 配列フォーマットで指定する必要があります。
inf_maxは無限大の値を表します。例えば、プライマリキーが
idという名前のint型カラムと、nameという名前のstring型カラムで構成される場合、構成例は次のとおりです。特定のプライマリキー範囲
完全データ
[ { "type": "int", "value": "999" }, { "type": "string", "value": "zzz" } ][ { "type": "inf_max" }, { "type": "inf_max" } ]分割構成:カスタムの分割構成情報を JSON 配列フォーマットで指定します。ほとんどの場合、このパラメーターを構成する必要はありません。このパラメーターを構成しない場合は、
[]に設定します。Tablestore のデータストレージでホットスポットが発生し、Tablestore Reader の自動分割ポリシーが効果的でない場合は、カスタムの分割ルールを使用します。このルールは、開始と終了のプライマリキー範囲内の分割点を指定します。シャードキーのみを設定する必要があり、すべてのプライマリキーを指定する必要はありません。
送信先: 次の項目を設定します。その他のパラメーターはデフォルト値のままにするか、必要に応じて変更します。
本番プロジェクト名: 宛先データソースに関連付けられている MaxCompute プロジェクトの名前です。このパラメーターは自動的に入力されます。
Tunnel リソースグループ:デフォルト値は [共通転送リソース] です。このリソースグループは MaxCompute の無料クォータを使用します。必要に応じて、専用の Tunnel リソースグループを選択してください。
テーブル: 宛先テーブルを選択します。[宛先テーブルスキーマを生成] をクリックして、宛先テーブルを自動的に作成します。
パーティション: 同期データが保存されるパーティションを指定します。このパラメーターは日次増分同期に使用できます。
書き込みモード: 書き込み前に既存のデータをクリアするか、テーブルにデータを追加するかを選択します。
宛先フィールドマッピング: ソーステーブルから宛先テーブルへのフィールドマッピングを設定します。システムは、ソーステーブルに基づいてフィールドを自動的にマッピングします。必要に応じてマッピングを変更できます。
構成が完了したら、ページ上部の[保存]をクリックします。
コードエディタ
エディターでスクリプトを編集するには、ページの上部にある [コードエディター] をクリックします。
次のサンプルスクリプトは、ソースデータテーブルに関するものです。このテーブルのプライマリキーは、idという名前のint列と、nameという名前のstring列で構成されます。属性列は、ageという名前のintフィールドです。ご使用のスクリプトでは、datasourceパラメーターとtableパラメーターの値を実際の値に置き換えてください。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "INTEGER"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INTEGER"
}
],
"range": {
"begin": [
{
"type": "inf_min"
},
{
"type": "inf_min"
}
],
"end": [
{
"type": "inf_max"
},
{
"type": "inf_max"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "pt=${bizdate}",
"truncate": true,
"datasource": "target_data",
"tunnelQuota": "default",
"column": [
"id",
"name",
"age"
],
"emptyAsNull": false,
"guid": null,
"table": "source_table",
"consistencyCommit": false
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}スクリプトの編集が完了したら、ページの上部にある[保存]をクリックします。
同期タスクの実行
ページの右側にある[デバッグ設定]をクリックし、タスクを実行するリソースグループを選択して、[スクリプトパラメーター]を追加します。
bizdate:MaxCompute 宛先テーブルのデータパーティション。例:
20251120。
ページの上部にある[実行]をクリックして、同期タスクを開始します。
ステップ 4:同期結果の表示
同期タスクが完了したら、ログで実行ステータスを表示し、DataWorks コンソールで同期されたデータを確認します。
ページ下部でタスクの実行ステータスと結果を表示します。以下のログ情報は、同期タスクが正常に実行されたことを示しています。
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208s宛先テーブルで同期されたデータを表示します。
DataWorks コンソールに移動します。 左側のナビゲーションウィンドウで、 をクリックします。 次に、[データマップに移動] をクリックして、宛先テーブルとそのデータを表示します。
