このトピックでは、Tablestore からローカルの CSV ファイルへ DataX を使用してデータをエクスポートする方法について説明します。Tablestore のデータテーブルおよび時系列テーブルからデータをエクスポートできます。
背景情報
DataX は、MySQL、Oracle、HDFS、Hive、Tablestore など、異種データソース間のデータ転送をサポートするオープンソースのオフラインデータ同期ツールです。
作業を開始する前に、以下の点にご注意ください。
エクスポートされたファイル名:DataX はエクスポートされた CSV ファイル名にランダムなサフィックスを付加します(例:
output.csv__d737aec2_c9e3_4489_a5d7_361f44c998ce)。標準的な CSV ファイルを得るには、エクスポート後に手動でサフィックスを削除してください。ネットワークコスト:Elastic Compute Service (ECS) インスタンス上で DataX を実行する場合は、アウトバウンドトラフィック料金を回避し、ネットワークパフォーマンスを向上させるために VPC エンドポイントを使用してください。
前提条件
Tablestore インスタンスのエンドポイントとインスタンス名、および Tablestore 内のソーステーブルの情報が取得済みである必要があります。
-
ご利用の Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ペアが作成済みである必要があります。詳細については、「アクセスキー ペアの作成」をご参照ください。
重要セキュリティ上の理由から、RAM ユーザーとして Tablestore を使用することを推奨します。RAM ユーザーを作成し、その RAM ユーザーに
AliyunOTSFullAccessポリシーをアタッチしたうえで、RAM ユーザーの AccessKey ペアを作成してください。詳細については、「RAM ユーザーの AccessKey ペアを使用して Tablestore にアクセスする」をご参照ください。
操作手順
このトピックの例では、Alibaba Cloud Linux 3.2104 LTS 64 ビットまたは Ubuntu 22.04 64 ビットを実行している ECS インスタンスを使用しています。詳細については、「ECS とは」をご参照ください。
ステップ 1:依存関係のインストール
-
Python 2 または Python 3 をインストールします。
Alibaba Cloud Linux または Ubuntu を実行している ECS インスタンスには、Python 3 がプリインストール済みです。その他のオペレーティングシステムの場合は、手動で Python をインストールしてください。
-
JDK 1.8 以降をインストールします。JDK 1.8 を推奨します。
以下のコマンドは、Alibaba Cloud Linux または Ubuntu に JDK 1.8 をインストールするものです。その他のオペレーティングシステムの場合は、手動で JDK をインストールしてください。
Alibaba Cloud Linux
yum -y install java-1.8.0-openjdk-devel.x86_64Ubuntu
apt update && apt upgrade apt install openjdk-8-jdk
ステップ 2:DataX のダウンロード
-
DataX パッケージをダウンロードします。
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz -
パッケージを展開します。
tar -zxvf datax.tar.gz
ソースコードから DataX をコンパイルする場合は、「DataX インストールガイド」をご参照ください。
ステップ 3:設定ファイルの作成
-
DataX の
binディレクトリに移動します。cd datax/bin -
設定ファイルを作成します。
vi tablestore_to_csv.json以下の例は、各テーブルタイプごとの設定ファイルの内容を示しています。必要に応じてパラメーターを変更してください。
データテーブルからデータをエクスポートする
ソースデータテーブル(名前:
orders)のスキーマ情報については、「付録 1:データテーブル」をご参照ください。{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "record": 0, "percentage": 0 } }, "content": [ { "reader": { "name": "otsreader", "parameter": { "endpoint": "https://<YOUR-INSTANCE>.<YOUR-REGION>.ots.aliyuncs.com", "accessId": "<YOUR-ACCESS-KEY-ID>", "accessKey": "<YOUR-ACCESS-KEY-SECRET>", "instanceName": "<YOUR-INSTANCE>", "table": "orders", "range": { "begin": [ { "type": "INF_MIN" } ], "end": [ { "type": "INF_MAX" } ] }, "column": [ { "name": "order_id" }, { "name": "user_id" }, { "name": "sku_id" }, { "name": "price" }, { "name": "num" }, { "name": "total_price" }, { "name": "order_status" }, { "name": "create_time" }, { "name": "modified_time" } ] } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/tmp/export/", "fileName": "output.csv", "writeMode": "truncate", "fileFormat": "csv" } } } ] } }時系列テーブルからデータをエクスポートする
ソース時系列テーブル(名前:
vehicles_timeseriesdata)のスキーマ情報については、「付録 2:時系列テーブル」をご参照ください。{ "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "otsreader", "parameter": { "endpoint": "https://<YOUR-INSTANCE>.<YOUR-REGION>.ots.aliyuncs.com", "accessId": "<YOUR-ACCESS-KEY-ID>", "accessKey": "<YOUR-ACCESS-KEY-SECRET>", "instanceName": "<YOUR-INSTANCE>", "table": "vehicles_timeseriesdata", "mode": "normal", "newVersion": "true", "isTimeseriesTable": "true", "measurementName": "measurement_1", "timeRange": { "begin": 0, "end": 1750000000000 }, "column": [ { "name": "_m_name" }, { "name": "_data_source" }, { "name": "_tags" }, { "name": "_time" }, { "name": "vin_id", "type": "STRING" }, { "name": "mfrs", "type": "STRING" }, { "name": "model", "type": "STRING" }, { "name": "speed", "type": "DOUBLE" }, { "name": "gps", "type": "STRING" }, { "name": "mileage", "type": "DOUBLE" }, { "name": "emission", "type": "DOUBLE" } ] } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/tmp/export/", "fileName": "output.csv", "writeMode": "truncate", "fileFormat": "csv" } } } ] } }-
以下の表は、変更が必要な Tablestore Reader のパラメーターを説明しています。
パラメーター
適用対象
説明
channel
両方
並列読み取り/書き込みスレッド数。各チャンネルは独立したスレッドです。この値を増やすと同時実行数とリソース消費が増加します。
endpoint
両方
Tablestore インスタンスのエンドポイント。ECS インスタンスから Tablestore にアクセスする場合は、アウトバウンドトラフィック料金を回避し、パフォーマンスとセキュリティを向上させるために VPC エンドポイントを使用してください。
accessId
両方
ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。
accessKey
両方
ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。
instanceName
両方
Tablestore インスタンスの名前。
tableName
両方
Tablestore 内のソーステーブルの名前。
column
両方
エクスポートするカラムの配列。
range
データテーブル
読み取るプライマリキー範囲。
beginおよびend配列は、それぞれプライマリキーカラムごとに 1 つの値を指定します。範囲は左閉右開です。テーブル全体を読み取るには、INF_MIN(負の無限大)およびINF_MAX(正の無限大)を使用します。仮想ポイントのカラム数はプライマリキーのカラム数と同じである必要があります。このパラメーターは省略可能で、デフォルトでは負の無限大から読み取ります。measurementName
時系列テーブル
読み取る時系列のメトリック名。設定しない場合、テーブル内のすべてのデータが読み取られます。
timeRange
時系列テーブル
読み取るタイムスタンプ範囲(ミリ秒単位)。範囲は左閉右開です。このパラメーターは省略可能で、デフォルトではすべてのバージョンが読み取られます。
-
以下の表は、変更が必要な TxtFileWriter のパラメーターを説明しています。
パラメーター
説明
path
エクスポートされたファイルを保存するローカルディレクトリ。
fileName
エクスポートされたファイルのベース名(拡張子を含む)。例:
output.csv。writeMode
TxtFileWriter が書き込み前に既存のファイルをどのように処理するかを指定します。有効な値は以下のとおりです。
-
truncate:書き込み前に、ターゲットディレクトリ内でfileName値で始まるすべてのファイルを削除します。 -
append:前処理を行わずにファイルに直接書き込み、ファイル名の競合が発生しないように保証します。 -
nonConflict:ディレクトリ内にfileName値で始まるファイルがすでに存在する場合、エラーを報告して中止します。
fileFormat
出力ファイル形式。有効な値:
csvおよびtext。 -
-
ステップ 4:DataX ジョブの実行
-
エクスポートジョブを実行します。
python3 datax.py tablestore_to_csv.jsonジョブが完了すると、以下のようなまとめが表示されます。
2025-03-19 17:21:05.146 [job-0] INFO StandAloneJobContainerCommunicator - Total 200000 records, 23086634 bytes | Speed 1.10MB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.222s | All Task WaitReaderTime 18.392s | Percentage 100.00% 2025-03-19 17:21:05.147 [job-0] INFO JobContainer - Task Start Time : 2025-03-19 17:20:43 Task End Time : 2025-03-19 17:21:05 Task Duration : 21s Average Transfer Speed : 1.10MB/s Record Write Speed : 10000rec/s Total Records Read : 200000 Failed Records : 0 -
エクスポート結果を確認します。
出力ディレクトリ(例:
/tmp/export/)を確認し、エクスポートされたファイルがあることを確認します。ファイル名にはランダムなサフィックスが含まれています。output.csv__d737aec2_c9e3_4489_a5d7_361f44c998ceファイルの内容をプレビューするには、以下のコマンドを実行します。
head -5 output.csv__d737aec2_c9e3_4489_a5d7_361f44c998ce出力は以下のようになります。
order_id,user_id,sku_id,price,num,total_price,order_status,create_time,modified_time ORD-001,USR-100,SKU-200,29.99,2,59.98,completed,2025-01-01 08:00:00,2025-01-02 10:00:00 ORD-002,USR-101,SKU-201,49.99,1,49.99,pending,2025-01-02 09:00:00,2025-01-02 09:00:00標準的な CSV ファイル名を得るために、サフィックスを削除します。
mv output.csv__d737aec2_c9e3_4489_a5d7_361f44c998ce output.csv説明DataX は、並列スレッドによって書き込まれたファイルを区別するためにランダムなサフィックスを付加します。標準的な CSV ファイル名を得るには、サフィックスを削除してください。
付録:サンプルソーステーブル
付録 1:データテーブル
サンプルデータテーブルの名前は orders です。以下の表は、データテーブルのスキーマを説明しています。
|
フィールド名 |
型 |
説明 |
|
order_id (プライマリキー列) |
String |
注文の ID。 |
|
user_id |
String |
ユーザーの ID。 |
|
sku_id |
String |
プロダクトの ID。 |
|
price |
Double |
購入したプロダクトの単位価格。 |
|
num |
Integer |
購入したプロダクトの数量。 |
|
total_price |
Double |
注文の合計金額。 |
|
order_status |
String |
注文のステータス。 |
|
create_time |
String |
注文が作成された時刻。 |
|
modified_time |
String |
注文が最後に変更された時刻。 |
付録 2:時系列テーブル
サンプル時系列テーブルの名前は vehicles_timeseriesdata です。以下の表は、時系列テーブルのスキーマを説明しています。
|
フィールド名 |
型 |
説明 |
|
_m_name |
String |
時系列データにおける物理量またはメトリックの名前。 |
|
_data_source |
String |
時系列のデータソース識別子。このフィールドは空白のままにできます。 |
|
_tags |
String |
時系列のタグ。 |
|
_time |
Integer |
データが報告された時刻。 |
|
vin_id |
String |
車両の車体識別番号(VIN)。 |
|
mfrs |
String |
メーカー。 |
|
model |
String |
車両のモデル。 |
|
speed |
Double |
車両の現在速度。 |
|
gps |
String |
|
|
mileage |
Double |
車両の現在走行距離。 |
|
emission |
Double |
排出量値。 |