すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:DataX を使用したデータのインポート

最終更新日:Apr 09, 2025

このトピックでは、DataX Doris Writer を使用して ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。

概要

DataX は、MySQL や Oracle などのリレーショナルデータベース、Hadoop Distributed File System(HDFS)、Hive、MaxCompute、HBase、FTP など、異なるデータソース間のオフライン同期のために Alibaba グループが提供するオープンソースツールです。DataX は、安定した効率的なデータ同期機能を提供します。DataX を使用してアップストリームデータソースからデータを読み取り、DataX Doris Writer を使用して ApsaraDB for SelectDB にデータを書き込むことができます。

前提条件

  • Maven 環境がインストールされていること。

  • Python 3.6 以降がインストールされていること。

次の例は、Linux で DataX を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータをインポートする方法を示しています。

手順 1:DataX 環境を構成する

  1. DataX パッケージをダウンロードします。詳細については、GitHub の doris をご覧ください。

  2. DataX パッケージ内の init-env.sh スクリプトを実行して、DataX 開発環境を構成します。

    sh init-env.sh
  3. mysqlreader プラグインと doriswriter プラグインをコンパイルします。

    1. DataX プロジェクトをコンパイルします。

      cd DataX/
      mvn package assembly:assembly -Dmaven.test.skip=true

      コンパイル結果は、target/datax/datax/. ディレクトリに保存されます。

      説明

      hdfsreader、hdfswriter、ossreader、osswriter プラグインを使用する場合は、追加の JAR パッケージが必要です。これらのプラグインが不要な場合は、DataX/pom.xml ファイルからこれらのプラグインのモジュールを削除できます。

    2. mysqlreader プラグインと selectdbwriter プラグインを個別にコンパイルします。

      mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests

手順 2:インポートするデータを生成する

  1. MySQL のテストテーブルを作成します。

    CREATE TABLE `employees` (
      `emp_no` int NOT NULL,
      `birth_date` date NOT NULL,
      `first_name` varchar(14) NOT NULL,
      `last_name` varchar(16) NOT NULL,
      `gender` enum('M','F') NOT NULL,
      `hire_date` date NOT NULL,
      PRIMARY KEY (`emp_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
  2. Data Management(DMS)を使用してテストデータを生成します。詳細については、「テストデータを生成する」をご参照ください。

手順 3: ApsaraDB for SelectDB インスタンス

  1. MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。

  2. テストデータベースとテストテーブルを作成します。

    1. 次のステートメントを実行して、テストデータベースを作成します。

      CREATE DATABASE test_db;
    2. 次のステートメントを実行して、テストテーブルを作成します。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

手順 4:DataX を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する

  1. ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントを申請またはリリースする」をご参照ください。

  2. DataX ホストのパブリック IP アドレスをインスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。

  3. mysqlToSelectDB.json 構成ファイルを作成し、ファイルにインポートジョブに関する情報を構成します。サンプルコード:

    {
      "job":{
        "content":[
          {
            "reader":{
                "name": "mysqlreader",
                "parameter": {
                    "column": [
                        "emp_no",
                        "birth_date",
                        "first_name",
                        "last_name",
                        "gender",
                        "hire_date"
                    ],
                    "where": "emp_no>0", // emp_no が 0 より大きい場合
                    "connection": [
                        {
                            "jdbcUrl": [
                                "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                            ],
                            "table": [
                                "employees"
                            ]
                        }
                    ],
                    "password": "123456",
                    "splitPk": "emp_no",
                    "username": "admin"
                }
            },
            "writer":{
              "name":"doriswriter",
              "parameter":{
                "loadUrl":[
                  "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080"
                ],
                "loadProps":{
                  "format":"json",
                  "strip_outer_array":"true"
                },
                "column":[
                    "emp_no",
                    "birth_date",
                    "first_name",
                    "last_name",
                    "gender",
                    "hire_date"
                ],
                "username":"admin",
                "password":"123456",
                "postSql":[
    
                ],
                "preSql":[
    
                ],
                "connection":[
                  {
                    "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db",
                    "table":[
                      "employees"
                    ],
                    "selectedDatabase":"test_db"
                  }
                ],
                "maxBatchRows":1000000,
                "batchSize":536870912000
              }
            }
          }
        ],
        "setting":{
          "errorLimit":{
            "percentage":0.02,
            "record":0
          },
          "speed":{
            "channel":5
          }
        }
      }
    }

  4. パラメーター

    パラメーター

    必須

    デフォルト値

    説明

    jdbcUrl

    はい

    なし

    ApsaraDB for SelectDB インスタンスに接続するために使用する Java Database Connectivity(JDBC) URL。形式は jdbc:mysql://<ip>:<port> です。

    ApsaraDB for SelectDBインスタンスのエンドポイントを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、[インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、エンドポイントと MySQL ポート番号を確認します。

    ip:インスタンスの仮想プライベートクラウド(VPC)エンドポイントまたはパブリックエンドポイント。

    port:インスタンスの MySQL ポート番号。

    例:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

    説明

    DataX ホストと ApsaraDB for SelectDB インスタンスが同じ VPC 内にある場合は、インスタンスの VPC エンドポイントを使用します。DataX ホストと ApsaraDB for SelectDB インスタンスが異なる VPC 内にある場合は、インスタンスのパブリックエンドポイントを使用します。

    loadUrl

    はい

    なし

    HTTP 経由で ApsaraDB for SelectDB インスタンスに接続するために使用する URL。形式は <ip>:<port> です。

    ApsaraDB for SelectDBインスタンスのエンドポイントを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログオンし、[インスタンスの詳細] ページに移動します。 [基本情報] ページの [ネットワーク情報] セクションで、エンドポイントと HTTP ポート番号を確認します。

    ip:インスタンスの VPC エンドポイントまたはパブリックエンドポイント。

    port:インスタンスの HTTP ポート番号。

    例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

    説明

    DataX ホストと ApsaraDB for SelectDB インスタンスが同じ VPC 内にある場合は、インスタンスの VPC エンドポイントを使用します。DataX ホストと ApsaraDB for SelectDB インスタンスが異なる VPC 内にある場合は、インスタンスのパブリックエンドポイントを使用します。

    username

    はい

    なし

    ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名。

    password

    はい

    なし

    ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名のパスワード。

    connection.selectedDatabase

    はい

    なし

    ApsaraDB for SelectDB インスタンスでデータを書き込むデータベースの名前。

    connection.table

    はい

    なし

    ApsaraDB for SelectDB インスタンスでデータを書き込むテーブルの名前。

    column

    はい

    なし

    書き込み先のテーブルのフィールド。これらのフィールドは、生成された JSON 形式のデータで使用されます。複数のフィールドはカンマ(,)で区切ります。例:"column": ["id","name","age"]

    preSql

    いいえ

    なし

    書き込み先のテーブルにデータを書き込む前に実行される標準ステートメント。

    postSql

    いいえ

    なし

    書き込み先のテーブルにデータを書き込んだ後に実行される標準ステートメント。

    maxBatchRows

    いいえ

    500000

    書き込み先のテーブルにバッチで書き込むことができる最大行数。このパラメーターは、batchSize パラメーターと組み合わせて使用され、バッチで書き込むことができるデータ量を指定します。データのバッチがいずれかのしきい値に達すると、このバッチのデータのインポートが開始されます。

    batchSize

    いいえ

    104857600

    書き込み先のテーブルにバッチで書き込むことができる最大データ量。このパラメーターは、maxBatchRows パラメーターと組み合わせて使用され、バッチで書き込むことができるデータ量を指定します。データのバッチがいずれかのしきい値に達すると、このバッチのデータのインポートが開始されます。デフォルト値:100 MB。

    maxRetries

    いいえ

    3

    データのバッチを書き込み先のテーブルにインポートできなかった場合に許可される最大再試行回数。

    labelPrefix

    いいえ

    datax_doris_writer_

    インポートされたデータの各バッチのラベルプレフィックス。一意のラベルは、ラベルプレフィックスと UUID で構成され、データが繰り返しインポートされないようにします。

    loadProps

    いいえ

    なし

    Stream Load ジョブと同じリクエストパラメーター。詳細については、「Stream Load を使用したデータのインポート」トピックの「パラメーター」セクションをご参照ください。format パラメーターは、インポートするデータの形式を指定します。デフォルトでは、インポートするデータは CSV 形式です。JSON 形式でデータをインポートできます。詳細については、このトピックの「データ型を変換する」セクションをご参照ください。

    flushInterval

    いいえ

    30000

    データをバッチで書き込む間隔。デフォルト値:30000。単位:ミリ秒。

  5. 次のコマンドを実行して、ジョブを送信します。

    cd target/datax/datax/bin
    python datax.py ../mysqlToSelectDB.json

データ型を変換する

デフォルトでは、インポートされたデータは文字列に変換されます。\t は列区切り文字として使用され、\n は行区切り文字として使用されて、ApsaraDB for SelectDB インスタンスにインポートされる CSV ファイルに変換されます。デフォルトでは、データは CSV 形式でインポートされます。列区切り文字または行区切り文字を変更する場合は、loadProps パラメーターのパラメーターを変更します。サンプルコード:

"loadProps": {
    "format": "csv",
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
}

JSON 形式でデータをインポートする場合は、loadProps パラメーターの format パラメーターを変更します。サンプルコード:

"loadProps": {
    "format": "json",
    "strip_outer_array": true
}