本文介紹如何使用DataX工具將Tablestore中的資料匯出至本地CSV檔案,Table Store中的資料表或時序表均可作為源表進行匯出操作。
背景資訊
DataX是一個異構資料來源離線同步工具,支援多種資料來源(如 MySQL、Oracle、HDFS、Hive 、Tablestore等)之間的高效穩定同步。
準備工作
擷取Table Store的服務地址、執行個體名稱和源表資訊。
建立阿里雲帳號(主帳號)或RAM使用者的AccessKey。
重要出於安全考慮,強烈建議您通過RAM使用者使用Table Store功能。您可以建立RAM使用者、授予該使用者管理Table Store許可權(
AliyunOTSFullAccess)並為該RAM使用者建立AccessKey。具體操作,請參見使用RAM使用者存取金鑰訪問Table Store。
操作步驟
本文以Elastic Compute Service(作業系統:Alibaba Cloud Linux 3.2104 LTS 64位、Ubuntu 22.04 64位)為例介紹具體操作。
步驟一:安裝依賴
安裝Python(Python 2和Python 3均可)。
ECS的Alibaba Cloud Linux和Ubuntu系統已內建Python 3,其他系統請自行安裝。
安裝JDK(1.8及以上,推薦1.8)。
本文介紹在ECS的Alibaba Cloud Linux或Ubuntu系統中安裝JDK 1.8的方法,其他系統請自行安裝。
Alibaba Cloud Linux
yum -y install java-1.8.0-openjdk-devel.x86_64Ubuntu
apt update && apt upgrade apt install openjdk-8-jdk
步驟二:下載DataX
下載DataX工具包。
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz解壓工具包。
tar -zxvf datax.tar.gz
如需自行編譯DataX,請參見DataX安裝指引。
步驟三:編寫設定檔
進入DataX的
bin目錄。cd datax/bin建立設定檔。如果您使用的是
vim,請自行替換命令。vi tablestore_to_csv.json設定檔樣本如下,請根據您的同步需求和實際情況,調整檔案中的相關參數。
匯出資料表
資料表源表
orders的結構資訊,請參見附錄1:資料表。{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "record": 0, "percentage": 0 } }, "content": [ { "reader": { "name": "otsreader", "parameter": { "endpoint": "https://<YOUR-INSTANCE>.<YOUR-REGION>.ots.aliyuncs.com", "accessId": "<YOUR-ACCESS-KEY-ID>", "accessKey": "<YOUR-ACCESS-KEY-SECRET>", "instanceName": "<YOUR-INSTANCE>", "table": "orders", "range": { "begin": [ { "type": "INF_MIN" } ], "end": [ { "type": "INF_MAX" } ] }, "column": [ { "name": "order_id" }, { "name": "user_id" }, { "name": "sku_id" }, { "name": "price" }, { "name": "num" }, { "name": "total_price" }, { "name": "order_status" }, { "name": "create_time" }, { "name": "modified_time" } ] } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/tmp/export/", "fileName": "output.csv", "writeMode": "truncate", "fileFormat": "csv" } } } ] } }匯出時序表
時序表源表
vehicles_timeseriesdata的結構資訊,請參見附錄2:時序表。{ "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "otsreader", "parameter": { "endpoint": "https://<YOUR-INSTANCE>.<YOUR-REGION>.ots.aliyuncs.com", "accessId": "<YOUR-ACCESS-KEY-ID>", "accessKey": "<YOUR-ACCESS-KEY-SECRET>", "instanceName": "<YOUR-INSTANCE>", "table": "vehicles_timeseriesdata", "mode": "normal", "newVersion": "true", "isTimeseriesTable": "true", "measurementName": "measurement_1", "timeRange": { "begin": 0, "end": 1750000000000 }, "column": [ { "name": "_m_name" }, { "name": "_data_source" }, { "name": "_tags" }, { "name": "_time" }, { "name": "vin_id", "type": "STRING" }, { "name": "mfrs", "type": "STRING" }, { "name": "model", "type": "STRING" }, { "name": "speed", "type": "DOUBLE" }, { "name": "gps", "type": "STRING" }, { "name": "mileage", "type": "DOUBLE" }, { "name": "emission", "type": "DOUBLE" } ] } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/tmp/export/", "fileName": "output.csv", "writeMode": "truncate", "fileFormat": "csv" } } } ] } }OTSReader需要替換的參數說明如下:
參數名稱
說明
channel
資料同步任務的核心並行單元。
每個channel對應一個獨立的資料讀取與寫入線程,通過調整channel數量可以控制任務的並發度,進而影響系統效能和資源消耗。
endpoint
Tablestore執行個體的服務地址。
說明如果通過Elastic Compute Service訪問 Tablestore,推薦使用VPC地址。不會產生外網下行流量費用,同時提供更高的網路效能和安全性。
accessId
阿里雲帳號或RAM使用者的AccessKey ID。
accessKey
阿里雲帳號或RAM使用者的AccessKey Secret。
instanceName
Tablestore執行個體的名稱。
tableName
Tablestore的源表名稱。
column
需要匯出的列數組。
range
begin和end分別為資料讀取的起始主鍵和結束主鍵,格式為JSON數組,讀取範圍為左閉右開的區間。
起始主鍵和結束主鍵需要是有效主鍵或者是由
INF_MIN和INF_MAX類型組成的虛擬點,虛擬點的列數必須與主鍵相同。其中INF_MIN表示無限小,任何類型的值都比它大;INF_MAX表示無限大,任何類型的值都比它小。說明資料表作為源表時使用此參數。此參數為可選項,預設值表示從無限小開始讀取。
measurementName
需要讀取時間軸的度量名稱。如果未配置,則將讀取全表資料。
說明時序表作為源表時使用此參數。
timeRange
begin和end分別為資料讀取的起始時間戳記和結束時間戳記,格式為JSON數組,讀取範圍為左閉右開的區間。時間戳記單位為毫秒。
說明時序表作為源表時使用此參數。此參數為可選項,預設值表示讀取全部版本。
TxtFileWriter需要替換的參數說明如下:
參數名稱
說明
path
匯出檔案在系統中的路徑資訊。
fileName
匯出檔案的名稱,需包含副檔名。例如
output.csv。writeMode
TxtFileWriter在寫入資料前的處理模式,取值範圍如下:
truncate:寫入前清理指定目錄下所有以fileName為首碼的檔案。
append:寫入前不進行任何處理,直接使用filename寫入,並保證檔案名稱不衝突。
nonConflict:如果目錄下存在以 fileName 為首碼的檔案,則報錯並中止操作。
fileFormat
檔案寫出的格式,支援
csv和text兩種類型。
步驟四:執行DataX任務
執行以下命令開始匯出資料。
python3 datax.py tablestore_to_csv.json任務結束後,將列印整體運行情況。
2025-03-19 17:21:05.146 [job-0] INFO StandAloneJobContainerCommunicator - Total 200000 records, 23086634 bytes | Speed 1.10MB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.222s | All Task WaitReaderTime 18.392s | Percentage 100.00% 2025-03-19 17:21:05.147 [job-0] INFO JobContainer - 任務啟動時刻 : 2025-03-19 17:20:43 任務結束時刻 : 2025-03-19 17:21:05 任務總計耗時 : 21s 任務平均流量 : 1.10MB/s 記錄寫入速度 : 10000rec/s 讀出記錄總數 : 200000 讀寫失敗總數 : 0驗證匯出結果。
任務執行完成後,請檢查指定的路徑(如
/tmp/export/),確認匯出的CSV檔案是否符合預期。output.csv__d737aec2_c9e3_4489_a5d7_361f44c998ce說明匯出的CSV檔案名稱將附加隨機尾碼,以便區分多線程產生的檔案。需要您手動移除此尾碼,以獲得標準的CSV檔案。
附錄-源表範例
附錄1:資料表
資料表範例的名稱為orders,表結構資訊請參見下表。
欄位名稱 | 類型 | 描述 |
order_id(主鍵) | String | 訂單ID。 |
user_id | String | 使用者ID。 |
sku_id | String | 商品ID。 |
price | Double | 商品購買單價。 |
num | Integer | 商品購買數量。 |
total_price | Double | 訂單總價。 |
order_status | String | 訂單狀態。 |
create_time | String | 訂單建立時間。 |
modified_time | String | 最後修改時間。 |
附錄2:時序表
時序表範例的名稱為vehicles_timeseriesdata,表結構資訊請參見下表。
欄位名稱 | 類型 | 描述 |
_m_name | String | 度量名稱,表示時間軸資料所度量的物理量或者監控指標的名稱。 |
_data_source | String | 資料來源,表示產生時間軸的資料來源標識,可以為空白。 |
_tags | String | 時間軸的標籤資訊。 |
_time | Integer | 資料上報時間。 |
vin_id | String | 車輛識別代碼,即車架號,用於唯一標識車輛。 |
mfrs | String | 生產廠商。 |
model | String | 車輛型號。 |
speed | Double | 當前速度。 |
gps | String | 車輛GPS座標,格式為 |
mileage | Double | 當前裡程數。 |
emission | Double | 排放值。 |