リアルタイム統合タスクを作成することで、MySQL、Oracle、PostgreSQL から Hive にデータをリアルタイムで同期できます。このトピックでは、リアルタイム統合タスクを作成する方法について説明します。
前提条件
使用するデータソースが準備されていること。リアルタイム統合タスクを構成する前に、必要なデータソースを構成する必要があります。そうすることで、リアルタイム統合タスクを構成するときに、対応するソースデータとデスティネーションデータを選択できます。詳細については、「リアルタイム統合でサポートされているデータソース」をご参照ください。
ステップ 1:リアルタイム統合タスクを作成する
Dataphin ホームページの上部ナビゲーションバーで、[開発] > [Data Integration] をクリックします。
上部ナビゲーションバーで、プロジェクトを選択します(開発 - 本番モードでは、環境を選択します)。
左側のナビゲーションウィンドウで、[統合] > [ストリームパイプライン] を選択します。
リアルタイム統合リストの
アイコンをクリックし、[リアルタイム統合タスク] を選択して、[リアルタイム統合タスクの作成] ダイアログボックスを開きます。
[リアルタイム統合タスクの作成] ダイアログボックスで、次のパラメーターを構成します。
パラメーター
説明
タスク名
リアルタイムタスクの名前を入力します。
名前は英字で始まり、小文字、数字、アンダースコア(_)のみを含めることができます。名前は 4 ~ 63 文字の長さでなければなりません。
本番/開発環境キューリソース
リアルタイムタスク用に構成されているすべてのリソースグループを選択できます。
説明この設定項目は、プロジェクトで使用されるコンピューティングエンジンが Kubernetes モードでデプロイされた Flink コンピューティングエンジンの場合にのみサポートされます。
説明
タスクの簡単な説明を入力します。説明は 1,000 文字を超えることはできません。
ディレクトリの選択
リアルタイムタスクを保存するディレクトリを選択します。
ディレクトリが存在しない場合は、次の手順を実行してフォルダを作成できます。
ページ左側のリアルタイムタスクリストの上にある
アイコンをクリックして、[フォルダの作成] ダイアログボックスを開きます。
[フォルダの作成] ダイアログボックスで、フォルダの [名前] を入力し、必要に応じて [ディレクトリの選択] の場所を選択します。
[OK] をクリックします。
構成が完了したら、[OK] をクリックします。
新しく作成されたリアルタイム統合タスクで、[ソースデータ] と [デスティネーションデータ] を構成します。
デスティネーションデータソースのタイプとして Hive を選択した場合、ソースデータソースのタイプは [MySQL]、[Oracle]、[PostgreSQL]、および [Kafka] のみをサポートします。ソースデータソースが異なると、構成が必要なパラメーターも異なります。
ソースデータの構成
ソースデータソースが MySQL、Oracle、または PostgreSQL の場合
セクション
パラメーター
説明
データソースの構成
データソースの種類
データソースの種類として、[MySQL]、[Oracle]、または [PostgreSQL] を選択します。
データソース
データソースを選択します。
システムは、データソースを作成するためのエントリを提供します。 [作成] をクリックして、[データソース] ページでデータソースを作成できます。 詳細については、「MySQL データソースを作成する」、「Oracle データソースを作成する」、および「PostgreSQL データソースを作成する」をご参照ください。
重要データソースでログ記録を有効にし、構成済みのアカウントにログを読み取る権限があることを確認する必要があります。 そうしないと、システムはデータソースからデータをリアルタイムで同期できません。
同期ルールの構成
同期モード
[リアルタイム増分] または [リアルタイム増分 + フル] を選択できます。 デフォルト値は [リアルタイム増分] です。
[リアルタイム増分]: ソースデータベースの増分変更は、発生順に収集され、ダウンストリームのターゲットデータベースに書き込まれます。
[リアルタイム増分 + フル]: ソースデータベースのすべてのデータが一度にインポートされ、その後、増分変更が順番に収集され、ダウンストリームのターゲットデータベースに書き込まれます。
説明ソースデータソースとして [MySQL] を選択し、ターゲットデータソースとして [Hive] を選択し、データレイクテーブル形式として [Hudi] を選択した場合、同期モードは [リアルタイム増分 + フル] をサポートします。
選択方法
データベース全体、テーブルの選択、テーブルの除外の 3 つの方法がサポートされています。
[データベース全体]: 現在のデータベース全体のデータを同期します。
[ソースデータソースの種類] として [MySQL] を選択すると、選択したデータソース下のすべてのデータベースのすべてのテーブルが同期されます。
[テーブルの選択]/[テーブルの除外]: 現在のデータベースの一部のテーブルのデータをリアルタイムで同期します。
[一括選択]/[一括除外]: [一括選択] を選択すると、現在のデータベースで選択した複数のテーブルのデータがリアルタイムで同期されます。 [一括除外] を選択すると、現在のデータベースで選択した複数のテーブルのデータはリアルタイムで同期されません。
ソースデータソースタイプとして MySQL を選択すると、選択したデータソース下のすべてのデータベースのすべてのテーブルを選択できます。テーブルはリストに
DBname.Tablename
形式で表示されます。[ソースデータソースの種類] として [PostgreSQL] を選択した場合、[テーブルの除外] 方法はサポートされていません。
[正規表現]:[正規表現] フィールドにテーブル名の正規表現を入力できます。Java 正規表現が適用可能です(例:
schemaA.*|schemaB.*
)。[ソースデータソースの種類] として [MySQL] を選択すると、正規表現を使用して、選択したデータソース下のすべてのデータベースのすべてのテーブルを照合できます。 データベース名 (DBname) とテーブル名 (Tablename) を正規表現の照合に使用できます。
[ソースデータソースの種類] として [PostgreSQL]、[Microsoft SQL Server]、または [IBM DB2] を選択した場合、正規表現の照合はサポートされていません。
ソースデータソースが Kafka の場合
セクション
パラメーター
説明
データソースの構成
データソースの種類
データソースの種類として [Kafka] を選択します。
データソース
データソースを選択します。
システムは、データソースを作成するためのエントリを提供します。 [作成] をクリックして、[データソース] ページでデータソースを作成できます。 詳細については、「Kafka データソースを作成する」をご参照ください。
重要データソースでログ記録を有効にし、構成済みのアカウントにログを読み取る権限があることを確認する必要があります。 そうしないと、システムはデータソースからデータをリアルタイムで同期できません。
ソース トピック
ソースデータの Topic を選択します。あいまい検索のために Topic 名のキーワードを入力できます。
データ形式
現在、[Canal JSON] 形式のみがサポートされています。 Canal JSON は Canal の互換形式であり、そのデータストレージ形式は Canal JSON です。
キータイプ
Kafka のキーのタイプ。KafkaConsumer を初期化するときの key.deserializer 構成を決定します。 現在、[STRING] のみがサポートされています。
値の型
Kafka の値のタイプ。KafkaConsumer を初期化するときの value.deserializer 構成を決定します。 現在、[STRING] のみがサポートされています。
コンシューマーグループ ID (オプション)
コンシューマーグループの ID を入力します。コンシューマーグループ ID は、ステータスオフセットを報告するために使用されます。
同期ルールの構成
テーブルリスト
同期するテーブルの名前を入力します。 複数のテーブル名は改行で区切ります。 テーブル名は合計で 1,024 文字を超えることはできません。
テーブル名は、次の 3 つの形式をサポートしています。
tablename
、db.tablename
、およびschema.tablename
。宛先データの構成
セクション
パラメーター
説明
データソース構成
データソースの種類
データソースの種類として [Hive] を選択します。
データソース
宛先データソースを選択します。
システムは、データソースを作成するためのエントリを提供します。 [作成] をクリックして、[データソース] ページでデータソースを作成できます。詳細については、「Hive データソースを作成する」をご参照ください。
新しいテーブルの構成
データレイクテーブル形式
[なし]、[Hudi]、または [Iceberg] を選択できます。
[なし]:標準の Hive テーブルを作成して書き込みます。
Hudi:Hudi 形式のテーブルを作成して書き込みます。このオプションは、Hive データソースのバージョンが CDP7.x Hive 3.1.3 の場合にのみサポートされます。
Iceberg:Iceberg 形式のテーブルを作成して書き込みます。このオプションは、Hive データソースのバージョンが EMR5.x Hive 3.1.x の場合にのみサポートされます。
説明この項目は、選択した [Hive データソース] で [データレイクテーブル形式の構成] が有効になっている場合にのみ構成できます。
[テーブル作成エンジン]
[Hive] または [Spark] を選択できます。データレイクテーブル形式を選択すると、デフォルトで Spark が選択されます。
Hive:テーブル作成に Hive エンジンを使用します。テーブル作成構文は Hive 構文です。
Spark:テーブル作成に Spark エンジンを使用します。テーブル作成構文は Spark 構文です。このオプションは、Hive データソースで Spark が有効になっている場合にのみサポートされます。
Hudi テーブルの種類
[MOR(Merge On Read)] または [COW(Copy On Write)] を選択できます。
テーブル名の変換
[テーブル名の変換の構成] をクリックして、[テーブル名変換ルールの構成] ダイアログボックスで変換ルールを構成します。
説明[ルールの作成] をクリックして行を追加します。最大 5 行まで表示できます。
ルールは上から下に照合され、置き換えられます。
置換文字とテーブル名のプレフィックスおよびサフィックスの英字は自動的に小文字に変換されます。
宛先テーブル名のプレフィックスとサフィックスは空にできず、最大 32 文字の英字、数字、およびアンダースコアを含めることができます。
パーティション設定
形式
現在、[マルチパーティション] のみがサポートされています。
パーティション間隔
デフォルト値は [時間] です。 [日] を選択することもできます。
[時間]:YYYY、MM、DD、HH の 4 つのパーティションレベルが表示されます。
[日]:YYYY、MM、DD の 3 つのパーティションレベルが表示されます。
宛先テーブルの構成
説明同期ルールが完了していない場合、宛先テーブルは空になり、[テーブルとマッピング関係のリフレッシュ] ボタンはグレー表示されます。
領域
説明
①追加フィールドの表示ボタン
リアルタイムの増分同期の間に、データ使用のために自動的に作成されたテーブルに追加フィールドが自動的に追加されます。 [追加フィールドの表示] をクリックして、フィールドを表示します。 [追加フィールド] ダイアログボックスで、現在の追加フィールドに関する情報を表示できます。
重要既存のテーブルを宛先テーブルとして選択し、そのテーブルに追加フィールドがない場合は、既存の宛先テーブルに追加フィールドを手動で追加することをお勧めします。 そうしないと、データ使用に影響が出ます。
データレイクテーブル形式を選択すると、追加フィールドは含まれなくなります。
[フィールド追加用の DDL の表示] をクリックすると、追加フィールドを追加するための DDL 文が表示されます。
説明ソースデータソースのタイプとして Kafka を選択した場合、追加フィールドを表示することはできません。
②検索とフィルター領域
[ソーステーブル] と [宛先テーブル名] で検索できます。 宛先テーブルをすばやくフィルタリングするには、上部の
アイコンをクリックします。 [マッピングステータス] と [テーブル作成方法] でフィルタリングできます。
③ [グローバルフィールドの追加] と [テーブルとマッピング関係のリフレッシュ]
グローバルフィールドの追加
[グローバルフィールドの追加] をクリックすると、[グローバルフィールドの追加] ダイアログボックスでグローバルフィールドを追加できます。
名前: グローバルフィールドの名前。
タイプ: [文字列]、[Long]、[Double]、[日付]、[ブール値] の 5 つのデータ型がサポートされています。
値: グローバルフィールドの値。
説明: フィールドの説明。
説明グローバルと単一テーブルの両方でフィールドが追加されている場合、単一テーブルに追加されたフィールドのみが有効になります。
現在、定数のみを追加できます。
グローバルフィールドは、作成方法が [テーブルの自動作成] である宛先テーブルに対してのみ有効です。
ソースデータソースのタイプとして Kafka を選択した場合、グローバルフィールドを追加することはできません。
テーブルとマッピング関係のリフレッシュ
宛先テーブル構成リストをリフレッシュするには、[テーブルとマッピング関係のリフレッシュ] をクリックします。
重要宛先テーブルの構成に既にコンテンツがある場合、データソースのタイプとデータソースを再選択すると、宛先テーブルリストとマッピング関係がリセットされます。 注意して進めてください。
リフレッシュプロセス中に再度クリックしてリフレッシュできます。 [テーブルとマッピング関係のリフレッシュ] をクリックするたびに、構成済みのグローバルフィールドのみが保存されます。 宛先テーブルの作成方法、宛先テーブル名、削除されたレコードなどのその他の情報は保存されません。
ソースデータソースのタイプとして [Kafka] を選択し、[テーブルとマッピング関係のリフレッシュ] をクリックすると、システムは [同期ルール - テーブル構成] のテーブルリストに従ってマッピングします。 テーブルが存在しない場合は、エラーが報告されます。
④ [宛先データベースリスト]
宛先データベースリストには、[シリアル番号]、[ソーステーブル]、[マッピングステータス]、[宛先テーブル作成方法]、[宛先テーブル名] が含まれます。 また、[フィールドの追加]、[フィールドの表示]、[リフレッシュ]、[削除] などの操作を宛先テーブルで実行することもできます。
[宛先テーブル作成方法] は、次の 3 つのタイプに分かれています。
ソーステーブルと同じ名前のテーブルが宛先データベースに存在する場合、宛先テーブルの作成方法は [既存のテーブルを使用] であり、このテーブルはデフォルトで宛先テーブルとして使用されます。 テーブルの自動作成に変更する場合は、テーブル名の変換ルールまたはプレフィックスとサフィックスを追加してから、再マッピングする必要があります。
同じ名前のテーブルが宛先データベースに見つからない場合、宛先テーブルの作成方法はデフォルトで [テーブルの自動作成] になります。 また、方法を [既存のテーブルを使用] に変更し、同期する既存のテーブルを選択することもできます。
自動的に作成されたテーブルのみが、フィールドの追加またはカスタム DDL テーブル作成をサポートします。 グローバルフィールドも、自動的に作成されたテーブルに対してのみ有効です。
説明テーブルの自動作成が使用され、データレイクテーブル形式がなしの場合、標準の Hive テーブルが自動的に作成されます。それ以外の場合、選択したテーブル形式のテーブルが作成されます。現在、Hudi と Iceberg がサポートされています。
カスタムテーブル作成が使用され、データレイクテーブル形式がなしの場合、標準の Hive テーブルの DDL が使用されます。それ以外の場合、選択したテーブル形式の DDL が使用されます。現在、Hudi と Iceberg がサポートされています。
ソースデータソースのタイプとして [Kafka] を選択した場合、宛先テーブルの作成方法は [既存のテーブルを使用] のみをサポートします。
[マッピングステータス]: マッピングステータスが異なると、表示される操作項目も異なります。 マッピングステータスは次のとおりです。
完了: マッピングは正常に完了しました。
未完了: ステータスは変更されましたが、マッピングはリフレッシュされていません。
マッピング中: マッピングを待機中またはマッピング処理中です。
例外: データソースエラーまたは内部システムエラーが存在します。
失敗: 宛先パーティションテーブルのパーティションが、リアルタイムタスクに設定されたパーティションセットと一致しません。
警告: ソーステーブルと宛先テーブルの間に互換性のないデータ型が存在する可能性があります。
フィールドの追加: [フィールドの追加] または [DDL] を使用して、テーブル作成をカスタマイズできます。 カスタムテーブル作成が有効になると、グローバルフィールドは有効ではなくなります。
説明フィールドが追加されると、テーブルの自動作成の操作列にのみ表示されます。
既存の宛先テーブル (つまり、作成方法が [既存のテーブルを使用] である宛先テーブル) の変更はサポートされていません。
フィールドの表示: ソーステーブルと宛先テーブルのフィールドとタイプを表示できます。
リフレッシュ: ソーステーブルと宛先テーブルを再マッピングします。
削除: ソーステーブルを削除した後、削除を元に戻すことはできません。
説明宛先テーブル名では、英字、数字、アンダースコアのみがサポートされています。 ソーステーブル名に他の文字が含まれている場合は、テーブル名の変換ルールを構成します。
⑤ バッチ操作
宛先テーブルでバッチ [削除] 操作を実行できます。
DDL 処理ポリシー
[テーブルの作成]、[列の追加など]:通常の処理(テーブルの作成、列の追加、列の削除、列の名前変更、列タイプの変更を含む)。DDL 情報は引き続きデスティネーションデータソースに送信されて処理されます。デスティネーションデータソースが異なると、処理ポリシーも異なります。
[無視]: DDL 情報を破棄し、宛先データソースに送信しません。
[エラー]: リアルタイム同期タスクをエラー状態で直接終了します。
説明Hive テーブルの既存のパーティションに追加された新しい列は、データと同期できません。つまり、既存のパーティションの新しい列のデータはすべて NULL です。次に新しく作成されたパーティションは正常に有効になります。
データソースタイプとして PostgreSQL を選択した場合、DDL 処理ポリシーはサポートされません。
データレイクテーブル形式として Hudi を選択した場合、すべての DDL 処理ポリシーは [無視] のみをサポートします。
ソースデータソースタイプとして Kafka を選択した場合、すべての DDL 処理ポリシーは [無視] のみをサポートします。
構成が完了したら、[保存] をクリックします。
ステップ 2: リアルタイム統合タスクのプロパティを設定する
現在のリアルタイム統合タスクタブの上部メニューバーにある [リソース構成] をクリックするか、右側のサイドバーにある [プロパティ] をクリックして、[プロパティ] パネルを開きます。
現在のリアルタイム統合タスクの [基本情報] と [リソース構成] を構成します。
基本情報: 現在のリアルタイム統合タスクの [開発オーナー] と [運用オーナー] を選択し、現在のタスクの [説明] を入力します。説明は 1,000 文字を超えることはできません。
リソース構成: 詳細については、「リアルタイム統合のリソース構成」をご参照ください。
ステップ 3: リアルタイム統合タスクを送信する
[送信] をクリックして、現在のリアルタイム統合タスクを送信します。
[送信] ダイアログボックスで、[送信の備考] を入力し、[OKして送信] をクリックします。
送信が完了すると、[送信] ダイアログボックスで送信の詳細を表示できます。
プロジェクトモードが Dev-Prod の場合、リアルタイム統合タスクを本番環境に公開する必要があります。詳細については、「公開タスクの管理」をご参照ください。
次の手順
オペレーションセンターでリアルタイム統合タスクを表示および維持して、タスクが適切に実行されていることを確認できます。詳細については、「リアルタイム タスクの表示と管理」をご参照ください。