すべてのプロダクト
Search
ドキュメントセンター

Dataphin:リアルタイム統合タスクの作成

最終更新日:Nov 20, 2025

リアルタイム統合により、複数のデータソースからデータを収集し、ターゲットデータソースに結合できます。このプロセスにより、データ同期のためのリアルタイムリンクが作成されます。このトピックでは、リアルタイム統合タスクの作成方法について説明します。

前提条件

リアルタイム統合タスクを作成する前に、必要なデータソースを設定する必要があります。これにより、設定プロセス中にソースデータとターゲットデータを選択できます。詳細については、「リアルタイム統合でサポートされているデータソース」をご参照ください。

背景情報

  • ターゲットデータソースとして Oracle または MySQL を選択した場合、Java Database Connectivity (JDBC) プロトコルが使用されます。さまざまなメッセージは、次のポリシーに基づいて処理されます。

    • 結果テーブルにプライマリキーがない場合:

      • INSERT メッセージは直接追加されます。

      • UPDATE_BEFORE メッセージは破棄されます。UPDATE_AFTER メッセージは直接追加されます。

      • DELETE メッセージは破棄されます。

    • 結果テーブルにプライマリキーがある場合

      • INSERT メッセージは UPSERT メッセージとして処理されます。

      • UPDATE_BEFORE メッセージは破棄されます。UPDATE_AFTER メッセージは UPSERT メッセージとして処理されます。

      • DELETE メッセージは DELETE メッセージとして処理されます。

  • JDBC プロトコルはデータを即時に書き込むため、ノードがフェイルオーバーし、結果テーブルにプライマリキーがない場合、重複データが存在する可能性があります。Exactly-once 配信は保証されません。

  • JDBC プロトコルは、テーブルの作成とフィールドの追加のためのデータ定義言語 (DDL) 文のみをサポートするため、他のタイプの DDL メッセージは破棄されます。

  • Oracle は基本的なデータの型のみをサポートします。INTERVAL YEAR、INTERVAL DAY、BFILE、SYS.ANY、XML、map、ROWID、および UROWID データの型はサポートされていません。

  • MySQL は基本的なデータの型のみをサポートします。map データの型はサポートされていません。

  • 順序が乱れたデータによるデータの不整合を防ぐため、単一の同時タスクのみがサポートされます。

  • Oracle データソースは、Oracle Database 11g、Oracle Database 19c、および Oracle Database 21c をサポートします。

  • MySQL データソースは、MySQL 8.0、MySQL 8.4、および MySQL 5.7 をサポートします。

ステップ 1: リアルタイム統合タスクの作成

  1. Dataphin のホームページで、トップメニューバーから [開発者] > [データ統合] を選択します。

  2. トップメニューバーで、プロジェクトを選択します。Dev-Prod モードの場合は、環境も選択する必要があります。

  3. 左側のナビゲーションウィンドウで、[統合] > [ストリームパイプライン] を選択します。

  4. リアルタイム統合リストで、image アイコンをクリックし、[リアルタイム統合タスク] を選択して [リアルタイム統合タスクの作成] ダイアログボックスを開きます。

  5. [リアルタイム統合タスクの作成] ダイアログボックスで、次のパラメーターを設定します。

    パラメーター

    説明

    タスク名

    リアルタイムタスクの名前を入力します。

    名前は文字で始まる必要があります。小文字、数字、アンダースコア (_) のみを含めることができます。名前の長さは 4〜63 文字である必要があります。

    本番/開発環境キューリソース

    リアルタイムタスク用に設定されている任意のリソースグループを選択できます。

    説明

    この設定項目は、プロジェクトが Kubernetes デプロイメントモードの Flink コンピュートソースを使用している場合にのみ使用できます。

    説明

    タスクの簡単な説明を入力します。説明の長さは最大 1,000 文字です。

    ディレクトリの選択

    リアルタイムタスクを保存するディレクトリを選択します。

    ディレクトリが存在しない場合は、次のように新しいフォルダを作成できます:

    1. 左側のリアルタイムタスクリストの上にある image アイコンをクリックして、[新しいフォルダ] ダイアログボックスを開きます。

    2. [新しいフォルダ] ダイアログボックスで、フォルダの [名前] を入力し、必要に応じて [ディレクトリの選択] で場所を選択します。

    3. [OK] をクリックします。

  6. 設定が完了したら、[OK] をクリックします。

ステップ 2: リアルタイム統合タスクの設定

サポートされているソースデータソースとターゲットデータソースは、リアルタイムコンピューティングエンジンによって異なります。詳細については、「リアルタイム統合でサポートされているデータソース」をご参照ください。

ソースデータソース

MySQL

パラメーター

説明

データソース設定

データソースタイプ

[MySQL] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「MySQL データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

同期ルール設定

同期ポリシー

[リアルタイム増分] または [リアルタイム増分 + 完全] を選択します。デフォルト値は [リアルタイム増分] です。

  • リアルタイム増分: ソースデータベースから増分変更を収集し、発生順に下流のターゲットデータベースに書き込みます。

  • リアルタイム増分 + 完全: ソースデータベースから完全データを一度にインポートし、その後、増分変更を収集して発生順に下流のターゲットデータベースに書き込みます。

説明

ターゲットデータソースが Hive (Hudi テーブル形式)MaxCompute、または Databricks の場合、リアルタイム増分 + 完全を選択できます。

選択方法

[データベース全体]、[テーブルの選択]、または [テーブルの除外] を選択できます。

  • データベース全体: 選択したデータソース配下のすべてのデータベースのすべてのテーブルを同期します。

  • [テーブルの選択]/[テーブルの除外]: 現在のデータベース内の一部のテーブルをリアルタイム同期用に選択します。テーブルを選択した後、[プレビュー] をクリックして、[選択]/[テーブルプレビューの除外] ダイアログボックスで一致したすべてのテーブルを表示できます。ダイアログボックスでは、キーワードでテーブルを検索し、テーブルを個別または一括で削除できます。正規表現の一致では削除はサポートされていません。

    • 一括選択/一括除外: 一括選択を使用すると、現在のデータベースで選択されたテーブルがリアルタイムで同期されます。一括除外を使用すると、選択されたテーブルは同期されません。

      選択したデータソース配下のすべてのデータベースのすべてのテーブルを選択できます。テーブルは DBname.Tablename 形式で表示されます。

    • 正規表現一致: [正規表現] ボックスにテーブル名の正規表現を入力します。schemaA.*|schemaB.* などの Java 正規表現がサポートされています。

      選択したデータソース配下のすべてのデータベースのすべてのテーブルを一括で照合できます。正規表現照合には、データベース名 (DBname) とテーブル名 (Tablename) を使用できます。

Microsoft SQL Server

パラメーター

説明

データソース設定

データソースタイプ

[Microsoft SQL Server] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Microsoft SQL Server データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

同期ルール設定

同期ポリシー

[リアルタイム増分] のみがサポートされています。ソースデータベースからの増分変更は収集され、発生順にリアルタイムで下流のターゲットデータベースに書き込まれます。

選択方法

[データベース全体]、[テーブルの選択]、または [テーブルの除外] を選択できます。

  • データベース全体: 現在のデータベース全体を同期します。

  • [テーブルの選択]/[テーブルの除外]: 現在のデータベース内の一部のテーブルをリアルタイム同期用に選択します。テーブルを選択した後、[プレビュー] をクリックして、[選択]/[テーブルプレビューの除外] ダイアログボックスで一致したすべてのテーブルを表示できます。ダイアログボックスでは、キーワードでテーブルを検索し、テーブルを個別または一括で削除できます。

    一括選択/一括除外: 一括選択を使用すると、現在のデータベースで選択されたテーブルがリアルタイムで同期されます。一括除外を使用すると、選択されたテーブルは同期されません。

PostgreSQL

パラメーター

説明

データソース設定

データソースタイプ

[PostgreSQL] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「PostgreSQL データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

同期ルール設定

同期ポリシー

[リアルタイム増分] のみがサポートされています。ソースデータベースからの増分変更は収集され、発生順にリアルタイムで下流のターゲットデータベースに書き込まれます。

選択方法

[データベース全体] または [テーブルの選択] を選択できます。

  • データベース全体: 現在のデータベース全体を同期します。

  • [テーブルの選択]: 現在のデータベース内の一部のテーブルをリアルタイム同期用に選択します。テーブルを選択した後、[プレビュー] をクリックして、[選択] [テーブルプレビュー] ダイアログボックスで一致したすべてのテーブルを表示できます。ダイアログボックスでは、キーワードでテーブルを検索し、テーブルを個別または一括で削除できます。

    一括選択: 現在のデータベースで選択されたテーブルがリアルタイムで同期されます。

Oracle

パラメーター

説明

データソース設定

データソースタイプ

[Oracle] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Oracle データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

同期ルール設定

同期ポリシー

[リアルタイム増分] のみがサポートされています。ソースデータベースからの増分変更は収集され、発生順にリアルタイムで下流のターゲットデータベースに書き込まれます。

選択方法

[データベース全体]、[テーブルの選択]、または [テーブルの除外] を選択できます。

  • データベース全体: 選択したデータソース配下のすべてのデータベースのすべてのテーブルを同期します。

  • [テーブルの選択]/[テーブルの除外]: 現在のデータベース内の一部のテーブルをリアルタイム同期用に選択します。テーブルを選択した後、[プレビュー] をクリックして、[選択]/[テーブルプレビューの除外] ダイアログボックスで一致したすべてのテーブルを表示できます。ダイアログボックスでは、キーワードでテーブルを検索し、テーブルを個別または一括で削除できます。正規表現の一致では削除はサポートされていません。

    • 一括選択/一括除外: 一括選択を使用すると、現在のデータベースで選択されたテーブルがリアルタイムで同期されます。一括除外を使用すると、選択されたテーブルは同期されません。

    • 正規表現一致: [正規表現] ボックスにテーブル名の正規表現を入力します。schemaA.*|schemaB.* などの Java 正規表現がサポートされています。

IBM DB2

パラメーター

説明

データソース設定

データソースタイプ

[IBM DB2] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「IBM DB2 データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

同期ルール設定

同期ポリシー

[リアルタイム増分] のみがサポートされています。ソースデータベースからの増分変更は収集され、発生順にリアルタイムで下流のターゲットデータベースに書き込まれます。

選択方法

[データベース全体]、[テーブルの選択]、または [テーブルの除外] を選択できます。

  • データベース全体: 選択したデータソース配下のすべてのデータベースのすべてのテーブルを同期します。

  • [テーブルの選択]/[テーブルの除外]: 現在のデータベース内の一部のテーブルをリアルタイム同期用に選択します。テーブルを選択した後、[プレビュー] をクリックして、[選択]/[テーブルプレビューの除外] ダイアログボックスで一致したすべてのテーブルを表示できます。ダイアログボックスでは、キーワードでテーブルを検索し、テーブルを個別または一括で削除できます。

    一括選択/一括除外: 一括選択を使用すると、現在のデータベースで選択されたテーブルがリアルタイムで同期されます。一括除外を使用すると、選択されたテーブルは同期されません。

Kafka

パラメーター

説明

データソース設定

データソースタイプ

[Kafka] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Kafka データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

ソーストピック

ソースデータの Topic を選択します。Topic 名のキーワードを入力して、あいまい検索を実行できます。

データ形式

[Canal JSON] のみがサポートされています。Canal JSON は Canal と互換性のある形式で、そのデータストレージ形式は Canal JSON です。

キータイプ

Kafka のキータイプで、KafkaConsumer の初期化時に key.deserializer の設定を決定します。[STRING] のみがサポートされています。

値のタイプ

Kafka の値のタイプで、KafkaConsumer の初期化時に value.deserializer の設定を決定します。[STRING] のみがサポートされています。

使用者グループ ID (オプション)

使用者グループの ID を入力します。使用者グループ ID は、ステータスオフセットを報告するために使用されます。

同期ルール設定

テーブルリスト

同期するテーブルの名前を入力します。複数のテーブル名は改行で区切ります。値の長さは最大 1,024 文字です。

テーブル名は、tablenamedb.tablename、または schema.tablename の 3 つの形式のいずれかになります。

Hive (Hudi テーブル形式)

リアルタイムコンピューティングエンジンが Apache Flink で、コンピュートリソースが Flink on YARN デプロイメントである場合にのみ、ソースデータソースとして Hive (Hudi データソース) を選択できます。

パラメーター

説明

データソース設定

データソースタイプ

[Hive] を選択します。

データソース

Hudi テーブル形式の Hive データソースのみを選択できます。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Hive データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

同期ルール設定

同期ポリシー

[リアルタイム増分] のみがサポートされています。ソースデータベースからの増分変更は収集され、発生順にリアルタイムで下流のターゲットデータベースに書き込まれます。

テーブルの選択

リアルタイム同期用に単一のテーブルを選択します。

PolarDB (MySQL データベースタイプ)

パラメーター

説明

データソース設定

データソースタイプ

[PolarDB] を選択します。

データソース

MySQL データベースタイプの PolarDB データソースのみを選択できます。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「PolarDB データソースの作成」をご参照ください。

重要

データソースのロギングを有効にし、設定されたアカウントにログの読み取り権限があることを確認してください。そうしないと、システムはこのデータソースからリアルタイムでデータを同期できません。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

同期ルール設定

同期ポリシー

[リアルタイム増分] または [リアルタイム増分 + 完全] を選択します。デフォルト値は [リアルタイム増分] です。

  • リアルタイム増分: ソースデータベースから増分変更を収集し、発生順に下流のターゲットデータベースに書き込みます。

  • リアルタイム増分 + 完全: ソースデータベースから完全データを一度にインポートし、その後、増分変更を収集して発生順に下流のターゲットデータベースに書き込みます。

説明

ターゲットデータソースが Hive (Hudi テーブル形式)MaxCompute、または Databricks の場合、リアルタイム増分 + 完全を選択できます。

選択方法

[データベース全体]、[テーブルの選択]、または [テーブルの除外] を選択できます。

  • データベース全体: 選択したデータソース配下のすべてのデータベースのすべてのテーブルを同期します。

  • [テーブルの選択]/[テーブルの除外]: 現在のデータベース内の一部のテーブルをリアルタイム同期用に選択します。テーブルを選択した後、[プレビュー] をクリックして、[選択]/[テーブルプレビューの除外] ダイアログボックスで一致したすべてのテーブルを表示できます。ダイアログボックスでは、キーワードでテーブルを検索し、テーブルを個別または一括で削除できます。正規表現の一致では削除はサポートされていません。

    • 一括選択/一括除外: 一括選択を使用すると、現在のデータベースで選択されたテーブルがリアルタイムで同期されます。一括除外を使用すると、選択されたテーブルは同期されません。

    • 正規表現一致: [正規表現] ボックスにテーブル名の正規表現を入力します。schemaA.*|schemaB.* などの Java 正規表現がサポートされています。

ターゲットデータソース

MaxCompute

パラメーター

説明

データソース設定

データソースタイプ

[MaxCompute] を選択します。

データソース

ターゲットデータソースを選択します。MaxCompute データソースとプロジェクトを選択できます。[新規] をクリックしてデータソースページでデータソースを作成することもできます。詳細については、「MaxCompute データソースの作成」をご参照ください。

新しい結果テーブルの設定

新しいテーブルタイプ

[標準テーブル] または [Delta テーブル] を選択できます。デフォルト値は [標準テーブル] です。

[Delta テーブル] を選択し、結果テーブルの作成方法を [テーブルの自動作成] に設定すると、MaxCompute Delta テーブルが作成されます。Delta テーブルの作成時には追加フィールドは使用されません。

説明

結果テーブルを設定した後、新しいテーブルタイプを変更すると、システムは確認を求めます。ダイアログボックスで [OK] をクリックすると、結果テーブルの設定がクリアされ、再設定する必要があります。

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

パーティション形式

[新しいテーブルタイプ] を [標準テーブル] に設定した場合、[複数パーティション] のみがサポートされます。[新しいテーブルタイプ] を [Delta テーブル] に設定した場合、[パーティションなし] または [複数パーティション] を選択できます。

パーティション間隔

[パーティション形式] を [パーティションなし] に設定した場合、パーティション間隔は設定できません。[パーティション形式] を [複数パーティション] に設定した場合、[パーティション間隔] を [時間] または [日] に設定できます。

説明
  • 時間: YYYY、MM、DD、HH の 4 つのレベルのパーティションを作成します。

  • : YYYY、MM、DD の 3 つのレベルのパーティションを作成します。

MySQL

パラメーター

説明

データソース設定

データソースタイプ

[MySQL] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「MySQL データソースの作成」をご参照ください。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

新規シンクテーブル設定

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

Microsoft SQL Server

パラメーター

説明

データソース設定

データソースタイプ

[Microsoft SQL Server] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Microsoft SQL Server データソースの作成」をご参照ください。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

新しいシンクテーブルの設定

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

Oracle

パラメーター

説明

データソース設定

データソースタイプ

[Oracle] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Oracle データソースの作成」をご参照ください。

タイムゾーン

選択したデータソースに設定されているタイムゾーンを表示します。

新しい結果テーブルの設定

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

Kafka

パラメーター

説明

データソース設定

データソースタイプ

[Kafka] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Kafka データソースの作成」をご参照ください。

ターゲット Topic

ターゲットデータの Topic です。[単一 Topic] または [複数 Topic] を選択できます。[単一 Topic] を選択した場合は、ターゲット Topic を選択する必要があります。Topic 名のキーワードを入力して検索できます。[複数 Topic] を選択した場合は、Topic 名の変換と Topic パラメーターを設定できます。

  • 単一 Topic: すべてのテーブルメッセージが同じ Topic に書き込まれます。

  • 複数 Topic: 各テーブルに対して同じ名前の Topic が作成されます。

データ形式

書き込まれるデータのストレージ形式を設定しますサポートされている形式には、[DTS Avro][Canal Json] があります。

  • DTS Avro: データ構造やオブジェクトを保存や送信が容易な形式に変換するデータシリアル化形式です。

  • Canal Json: Canal と互換性のある形式です。データストレージ形式は Canal Json です。

説明

[ターゲット Topic] を [複数 Topic] に設定した場合、データ形式として [Canal Json] のみを選択できます。

ターゲット Topic 設定

Topic 名の変換

[Topic 名変換の設定] をクリックします。[Topic 名変換ルールの設定] ダイアログボックスで、[Topic 名変換ルール]Topic 名のプレフィックスとサフィックスを設定できます。

  • Topic 名変換ルール: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲット Topic の置換文字列] を入力する必要があります。どちらもにすることはできません。[ターゲット Topic の置換文字列] には、文字、数字、アンダースコア (_) のみを含めることができ、長さは最大 32 文字です。

  • Topic 名のプレフィックスとサフィックス: 文字、数字、アンダースコア (_) を入力できます。長さは 32 文字を超えることはできません。

説明
  • 置換文字列Topic 名のプレフィックスとサフィックスの文字は、自動的に小文字に変換されます。

  • [ターゲット Topic] が [複数 Topic] に設定されている場合にのみ、Topic 名の変換を設定できます。

Topic パラメーター

Topic を作成するための追加パラメーター。形式は key=value です。複数のパラメーターは改行で区切ります。

説明

この項目は、[ターゲット Topic] が [複数 Topic] に設定されている場合にのみ設定できます。

DataHub

パラメーター

説明

ターゲットデータ

データソースタイプ

[DataHub] を選択します。

データソース

ターゲットデータソースを選択します。

システムは、新しいデータソースを作成するためのショートカットを提供します。[新規] をクリックして、データソースページで DataHub データソースを作成できます。詳細については、「DataHub データソースの作成」をご参照ください。

転送先トピックの作成方法

[新しい] [Topic] または [既存の Topic を使用] を選択できます。

  • 新規 Topic: 宛先 Topic を手動で入力して作成します。

  • [既存の Topic を使用]: ターゲットデータベース内の既存の Topic を使用します。Topic のスキーマが同期メッセージの形式と一致していることを確認してください。そうしないと、同期タスクは失敗します。

ターゲット Topic

  • [ターゲット Topic の作成方法][Topic の作成] に設定します。

    [宛先トピック] は手動で入力する必要があります [宛先トピック] 名は、小文字の英字で始まり、3~64 文字の数字、英字、アンダースコア (_) を含む必要があります。

    名前を入力した後、[検証] をクリックして、Topic がターゲットデータベースにすでに存在するかどうかを確認できます。

    • Topic がターゲットデータベースに存在しない場合は、自動的に作成されます。そのスキーマは同期メッセージのスキーマであり、デフォルトのライフサイクルは 7 日です。

    • Topic がターゲットデータベースにすでに存在する場合は、そのスキーマが同期メッセージのスキーマと一致していることを確認してください。そうしないと、タスクは失敗します。

  • [ターゲット Topic の作成方法][既存の Topic を使用] です。

    ドロップダウンリストをクリックして、ターゲットデータベース内の既存の Topic を選択します。Topic が多数ある場合は、Topic 名を入力して必要なものを検索できます。

Databricks

パラメーター

説明

データソース設定

データソースタイプ

[Databricks] を選択します。

データソース

ターゲットデータソースを選択します。Databricks データソースとプロジェクトを選択できます。[新規] をクリックしてデータソースページでデータソースを作成することもできます。詳細については、「Databricks データソースの作成」をご参照ください。

タイムゾーン

時間形式のデータは、現在のタイムゾーンに基づいて処理されます。デフォルト値は、選択したデータソースで設定されたタイムゾーンであり、変更できません。

説明

タイムゾーン変換は、ソースデータソースタイプが MySQL または PostgreSQL で、ターゲットデータソースタイプが Databricks の場合にのみサポートされます。

新規シンクテーブル設定

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

パーティション形式

[パーティションなし] または [複数パーティション] を選択できます。

パーティション間隔

[パーティション形式] を [パーティションなし] に設定した場合、パーティション間隔は設定できません。[パーティション形式] を [複数パーティション] に設定した場合、[パーティション間隔] を [時間] または [日] に設定できます。

説明
  • 時間: YYYY、MM、DD、HH の 4 つのレベルのパーティションを作成します。

  • : YYYY、MM、DD の 3 つのレベルのパーティションを作成します。

SelectDB

パラメーター

説明

データソース設定

データソースタイプ

[SelectDB] を選択します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「SelectDB データソースの作成」をご参照ください。

新しい結果テーブルの設定

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

Hive

パラメーター

説明

データソース設定

データソースタイプ

[データソースタイプ] を [Hive] に設定します。

データソース

データソースを選択します。[新規] をクリックして [データソース] ページでデータソースを作成することもできます。詳細については、「Hive データソースの作成」をご参照ください。

新規シンクテーブル設定

データレイクテーブル形式

[なし][Hudi][Iceberg]、または [Paimon] を選択できます。

  • なし: データは標準の Hive テーブルとして書き込まれ、テーブルが作成されます。

  • Hudi: データは Hudi 形式で書き込まれ、テーブルが作成されます。Hive データソースのバージョンが CDP7.x Hive 3.1.3 の場合にのみ Hudi を選択できます。

  • Iceberg: データは Iceberg 形式で書き込まれ、テーブルが作成されます。Hive データソースのバージョンが EMR5.x Hive 3.1.x の場合にのみ Iceberg を選択できます。

  • Paimon: データは Paimon 形式で書き込まれ、テーブルが作成されます。Hive データソースのバージョンが EMR5.x Hive 3.1.x の場合にのみ Paimon を選択できます。

説明

この項目は、選択した Hive データソースでデータレイクテーブル形式の設定有効になっている場合にのみ設定できます。

Hudi テーブルタイプ/Paimon テーブルタイプ

Hudi テーブルタイプでは、[MOR] (マージオンリード) または [COW] (コピーオンライト) を選択できます。

Paimon テーブルタイプでは、[MOR] (マージオンリード)、[COW] (コピーオンライト)、または [MOW] (マージオンライト) を選択できます。

説明

この項目は、[データレイクテーブル形式] が Hudi または Paimon に設定されている場合にのみ設定できます。

テーブル作成実行エンジン

[Hive] または [Spark] を選択できます。データレイクテーブル形式を選択すると、デフォルトで Spark が選択されます。

  • Hive: Hive エンジンを使用してテーブルを作成します。テーブル作成構文は Hive 構文です。

  • Spark: Spark エンジンを使用してテーブルを作成します。テーブル作成構文は Spark 構文です。Hive データソースで Spark が有効になっている場合にのみ Spark を選択できます。

    説明

    [データレイクテーブル形式] が Paimon に設定されている場合、Spark テーブル作成実行エンジンのみがサポートされます。

テーブル名の変換

ターゲットテーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

[テーブル名変換の設定] をクリックして、[テーブル名変換ルールの設定] ダイアログボックスを開きます。

  • 文字列の置換: [新しいルール] をクリックしてルールを追加します。[置換するソーステーブル文字列][ターゲットテーブルの置換文字列] を設定します。最大 5 つのルールを追加できます。

  • テーブル名のプレフィックス/サフィックス: 空にすることはできません。文字、数字、アンダースコア (_) のみを含めることができます。長さは 32 文字を超えることはできません。

説明
  • テーブル名変換を設定すると、システムはルールに基づいて上から下に文字列を自動的に照合して置換します。

  • 置換文字列とテーブル名のプレフィックスおよびサフィックスの文字は、自動的に小文字に変換されます。

パーティション形式

[単一パーティション][複数パーティション]、または [固定パーティション] を選択できます。

説明

形式が [単一パーティション] または [固定パーティション] に設定されている場合、デフォルトのパーティションフィールド名は ds であり、変更できません。

パーティション間隔

デフォルト値は [時間] です。[日] を選択することもできます。パーティション間隔の横にある image アイコンをクリックして、パーティション設定の詳細を表示します。

  • 単一パーティション:

    • 時間: ds (yyyyMMddhh) という名前のハッシュパーティションを表示します。

    • : ds (yyMMdd) という名前のハッシュパーティションを表示します。

  • 複数パーティション:

    • 時間: yyyy、mm、dd、hh の 4 つのレベルのパーティションを表示します。

    • : yyyy、mm、dd の 3 つのレベルのパーティションを表示します。

説明

この設定項目は、[パーティション形式] が [単一パーティション] または [複数パーティション] に設定されている場合にのみサポートされます。

パーティション値

固定パーティション値を入力します。例: 20250101。

説明

この設定項目は、[パーティション形式] が [固定パーティション] に設定されている場合にのみサポートされます。

マッピング設定

説明

マッピング設定は、ターゲットデータソースタイプが DataHub の場合、またはターゲットデータソースが Kafka でターゲット Topic が単一 Topic の場合はサポートされていません。

ターゲットデータソースが Kafka ではない場合

image

エリア

説明

追加フィールドの表示

リアルタイム増分同期中、データ利用を容易にするためにテーブル作成時にデフォルトで追加フィールドが自動的に追加されます。[追加フィールドの表示] をクリックしてフィールドを表示します。[追加フィールド] ダイアログボックスで、現在追加されているフィールドに関する情報を表示できます。

重要
  • 既存のテーブルを結果テーブルとして選択し、そのテーブルに追加フィールドがない場合は、既存の結果テーブルに追加フィールドを追加してください。そうしないと、データの使用に影響します。

  • データレイクテーブル形式を選択した後、追加フィールドは含まれません。

[フィールド追加用 DDL の表示] をクリックして、追加フィールドを追加するための DDL 文を表示します。

説明
  • ソースデータソースタイプが Kafka の場合、追加フィールドの表示はサポートされていません。

  • 結果テーブルがプライマリキーテーブルの場合、追加フィールドを追加する必要はありません。結果テーブルがプライマリキーテーブルでない場合は、追加フィールドを追加する必要があります。

検索およびフィルター領域

[ソーステーブル][結果テーブル名] で検索できます。結果テーブルをすばやくフィルターするには、上部の 1 アイコンをクリックします。[マッピングステータス][作成方法] でフィルターできます。

グローバルフィールドの追加マッピングの更新

  • グローバルフィールドの追加

    [グローバルフィールドの追加] をクリックして、[グローバルフィールドの追加] ダイアログボックスでグローバルフィールドを追加します。

    • 名前: グローバルフィールドの名前。

    • タイプ: サポートされているデータの型は、[文字列][Long][Double][Date]、および [Boolean] です。

    • : グローバルフィールドの値。

    • 説明: フィールドの説明。

    説明
    • フィールドがグローバルと単一テーブルの両方に追加された場合、単一テーブルに追加されたフィールドのみが有効になります。

    • 現在、定数のみを追加できます。

    • グローバルフィールドは、[テーブルの自動作成] メソッドを使用して作成された結果テーブルに対してのみ有効です。

    • ソースデータソースタイプが Kafka の場合、グローバルフィールドの追加はサポートされていません。

  • マッピングの更新

    結果テーブル設定リストを更新するには、[マッピングの更新] をクリックします。

    重要
    • 結果テーブル設定にすでにコンテンツがある場合、データソースタイプとデータソースを再選択すると、結果テーブルリストとマッピングステータスがリセットされます。注意して進めてください。

    • 更新プロセス中にいつでも再度クリックして更新できます。[マッピングの更新] をクリックするたびに、設定されたグローバルフィールドのみが保存されます。結果テーブルの作成方法、結果テーブル名、削除レコードなどの他の情報は保存されません。

    • ソースデータソースタイプが Kafka の場合、[マッピングの更新] をクリックすると、[同期ルール設定] のテーブルリストに従ってテーブルがマッピングされます。テーブルが存在しない場合はエラーが報告されます。

ターゲットデータベースリスト

ターゲットデータベースリストには、[シリアル番号][ソーステーブル][マッピングステータス][結果テーブルの作成方法]、および [結果テーブル名] が含まれます。また、結果テーブルの [フィールドの追加][フィールドの表示][更新]、または [削除] もできます。

  • マッピングステータス:

    • 完了: マッピングは正常に完了しました。

    • 未完了: ステータス変更後にマッピングが更新されていません。

    • マッピング中: マッピングを待機中またはマッピング処理中です。

    • 異常: データソースまたは内部システムエラーが存在します。

    • 失敗: ターゲットパーティションテーブルがリアルタイムタスクに設定されたパーティションと一致しません。

    • 警告: ソーステーブルと結果テーブルに互換性のないデータの型がある可能性があります。

  • [結果テーブルの作成方法] には 3 つのオプションがあります:

    • ターゲットデータベースにソーステーブルと同じ名前のテーブルが存在する場合、作成方法は [既存のテーブルを使用] です。このテーブルはデフォルトで結果テーブルとして使用されます。[テーブルの自動作成] に変更するには、テーブル名変換ルールまたはプレフィックス/サフィックスを追加してから再マッピングする必要があります。

    • ターゲットデータベースに同じ名前のテーブルが見つからない場合、作成方法はデフォルトで [テーブルの自動作成] になります。この方法を [既存のテーブルを使用] に変更し、同期用に既存のテーブルを選択することもできます。

    • 自動的に作成されるテーブルに対してのみ、フィールドを追加したり、カスタム DDL を使用してテーブルを作成したりできます。グローバルフィールドも、自動的に作成されたテーブルに対してのみ有効です。

    説明
    • ターゲットデータソースタイプが Hive の場合:

      • [テーブルの自動作成] を使用し、データレイクテーブル形式が [なし] の場合、標準 Hive テーブルが作成されます。それ以外の場合は、選択した形式のテーブルが作成されます。現在、Hudi と Iceberg がサポートされています。

      • [カスタムテーブル作成] を使用し、データレイクテーブル形式が [なし] の場合、標準 Hive テーブルの DDL が使用されます。それ以外の場合は、選択したテーブル形式の DDL を使用する必要があります。現在、Hudi と Iceberg がサポートされています。

    • ソースデータソースタイプが Kafka の場合、サポートされている唯一の結果テーブル作成方法は [既存のテーブルを使用] です。

    • ターゲットデータソースタイプが SelectDB の場合、自動テーブル作成中にソーステーブルにプライマリキーがない場合、Duplicate テーブルが作成されます。ソーステーブルにプライマリキーがある場合、Unique テーブルが作成されます。

    • パーティション形式が [単一パーティション] または [固定パーティション] で、結果テーブルの作成方法が [既存のテーブルを使用] の場合、システムは結果テーブルのパーティションがパーティション設定を満たしているかどうかを自動的にチェックします。満たしていない場合はエラーが報告されます。

  • 結果テーブル名: 結果テーブル名には、文字、数字、アンダースコア (_) のみを含めることができます。ソーステーブル名に他の文字が含まれている場合は、テーブル名変換ルールを設定する必要があります。

    ターゲットデータソースタイプが MaxCompute で、結果テーブルの作成方法が [テーブルの自動作成] で、新しいテーブルタイプが [Delta テーブル] の場合、結果テーブル名の後に image アイコンが表示され、新しい Delta テーブルが作成されることを示します。結果テーブルの作成方法が [既存のテーブルを使用] の場合、ユーザーが結果テーブルリストで Delta テーブルを選択すると、結果テーブル名の後にも image アイコンが表示され、そのテーブルが Delta テーブルであることを示します。

  • 操作:

    • カスタムテーブル作成: [フィールドの追加] または [DDL] を使用してテーブルを作成できます。カスタムテーブル作成を有効にすると、グローバルフィールドは無効になります。

      説明
      • フィールドが追加された後、それは [テーブルの自動作成] の操作列にのみ表示されます。

      • [既存のテーブルを使用] メソッドを使用して作成された結果テーブルである既存の結果テーブルは変更できません。

    • フィールドの表示: ソーステーブルと結果テーブルのフィールドとタイプを表示します。

    • 更新: ソーステーブルと結果テーブルを再マッピングします。

    • 削除: ソーステーブルの削除は元に戻せません。

一括操作

結果テーブルに対して一括 [削除] 操作を実行できます。

ターゲットデータソースが Kafka (ターゲット Topic が複数 Topic) の場合

image

エリア

説明

検索およびフィルター領域

[ソーステーブル][ターゲット Topic 名] で検索できます。結果テーブルをすばやくフィルターするには、上部の 1 アイコンをクリックします。[マッピングステータス][ターゲット Topic の作成方法] でフィルターできます。

マッピングの更新

結果テーブル設定リストを更新するには、[マッピングの更新] をクリックします。

重要

ターゲット Topic 設定にすでにコンテンツがある場合、データソースタイプとデータソースを再選択すると、ターゲット Topic リストとマッピングステータスがリセットされます。注意して進めてください。

リスト

リストには、[シリアル番号][ソーステーブル][マッピングステータス][ターゲット Topic の作成方法]、および [ターゲット Topic 名] が含まれます。また、結果テーブルを削除することもできます。

  • ターゲット Topic の作成方法: ターゲット Topic がすでに存在する場合、作成方法は [既存の Topic を使用] です。ターゲット Topic が存在しない場合、作成方法は [Topic の自動作成] です。

    [Topic の自動作成] を使用すると、システムは生成されたターゲット Topic 名と Topic パラメーターに基づいて Topic を作成します。

  • マッピングステータス: ターゲット Topic が存在するかどうかのみをチェックします。

  • 削除: 対応する行を削除します。この操作は元に戻せません。

一括操作

結果テーブルに対して一括 [削除] 操作を実行できます。

DDL メッセージ処理ポリシー

説明
  • DDL メッセージ処理ポリシーは、ソースデータソースタイプが DataHub または Kafka の場合はサポートされていません。

  • DDL メッセージ処理ポリシーは、ターゲットデータソースタイプが PostgreSQL または Hive (Hudi テーブルタイプ) の場合はサポートされていません。

  • ターゲットデータソースタイプが Hive (Hudi テーブルタイプ) で、データレイクテーブル形式が Hudi の場合、[無視] ポリシーのみがサポートされます。

  • ソースデータソースタイプが Kafka の場合、[無視] ポリシーのみがサポートされます。

  • Hive または MaxCompute テーブルの既存のパーティションに追加された新しい列のデータは同期できません。既存のパーティション内のこれらの新しい列のデータは NULL になります。後続の新しいパーティションでは、データ同期は正しく機能します。

  • [テーブルの作成][列の追加]など: DDL 操作は正常に処理されます。これらの操作には、テーブルの作成、列の追加、列の削除、列の名前変更、列のタイプの変更が含まれます。この DDL 情報は、処理のためにターゲットデータソースに送信されます。処理ポリシーは、ターゲットデータソースによって異なります。

  • [無視]: DDL メッセージを破棄し、ターゲットデータソースに送信しません。

  • [エラー]: リアルタイム同期タスクを直ちに停止し、エラーステータスにします。

ステップ 3: リアルタイム統合タスクのプロパティを設定する

  1. 現在のリアルタイム統合タスクタブのトップメニューバーにある [リソース設定] をクリックするか、右側のサイドバーにある [プロパティ] をクリックして [プロパティ] パネルを開きます。

  2. 現在のリアルタイム統合タスクの [基本情報][リソース設定] を設定します。

    • 基本情報: 現在のリアルタイム統合タスクの [開発オーナー][運用オーナー] を選択し、タスクの [説明] を入力します。説明は最大 1,000 文字です。

    • リソース設定: 詳細については、「リアルタイム統合リソース設定」をご参照ください。

ステップ 4: リアルタイム統合タスクの送信

  1. 現在のリアルタイム統合タスクを送信するには、[送信] をクリックします。

  2. [送信] ダイアログボックスで、[送信備考] を入力し、[OK して送信] をクリックします。

  3. 送信が完了すると、[送信] ダイアログボックスで送信の詳細を表示できます。

    プロジェクトが Dev-Prod モードの場合、リアルタイム統合タスクを本番環境に公開する必要があります。詳細については、「リリースタスクの管理」。

次のステップ

オペレーションセンターでリアルタイム統合タスクを表示および管理して、期待どおりに実行されることを確認できます。詳細については、「リアルタイムタスクの表示と管理」。