このトピックでは、DataX Doris Writer を使用して ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。
概要
DataX は、MySQL や Oracle などのリレーショナルデータベース、Hadoop Distributed File System(HDFS)、Hive、MaxCompute、HBase、FTP など、異なるデータソース間のオフライン同期のために Alibaba グループが提供するオープンソースツールです。DataX は、安定した効率的なデータ同期機能を提供します。DataX を使用してアップストリームデータソースからデータを読み取り、DataX Doris Writer を使用して ApsaraDB for SelectDB にデータを書き込むことができます。
前提条件
Maven 環境がインストールされていること。
Python 3.6 以降がインストールされていること。
例
次の例は、Linux で DataX を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータをインポートする方法を示しています。
手順 1:DataX 環境を構成する
DataX パッケージをダウンロードします。詳細については、GitHub の doris をご覧ください。
DataX パッケージ内の init-env.sh スクリプトを実行して、DataX 開発環境を構成します。
sh init-env.sh
mysqlreader プラグインと doriswriter プラグインをコンパイルします。
DataX プロジェクトをコンパイルします。
cd DataX/ mvn package assembly:assembly -Dmaven.test.skip=true
コンパイル結果は、target/datax/datax/. ディレクトリに保存されます。
説明hdfsreader、hdfswriter、ossreader、osswriter プラグインを使用する場合は、追加の JAR パッケージが必要です。これらのプラグインが不要な場合は、DataX/pom.xml ファイルからこれらのプラグインのモジュールを削除できます。
mysqlreader プラグインと selectdbwriter プラグインを個別にコンパイルします。
mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests
手順 2:インポートするデータを生成する
MySQL のテストテーブルを作成します。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
Data Management(DMS)を使用してテストデータを生成します。詳細については、「テストデータを生成する」をご参照ください。
手順 3: ApsaraDB for SelectDB インスタンス
MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。
テストデータベースとテストテーブルを作成します。
次のステートメントを実行して、テストデータベースを作成します。
CREATE DATABASE test_db;
次のステートメントを実行して、テストテーブルを作成します。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
手順 4:DataX を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する
ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントを申請またはリリースする」をご参照ください。
DataX ホストのパブリック IP アドレスをインスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。
mysqlToSelectDB.json
構成ファイルを作成し、ファイルにインポートジョブに関する情報を構成します。サンプルコード:{ "job":{ "content":[ { "reader":{ "name": "mysqlreader", "parameter": { "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "where": "emp_no>0", // emp_no が 0 より大きい場合 "connection": [ { "jdbcUrl": [ "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ], "table": [ "employees" ] } ], "password": "123456", "splitPk": "emp_no", "username": "admin" } }, "writer":{ "name":"doriswriter", "parameter":{ "loadUrl":[ "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080" ], "loadProps":{ "format":"json", "strip_outer_array":"true" }, "column":[ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "username":"admin", "password":"123456", "postSql":[ ], "preSql":[ ], "connection":[ { "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db", "table":[ "employees" ], "selectedDatabase":"test_db" } ], "maxBatchRows":1000000, "batchSize":536870912000 } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "channel":5 } } } }
パラメーター
パラメーター
必須
デフォルト値
説明
jdbcUrl
はい
なし
ApsaraDB for SelectDB インスタンスに接続するために使用する Java Database Connectivity(JDBC) URL。形式は
jdbc:mysql://<ip>:<port>
です。ApsaraDB for SelectDBインスタンスのエンドポイントを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、[インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、エンドポイントと MySQL ポート番号を確認します。
ip:インスタンスの仮想プライベートクラウド(VPC)エンドポイントまたはパブリックエンドポイント。
port:インスタンスの MySQL ポート番号。
例:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030
。説明DataX ホストと ApsaraDB for SelectDB インスタンスが同じ VPC 内にある場合は、インスタンスの VPC エンドポイントを使用します。DataX ホストと ApsaraDB for SelectDB インスタンスが異なる VPC 内にある場合は、インスタンスのパブリックエンドポイントを使用します。
loadUrl
はい
なし
HTTP 経由で ApsaraDB for SelectDB インスタンスに接続するために使用する URL。形式は
<ip>:<port>
です。ApsaraDB for SelectDBインスタンスのエンドポイントを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログオンし、[インスタンスの詳細] ページに移動します。 [基本情報] ページの [ネットワーク情報] セクションで、エンドポイントと HTTP ポート番号を確認します。
ip:インスタンスの VPC エンドポイントまたはパブリックエンドポイント。
port:インスタンスの HTTP ポート番号。
例:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080
。説明DataX ホストと ApsaraDB for SelectDB インスタンスが同じ VPC 内にある場合は、インスタンスの VPC エンドポイントを使用します。DataX ホストと ApsaraDB for SelectDB インスタンスが異なる VPC 内にある場合は、インスタンスのパブリックエンドポイントを使用します。
username
はい
なし
ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名。
password
はい
なし
ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名のパスワード。
connection.selectedDatabase
はい
なし
ApsaraDB for SelectDB インスタンスでデータを書き込むデータベースの名前。
connection.table
はい
なし
ApsaraDB for SelectDB インスタンスでデータを書き込むテーブルの名前。
column
はい
なし
書き込み先のテーブルのフィールド。これらのフィールドは、生成された JSON 形式のデータで使用されます。複数のフィールドはカンマ(,)で区切ります。例:
"column": ["id","name","age"]
。preSql
いいえ
なし
書き込み先のテーブルにデータを書き込む前に実行される標準ステートメント。
postSql
いいえ
なし
書き込み先のテーブルにデータを書き込んだ後に実行される標準ステートメント。
maxBatchRows
いいえ
500000
書き込み先のテーブルにバッチで書き込むことができる最大行数。このパラメーターは、batchSize パラメーターと組み合わせて使用され、バッチで書き込むことができるデータ量を指定します。データのバッチがいずれかのしきい値に達すると、このバッチのデータのインポートが開始されます。
batchSize
いいえ
104857600
書き込み先のテーブルにバッチで書き込むことができる最大データ量。このパラメーターは、maxBatchRows パラメーターと組み合わせて使用され、バッチで書き込むことができるデータ量を指定します。データのバッチがいずれかのしきい値に達すると、このバッチのデータのインポートが開始されます。デフォルト値:100 MB。
maxRetries
いいえ
3
データのバッチを書き込み先のテーブルにインポートできなかった場合に許可される最大再試行回数。
labelPrefix
いいえ
datax_doris_writer_
インポートされたデータの各バッチのラベルプレフィックス。一意のラベルは、ラベルプレフィックスと UUID で構成され、データが繰り返しインポートされないようにします。
loadProps
いいえ
なし
Stream Load ジョブと同じリクエストパラメーター。詳細については、「Stream Load を使用したデータのインポート」トピックの「パラメーター」セクションをご参照ください。format パラメーターは、インポートするデータの形式を指定します。デフォルトでは、インポートするデータは CSV 形式です。JSON 形式でデータをインポートできます。詳細については、このトピックの「データ型を変換する」セクションをご参照ください。
flushInterval
いいえ
30000
データをバッチで書き込む間隔。デフォルト値:30000。単位:ミリ秒。
次のコマンドを実行して、ジョブを送信します。
cd target/datax/datax/bin python datax.py ../mysqlToSelectDB.json
データ型を変換する
デフォルトでは、インポートされたデータは文字列に変換されます。\t
は列区切り文字として使用され、\n
は行区切り文字として使用されて、ApsaraDB for SelectDB インスタンスにインポートされる CSV
ファイルに変換されます。デフォルトでは、データは CSV 形式でインポートされます。列区切り文字または行区切り文字を変更する場合は、loadProps
パラメーターのパラメーターを変更します。サンプルコード:
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}
JSON
形式でデータをインポートする場合は、loadProps
パラメーターの format
パラメーターを変更します。サンプルコード:
"loadProps": {
"format": "json",
"strip_outer_array": true
}