すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:SeaTunnel を使用してデータをインポートする

最終更新日:Apr 09, 2025

ApsaraDB for SelectDB は SeaTunnel と統合されています。SeaTunnel SelectDB Sink を使用して、テーブルデータを ApsaraDB for SelectDB にインポートできます。このトピックでは、SeaTunnel SelectDB Sink を使用して ApsaraDB for SelectDB にデータを同期する方法について説明します。

概要

SeaTunnel は、大量データのリアルタイム同期をサポートする、使いやすく高性能な分散データ統合プラットフォームです。 SeaTunnel を使用して、MySQL、Hive、Kafka などのデータソースから大量のデータを読み取り、SeaTunnel SelectDB Sink を使用して ApsaraDB for SelectDB に書き込むことができます。

前提条件

SeaTunnel 2.3.1 以降がインストールされていること。

しくみ

SeaTunnel を使用すると、アップストリームデータを JSON または CSV 形式で ApsaraDB for SelectDB に書き込むことができます。シンクの設定は、データ形式によって異なります。

JSON 形式

sink { 
  SelectDB { 
    load-url="ip:http_port" // ロードURL
    jdbc-url="ip:mysql_port" // JDBC URL
    cluster-name="Cluster" // クラスタ名
    table.identifier="test_db.test_table" // テーブル名
    username="admin" // ユーザー名
    password="****" // パスワード
    selectdb.config { 
      file.type="json" // ファイルタイプ
    } 
  }
}

CSV 形式

sink { 
  SelectDB { 
    load-url="ip:http_port" // ロードURL
    jdbc-url="ip:mysql_port" // JDBC URL
    cluster-name="Cluster" // クラスタ名
    table.identifier="test_db.test_table" // テーブル名
    username="admin" // ユーザー名
    password="****" // パスワード
    selectdb.config { 
      file.type="csv" // ファイルタイプ
      file.column_separator="," // 列区切り文字
      file.line_delimiter="\n" // 行区切り文字
    } 
  }
}

次の表に、シンク設定のパラメーターを示します。

パラメーター

必須

説明

load-url

はい

ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと HTTP ポート。

ApsaraDB for SelectDB インスタンスの仮想プライベートクラウド (VPC) エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を表示します。

例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

はい

ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと MySQL ポート。

ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [mysql ポート] パラメーターの値を表示します。

例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

cluster-name

はい

ApsaraDB for SelectDB インスタンス内のクラスタの名前。ApsaraDB for SelectDB インスタンスには複数のクラスタが含まれている場合があります。ビジネス要件に基づいてクラスタを選択します。

username

はい

ApsaraDB for SelectDB インスタンスに接続するために使用されるユーザー名。

password

はい

ApsaraDB for SelectDB インスタンスに接続するために使用されるパスワード。

table.identifier

はい

ApsaraDB for SelectDB インスタンス内のテーブルの名前。データベース名.テーブル名 の形式で名前を指定します。例: test_db.test_table

selectdb.config

はい

データインポートジョブの設定。

  • CSV 形式:

    selectdb.config { file.type='csv' file.column_separator=',' file.line_delimiter='\n' } // CSV形式の設定
  • JSON 形式:

    selectdb.config { file.type="json" file.strip_outer_array="false" } // JSON形式の設定

sink.enable-delete

いいえ

一括削除機能を有効にするかどうかを指定します。この機能は、一意キーモデルでのみサポートされています。

sink.buffer-size

いいえ

最大バッファサイズ。単位: バイト。デフォルト値は 10 MB 相当です。バッファサイズが上限を超えると、バッファ内のすべてのコンテンツがオブジェクトストレージサービス (OSS) にフラッシュされます。デフォルト値を使用することをお勧めします。

sink.buffer-count

いいえ

バッファリングできるデータレコードの最大数。デフォルト値: 10000。バッファリングされるデータレコードの数が上限を超えると、バッファ内のすべてのコンテンツが OSS にフラッシュされます。デフォルト値を使用することをお勧めします。

sink.max-retries

いいえ

コミットフェーズでの最大再試行回数。デフォルト値: 3。

sink.enable-2pc

いいえ

2 フェーズコミットモードを有効にするかどうかを指定します。2 フェーズコミットモードを有効にして、exactly-once セマンティクスを確保できます。デフォルト値: true。

この例では、SeaTunnel を使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB にデータをインポートします。次の表に、この例のソフトウェアバージョンを示します。

環境

バージョン

Java Development Kit (JDK)

1.8

SeaTunnel

2.3.3

ApsaraDB for SelectDB

3.0.4

環境を準備する

  1. SeaTunnel を設定します。

    1. SeaTunnel インストールパッケージをダウンロードして解凍します。この例では、SeaTunnel インストールパッケージ apache-seatunnel-2.3.3-bin.tar.gz を使用します。

      wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
      tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
    2. SEATUNNEL_HOME/config/plugin_config ファイルを変更します。必要なコネクタのみを残します。

      --connectors-v2--
      connector-cdc-mysql // 必要なコネクタ
      connector-selectdb-cloud // 必要なコネクタ
      connector-jdbc // 必要なコネクタ
      connector-fake // 必要なコネクタ
      connector-console // 必要なコネクタ
      connector-assert // 必要なコネクタ
      --end--
    3. SeaTunnel コネクタプラグインをインストールします。

      sh bin/install-plugin.sh
    4. MySQL ドライバーをダウンロードし、SEATUNNEL_HOME/jar ディレクトリに配置します。

      cd lib/
      wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
  2. インポートするデータを構築します。この例では、インポート用に MySQL データベースに少量のサンプルデータが構築されます。

    1. MySQL データベースにテストテーブルを作成します。

      CREATE TABLE `employees` (
        `emp_no` INT NOT NULL,
        `birth_date` DATE NOT NULL,
        `first_name` VARCHAR(14) NOT NULL,
        `last_name` VARCHAR(16) NOT NULL,
        `gender` ENUM('M','F') NOT NULL,
        `hire_date` DATE NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. データ管理 (DMS) を使用してテストデータを生成します。詳細については、「テストデータを生成する」をご参照ください。

  3. ApsaraDB for SelectDB インスタンスを設定します。

    1. ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスを作成する」をご参照ください。

    2. MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。

    3. テストデータベースとテストテーブルを作成します。

      1. テストデータベースを作成します。

        CREATE DATABASE test_db;
      2. テストテーブルを作成します。

        USE test_db;
        CREATE TABLE employees (
            emp_no       INT NOT NULL,
            birth_date   DATE,
            first_name   VARCHAR(20),
            last_name    VARCHAR(20),
            gender       CHAR(2),
            hire_date    DATE
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
    4. ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントを申請またはリリースする」をご参照ください。

    5. SeaTunnel のパブリック IP アドレスを ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。

ローカル SeaTunnel エンジンを使用して、MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する

  1. mysqlToSelectDB.conf 設定ファイルを作成し、ジョブ情報を設定します。

    env {
      execution.parallelism = 2 // 並列度
      job.mode = "BATCH" // ジョブモード
      checkpoint.interval = 10000 // チェックポイント間隔
    }
    
    source{
      jdbc {
        url = "jdbc:mysql://host:ip/test_db" // MySQLのURL
        driver = "com.mysql.cj.jdbc.Driver" // MySQLのドライバー
        user = "admin" // ユーザー名
        password = "****" // パスワード
        query = "select * from employees" // クエリ
      }
    }
     
    sink {
      SelectDBCloud {
        load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" // ロードURL
        jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" // JDBC URL
        cluster-name="new_cluster" // クラスタ名
        table.identifier="test_db.employees" // テーブル名
        username="admin" // ユーザー名
        password="****" // パスワード
        selectdb.config {
            file.type="json" // ファイルタイプ
        }
      }
    }
  2. ジョブを送信します。

    sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local