Data Transmission Service (DTS)、DataWorks、Flink 変更データキャプチャ (CDC)、またはカタログを使用して、PostgreSQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行できます。 PostgreSQL データソースは、セルフマネージド PostgreSQL データベース、ApsaraDB RDS for PostgreSQL インスタンス、または PolarDB for PostgreSQL クラスタです。 移行するデータ量とビジネスシナリオに基づいて、適切なソリューションを選択してデータを移行できます。
移行ソリューションの比較
DTS、DataWorks、Flink CDC、またはカタログを使用して、PostgreSQL データソースから SelectDB インスタンスにデータを移行できます。 移行できるデータは、移行ソリューションによって異なります。 ビジネス要件に基づいて適切なソリューションを選択できます。
ソリューション | 履歴データの移行 | 増分データの同期 | スキーマの移行 | データベースの移行 | DDL 操作の増分同期 | データ検証 |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ | |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ | |
✔️ | ❌ | ❌ | ❌ | ❌ | ❌ |
前提条件
PostgreSQL データソースが SelectDB インスタンスと通信できることを確認します。
PostgreSQL データソースと SelectDB インスタンスは同じVPC (Virtual Private Cloud) にあります。 異なる VPC にある場合は、相互に通信できることを確認する必要があります。 詳細については、「SelectDB インスタンスとデータソース間の接続の確立に失敗した場合はどうすればよいですか?」をご参照ください。
PostgreSQL データソースの IP アドレスが SelectDB インスタンスの IP アドレスホワイトリストに追加されています。 詳細については、「IP アドレスホワイトリストの構成」をご参照ください。
PostgreSQL データソースでホワイトリストメカニズムがサポートされている場合は、SelectDB インスタンスの CIDR ブロックが PostgreSQL データソースの IP アドレスホワイトリストに追加されます。
SelectDB インスタンスが属する VPC 内の SelectDB インスタンスの IP アドレスを取得するには、ApsaraDB SelectDB インスタンスが属する VPC 内の IP アドレスを表示するにはどうすればよいですか? に記載されている操作を実行できます。
SelectDB インスタンスのパブリック IP アドレスを取得するには、ping コマンドを実行して SelectDB インスタンスのパブリックエンドポイントにアクセスし、インスタンスの IP アドレスを取得できます。
手順
DTS を使用してデータを移行する
DTS を使用すると、履歴データを移行し、PostgreSQL データソースから SelectDB インスタンスに増分データを同期できます。 DTS は、スキーマの移行、DDL 操作の同期、およびデータ検証もサポートしています。
データ同期の参照:
データ移行の参照:
DataWorks を使用してデータを移行する
ステップ 1: データソースを追加する
データ同期タスクを開発する場合は、PostgreSQL データソースと SelectDB データソースを DataWorks に追加する必要があります。
PostgreSQL データソースを追加します。 詳細については、「PostgreSQL データソース」をご参照ください。
SelectDB データソースを追加します。 詳細については、「データソースの追加と管理」をご参照ください。
次の表に、SelectDB データソースのパラメーターを示します。
パラメーター | 説明 |
[データソース名] | データソースの名前。 |
[ホストアドレス/ IP アドレス] |
SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。 SelectDB コンソールにログインし、[インスタンスの詳細] ページに移動します。 表示されるページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [MySQL ポート] パラメーターの値を表示します。 例: |
[HTTP 接続アドレス] | HTTP 経由で SelectDB インスタンスにアクセスするために使用される IP アドレスとポート番号。 形式: SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。 SelectDB コンソールにログインし、[インスタンスの詳細] ページに移動します。 表示されるページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を表示します。 例: |
[ユーザー名] | SelectDB インスタンスのオーナーアカウントのユーザー名。 |
[パスワード] | SelectDB インスタンスのオーナーアカウントのパスワード。 |
ステップ 2: データ同期タスクを構成する
詳細については、以下のトピックをご参照ください。
Flink CDC を使用してデータを移行する
Flink は、PostgreSQL データソースから SelectDB インスタンスにデータを移行するために使用できる Flink CDC を提供します。 Flink CDC を使用して、履歴データ、増分データ、およびスキーマを移行し、DDL 操作を同期できます。
環境を準備する
Flink 環境を構築します。 この例では、バージョン 1.16 の Flink スタンドアロンクラスタがデプロイされています。
flink-1.16.3-bin-scala_2.12.tgz パッケージをダウンロードし、解凍します。このバージョンが有効期限切れの場合は、別のバージョンをダウンロードしてください。バージョンに関する詳細については、「Apache Flink」をご参照ください。サンプルコード:
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
flink-sql-connector-mysql-cdc-2.4.2 パッケージと flink-doris-connector-1.16-1.5.2 パッケージを FLINK_HOME/lib ディレクトリにダウンロードします。サンプルコード:
説明データソースから完全データを同期する場合、Flink のバージョンは 1.15 以降である必要があります。異なるバージョンの Flink Doris Connector のダウンロード方法の詳細については、「org/apache/doris」をご参照ください。
cd flink-1.16.3 cd lib/ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
Flink スタンドアローン クラスタを起動します。サンプル コード:
bin/start-cluster.sh
SelectDB インスタンスを作成します。詳細については、「インスタンスを作成する」をご参照ください。
MySQL プロトコルを介して SelectDB インスタンスに接続します。詳細については、「インスタンスに接続する」をご参照ください。
テストデータベースとテストテーブルを作成します。
テストデータベースを作成します。
CREATE DATABASE test_db;
テストテーブルを作成します。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Flink CDC ジョブを送信する
サンプルコード:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
パラメータの説明
パラメーター | 必須 | 説明 |
execution.checkpointing.interval | はい | チェックポイントの間隔。データ同期の頻度に影響します。このパラメーターを 10s に設定することをお勧めします。 |
parallelism.default | いいえ | Flink ジョブの並列度。並列度を適切に増やすことで、データ同期を高速化できます。 |
database | はい | SelectDB インスタンスでデータを同期するデータベースの名前。 |
including-tables | いいえ | データを同期する PostgreSQL テーブル。 複数のテーブル名を区切るには、縦棒(|)を使用できます。正規表現もサポートされています。 例: |
excluding-tables | いいえ | 除外するテーブル。このパラメーターは、including-tables パラメーターと同じ方法で指定できます。 |
postgres-conf | はい | Postgres CDC ソースの構成項目。詳細については、「Postgres CDC コネクタ」をご参照ください。 |
sink-conf | はい | Doris シンクの構成項目。詳細については、「Flink を使用してデータをインポートする」をご参照ください。 |
table-conf | いいえ | SelectDB テーブルの構成項目。構成項目は、SelectDB テーブルの作成時にプロパティに含まれています。 |
カタログを使用してデータを移行する
SelectDB は、フェデレーテッドクエリを実行することで PostgreSQL データソースにアクセスできるカタログ機能を提供します。この方法で、PostgreSQL データソースから SelectDB インスタンスに履歴データを簡単かつ迅速に移行できます。
SelectDB インスタンスに接続します。詳細については、「インスタンスに接続する」をご参照ください。
説明Data Management ( DMS ) を使用して SelectDB インスタンスにログインする場合、
SWITCH
文は無効になります。 MySQL クライアントを使用して SelectDB インスタンスに接続することをお勧めします。PostgreSQL データソースの JDBC カタログを作成します。
CREATE CATALOG jdbc_postgresql PROPERTIES ( "type"="jdbc", "user"="root", "password"="123456", "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/demo", "driver_url" = "postgresql-42.5.1.jar", "driver_class" = "org.postgresql.Driver", "checksum" = "20c8228267b6c9ce620fddb39467d3eb" );
パラメータの説明
パラメータ
必須
説明
user
はい
PostgreSQL データソースへの接続に使用するアカウント。
password
はい
PostgreSQL データソースへの接続に使用するアカウントのパスワード。
jdbc_url
はい
PostgreSQL データソースへの接続に使用する JDBC URL。
driver_url
はい
JDBC ドライバの JAR パッケージの名前。
driver_class
はい
JDBC ドライバクラスの名前。
lower_case_table_names
いいえ
外部 JDBC データソースのデータベースとテーブルの名前を小文字で同期するかどうかを指定します。
デフォルト値:
"false"
only_specified_database
いいえ
特定のデータベースからのみデータを移行するかどうかを指定します。
デフォルト値:
"false"
include_database_list
いいえ
データの移行元となるデータベースの名前。このパラメータは、
only_specified_database=true
設定を使用する場合にのみ有効になります。複数のデータベース名はカンマ ( , ) で区切ります。データベース名は大文字と小文字が区別されます。デフォルト値:
""
exclude_database_list
いいえ
データの移行元としないデータベースの名前。このパラメータは、
only_specified_database=true
設定を使用する場合にのみ有効になります。複数のデータベース名はカンマ ( , ) で区切ります。データベース名は大文字と小文字が区別されます。デフォルト値:
""
詳細については、「JDBC データソース」をご参照ください。
SelectDB インスタンスにテーブルを作成し、抽出、変換、ロード ( ETL ) 構文の
INSERT INTO SELECT
文を実行してデータを同期します。詳細については、「INSERT INTO を使用してデータをインポートする」をご参照ください。# テーブルを作成する CREATE TABLE selectdb_table ... # データを移行する INSERT INTO selectdb_table SELECT * FROM jdbc_postgresql.pg_database.pg_table;