Data Transmission Service (DTS) は、自己管理 Oracle データベースの変更データをリアルタイムで ApsaraMQ for Kafka インスタンスにストリーミングします。これにより、ダウンストリームのコンシューマーがデータレイクへの取り込み、リアルタイムデータウェアハウス構築、サービスデカップリングなどのユースケースにおいて、信頼性の高い変更ストリームを Oracle から受信できます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
自己管理 Oracle データベースおよび送信先の ApsaraMQ for Kafka インスタンス。対応バージョンについては、「データ同期シナリオの概要」をご参照ください。
Oracle データベースが ARCHIVELOG モードで実行されており、アーカイブログファイルにアクセス可能であること、およびログの保持期間が設定されていること。詳細については、「アーカイブ・リドゥ・ログファイルの管理」をご参照ください。
Oracle データベースで補足ログが有効化されており、SUPPLEMENTAL_LOG_DATA_PK および SUPPLEMENTAL_LOG_DATA_UI の値が Yes に設定されていること。詳細については、「補足ログ」をご参照ください。
ApsaraMQ for Kafka インスタンスに、Oracle データベースからのすべてのデータを格納できる十分な空きストレージ容量があること。
ApsaraMQ for Kafka インスタンス内に、同期データを受信するトピックが作成済みであること。「ステップ 1:トピックの作成」をご参照ください。
Oracle 同期における DTS の機能および制限について理解している必要があります。詳細については、「Oracle データベースの準備」および「概要」をご参照ください。
課金
| 同期タイプ | 料金 |
|---|---|
| スキーマ同期および完全データ同期 | 無料 |
| 増分データ同期 | 課金対象です。「課金概要 |
サポートされる同期トポロジ
単方向 1 対 1 同期
単方向 1 対多同期
単方向 多対 1 同期
単方向 カスケード同期
詳細については、「同期トポロジ」をご参照ください。
同期可能な SQL 操作
| 操作タイプ | 文 |
|---|---|
| DML | INSERT、UPDATE、DELETE |
| DDL | CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、TRUNCATE TABLE;CREATE/ALTER/DROP VIEW;CREATE/ALTER/DROP PROCEDURE;CREATE/DROP FUNCTION;CREATE/DROP TRIGGER;CREATE/DROP INDEX |
必要な権限
| データベース | 必要な権限 | 参考情報 |
|---|---|---|
| 自己管理 Oracle データベース | 詳細な権限 | データベースアカウントの準備、CREATE USER、GRANT |
Oracle から増分データを同期するには、アーカイブ ログと補足ログを有効にしてください。 詳細については、「Oracle データベースを構成する」をご参照ください。
制限事項
DTS は外部キーを同期しません。ソースデータベースでのカスケードおよび削除操作は、送信先に伝播されません。
ソースデータベースの制限事項
同期対象オブジェクト:
テーブルには、すべてのフィールドが一意となる PRIMARY KEY または一意制約(UNIQUE constraint)が必要です。これらの制約がないテーブルでは、送信先に重複レコードが生成される可能性があります。
Oracle 12c 以降では、テーブル名は 30 バイトを超えてはなりません。
同期対象としてテーブルを選択し、送信先で編集(例:テーブル名またはカラム名の変更)を予定している場合、1 つのタスクで最大 1,000 個のテーブルをサポートします。1,000 個を超えるテーブルを含むタスクは、リクエストエラーで失敗します。テーブルを複数のタスクに分割するか、代わりにデータベース全体を同期してください。
Oracle RAC:
Express Connect 経由で接続されたソースが Oracle RAC データベースの場合、タスク設定時に仮想 IP アドレス(VIP)を指定します。
Oracle RAC データベースでは、Single Client Access Name(SCAN)IP アドレスではなく VIP を使用します。VIP を指定した後は、Oracle RAC データベースのノードフェイルオーバーはサポートされません。
ログ保持:
| 同期範囲 | 最低ログ保持期間 |
|---|---|
| 増分データ同期のみ | 24 時間以上 |
| 完全データ同期+増分データ同期 | 最低 7 日間 |
DTS がリドゥログまたはアーカイブログを読み取れない場合、タスクは失敗するだけでなく、データの不整合やデータ損失が発生する可能性があります。完全データ同期が完了した後は、保持期間を 24 時間以上に短縮できます。この最低保持期間を下回る設定では、DTS のサービスレベル合意(SLA)は適用されません。
その他のソース制限事項:
タスク実行中にソースデータベースでプライマリ/セカンダリ スイッチオーバーが発生すると、タスクは失敗します。
Oracle では VARCHAR2 型の空文字列は NULL 値として扱われます。送信先の対応カラムに非 NULL 制約(NOT NULL constraint)が設定されている場合、タスクは失敗します。
同期中に LONGTEXT フィールドを更新しないでください。更新するとタスクが失敗します。
スキーマ同期または完全データ同期中に DDL 文を実行しないでください。実行するとタスクが失敗します。
単一レコードサイズ制限
DTS が Kafka に書き込む単一レコードの最大サイズは 10 MB です。ソースの行がこの制限を超えると、タスクは中断されます。
これを回避するには、大規模フィールドを含むテーブル全体を同期しないでください。代わりに特定のカラムのみを同期します。タスク設定時に、大規模な値を持つカラムを除外します。すでにタスクに大規模フィールドを含むテーブルが含まれている場合は、そのテーブルを削除し、再度追加してから、大規模カラムを除外するフィルター条件を適用します。
その他の制限事項
名前変更されたテーブル: ソースデータベースでテーブルの名前を変更し、その新しい名前が選択済みのオブジェクトに含まれていない場合、DTS はその名前変更されたテーブルのデータを Kafka に同期しません。同期を再開するには、オブジェクトを再選択してください。詳細については、「データ同期タスクにオブジェクトを追加する」をご参照ください。
Oracle Data Pump:増分データ同期中に、Oracle Data Pump を使用してソースデータベースにデータを書き込まないでください。データ損失が発生する可能性があります。
パフォーマンスへの影響:完全データ同期中、DTS はソースおよび送信先データベースの読み取り/書き込みリソースを使用するため、サーバー負荷が増加する可能性があります。影響を最小限に抑えるため、ピーク時間帯を避けて同期タスクを実行してください。
表領域サイズ:完全データ同期中の並列 INSERT 操作により、テーブルの断片化が発生します。完全データ同期完了後、送信先の表領域はソースよりも大きくなります。
同期遅延:同期遅延は、送信先で最新に同期されたレコードのタイムスタンプとソースの現在時刻に基づいて計算されます。ソースで長期間 DML 操作が実行されない場合、表示される遅延は不正確になる可能性があります。遅延を更新するには、ソースで DML 操作を実行してください。データベース全体を同期する場合は、1 秒ごとに更新されるハートビートテーブルを作成して、遅延の精度を維持します。
排他的書き込み:送信先へのデータ書き込みは DTS を通じてのみ行ってください。他のツールを通じた書き込み(例:DMS を使用したオンライン DDL 操作)は、データの不整合やデータ損失を引き起こす可能性があります。
Kafka のスケーリング:送信先 Kafka クラスターをスケールアップまたはスケールダウンした場合、Kafka クラスターを再起動して同期を再開します。
データ同期タスクの作成
ステップ 1:データ同期タスクページへ移動
Data Management (DMS) コンソール にログインします。
上部のナビゲーションバーで、Data + AI をクリックします。
左側のナビゲーションウィンドウで、DTS (DTS) > データ同期 を選択します。
手順は、DMS コンソールのモードおよびレイアウトによって異なる場合があります。「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。また、新しい DTS コンソールのデータ同期タスクページに直接アクセスすることもできます。
ステップ 2:リージョンの選択
データ同期タスク の右側で、データ同期インスタンスが配置されているリージョンを選択します。
新規 DTS コンソールでは、上部のナビゲーションバーからリージョンを選択します。
ステップ 3:ソースおよび送信先データベースの設定
タスクの作成 をクリックします。データ同期タスクの作成 ウィザードで、以下のパラメーターを設定します。
ソースデータベース
| パラメーター | 説明 |
|---|---|
| タスク名 | DTS タスクの名前です。DTS がデフォルト名を自動生成しますが、識別しやすいように意味のある名前を付けることを推奨します。名前は一意である必要はありません。 |
| データベースタイプ | Oracle を選択します。 |
| アクセス方法 | ソースデータベースへのアクセス方法です。本例では ECS 上の自己管理データベース を使用します。ソースが自己管理データベースの場合、事前にネットワーク環境を構築する必要があります。「事前準備の概要」をご参照ください。 |
| インスタンスリージョン | 自己管理 Oracle データベースが配置されているリージョンです。 |
| ECS インスタンス ID | Oracle データベースをホストする Elastic Compute Service (ECS) インスタンスの ID です。 |
| ポート番号 | Oracle データベースのサービスポートです。デフォルト値:1521。 |
| Oracle タイプ | Oracle のアーキテクチャです。非 RAC インスタンス を選択して SID を設定するか、RAC または PDB インスタンス を選択して サービス名 を設定します。 説明 RAC インスタンスはサポートされていません。本例では 非 RAC インスタンス を使用します。 |
| データベースアカウント | Oracle アカウントです。必要な権限については、「必要な権限」をご参照ください。 |
| データベースパスワード | Oracle アカウントのパスワードです。 |
宛先データベース
| パラメーター | 説明 |
|---|---|
| データベースタイプ | Kafka を選択します。 |
| アクセス方法 | Express Connect、VPN Gateway、または Smart Access Gateway を選択します。DTS は ApsaraMQ for Kafka を専用の接続方法としてリスト表示しません。自己管理 Kafka クラスターとして接続します。 |
| インスタンスリージョン | ApsaraMQ for Kafka インスタンスが配置されているリージョンです。 |
| 接続済み VPC | ApsaraMQ for Kafka インスタンスが属する仮想プライベートクラウド(VPC)の ID です。VPC ID を取得するには、ApsaraMQ for Kafka コンソールを開き、インスタンスの詳細ページを表示して、インスタンス情報 タブの 設定情報 で VPC ID を確認します。 |
| IP アドレスまたはドメイン名 | ApsaraMQ for Kafka インスタンスの デフォルトエンドポイント パラメーターから取得した IP アドレスです。IP アドレスを取得するには、インスタンスの詳細ページを表示し、インスタンス情報 タブの エンドポイント情報 を確認します。 |
| ポート番号 | ApsaraMQ for Kafka インスタンスのサービスポートです。デフォルト値:9092。 |
| データベースアカウント | ApsaraMQ for Kafka インスタンスのアカウントです。VPC タイプのインスタンス の場合は不要です。 |
| データベースパスワード | アカウントのパスワードです。 |
| Kafka バージョン | ApsaraMQ for Kafka インスタンスのバージョンです。 |
| 暗号化 | セキュリティ要件に応じて、非暗号化 または SCRAM-SHA-256 を選択します。 |
| トピック | 同期データを受信するトピックです。ドロップダウンリストから選択します。 |
| DDL 情報を保存するトピック | DDL 情報を保存するトピックです。未設定の場合、DDL 情報は トピック で指定されたトピックに保存されます。 |
| Kafka Schema Registry の使用 | Kafka Schema Registry を使用するかどうかです。No または Yes を選択します。Yes を選択した場合、Avro スキーマ用に Kafka Schema Registry に登録された URL または IP アドレスを入力します。 |
ステップ 4:接続性のテスト
接続性のテストと続行 をクリックします。
DTS は、Alibaba Cloud データベースインスタンス (ApsaraDB RDS for MySQL や ApsaraDB for MongoDB など) のホワイトリストに、その CIDR ブロックを自動的に追加します。ECS 上のデータベースの場合、DTS はその CIDR ブロックを ECS セキュリティグループルールに自動的に追加します。データベースが複数の ECS インスタンスにデプロイされている場合は、各 ECS インスタンスのセキュリティグループルールに DTS の CIDR ブロックを手動で追加する必要があります。オンプレミスデータベースまたはサードパーティプロバイダーのデータベースの場合、DTS の CIDR ブロックをデータベースのホワイトリストに手動で追加する必要があります。詳細については、「DTS サーバーの CIDR ブロック」をご参照ください。
DTS の CIDR ブロックをホワイトリストまたはセキュリティグループルールに追加すると、セキュリティリスクが発生する可能性があります。実行前に、認証情報の強化、公開ポートの制限、API 呼び出しの監査、ホワイトリストルールの定期的な見直し、不正な CIDR ブロックの遮断などの予防措置を講じてください。あるいは、Express Connect、VPN Gateway、または Smart Access Gateway を使用してデータベースを DTS に接続することもできます。
ステップ 5:オブジェクトおよび同期設定の構成
以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| [同期タイプ] | デフォルトでは、[増分データ同期] が選択されています。[スキーマ同期] と [完全データ同期] も選択してください。事前チェック後、DTS は増分同期のベースラインとして、ソースから送信先へ既存データを同期します。 |
| [競合するテーブルの処理モード] | [事前チェックしてエラーを報告]:DTS は、ソースと送信先に同じ名前のテーブルがあるかどうかをチェックします。競合が見つかった場合、タスクは開始できません。送信先テーブルを削除または名前変更せずに名前の競合を解決するには、オブジェクト名マッピング機能を使用します。詳細については、「オブジェクト名のマッピング」をご参照ください。[エラーを無視して続行]:名前の競合チェックをスキップします。ソースと送信先が同じスキーマを共有している場合、DTS は完全データ同期中に既存の送信先レコードを保持し、増分同期中にそれらを上書きします。スキーマが異なる場合、初期化が部分的に失敗するか、タスクが完全に失敗する可能性があります。注意してご使用ください。 |
| [Kafka でのデータフォーマット] | Kafka に書き込まれるデータのフォーマットです。[DTS Avro]:データは DTS Avro スキーマに従います。GitHub の スキーマ定義をご参照ください。[SharePlex JSON]:データは SharePlex JSON フォーマットを使用します。詳細については、「Shareplex Json」をご参照ください。 |
| [Kafka パーティションへのデータ転送ポリシー] | 同期されるデータのパーティション分割ポリシーです。詳細については、「Kafka パーティションへのデータ移行ポリシーの指定」をご参照ください。 |
| [宛先インスタンスでのオブジェクト名の大文字/小文字の区別] | 送信先のデータベース、テーブル、およびカラム名の大文字/小文字の区別に関するポリシーです。デフォルト:[DTS のデフォルトポリシー]。詳細については、「宛先インスタンスでのオブジェクト名の大文字/小文字の区別の指定」をご参照ください。 |
| [ソースオブジェクト] | [ソースオブジェクト] セクションからカラム、テーブル、またはデータベースを選択し、 |
| [選択したオブジェクト] | 単一のオブジェクトの名前を変更するには、そのオブジェクトを右クリックします。詳細については、「単一オブジェクト名のマッピング」をご参照ください。複数のオブジェクトの名前を一度に変更するには、[バッチ編集] をクリックします。詳細については、「複数オブジェクト名の一括マッピング」をご参照ください。テーブルの特定の SQL 操作をフィルターするには、テーブルを右クリックして操作を選択します。詳細については、「同期可能な SQL 操作」をご参照ください。行をフィルターするには、テーブルを右クリックしてフィルター条件を設定します。詳細については、「フィルター条件の設定」をご参照ください。オブジェクトの名前を変更すると、依存オブジェクトの同期が失敗する可能性があります。 |
ステップ 6:高度な設定の構成
次へ:高度な設定 をクリックし、以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| タスクスケジューリング用専用クラスター | DTS はデフォルトで共有クラスターを使用します。安定性を向上させるには、専用クラスターを購入してください。詳細については、「DTS 専用クラスターとは」をご参照ください。 |
| 接続失敗時の再試行時間 | 接続失敗時の再試行ウィンドウです。有効値:10~1440 分。デフォルト値:720 分。30 分より大きい値を設定してください。DTS がこのウィンドウ内で再接続できた場合、タスクは再開されます。それ以外の場合はタスクが失敗します。複数のタスクが同じソースまたは送信先を共有する場合、最も短い再試行ウィンドウが適用されます。再試行中は、DTS インスタンスの課金が発生します。 |
| その他の問題発生時の再試行時間 | DDL または DML 操作失敗時の再試行ウィンドウです。有効値:1~1440 分。デフォルト値:10 分。10 分より大きい値を設定してください。この値は 接続失敗時の再試行時間 よりも小さくする必要があります。 |
| 完全データ移行のスロットル機能の有効化 | 完全データ同期中の送信先データベースサーバーへの負荷を制限します。ソースデータベースへのクエリ数(QPS)、完全データ移行の RPS、完全移行のデータ移行速度(MB/s) を設定します。このパラメーターは、完全データ同期 が選択されている場合にのみ利用可能です。 |
| 増分データ同期のスロットル機能の有効化 | 増分同期中の負荷を制限します。増分データ同期の RPS および 増分同期のデータ同期速度(MB/s) を設定します。 |
| 環境タグ | DTS インスタンスを識別するためのタグです。要件に応じて選択してください。本例ではタグは使用しません。 |
| 実際の書き込みコード | 送信先に書き込まれるデータのエンコード形式です。 |
| ETL の設定 | ETL(抽出・変換・書き出し)を有効にするかどうか。データ処理ステートメントを設定するには、[はい] を選択します。詳細については、「ETL をデータ移行またはデータ同期タスクで設定する」をご参照ください。ETL をスキップするには、[いいえ] を選択します。 |
| モニタリングとアラート | タスクの失敗または高い同期遅延に対してアラートを設定するかどうかを指定します。[いいえ] または [はい] を選択します。[はい] を選択した場合は、アラートのしきい値と通知設定を設定します。「DTS タスクの作成時にモニタリングとアラートを設定する」をご参照ください。 |
ステップ 7:設定の保存と事前チェックの実行
保存前に OpenAPI パラメーターをプレビューするには、次へ:タスク設定の保存と事前チェック の上にマウスを合わせ、OpenAPI パラメーターのプレビュー をクリックします。
次へ:タスク設定の保存と事前チェック をクリックします。
DTS は、タスク開始前に事前チェックを実行します。事前チェックが失敗した場合:
各失敗項目の横にある 詳細の表示 をクリックし、問題を解決してから再度事前チェックを実行します。
無視可能な項目でアラートがトリガーされた場合、アラートの詳細の確認 をクリックし、ダイアログボックスで 無視 をクリックして OK をクリックし、その後 再チェック をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。
ステップ 8:同期インスタンスの購入
成功率 が 100% に達するまで待ち、次へ:インスタンスの購入 をクリックします。
購入ページで、以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| 課金方法 | サブスクリプション:固定期間の前払い方式です。長期利用に適しています。従量課金:時間単位で課金されます。短期利用に適しています。不要になったらインスタンスを解放して課金を停止できます。 |
| リソースグループ設定 | インスタンスのリソースグループです。デフォルト値:デフォルトリソースグループResource Management とは |
| インスタンスクラス | データ同期インスタンスのクラスインスタンスクラスで、同期速度が決まります。「」をご参照ください。 |
| サブスクリプション期間 | サブスクリプション 課金方法でのみ利用可能です。有効値:1~9 か月、または 1、2、3、5 年です。 |
ステップ 9:タスクの開始
Data Transmission Service(従量課金)利用規約 を読み、チェックボックスをオンにして、購入して開始 をクリックします。ダイアログボックスで OK をクリックします。
タスクはタスクリストに表示されます。そこから進行状況を監視できます。