すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:PostgreSQL データソースからのデータ移行

最終更新日:Jun 19, 2025

Data Transmission Service (DTS)、DataWorks、Flink 変更データキャプチャ (CDC)、またはカタログを使用して、PostgreSQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行できます。 PostgreSQL データソースは、セルフマネージド PostgreSQL データベース、ApsaraDB RDS for PostgreSQL インスタンス、または PolarDB for PostgreSQL クラスタです。 移行するデータ量とビジネスシナリオに基づいて、適切なソリューションを選択してデータを移行できます。

移行ソリューションの比較

DTS、DataWorks、Flink CDC、またはカタログを使用して、PostgreSQL データソースから SelectDB インスタンスにデータを移行できます。 移行できるデータは、移行ソリューションによって異なります。 ビジネス要件に基づいて適切なソリューションを選択できます。

ソリューション

履歴データの移行

増分データの同期

スキーマの移行

データベースの移行

DDL 操作の増分同期

データ検証

DTS

✔️

✔️

✔️

✔️

✔️

✔️

DataWorks

✔️

✔️

✔️

✔️

✔️

Flink CDC

✔️

✔️

✔️

✔️

✔️

カタログ

✔️

前提条件

  • PostgreSQL データソースが SelectDB インスタンスと通信できることを確認します。

手順

DTS を使用してデータを移行する

DTS を使用すると、履歴データを移行し、PostgreSQL データソースから SelectDB インスタンスに増分データを同期できます。 DTS は、スキーマの移行、DDL 操作の同期、およびデータ検証もサポートしています。

DataWorks を使用してデータを移行する

ステップ 1: データソースを追加する

データ同期タスクを開発する場合は、PostgreSQL データソースと SelectDB データソースを DataWorks に追加する必要があります。

  1. PostgreSQL データソースを追加します。 詳細については、「PostgreSQL データソース」をご参照ください。

  2. SelectDB データソースを追加します。 詳細については、「データソースの追加と管理」をご参照ください。

次の表に、SelectDB データソースのパラメーターを示します。

パラメーター

説明

[データソース名]

データソースの名前。

[ホストアドレス/ IP アドレス]

jdbc:mysql://<ip>:<port>/<dbname> 形式の JDBC (Java Database Connectivity) URL。

SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。 SelectDB コンソールにログインし、[インスタンスの詳細] ページに移動します。 表示されるページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [MySQL ポート] パラメーターの値を表示します。

例: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db.

[HTTP 接続アドレス]

HTTP 経由で SelectDB インスタンスにアクセスするために使用される IP アドレスとポート番号。 形式: <ip>:<port>.

SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。 SelectDB コンソールにログインし、[インスタンスの詳細] ページに移動します。 表示されるページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を表示します。

例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

[ユーザー名]

SelectDB インスタンスのオーナーアカウントのユーザー名。

[パスワード]

SelectDB インスタンスのオーナーアカウントのパスワード。

ステップ 2: データ同期タスクを構成する

詳細については、以下のトピックをご参照ください。

Flink CDC を使用してデータを移行する

Flink は、PostgreSQL データソースから SelectDB インスタンスにデータを移行するために使用できる Flink CDC を提供します。 Flink CDC を使用して、履歴データ、増分データ、およびスキーマを移行し、DDL 操作を同期できます。

環境を準備する

Flink 環境を構築します。 この例では、バージョン 1.16 の Flink スタンドアロンクラスタがデプロイされています。

  1. flink-1.16.3-bin-scala_2.12.tgz パッケージをダウンロードし、解凍します。このバージョンが有効期限切れの場合は、別のバージョンをダウンロードしてください。バージョンに関する詳細については、「Apache Flink」をご参照ください。サンプルコード:

    wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. flink-sql-connector-mysql-cdc-2.4.2 パッケージと flink-doris-connector-1.16-1.5.2 パッケージを FLINK_HOME/lib ディレクトリにダウンロードします。サンプルコード:

    説明

    データソースから完全データを同期する場合、Flink のバージョンは 1.15 以降である必要があります。異なるバージョンの Flink Doris Connector のダウンロード方法の詳細については、「org/apache/doris」をご参照ください。

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. Flink スタンドアローン クラスタを起動します。サンプル コード:

    bin/start-cluster.sh
  4. SelectDB インスタンスを作成します。詳細については、「インスタンスを作成する」をご参照ください。

  5. MySQL プロトコルを介して SelectDB インスタンスに接続します。詳細については、「インスタンスに接続する」をご参照ください。

  6. テストデータベースとテストテーブルを作成します。

    1. テストデータベースを作成します。

      CREATE DATABASE test_db;
    2. テストテーブルを作成します。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Flink CDC ジョブを送信する

サンプルコード:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

パラメータの説明

パラメーター

必須

説明

execution.checkpointing.interval

はい

チェックポイントの間隔。データ同期の頻度に影響します。このパラメーターを 10s に設定することをお勧めします。

parallelism.default

いいえ

Flink ジョブの並列度。並列度を適切に増やすことで、データ同期を高速化できます。

database

はい

SelectDB インスタンスでデータを同期するデータベースの名前。

including-tables

いいえ

データを同期する PostgreSQL テーブル。

複数のテーブル名を区切るには、縦棒(|)を使用できます。正規表現もサポートされています。

例:--including-tables table1|tbl.*。これは、table1 と、名前が tbl で始まるすべてのテーブルからデータを同期することを指定します。

excluding-tables

いいえ

除外するテーブル。このパラメーターは、including-tables パラメーターと同じ方法で指定できます。

postgres-conf

はい

Postgres CDC ソースの構成項目。詳細については、「Postgres CDC コネクタ」をご参照ください。hostnameusernamepasswordtable-nameschema-name、および database-name パラメーターは必須です。

sink-conf

はい

Doris シンクの構成項目。詳細については、「Flink を使用してデータをインポートする」をご参照ください。

table-conf

いいえ

SelectDB テーブルの構成項目。構成項目は、SelectDB テーブルの作成時にプロパティに含まれています。

カタログを使用してデータを移行する

SelectDB は、フェデレーテッドクエリを実行することで PostgreSQL データソースにアクセスできるカタログ機能を提供します。この方法で、PostgreSQL データソースから SelectDB インスタンスに履歴データを簡単かつ迅速に移行できます。

  1. SelectDB インスタンスに接続します。詳細については、「インスタンスに接続する」をご参照ください。

    説明

    Data Management ( DMS ) を使用して SelectDB インスタンスにログインする場合、SWITCH 文は無効になります。 MySQL クライアントを使用して SelectDB インスタンスに接続することをお勧めします。

  2. PostgreSQL データソースの JDBC カタログを作成します。

    CREATE CATALOG jdbc_postgresql PROPERTIES (
        "type"="jdbc",
        "user"="root",
        "password"="123456",
        "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/demo",
        "driver_url" = "postgresql-42.5.1.jar",
        "driver_class" = "org.postgresql.Driver",
        "checksum" = "20c8228267b6c9ce620fddb39467d3eb"
    );

    パラメータの説明

    パラメータ

    必須

    説明

    user

    はい

    PostgreSQL データソースへの接続に使用するアカウント。

    password

    はい

    PostgreSQL データソースへの接続に使用するアカウントのパスワード。

    jdbc_url

    はい

    PostgreSQL データソースへの接続に使用する JDBC URL。

    driver_url

    はい

    JDBC ドライバの JAR パッケージの名前。

    driver_class

    はい

    JDBC ドライバクラスの名前。

    lower_case_table_names

    いいえ

    外部 JDBC データソースのデータベースとテーブルの名前を小文字で同期するかどうかを指定します。

    デフォルト値: "false"

    only_specified_database

    いいえ

    特定のデータベースからのみデータを移行するかどうかを指定します。

    デフォルト値: "false"

    include_database_list

    いいえ

    データの移行元となるデータベースの名前。このパラメータは、only_specified_database=true 設定を使用する場合にのみ有効になります。複数のデータベース名はカンマ ( , ) で区切ります。データベース名は大文字と小文字が区別されます。

    デフォルト値: ""

    exclude_database_list

    いいえ

    データの移行元としないデータベースの名前。このパラメータは、only_specified_database=true 設定を使用する場合にのみ有効になります。複数のデータベース名はカンマ ( , ) で区切ります。データベース名は大文字と小文字が区別されます。

    デフォルト値: ""

    詳細については、「JDBC データソース」をご参照ください。

  3. SelectDB インスタンスにテーブルを作成し、抽出、変換、ロード ( ETL ) 構文の INSERT INTO SELECT 文を実行してデータを同期します。詳細については、「INSERT INTO を使用してデータをインポートする」をご参照ください。

    # テーブルを作成する
    CREATE TABLE selectdb_table ...
    
    # データを移行する
    INSERT INTO selectdb_table SELECT * FROM jdbc_postgresql.pg_database.pg_table;