ApsaraDB for SelectDB は SeaTunnel と統合されています。SeaTunnel SelectDB Sink を使用して、テーブルデータを ApsaraDB for SelectDB にインポートできます。このトピックでは、SeaTunnel SelectDB Sink を使用して ApsaraDB for SelectDB にデータを同期する方法について説明します。
概要
SeaTunnel は、大量データのリアルタイム同期をサポートする、使いやすく高性能な分散データ統合プラットフォームです。 SeaTunnel を使用して、MySQL、Hive、Kafka などのデータソースから大量のデータを読み取り、SeaTunnel SelectDB Sink を使用して ApsaraDB for SelectDB に書き込むことができます。
前提条件
SeaTunnel 2.3.1 以降がインストールされていること。
しくみ
SeaTunnel を使用すると、アップストリームデータを JSON または CSV 形式で ApsaraDB for SelectDB に書き込むことができます。シンクの設定は、データ形式によって異なります。
JSON 形式
sink {
SelectDB {
load-url="ip:http_port" // ロードURL
jdbc-url="ip:mysql_port" // JDBC URL
cluster-name="Cluster" // クラスタ名
table.identifier="test_db.test_table" // テーブル名
username="admin" // ユーザー名
password="****" // パスワード
selectdb.config {
file.type="json" // ファイルタイプ
}
}
}
CSV 形式
sink {
SelectDB {
load-url="ip:http_port" // ロードURL
jdbc-url="ip:mysql_port" // JDBC URL
cluster-name="Cluster" // クラスタ名
table.identifier="test_db.test_table" // テーブル名
username="admin" // ユーザー名
password="****" // パスワード
selectdb.config {
file.type="csv" // ファイルタイプ
file.column_separator="," // 列区切り文字
file.line_delimiter="\n" // 行区切り文字
}
}
}
次の表に、シンク設定のパラメーターを示します。
パラメーター | 必須 | 説明 |
load-url | はい | ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと HTTP ポート。 ApsaraDB for SelectDB インスタンスの仮想プライベートクラウド (VPC) エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を表示します。 例: |
jdbc-url | はい | ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと MySQL ポート。 ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [mysql ポート] パラメーターの値を表示します。 例: |
cluster-name | はい | ApsaraDB for SelectDB インスタンス内のクラスタの名前。ApsaraDB for SelectDB インスタンスには複数のクラスタが含まれている場合があります。ビジネス要件に基づいてクラスタを選択します。 |
username | はい | ApsaraDB for SelectDB インスタンスに接続するために使用されるユーザー名。 |
password | はい | ApsaraDB for SelectDB インスタンスに接続するために使用されるパスワード。 |
table.identifier | はい | ApsaraDB for SelectDB インスタンス内のテーブルの名前。 |
selectdb.config | はい | データインポートジョブの設定。
|
sink.enable-delete | いいえ | 一括削除機能を有効にするかどうかを指定します。この機能は、一意キーモデルでのみサポートされています。 |
sink.buffer-size | いいえ | 最大バッファサイズ。単位: バイト。デフォルト値は 10 MB 相当です。バッファサイズが上限を超えると、バッファ内のすべてのコンテンツがオブジェクトストレージサービス (OSS) にフラッシュされます。デフォルト値を使用することをお勧めします。 |
sink.buffer-count | いいえ | バッファリングできるデータレコードの最大数。デフォルト値: 10000。バッファリングされるデータレコードの数が上限を超えると、バッファ内のすべてのコンテンツが OSS にフラッシュされます。デフォルト値を使用することをお勧めします。 |
sink.max-retries | いいえ | コミットフェーズでの最大再試行回数。デフォルト値: 3。 |
sink.enable-2pc | いいえ | 2 フェーズコミットモードを有効にするかどうかを指定します。2 フェーズコミットモードを有効にして、exactly-once セマンティクスを確保できます。デフォルト値: true。 |
例
この例では、SeaTunnel を使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB にデータをインポートします。次の表に、この例のソフトウェアバージョンを示します。
環境 | バージョン |
Java Development Kit (JDK) | 1.8 |
SeaTunnel | 2.3.3 |
ApsaraDB for SelectDB | 3.0.4 |
環境を準備する
SeaTunnel を設定します。
SeaTunnel インストールパッケージをダウンロードして解凍します。この例では、SeaTunnel インストールパッケージ apache-seatunnel-2.3.3-bin.tar.gz を使用します。
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
SEATUNNEL_HOME/config/plugin_config ファイルを変更します。必要なコネクタのみを残します。
--connectors-v2-- connector-cdc-mysql // 必要なコネクタ connector-selectdb-cloud // 必要なコネクタ connector-jdbc // 必要なコネクタ connector-fake // 必要なコネクタ connector-console // 必要なコネクタ connector-assert // 必要なコネクタ --end--
SeaTunnel コネクタプラグインをインストールします。
sh bin/install-plugin.sh
MySQL ドライバーをダウンロードし、SEATUNNEL_HOME/jar ディレクトリに配置します。
cd lib/ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
インポートするデータを構築します。この例では、インポート用に MySQL データベースに少量のサンプルデータが構築されます。
MySQL データベースにテストテーブルを作成します。
CREATE TABLE `employees` ( `emp_no` INT NOT NULL, `birth_date` DATE NOT NULL, `first_name` VARCHAR(14) NOT NULL, `last_name` VARCHAR(16) NOT NULL, `gender` ENUM('M','F') NOT NULL, `hire_date` DATE NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
データ管理 (DMS) を使用してテストデータを生成します。詳細については、「テストデータを生成する」をご参照ください。
ApsaraDB for SelectDB インスタンスを設定します。
ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスを作成する」をご参照ください。
MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。
テストデータベースとテストテーブルを作成します。
テストデータベースを作成します。
CREATE DATABASE test_db;
テストテーブルを作成します。
USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントを申請またはリリースする」をご参照ください。
SeaTunnel のパブリック IP アドレスを ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。
ローカル SeaTunnel エンジンを使用して、MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する
mysqlToSelectDB.conf
設定ファイルを作成し、ジョブ情報を設定します。env { execution.parallelism = 2 // 並列度 job.mode = "BATCH" // ジョブモード checkpoint.interval = 10000 // チェックポイント間隔 } source{ jdbc { url = "jdbc:mysql://host:ip/test_db" // MySQLのURL driver = "com.mysql.cj.jdbc.Driver" // MySQLのドライバー user = "admin" // ユーザー名 password = "****" // パスワード query = "select * from employees" // クエリ } } sink { SelectDBCloud { load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" // ロードURL jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" // JDBC URL cluster-name="new_cluster" // クラスタ名 table.identifier="test_db.employees" // テーブル名 username="admin" // ユーザー名 password="****" // パスワード selectdb.config { file.type="json" // ファイルタイプ } } }
ジョブを送信します。
sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local