ソースデータベースが Data Transmission Service (DTS) でネイティブにサポートされていない場合、またはセキュリティ上の制約により DTS がソースデータベースに直接接続できない場合、データ転送 SDK を使用して、データを手動で DTS にプッシュできます。その後、DTS はそのデータを宛先の AnalyticDB for PostgreSQL (ADB for PostgreSQL) インスタンスへ同期します。
データ転送を利用するタイミング
以下の状況でデータ転送をご利用ください:
サポートされていないソースデータベース:ソースが DTS でネイティブにサポートされていないデータベース(例:他の中国系ベンダーが提供するデータベース)である場合。
非標準のデータ型:ログデータなど、取り込み前にカスタムエンコーディングを必要とする特殊なデータ型である場合。
直接アクセスが制限されている場合:セキュリティ上の理由から、ソースデータベースが認証情報やネットワークエンドポイントを DTS に対して直接公開できない場合。
ソースデータベースがすでに DTS でサポートされている場合は、代わりに標準的なデータ同期タスクを使用してください。「DTS でサポートされているデータベースのリスト」を参照してください。
仕組み
データ転送は、以下の 2 段階のセットアップで実行されます:
DTS コンソールでデータ転送インスタンスを作成します。これにより、SDK からデータを受信するチャネルがプロビジョニングされます。
SDK を構成して起動します。インスタンス作成後、インスタンスの詳細ページから接続パラメーターを取得し、SDK を構成してデータのプッシュを開始します。
DTS は、この転送チャネルから受け取ったデータを、宛先の ADB for PostgreSQL インスタンスへ同期します。
前提条件
開始する前に、以下の条件を満たしていることをご確認ください:
データを受信する ADB for PostgreSQL インスタンス。宛先としてサポートされるのは ADB for PostgreSQL のみです。「インスタンスの作成」をご参照ください。
宛先インスタンス内にデータベースおよびスキーマが作成済みであること。本ガイドでは、スキーマ名を dts_deliver_test とします。「ベクトルデータを SQL を使用してインポートする」の「データのインポート」セクションをご参照ください。
データ転送インスタンスを所有するアカウントの AccessKey ID および AccessKey Secret。『AccessKey ペアの作成』をご参照ください。
(条件付き)アクセス方法として Express Connect、VPN Gateway、または Smart Access Gateway を選択する場合、事前に VPN Gateway 経由での DTS アクセスを設定してください。「VPN Gateway を使用したデータセンターと DTS の接続」をご参照ください。
コードベース内にデータ転送 SDK を実装する開発能力
注意事項
データ転送 SDK を使用してデータソースから DTS へデータを転送するには、エンコーディング機能が必要です。
宛先の ADB for PostgreSQL インスタンス内のスキーマ名は、データオブジェクト削除の構成ステップで入力したデータベース名および SDK の
dbNameパラメーターの値と完全に一致させる必要があります。不一致の場合、宛先データベースがデータを受信できなくなります。データ転送インスタンスを作成した後は、シャード数を変更できません。
データ転送インスタンスの作成直後に、データ転送 SDK をすぐに起動してください。SDK の起動が遅れると、増分データが収集されず、インスタンスが失敗します。
課金
「課金概要」をご参照ください。
データ転送インスタンスの作成
Data Synchronization Tasks(データ同期タスク)ページへ移動します。
Data Management (DMS) コンソール にログインします。
上部ナビゲーションバーで、Data + AI をクリックします。
左側ナビゲーションウィンドウで、DTS (DTS) > データ同期 を選択します。
操作は、DMS コンソールのモードおよびレイアウトによって異なる場合があります。「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」を参照してください。また、新しい DTS コンソールの「データ同期タスクページ」に直接移動することもできます。
データ同期インスタンスが配置されているリージョンを選択します。
新しい DTS コンソールでは、上部ナビゲーションバーからリージョンを選択します。
タスクの作成 をクリックします。タスク作成ウィザードで、ソースおよび宛先データベースを構成します。
セクション パラメーター 説明 該当なし タスク名 DTS タスクの名前です。DTS が自動的に名前を生成します。タスクを容易に識別できるよう、意味のある名前を指定してください。名前は一意である必要はありません。 ソースデータベース DMS データベースインスタンスの選択 既存のインスタンスは選択しないでください。データ転送タスク用に、ソースパラメーターを手動で構成します。 データベースタイプ データ転送 を選択します。 アクセス方法 ソースへのアクセス方法です。本例では、パブリック IP アドレス を選択しています。ただし、Express Connect、VPN Gateway、または Smart Access Gateway を選択した場合は、VPC および vSwitch のパラメーターも構成してください。 インスタンスリージョン ソースデータベースが配置されているリージョンです。リストに表示されていない場合は、地理的に最も近いリージョンを選択してください。 宛先データベース DMS データベースインスタンスの選択 既存のデータベースインスタンスを選択するか、空白のままにして下記のパラメーターを手動で構成してください。既存のインスタンスを選択した場合、DTS がパラメーターを自動的に設定します。 データベースタイプ AnalyticDB for PostgreSQL を選択します。 アクセス方法 Alibaba Cloud インスタンス を選択します。 インスタンスリージョン 宛先の ADB for PostgreSQL インスタンスが配置されているリージョンです。 インスタンス ID 宛先の ADB for PostgreSQL インスタンスの ID です。 データベース名 データを受信する宛先インスタンス内のデータベース名です。 データベースアカウント 宛先インスタンスのアカウントです。このアカウントは、宛先データベースに対して読み書き権限を持っている必要があります。「データベースアカウントの作成と管理」をご参照ください。 データベースパスワード データベースアカウントのパスワードです。 「[接続性のテストと続行]」をクリックします。DTS は、自動的にそのサーバーの CIDR ブロックを Alibaba Cloud データベースインスタンスのホワイトリストおよび ECS でホストされるデータベースのセキュリティグループルールに追加します。データセンター内またはサードパーティプロバイダーがホストする自己管理データベースの場合、DTS サーバーの CIDR ブロックをデータベースのホワイトリストに手動で追加します。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。
警告DTS サーバーの CIDR ブロックをデータベースのホワイトリストまたはセキュリティグループルールに追加すると、セキュリティリスクが生じる可能性があります。続行する前に、認証情報の強化、公開ポートの制限、API 呼び出しの認証、ホワイトリストの定期的な監査などの予防措置を講じてください。あるいは、Express Connect、VPN Gateway、または Smart Access Gateway を介してデータベースを DTS に接続することもできます。
転送対象のオブジェクトおよび高度な設定を構成します。
パラメーター 説明 競合テーブルの処理モード [事前チェックとエラー報告]:送信先にソースと同じ名前のテーブルが存在するかをチェックします。同一の名前が存在する場合、事前チェックは失敗し、タスクを開始できません。送信先のテーブルを削除または名前変更せずに名前衝突を解決するには、オブジェクト名マッピングを使用します。詳しくは、「オブジェクト名のマップ」をご参照ください。[エラーを無視して続行]:同一のテーブル名に関する事前チェックをスキップします。ソースと送信先のスキーマが一致し、レコードが同じプライマリキーまたは一意キーの値を持つ場合、完全同期では既存の送信先レコードが保持され、増分同期では上書きされます。スキーマが異なる場合、初期化が失敗するか、一部の列のみが同期される可能性があります。注意して続行してください。 宛先インスタンスにおけるオブジェクト名の大文字小文字の設定 送信先でのデータベース、テーブル、およびカラム名の大文字・小文字の使用を制御します。デフォルトは DTS デフォルトポリシー です。宛先インスタンスでのオブジェクト名の大文字・小文字の指定 をご参照ください。 データオブジェクトの構成を削除する 転送するソースデータベースおよびテーブルを構成します:1. ライブラリの追加 をクリックします。新規データベース ダイアログボックスで、ソースデータベース名を入力します。この名前は、宛先の ADB for PostgreSQL インスタンスのスキーマ名と一致する必要があります。本例では、dts_deliver_test を入力します。別のデータベースを追加するには、既存のエントリの横にある 追加 をクリックします。2. OK をクリックします。3. データベースの横にある
アイコンをクリックしてリストを展開します。4. テーブル の横にある テーブルの追加 をクリックします。テーブルの追加 ダイアログボックスで、テーブル名を入力します。テーブル名は、SDK の tableNameパラメーターの値と一致する必要があります。本例では、tab1、tab2、tab3 を入力します。5. OK をクリックします。6. (任意)テーブルおよびカラム名のマッピングを構成するには、テーブルの横にある 編集 をクリックし、テーブル名 を変更してテーブルのマッピングを設定します。次に、すべてのテーブルを同期 のチェックを外し、カラム名 および カラム名のマッピング を変更してカラムのマッピングを設定します。カラム名 の値は、nameに対応し、これはcreateFieldメソッド(FakeSource.java内)の引数です。カラム名のマッピング の値は、宛先インスタンスにおけるカラム名です。さらにカラムを追加するには、
をクリックします。完了したら OK をクリックします。次へ:高度な設定 をクリックし、高度なパラメーターを構成します。
パラメーター 説明 タスクスケジューリング用の専用クラスター デフォルトでは、DTS がタスクを共有クラスターにスケジュールします。専用クラスターを使用するには、別途購入してください。「DTS 専用クラスターとは? アラートの設定 アラート機能を有効にするかどうか。[いいえ]:アラート機能が無効です。[はい]:アラートのしきい値と通知設定を設定します。詳細については、「DTS タスクを作成するときにモニタリングとアラートを設定する」をご参照ください。 接続失敗時の再試行時間 接続失敗後の DTS の再試行時間を分単位で指定します。有効な値:10~1440。デフォルト:720。30 分を超える値を設定してください。この時間内に DTS が再接続できた場合、タスクは再開されます。それ以外の場合は、タスクが失敗します。 説明複数のタスクが同じソースまたは宛先を共有する場合、設定された再試行時間のうち最も短いものがすべてのタスクに適用されます。再試行期間中もインスタンスの課金が発生します。
その他の問題発生時の再試行時間 DDL または DML 操作失敗後の DTS の再試行時間を分単位で指定します。有効な値:1~1440。デフォルト:10。10 分を超える値を設定してください。この値は、接続失敗時の再試行時間 よりも短くする必要があります。 増分データ同期のスロットル機能の有効化 増分同期のスロットル機能を有効にするかどうかです。有効にした場合、増分データ移行の RPS および 増分データ移行の BPS を構成して、宛先への負荷を軽減します。 環境タグ DTS インスタンスを識別するためのタグです。ご利用の環境に応じて選択してください。 ETL の構成 ETL (抽出·変換·書き出し) を有効にするかどうかを指定します。 はい: コードエディタにデータ処理文を入力します。 詳細については、「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。 いいえ: ETL は無効になります。 詳細については、「ETL とは タスクを保存し、事前チェックを実行します。
この構成の API パラメーターをプレビューするには、次へ:タスク設定の保存と事前チェック の上にマウスを合わせ、OpenAPI パラメーターのプレビュー をクリックします。
次へ:タスク設定の保存と事前チェック をクリックします。
DTS はタスク開始前に事前チェックを実行します。事前チェックが失敗した場合は、各失敗項目の横にある 詳細の表示 をクリックし、問題を修正して 再事前チェック をクリックしてください。安全に無視できる項目についてアラートがトリガーされた場合は、アラートの詳細の確認 をクリックし、ダイアログボックスで 無視 をクリックして OK をクリックし、その後 再事前チェック をクリックしてください。アラートを無視すると、データの不整合が発生する可能性があります。
成功率 が 100% になるまで待機し、次へ:インスタンスの購入 をクリックします。
インスタンスの購入 ページで、課金およびインスタンス設定を構成します。
パラメーター 説明 課金方法 サブスクリプション:固定期間の前払い方式です。長期利用に適しています。従量課金:時間単位で課金されます。短期利用に適しています。不要になったらインスタンスを解放して、不要な課金を回避してください。 シャード数 宛先トピック内のパーティション数です。インスタンス作成後は変更できません。 リソースグループ設定 インスタンスのリソースグループです。デフォルト: デフォルトリソースグループ。「Resource Management とは? インスタンスクラス 同期仕様であり、パフォーマンスを決定します。「データ同期インスタンスのインスタンスクラス」をご参照ください。 サブスクリプション期間 サブスクリプション課金方法でのみ利用可能です。有効な値:1~9 か月、または 1、2、3、5 年です。 Data Transmission Service(従量課金)サービス利用規約 をお読みになり、同意してください。
購入して開始 をクリックし、確認ダイアログボックスで OK をクリックします。
タスクはタスクリストに表示されます。データ転送インスタンスの作成直後に、データ転送 SDK をすぐに起動してください。
SDK の構成と起動
インスタンスを作成した後、接続パラメーターを取得して SDK を構成します。
ステップ 1:SDK の依存関係を追加
IDE(例:IntelliJ IDEA)でプロジェクトを開き、pom.xml に以下の依存関係を追加します:
<dependency>
<groupId>com.aliyun.dts.deliver</groupId>
<artifactId>dts-deliver-client</artifactId>
<version>1.0.0</version>
</dependency>最新バージョンについては、「dts-deliver-client」ページをご参照ください。
ステップ 2:サンプルコードのダウンロードと接続パラメーターの取得
GitHub の「dts-deliver-test」からサンプルコードをダウンロードします。DtsDeliverTest.java(dts-deliver-test フォルダ内)を起点としてご利用ください。
FakeSource.java内のreadメソッドには、データソース実装の例が示されています。nameフィールド(createField内)は、ソーステーブルのカラム名です。実際のデータソースに応じてエンコーディングを実装してください。
接続パラメーターを取得するには、Data Synchronization Tasks(データ同期タスク)ページに移動し、データ転送インスタンスの ID をクリックして、左側ペインの 基本情報 を選択します。転送チャネル情報 セクションに以下の値が表示されます。
| パラメーター | 説明 | 取得場所 |
|---|---|---|
ip:port | データ転送インスタンスのエンドポイント | 転送チャネル情報 で、パブリックエンドポイント または VPC エンドポイント の横にある コピー をクリックします。ソースデータベースがデータ転送インスタンスと同じ VPC 内にある場合のみ、VPC エンドポイント を使用してください。 |
ak | インスタンスを所有するアカウントの AccessKey ID | 「AccessKey ペアの作成」および「RAM ユーザーの AccessKey ペア情報の表示」をご参照ください。 |
secret | AccessKey Secret | 上記と同様です。 |
dts_job_id | データ転送インスタンスのタスク ID(インスタンス ID ではありません) | API を呼び出します。応答から Shipped Topic(配信トピック) の値を取得し、_vpc_ と _data_delivery_ の間の部分文字列を抽出します。たとえば、Shipped Topic(配信トピック) が cn_hangzhou_vpc_cxti86dc11z*_data_delivery_version2 の場合、dts_job_id は cxti86dc11z* となります。 |
topic | データ転送インスタンスの宛先トピック | 転送チャネル情報 で、コピー を Shipped Topic(配信トピック) の横からクリックします。 |
partition | 宛先トピック内のシャード数 | 転送チャネル情報 で、シャード数を確認します。 |
region | データ転送インスタンスが配置されているリージョン | [配送チャネル情報] で、[インスタンスリージョン] を表示します。 |
dbName | ソースデータベース名。宛先の ADB for PostgreSQL インスタンスのスキーマ名と一致する必要があります。 | 本例では:dts_deliver_test。 |
tableName | ソーステーブル名。 データオブジェクト削除の構成 で構成したテーブル名と一致する必要があります。 | 本例では:tab1、tab2、tab3。 |
ステップ 3:SDK の起動とオフセットの設定
現在時刻を記録した後、データ転送 SDK を起動します。
データ転送インスタンスの現在のオフセットを、SDK の起動時に更新します。詳細については、「データ同期または移行インスタンスの現在のオフセットを変更する」をご参照ください。
デフォルトでは、現在のオフセット は 増分書き込み モジュールの開始時刻であり、SDK の開始時刻ではありません。正しいポイントからデータを収集するために、オフセットを SDK の開始時刻に変更してください。
ステップ 4:データフローの検証
宛先データベースへ同期されたデータを確認します。