DataX は、Alibaba Group が提供するオープンソースのオフラインデータ同期ツールです。MySQL や Oracle などのリレーショナルデータベース、Hadoop 分散ファイルシステム (HDFS)、Hive、MaxCompute、HBase、FTP などからデータを読み取ることができます。ApsaraDB for SelectDB へデータを書き込むには、DataX Doris Writer を使用します。
データフロー:
ソース(例:MySQL) → mysqlreader → DataX チャンネル → doriswriter → ApsaraDB for SelectDB前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Maven がインストール済みであること
Python 3.6 以降がインストール済みであること
ApsaraDB for SelectDB インスタンス
MySQL から ApsaraDB for SelectDB へのデータインポート
以下の例では、Linux 環境で MySQL をソースとして使用し、完全なデータインポート手順を説明します。
ステップ 1:DataX 環境の構築
GitHub 上の Doris 拡張から DataX パッケージをダウンロードします。
初期化スクリプトを実行します。
sh init-env.shDataX プロジェクトをコンパイルします。
必要に応じて
hdfsreader、hdfswriter、ossreader、またはosswriterプラグインを使用する場合は、追加の JAR パッケージが必要です。これらのプラグインをスキップする場合は、コンパイル前にDataX/pom.xmlから対応するモジュールを削除してください。cd DataX/ mvn package assembly:assembly -Dmaven.test.skip=trueコンパイル後の出力は、
target/datax/datax/ディレクトリに保存されます。mysqlreaderおよびdoriswriterプラグインを個別にコンパイルします。mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests
ステップ 2:MySQL におけるテストデータの準備
MySQL でテストテーブルを作成します。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;Data Management (DMS) を使用してテストデータを生成します。
ステップ 3:ApsaraDB for SelectDB における送信先テーブルの作成
MySQL プロトコル経由で ApsaraDB for SelectDB のインスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。
データベースおよび送信先テーブルを作成します。
CREATE DATABASE test_db; USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
ステップ 4:DataX インポートジョブの実行
ApsaraDB for SelectDB のインスタンスに対してパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請またはリリース」をご参照ください。
DataX ホストのパブリック IP アドレスを、インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。
ジョブ構成ファイル
mysqlToSelectDB.jsonを作成します。{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "where": "emp_no>0", "connection": [ { "jdbcUrl": [ "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ], "table": ["employees"] } ], "password": "123456", "splitPk": "emp_no", "username": "admin" } }, "writer": { "name": "doriswriter", "parameter": { "loadUrl": [ "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080" ], "loadProps": { "format": "json", "strip_outer_array": "true" }, "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "username": "admin", "password": "123456", "postSql": [], "preSql": [], "connection": [ { "jdbcUrl": "jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db", "table": ["employees"], "selectedDatabase": "test_db" } ], "maxBatchRows": 1000000, "batchSize": 536870912000 } } } ], "setting": { "errorLimit": { "percentage": 0.02, "record": 0 }, "speed": { "channel": 5 } } } }ジョブを実行します。
cd target/datax/datax/bin python datax.py ../mysqlToSelectDB.json
パラメーター
以下の表では、doriswriter のパラメーターについて説明します。jdbcUrl および loadUrl パラメーターには、ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントを指定できます。DataX ホストとインスタンスが同一 VPC 内にある場合は、VPC エンドポイントを使用してください。それ以外の場合は、パブリックエンドポイントを使用します。
エンドポイントを確認するには、ApsaraDB for SelectDB コンソールにログインし、[インスタンス詳細] ページに移動して、[基本情報] タブの [ネットワーク情報] セクションを確認します。
| パラメーター | 必須 | デフォルト値 | 説明 |
|---|---|---|---|
jdbcUrl | はい | — | インスタンスの Java Database Connectivity (JDBC) URL。形式: jdbc:mysql://<エンドポイント>:<MySQL ポート>/<データベース>。例:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db |
loadUrl | はい | — | Stream Load の HTTP エンドポイント。形式:<エンドポイント>:<HTTP ポート>。例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080 |
username | はい | — | インスタンスのユーザー名。 |
password | はい | — | ユーザー名に対応するパスワード。 |
connection.selectedDatabase | はい | — | ターゲットデータベース名。 |
connection.table | はい | — | ターゲットテーブル名。 |
column | はい | — | データを書き込む列。複数の列をカンマで区切ります。例:["id", "name", "age"]。 |
preSql | いいえ | — | データ書き込み前に実行する SQL ステートメント。 |
postSql | いいえ | — | データ書き込み後に実行する SQL ステートメント。 |
maxBatchRows | いいえ | 500000 | バッチあたりの最大行数。maxBatchRows または batchSize のいずれかが最初に達すると、バッチがフラッシュされます。 |
batchSize | いいえ | 104857600(100 MB) | バッチあたりの最大データサイズ。batchSize または maxBatchRows のいずれかが最初に達すると、バッチがフラッシュされます。 |
maxRetries | いいえ | 3 | バッチインポート失敗時の最大再試行回数。 |
labelPrefix | いいえ | datax_doris_writer_ | インポートラベルのプレフィックス。各バッチでは、このプレフィックスと UUID を組み合わせた一意のラベルが使用され、重複インポートを防止します。 |
loadProps | いいえ | — | その他の Stream Load パラメーター。サポートされるパラメーターの完全な一覧については、「パラメーター」セクション(「Stream Load を使用したデータインポート」)をご参照ください。 |
flushInterval | いいえ | 30000 ms | バッチ書き込み間隔。 |
データ形式の設定
デフォルトでは、DataX Doris Writer はデータを CSV 形式に変換し、列区切り文字として \t、行区切り文字として \n を使用します。
カスタム区切り文字の使用
区切り文字を変更するには、loadProps で設定します。\x01 や \x02 のような非表示文字を使用することで、タブや改行を含むデータとの競合を回避できます。
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}JSON 形式の使用
代わりに JSON 形式でデータをインポートするには、以下のように設定します。
"loadProps": {
"format": "json",
"strip_outer_array": true
}