このトピックでは、DataX を使用して Tablestore からローカルの CSV ファイルにデータをエクスポートする方法について説明します。 Tablestore のデータテーブルと時系列テーブルからデータをエクスポートできます。
背景情報
DataX は、異種データソース間でオフライン同期を行うためのツールです。 MySQL、Oracle、HDFS、Hive、Tablestore など、さまざまなデータソース間で効率的かつ安定した同期をサポートしています。
前提条件
Tablestore インスタンスのエンドポイントと名前、および Tablestore 内のソーステーブルの情報が取得されている。
Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ペアが作成されている。 詳細については、「AccessKey ペアを作成する」をご参照ください。
重要セキュリティ上の理由から、RAM ユーザーとして Tablestore の機能を使用することをお勧めします。 RAM ユーザーを作成し、
AliyunOTSFullAccessポリシーを RAM ユーザーにアタッチすることで Tablestore を管理する権限をユーザーに付与し、RAM ユーザーの AccessKey ペアを作成できます。 詳細については、「RAM ユーザーの AccessKey ペアを使用して Tablestore にアクセスする」をご参照ください。
手順
このトピックの例では、Alibaba Cloud Linux 3.2104 LTS 64 ビットまたは Ubuntu 22.04 64 ビットを実行する Elastic Compute Service (ECS) インスタンスを使用しています。 詳細については、「ECS とは」をご参照ください。
ステップ 1:依存関係をインストールする
Python 2 または Python 3 をインストールします。
Python 3 は、Alibaba Cloud Linux または Ubuntu を実行する ECS インスタンスにプリインストールされています。 他のオペレーティングシステムを実行する ECS インスタンスの場合は、Python を自分でインストールする必要があります。
JDK 1.8 以降をインストールします。 JDK 1.8 をインストールすることをお勧めします。
このステップでは、Alibaba Cloud Linux または Ubuntu を実行する ECS インスタンスに JDK 1.8 をインストールする方法について説明します。 他のオペレーティングシステムを実行する ECS インスタンスの場合は、JDK を自分でインストールする必要があります。
Alibaba Cloud Linux
yum -y install java-1.8.0-openjdk-devel.x86_64Ubuntu
apt update && apt upgrade apt install openjdk-8-jdk
ステップ 2:DataX をダウンロードする
DataX パッケージをダウンロードします。
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gzパッケージを展開します。
tar -zxvf datax.tar.gz
DataX を自分でコンパイルする方法については、「DataX インストールガイド」をご参照ください。
ステップ 3:構成ファイルを作成する
DataX の
binディレクトリに移動します。cd datax/bin構成ファイルを作成します。
vimを使用する場合は、コマンドを適宜置き換えてください。vi tablestore_to_csv.json次のサンプルコードは、構成ファイルの内容を示しています。 同期の要件と実際の状況に基づいてパラメータを変更してください。
データテーブルからデータをエクスポートする
ordersという名前のソースデータテーブルのスキーマについては、「付録 1:データテーブル」をご参照ください。{ "job": { "setting": { "speed": { "channel": 1 // データ同期タスクのコア並列ユニット。各チャネルは、データの読み取りと書き込みを行う独立したスレッドに対応します。チャネル数を変更することで、タスクの並行性を制御できます。これは、システムのパフォーマンスとリソース消費に影響します。 }, "errorLimit": { "record": 0, "percentage": 0 } }, "content": [ { "reader": { "name": "otsreader", "parameter": { "endpoint": "https://<YOUR-INSTANCE>.<YOUR-REGION>.ots.aliyuncs.com", // Tablestore インスタンスのエンドポイント。ECS インスタンスから Tablestore にアクセスする場合は、VPC エンドポイントを使用することをお勧めします。これにより、インターネット経由のアウトバウンドトラフィックの料金が発生せず、ネットワークパフォーマンスとセキュリティが向上します。 "accessId": "<YOUR-ACCESS-KEY-ID>", // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。 "accessKey": "<YOUR-ACCESS-KEY-SECRET>", // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。 "instanceName": "<YOUR-INSTANCE>", // Tablestore インスタンスの名前。 "table": "orders", // Tablestore 内のソーステーブルの名前。 "range": { // begin パラメータと end パラメータは、Tablestore データテーブルからデータを読み取るときの各プライマリキー列の開始値と終了値を指定します。begin パラメータと end パラメータの値は JSON 配列です。Tablestore から読み取るデータの範囲は、左閉右開区間です。開始プライマリキーと終了プライマリキーは、有効なプライマリキー、または INF_MIN タイプと INF_MAX タイプで構成される仮想ポイントである必要があります。仮想ポイントの列数は、プライマリキーの列数と同じである必要があります。INF_MIN は負の無限大を示し、どの値よりも小さくなります。INF_MAX は正の無限大を示し、どの値よりも大きくなります。このパラメータは、データテーブルをソーステーブルとして使用する場合に使用されます。このパラメータは省略可能です。デフォルト値は、読み取りが負の無限大から開始されることを示します。 "begin": [ { "type": "INF_MIN" } ], "end": [ { "type": "INF_MAX" } ] }, "column": [ // エクスポートする列の配列。 { "name": "order_id" }, { "name": "user_id" }, { "name": "sku_id" }, { "name": "price" }, { "name": "num" }, { "name": "total_price" }, { "name": "order_status" }, { "name": "create_time" }, { "name": "modified_time" } ] } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/tmp/export/", // システム内のエクスポートファイルのパス。 "fileName": "output.csv", // エクスポートファイルの名前。ファイル名拡張子を含みます。例:output.csv。 "writeMode": "truncate", // TxtFileWriter がデータを書き込む前にデータを処理するモード。有効な値:truncate:データを書き込む前に、指定されたディレクトリにある fileName パラメータの値で始まる名前のすべてのファイルをクリアします。append:データを書き込む前にデータを処理しません。データは、fileName パラメータを使用して指定された名前のファイルに直接書き込まれます。これにより、ファイル名が競合しないようになります。nonConflict:ディレクトリに fileName パラメータの値で始まる名前のファイルが存在する場合、エラーを報告して操作を中止します。 "fileFormat": "csv" // 出力ファイルの形式。有効な値:csv および text。 } } } ] } }時系列テーブルからデータをエクスポートする
vehicles_timeseriesdataという名前のソース時系列テーブルのスキーマについては、「付録 2:時系列テーブル」をご参照ください。{ "job": { "setting": { "speed": { "channel": 1 // データ同期タスクのコア並列ユニット。各チャネルはデータの読み取りと書き込みを行う独立したスレッドに対応します。チャネル数を変更することでタスクの並行性を制御でき、システムのパフォーマンスとリソース消費に影響します。 } }, "content": [ { "reader": { "name": "otsreader", "parameter": { "endpoint": "https://<YOUR-INSTANCE>.<YOUR-REGION>.ots.aliyuncs.com", // Tablestore インスタンスのエンドポイント。 ECS インスタンスから Tablestore にアクセスする場合は、VPC エンドポイントを使用することをお勧めします。これにより、インターネット経由のアウトバウンドトラフィックの料金が発生せず、ネットワークパフォーマンスとセキュリティが向上します。 "accessId": "<YOUR-ACCESS-KEY-ID>", // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID "accessKey": "<YOUR-ACCESS-KEY-SECRET>", // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット "instanceName": "<YOUR-INSTANCE>", // Tablestore インスタンスの名前 "table": "vehicles_timeseriesdata", // Tablestore 内のソーステーブルの名前 "mode": "normal", "newVersion": "true", "isTimeseriesTable": "true", "measurementName": "measurement_1", // 読み取る時系列のメトリック名。このパラメータが設定されていない場合、テーブル内のすべてのデータが読み取られます。このパラメータは、時系列テーブルをソーステーブルとして使用する場合に使用されます。 "timeRange": { // begin パラメータと end パラメータは、データを読み取る開始タイムスタンプと終了タイムスタンプを指定します。 begin パラメータと end パラメータの値は JSON 配列です。 Tablestore から読み取るデータの範囲は左閉右開区間です。タイムスタンプの単位はミリ秒です。このパラメータは、時系列テーブルをソーステーブルとして使用する場合に使用されます。このパラメータは省略可能です。デフォルト値は、すべてのバージョンが読み取られることを示します。 "begin": 0, "end": 1750000000000 }, "column": [ // エクスポートする列の配列 { "name": "_m_name" }, { "name": "_data_source" }, { "name": "_tags" }, { "name": "_time" }, { "name": "vin_id", "type": "STRING" }, { "name": "mfrs", "type": "STRING" }, { "name": "model", "type": "STRING" }, { "name": "speed", "type": "DOUBLE" }, { "name": "gps", "type": "STRING" }, { "name": "mileage", "type": "DOUBLE" }, { "name": "emission", "type": "DOUBLE" } ] } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/tmp/export/", // システム内のエクスポートファイルのパス "fileName": "output.csv", // エクスポートファイルの名前。ファイル名拡張子を含みます。例: output.csv "writeMode": "truncate", // TxtFileWriter がデータを書き込む前にデータを処理するモード。有効な値: truncate:データを書き込む前に、指定されたディレクトリにある fileName パラメータの値で始まる名前のすべてのファイルをクリアします。 append:データを書き込む前にデータを処理しません。データは、fileName パラメータを使用して指定された名前のファイルに直接書き込まれます。これにより、ファイル名が競合しないようになります。 nonConflict:ディレクトリに fileName パラメータの値で始まる名前のファイルが存在する場合、エラーを報告して操作を中止します。 "fileFormat": "csv" // 出力ファイルの形式。有効な値: csv および text } } } ] } }次の表は、変更が必要な Tablestore Reader のパラメータについて説明しています。
パラメータ
説明
channel
データ同期タスクのコア並列ユニット。
各チャネルは、データの読み取りと書き込みを行う独立したスレッドに対応します。チャネル数を変更することで、タスクの並行性を制御できます。これは、システムのパフォーマンスとリソース消費に影響します。
endpoint
Tablestore インスタンスの エンドポイント。
説明ECS インスタンスから Tablestore にアクセスする場合は、VPC エンドポイントを使用することをお勧めします。これにより、インターネット経由のアウトバウンドトラフィックの料金が発生せず、ネットワークパフォーマンスとセキュリティが向上します。
accessId
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。
accessKey
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。
instanceName
Tablestore インスタンスの名前。
tableName
Tablestore 内のソーステーブルの名前。
column
エクスポートする列の配列。
range
begin パラメータと end パラメータは、Tablestore データテーブルからデータを読み取るときの各プライマリキー列の開始値と終了値を指定します。begin パラメータと end パラメータの値は JSON 配列です。Tablestore から読み取るデータの範囲は、左閉右開区間です。
開始プライマリキーと終了プライマリキーは、有効なプライマリキー、または
INF_MINタイプとINF_MAXタイプで構成される仮想ポイントである必要があります。仮想ポイントの列数は、プライマリキーの列数と同じである必要があります。INF_MINは負の無限大を示し、どの値よりも小さくなります。INF_MAXは正の無限大を示し、どの値よりも大きくなります。説明このパラメータは、データテーブルをソーステーブルとして使用する場合に使用されます。このパラメータは省略可能です。デフォルト値は、読み取りが負の無限大から開始されることを示します。
measurementName
読み取る時系列のメトリック名。このパラメータが構成されていない場合、テーブル内のすべてのデータが読み取られます。
説明このパラメータは、時系列テーブルをソーステーブルとして使用する場合に使用されます。
timeRange
begin パラメータと end パラメータは、データを読み取る開始タイムスタンプと終了タイムスタンプを指定します。begin パラメータと end パラメータの値は JSON 配列です。Tablestore から読み取るデータの範囲は、左閉右開区間です。タイムスタンプの単位はミリ秒です。
説明このパラメータは、時系列テーブルをソーステーブルとして使用する場合に使用されます。このパラメータは省略可能です。デフォルト値は、すべてのバージョンが読み取られることを示します。
次の表は、変更が必要な TxtFileWriter のパラメータについて説明しています。
パラメータ
説明
path
システム内のエクスポートファイルのパス。
fileName
エクスポートされたファイルの名前(ファイル名拡張子を含む)。例:
output.csv。writeMode
TxtFileWriter がデータを書き込む前にデータを処理するモード。有効な値:
truncate:データを書き込む前に、指定されたディレクトリにある fileName パラメータの値で始まる名前のすべてのファイルをクリアします。
append:データを書き込む前にデータを処理しません。データは、fileName パラメータを使用して指定された名前のファイルに直接書き込まれます。これにより、ファイル名が競合しないようになります。
nonConflict:ディレクトリに fileName パラメータの値で始まる名前のファイルが存在する場合、エラーを報告して操作を中止します。
fileFormat
出力ファイルの形式。有効な値:
csvおよびtext。
ステップ 4:DataX タスクを実行する
次のコマンドを実行して、データエクスポートタスクの実行を開始します。
python3 datax.py tablestore_to_csv.jsonタスクが完了すると、全体の実行ステータスが出力されます。
2025-03-19 17:21:05.146 [job-0] INFO StandAloneJobContainerCommunicator - Total 200000 records, 23086634 bytes | Speed 1.10MB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.222s | All Task WaitReaderTime 18.392s | Percentage 100.00% 2025-03-19 17:21:05.147 [job-0] INFO JobContainer - Task Start Time : 2025-03-19 17:20:43 Task End Time : 2025-03-19 17:21:05 Task Duration : 21s Average Transfer Speed : 1.10MB/s Record Write Speed : 10000rec/s Total Records Read : 200000 Failed Records : 0エクスポート結果を確認します。
タスクが完了したら、指定されたパス(
/tmp/export/など)を確認して、エクスポートされた CSV ファイルが期待どおりであることを確認します。output.csv__d737aec2_c9e3_4489_a5d7_361f44c998ce説明エクスポートされた CSV ファイル名には、複数のスレッドによって生成されたファイルを区別するためにランダムなサフィックスが付加されます。標準の CSV ファイルを取得するには、このサフィックスを手動で削除する必要があります。
付録 - ソーステーブルのサンプル
付録 1:データテーブル
サンプルデータテーブルの名前は orders です。次の表は、データテーブルのスキーマについて説明しています。
フィールド名 | タイプ | 説明 |
order_id (プライマリキー列) | String | 注文の ID。 |
user_id | String | ユーザーの ID。 |
sku_id | String | 商品の ID。 |
price | Double | 購入した商品の単価。 |
num | Integer | 購入した商品の数量。 |
total_price | Double | 注文の合計金額。 |
order_status | String | 注文のステータス。 |
create_time | String | 注文が作成された日時。 |
modified_time | String | 注文が最後に変更された日時。 |
付録 2:時系列テーブル
サンプル時系列テーブルの名前は vehicles_timeseriesdata です。次の表は、時系列テーブルのスキーマについて説明しています。
フィールド名 | タイプ | 説明 |
_m_name | String | 時系列データの物理量またはメトリックの名前。 |
_data_source | String | 時系列のデータソースの識別子。このフィールドは空のままにすることができます。 |
_tags | String | 時系列のタグ。 |
_time | Integer | データが報告された日時。 |
vin_id | String | 車両識別番号 (VIN)。 |
mfrs | String | 製造元。 |
model | String | 車両のモデル。 |
speed | Double | 車両の現在の速度。 |
gps | String |
|
mileage | Double | 車両の現在の走行距離。 |
emission | Double | 排出量。 |