このトピックでは、Data Transmission Service(DTS)を使用して、PolarDB for PostgreSQL(Oracle互換)クラスターから Alibaba Cloud Message Queue for Apache Kafka へデータを同期する方法について説明します。
前提条件
ソース PolarDB for PostgreSQL(Oracle互換)クラスターの wal_level パラメーターを logical に設定します。この設定により、論理デコーディングに必要な情報がウォールアヘッドログ(WAL)に追加されます。詳細については、「クラスターのパラメーターの設定」をご参照ください。
宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンスのストレージ領域を、ソースの PolarDB for PostgreSQL(Oracle互換) インスタンスで使用されているストレージ領域より大きく設定します。
説明サポート対象のソースおよび宛先データベースのバージョンについては、「同期シナリオの概要」をご参照ください。
同期データを受信するための Topic を、宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンス内に作成します。詳細については、「ステップ 1:Topic の作成」をご参照ください。
注意事項
種別 | 説明 |
ソースデータベースの制限事項 |
|
その他の制限事項 |
|
課金
同期タイプ | 料金 |
スキーマ同期および完全データ同期 | 無料です。 |
増分データ同期 | 課金対象です。詳細については、「課金の概要」をご参照ください。 |
増分同期でサポートされる SQL 操作
操作タイプ | SQL 文 |
DML | INSERT、UPDATE、DELETE |
DDL |
説明 以下のシナリオでは、DDL 文は同期されません:
|
データベースアカウントの権限
データベース | 権限要件 | アカウントの作成および権限付与方法 |
PolarDB for PostgreSQL(Oracle互換)クラスター | 特権アカウント |
操作手順
宛先リージョンの同期タスク一覧ページに移動します。以下のいずれかの方法を使用できます:
DTS コンソールから
Data Transmission Service(DTS)コンソール にログインします。
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、同期インスタンスが配置されているリージョンを選択します。
DMS コンソールから
説明実際の操作は、DMS コンソールのモードおよびレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS インターフェイスのレイアウトおよびスタイルのカスタマイズ」をご参照ください。
Data Management(DMS) にログインします。
トップメニューバーで、 を選択します。
データ同期タスク の右側で、同期インスタンスが配置されているリージョンを選択します。
タスクの作成 をクリックして、タスク設定ページを開きます。
ソースおよび宛先データベースを設定します。
説明宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンスのパラメーターを取得する方法については、「Message Queue for Apache Kafka インスタンスのパラメーター設定」をご参照ください。
カテゴリ
設定
説明
なし
タスク名
DTS が自動的にタスク名を生成します。識別しやすいように、説明的な名前を指定することを推奨します。名前は一意である必要はありません。
移行元データベース
既存の接続情報の選択
システムに追加済みのデータベースインスタンス(新規作成または保存済み)を使用する場合、ドロップダウンリストからデータベースインスタンスを選択します。データベース情報は自動的に設定されます。
説明DMS コンソールでは、この設定項目の名称は DMS データベースインスタンスの選択 です。
データベースインスタンスをシステムに追加していない場合、または既に追加済みのインスタンスを使用する必要がない場合は、以下のデータベース情報を手動で設定します。
データベースタイプ
PolarDB (Oracle と互換性) を選択します。
アクセス方法
Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソースの PolarDB for PostgreSQL(Oracle互換)クラスターが配置されているリージョンを選択します。
Alibaba Cloud アカウント間でデータを複製
この例では、現在の Alibaba Cloud アカウントに属するデータベースインスタンスを使用します。× を選択します。
インスタンス ID
ソースの PolarDB for PostgreSQL(Oracle互換)クラスターの ID を選択します。
データベース名
同期対象オブジェクトを含むソースの PolarDB for PostgreSQL(Oracle互換)クラスター内のデータベース名を入力します。
データベースアカウント
ソースの PolarDB for PostgreSQL(Oracle互換)クラスターのデータベースアカウントを入力します。権限要件については、「データベースアカウントの権限」をご参照ください。
データベースのパスワード
データベースアカウントに対応するパスワードを入力します。
移行先データベース
既存の接続情報の選択
システムに追加済みのデータベースインスタンス(新規作成または保存済み)を使用する場合、ドロップダウンリストからデータベースインスタンスを選択します。データベース情報は自動的に設定されます。
説明DMS コンソールでは、この設定項目の名称は DMS データベースインスタンスの選択 です。
データベースインスタンスをシステムに追加していない場合、または既に追加済みのインスタンスを使用する必要がない場合は、以下のデータベース情報を手動で設定します。
データベースタイプ
Kafka を選択します。
アクセス方法
Express Connect、VPN Gateway、または Smart Access Gateway を選択します。
説明ここでは、Alibaba Cloud Message Queue for Apache Kafka インスタンスを、データ同期用の自己管理 Kafka データベースとして構成します。
インスタンスのリージョン
宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンスが配置されているリージョンを選択します。
接続中の VPC
宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンスの VPC ID を選択します。
ドメイン名または IP アドレス
宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンスのデフォルトエンドポイントから任意の IP アドレスを入力します。
ポート番号
宛先の Alibaba Cloud Message Queue for Apache Kafka インスタンスのサービスポートを入力します。デフォルトポートは 9092 です。
データベースアカウント
この例では、このフィールドは不要です。
データベースのパスワード
Kafka のバージョン
ご利用の Kafka インスタンスと一致するバージョンを選択します。
暗号化
業務およびセキュリティ要件に応じて、非暗号化 または SCRAM-SHA-256 を選択します。
トピック
ドロップダウンリストから、データを受信する Topic を選択します。
Kafka スキーマレジストリの使用
Kafka Schema Registry は、Avro スキーマを保存および取得するための RESTful インターフェイスを提供するメタデータサービス層です。
×:Kafka Schema Registry を使用しません。
○:Kafka Schema Registry を使用します。スキーマレジストリの URL または IP アドレス テキストボックスに、Avro スキーマが登録されている Kafka Schema Registry の URL または IP アドレスを入力します。
設定を完了したら、ページ下部の 接続をテストして続行 をクリックします。
説明DTS サーバーの IP アドレス CIDR ブロックを、ソースおよび宛先データベースのセキュリティ設定に追加して、DTS サーバーからのアクセスを許可してください。これは自動的または手動で実行できます。詳細については、「DTS サーバーの IP アドレス CIDR ブロックをホワイトリストに追加」をご参照ください。
ソースまたはターゲットデータベースが自己管理データベース(アクセス方法 が Alibaba Cloud インスタンス でない場合)である場合、接続テスト を DTS サーバーの CIDR ブロック ダイアログボックスでクリックする必要があります。
タスクオブジェクトを設定します。
オブジェクト設定 ページで、同期対象のオブジェクトを設定します。
設定
説明
同期タイプ
増分データ同期 が選択されています。デフォルトでは、スキーマ同期 および 完全データ同期 も選択する必要があります。事前チェックが完了すると、DTS はソースインスタンスから選択されたオブジェクトの完全データ同期を宛先クラスターに対して実行します。これは、その後の増分データ同期のベースラインデータとして機能します。
説明宛先 Kafka インスタンスの アクセス方法 が Alibaba Cloud インスタンス の場合、スキーマ同期 はサポートされていません。
競合するテーブルの処理モード
エラーの事前チェックと報告:宛先データベースに同名のテーブルが存在するかどうかをチェックします。同名のテーブルが存在しない場合、事前チェックは合格します。同名のテーブルが存在する場合、事前チェックは失敗し、データ同期タスクは開始されません。
説明宛先データベースで同名のテーブルを削除または名前変更できない場合、別のテーブル名にマッピングできます。詳細については、「テーブルおよび列名のマッピング」をご参照ください。
エラーを無視して続行:宛先データベースにおける同名テーブルのチェックをスキップします。
警告エラーを無視して続行 を選択すると、データ不整合が発生し、業務にリスクを及ぼす可能性があります。例:
テーブルスキーマが同一であり、宛先データベースのレコードとソースデータベースのレコードが同じプライマリキーまたは一意キー値を持つ場合:
完全同期中:DTS は宛先クラスター内のレコードを保持し、ソースデータベースからの対応するレコードは同期されません。
増分同期中:ソースデータベースからのレコードが宛先データベースのレコードを上書きします。
テーブルスキーマが異なる場合、初期データ同期が失敗する可能性があります。これにより、一部の列データのみが同期されるか、完全に同期が失敗する可能性があります。慎重に進めてください。
Kafka のデータ形式
Kafka インスタンスへの同期に必要なデータ保存形式を選択します。
Canal JSON を選択した場合、「Canal JSON の説明」でパラメーターの説明および例をご確認ください。
説明現在、中国(青島)および中国(北京)リージョンのみが Canal JSON をサポートしています。
DTS Avro を選択した場合、DTS Avro スキーマ定義に基づいてデータを解析する必要があります。詳細については、「DTS Avro スキーマ定義」および「DTS Avro デシリアライゼーションのサンプルコード」をご参照ください。
Shareplex JSON を選択した場合、「Shareplex JSON」でパラメーターの説明および例をご確認ください。
Kafka 圧縮形式
Kafka メッセージの圧縮形式を必要に応じて選択します。
LZ4(デフォルト):圧縮率が低く、圧縮速度が高い。
GZIP:圧縮率が高く、圧縮速度が低い。
説明この形式は、より多くの CPU リソースを消費します。
Snappy:圧縮率および圧縮速度ともに中程度。
Kafka パーティションへのデータ転送ポリシー
必要に応じて、戦略 を選択します。
メッセージ肯定応答メカニズム
必要に応じて、メッセージ確認応答メカニズム を選択します。
DDL 情報を格納するトピック
ドロップダウンリストから、DDL 情報を格納する Topic を選択します。
説明Topic を選択しない場合、DDL 情報はデフォルトでデータを受信する Topic に格納されます。
移行先インスタンスでのオブジェクト名の大文字化
宛先インスタンスに同期されるデータベース、テーブル、および列オブジェクト名の大文字小文字の処理ポリシーを設定できます。デフォルトでは、DTS のデフォルトポリシー が選択されています。また、ソースおよび宛先データベースのデフォルトポリシーを使用することもできます。詳細については、「宛先オブジェクト名の大文字小文字の処理ポリシー」をご参照ください。
ソースオブジェクト
ソースオブジェクト ボックスで同期対象のオブジェクトをクリックし、
をクリックして 選択中のオブジェクト ボックスに移動します。説明同期対象の選択粒度はテーブル単位です。
選択中のオブジェクト
この例では、追加の設定は不要です。マッピング機能を使用して、ソーステーブルの宛先 Kafka インスタンスにおける Topic 名、パーティション数、およびパーティションキーを設定できます。詳細については、「マッピング情報」をご参照ください。
説明オブジェクト名マッピング機能を使用した場合、このオブジェクトに依存する他のオブジェクトの同期が失敗する可能性があります。
増分同期で実行する SQL 操作を選択するには、選択中のオブジェクト 内で同期対象のオブジェクトを右クリックし、表示されるダイアログボックスで必要な SQL 操作を選択します。
詳細設定へ をクリックして、高度なパラメーターを設定します。
設定
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、DTS は共有クラスター上でタスクをスケジュールし、クラスターを選択する必要はありません。より安定したパフォーマンスを得るには、DTS 同期タスクを実行する専用クラスターを購入できます。詳細については、「DTS 専用クラスターとは?」をご参照ください。
失敗した接続の再試行時間
同期タスクが開始された後、ソースまたは宛先データベースへの接続が失敗した場合、DTS はエラーを報告し、即座に接続のリトライを開始します。デフォルトのリトライ時間は 720 分です。また、10 分から 1,440 分までの範囲でカスタムのリトライ時間を指定できます。リトライ時間は 30 分以上に設定することを推奨します。指定された時間内に DTS がデータベースに再接続できた場合、同期タスクは自動的に再開されます。それ以外の場合、タスクは失敗します。
説明複数の DTS インスタンス(例:インスタンス A およびインスタンス B)が同じソースまたは宛先を共有しており、インスタンス A のネットワークリトライ時間を 30 分、インスタンス B のネットワークリトライ時間を 60 分に設定した場合、両方のインスタンスで短い方の 30 分が使用されます。
接続リトライ期間中も DTS はタスク実行時間に対して課金を行うため、ビジネス要件に応じてリトライ時間をカスタマイズするか、ソースおよび宛先データベースインスタンスがリリースされた直後に DTS インスタンスをリリースすることを推奨します。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
同期タスクが開始された後、ソースまたは宛先データベースで接続性以外の問題(例:DDL または DML 実行例外)が発生した場合、DTS はエラーを報告し、即座に継続的なリトライ操作を開始します。デフォルトのリトライ時間は 10 分です。また、1 分から 1,440 分までの範囲でカスタムのリトライ時間を指定できます。リトライ時間は 10 分以上に設定することを推奨します。設定されたリトライ時間内に該当操作が成功した場合、同期タスクは自動的に再開されます。それ以外の場合、タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 の値は、失敗した接続の再試行時間 の値より小さくする必要があります。
完全同期レートを制限するかどうか
完全同期ステージ中、DTS はソースおよび宛先データベースの読み取り/書き込みリソースを消費し、データベースの負荷が上昇する可能性があります。ソースおよび宛先データベースの負荷を軽減するため、1 秒あたりのソースデータベースのクエリ率 QPS、1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメーターを設定することで、完全同期タスクのレート制限を設定できます。
説明この設定項目は、同期タイプ が 完全データ同期 に設定されている場合にのみ利用可能です。
同期インスタンスが実行中の状態でも、完全同期レートの調整 を行うことができます。
増分同期率を制限するかどうか
増分同期タスクのレート制限も設定できます。宛先データベースへの負荷を軽減するため、1 秒あたりの増分同期の行数 RPS および 1 秒あたりの増分同期データ量 (MB) BPS を設定します。
環境タグ
必要に応じて、インスタンスを識別する環境ラベルを選択します。この例では、選択は不要です。
ETL 機能の設定
抽出・変換・書き出し(ETL)機能を有効化するかどうかを選択します。詳細については、「ETL とは?」をご参照ください。有効な値は以下のとおりです。
-
○:ETL 機能を有効化します。コードエディタにデータ処理文を入力します。詳細については、「データ移行またはデータ同期タスクにおける ETL の設定」をご参照ください。
-
×:ETL 機能を無効化します。
監視アラート
アラートの設定を行うかどうかを指定します。同期が失敗した場合、または遅延が指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。
×:アラートは設定されません。
○:アラートしきい値を設定し、アラート通知 を指定してアラートを設定します。詳細については、「タスク設定中のモニタリングおよびアラートの設定」をご参照ください。
タスクを保存し、事前チェックを実行します。
このインスタンスの API パラメーターを表示するには、次:タスク設定の保存と事前チェック ボタンにマウスを合わせ、ポップアップで表示される OpenAPI パラメーターのプレビュー をクリックします。
API パラメーターの表示が完了したら、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。
説明同期ジョブが開始される前に、DTS は事前チェックを実行します。すべての事前チェック項目が合格した場合にのみ、ジョブを開始できます。
事前チェックが失敗した場合、失敗した項目の横にある 詳細を表示 をクリックします。指示に従って問題を修正し、再度事前チェックを実行します。
事前チェックで警告が返された場合:
チェック項目が失敗し、無視できない場合、項目の横にある 詳細を表示 をクリックします。指示に従って問題を修正し、再度事前チェックを実行します。
無視可能なチェック項目については、アラートの詳細を確認、無視、OK、および 再度事前チェックを実行 の順にクリックして、警告をスキップし、事前チェックを再実行できます。警告項目を無視した場合、データ不整合などの問題が発生し、業務にリスクを及ぼす可能性があります。
インスタンスを購入します。
成功率 が 100 % の場合、次:インスタンスの購入 をクリックします。
購入 ページで、データ同期インスタンスの課金方法およびリンク仕様を選択します。以下の表で、これらのパラメーターについて詳しく説明します。
カテゴリ
パラメーター
説明
新しいインスタンスクラス
課金方法
サブスクリプション:インスタンス作成時に支払いを行います。長期的なニーズに適しており、従量課金よりもコスト効率が優れています。サブスクリプション期間が長いほど、割引率が高くなります。
従量課金:時間単位で課金されます。短期的なニーズに適しています。使用後すぐにインスタンスをリリースすることで、コストを節約できます。
リソースグループの構成
インスタンスが所属するリソースグループです。デフォルトはデフォルトリソースグループです。詳細については、「Resource Management とは?」をご参照ください。
リンク仕様
DTS は、さまざまなパフォーマンスレベルの同期仕様を提供しています。同期リンク仕様は、同期レートに影響を与えます。ビジネスシナリオに応じて仕様を選択できます。詳細については、「データ同期リンク仕様」をご参照ください。
サブスクリプション期間
サブスクリプションモードでは、サブスクリプションインスタンスの期間および数量を選択します。月額サブスクリプションは 1~9 ヶ月、年額サブスクリプションは 1、2、3、または 5 年から選択できます。
説明このオプションは、課金方法が サブスクリプション の場合にのみ利用可能です。
設定を完了したら、「Data Transmission Service (従量課金) 利用規約」を読み、チェックボックスを選択します。
購入して起動 をクリックします。OK ダイアログボックスで、OK をクリックします。
データ同期ページでタスクの進行状況を確認できます。
マッピング情報
選択中のオブジェクト エリアで、宛先 Topic 名(テーブルレベル)の上にマウスポインターを置きます。
宛先 Topic 名の横にある 編集 をクリックします。
表示される テーブルの編集 ダイアログボックスで、マッピング情報を設定します。
説明スキーマレベルでは、ダイアログボックスは スキーマの編集 となり、設定可能なパラメーターが少なくなります。テーブルレベルでは、ダイアログボックスは テーブルの編集 となります。
同期オブジェクトの粒度がスキーマ全体ではない場合、スキーマの編集 ダイアログボックスで 対象トピックの名前 と パーティション数 を変更できません。
設定
説明
対象トピックの名前
ソーステーブルを同期するための宛先 Topic 名です。デフォルトでは、ソースデータベースとターゲットデータベースの設定 ステップで 移行先データベース にて選択された トピック です。
重要宛先データベースが Alibaba Cloud Message Queue for Apache Kafka インスタンスの場合、入力した Topic 名は宛先 Kafka インスタンスに存在している必要があります。存在しない場合、データ同期は失敗します。宛先データベースが自己管理 Kafka データベースであり、同期インスタンスにスキーマおよびテーブルタスクが含まれている場合、DTS は宛先データベースに指定した Topic を作成しようと試みます。
対象トピックの名前 を変更した場合、データは入力した Topic に書き込まれます。
フィルタリング条件
詳細については、「フィルター条件の設定」をご参照ください。
パーティション数
宛先 Topic にデータを書き込む際のパーティション数です。
パーティションキー
Kafka パーティションへのデータ転送ポリシー が 主キーのハッシュ値に基づいて、データを個別のパーティションに転送 に設定されている場合、このパラメーターを設定して、ハッシュ計算のパーティションキーとして 1 つ以上の列を指定します。DTS は、計算されたハッシュ値に基づいて、宛先 Topic のパーティションに異なる行を配信します。それ以外の場合、この配信戦略は増分書き込み時に効果を発揮しません。
説明パーティションキー は、テーブルの編集 ダイアログボックスでのみ選択できます。
OK をクリックします。
