OceanBase データソースは、OceanBase からのデータの読み取りと OceanBase へのデータの書き込みをサポートします。OceanBase データソースを使用して、データ同期タスクを設定できます。このトピックでは、DataWorks が OceanBase に対して提供するデータ同期機能について説明します。
サポート対象バージョン
OceanBase Reader と OceanBase Writer は、オフラインデータの読み取りと書き込みにおいて、以下の OceanBase バージョンをサポートしています。
OceanBase 2.x
OceanBase 3.x
OceanBase 4.x
制限事項
オフライン読み取り
OceanBase は、Oracle および MySQL のテナントモードをサポートしています。[where] フィルター条件と [column] パラメーター内の関数カラムを構成する際は、対応するテナントモードの SQL 構文に準拠する必要があります。そうしないと、SQL 文の実行に失敗する可能性があります。
ビューからデータを読み取ることができます。
OceanBase からのオフライン読み取り中に同期中のデータを変更しないでください。変更すると、データ重複やデータ損失などのデータ品質の問題が発生する可能性があります。
データソースが パーティションによる読み取り で設定されている場合、データソースへのアクセスに使用するアカウントにはシステム権限が必要です。
オフライン書き込み
タスクに使用するアカウントには、少なくとも insert into... 文を実行する権限が必要です。preSql および postSql パラメーターで指定する文によっては、他の権限が必要になる場合があります。
データはバッチモードで書き込まれます。行数が指定されたしきい値に達した後に書き込みリクエストが開始されます。
OceanBase は Oracle と MySQL のテナントモードをサポートしています。preSql および postSql パラメーターで指定する文は、対応するテナントモードの SQL 構文に準拠する必要があります。準拠していない場合、SQL 文の実行に失敗する可能性があります。
リアルタイム読み取り
OceanBase は、物理的に分散した複数のデータベースのデータを単一の論理データベースに統合する分散リレーショナルデータベースです。ただし、OceanBase から AnalyticDB for MySQL へリアルタイムでデータを同期する場合、単一の物理データベースからのみデータを同期できます。論理データベースからデータを同期することはできません。
接続文字列を使用して追加されたデータソースは、データベース全体を同期するリアルタイムタスクではサポートされていません。
リアルタイム同期タスクの場合、データベースのバージョンは 3.0 以降である必要があります。
データ同期の事前準備
DataWorks でデータを同期する前に、OceanBase 環境を準備する必要があります。これにより、OceanBase データ同期タスクが期待どおりに設定および実行されることが保証されます。以下のセクションでは、必要な準備について説明します。
ホワイトリストの設定
ご利用の OceanBase インスタンスのホワイトリストに、サーバーレスリソースグループまたはデータ統合専用リソースグループの VPC CIDR ブロックを追加します。詳細については、「ホワイトリストの追加」をご参照ください。
アカウントの作成と権限付与
データベースログインアカウントを作成します。このアカウントには、OceanBase に対する必要な操作権限が必要です。詳細については、「アカウントの作成」をご参照ください。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターのヒントを表示して、各パラメーターの意味を理解できます。
データ同期タスクの開発
同期タスクを設定するためのエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのバッチ同期タスクの設定
手順の詳細については、「コードレス UI でのタスク設定」および「コードエディタでのタスク設定」をご参照ください。
コードエディタのすべてのパラメーターとスクリプトデモについては、「付録:スクリプトデモとパラメーターの説明」をご参照ください。
データベース全体のリアルタイム同期タスクの設定
手順の詳細については、「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。
単一テーブルまたはデータベース全体の完全および増分 (リアルタイム) 読み取り同期タスクの設定
手順の詳細については、「データベース全体のリアルタイム同期タスクの設定」をご参照ください。
付録:スクリプトデモとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプトフォーマットの要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトデモ
{
"type": "job",
"steps": [
{
"stepType": "apsaradb_for_OceanBase", // プラグイン名
"parameter": {
"datasource": "", // データソース名
"where": "",
"column": [ // フィールド
"id",
"name"
],
"splitPk": ""
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0" // エラーレコード数
},
"speed": {
"throttle": true, // スロットリングを有効にするかどうかを指定します。true に設定するとスロットリングが有効になり、false に設定するとスロットリングが無効になり、mbps パラメーターは効果がありません。
"concurrent": 1, // 同時実行タスク数
"mbps":"12"// スロットリングレート。1 mbps = 1 MB/s。
}
}
}Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | 追加した ApsaraDB For OceanBase データソースの名前です。ご利用の DataWorks バージョンが ApsaraDB For OceanBase データソースの追加をサポートしている場合、このパラメーターを使用できます。 jdbcUrl パラメーターまたは username パラメーターを使用して接続を設定できます。 | はい | なし |
jdbcUrl | ピアデータベースの JDBC 接続情報です。JSON 配列を使用して、データベースの複数の接続アドレスを指定します。 複数のアドレスを設定した場合、ApsaraDB For OceanBase Reader は有効な IP アドレスが見つかるまで、IP アドレスの接続性を順番にプローブします。 すべての接続に失敗した場合、ApsaraDB For OceanBase Reader はエラーを報告します。 説明 jdbcUrl パラメーターは connection 設定ユニットに含める必要があります。 jdbcUrl のフォーマットは、ApsaraDB For OceanBase の公式仕様に準拠する必要があります。接続の添付ファイル制御情報を指定することもできます。例: | いいえ | なし |
username | データソースのユーザー名です。 | いいえ | なし |
password | 指定されたユーザー名のパスワードです。 | いいえ | なし |
table | 同期するテーブルです。JSON 配列を使用して複数のテーブルを指定します。 複数のテーブルを設定する場合は、テーブルが同じスキーマ構造を持っていることを確認してください。ApsaraDB For OceanBase Reader は、テーブルが統一された論理構造を持っているかどうかをチェックしません。 説明 table パラメーターは connection 設定ユニットに含める必要があります。 | はい | なし |
column | 設定されたテーブルから同期する列のセットです。JSON 配列を使用してフィールドを記述します。デフォルトでは、すべての列が使用されます。例:[ * ]。
| はい | なし |
splitPk | ApsaraDB for OceanBase Reader がデータを抽出する際に splitPk パラメーターを指定すると、splitPk で指定されたフィールドがデータシャーディングに使用されます。その後、Data Integration は並列スレッドを実行してデータを読み取ります。これにより、データをより効率的に同期できます。
| いいえ | 空 |
where | ApsaraDB for OceanBase Reader は、指定された column、table、および where 条件に基づいて SQL 文を連結し、その SQL 文に基づいてデータを抽出します。 たとえば、テスト目的で where 条件を limit 10 に設定できます。実際のビジネスシナリオでは、通常、当日のデータを同期し、where 条件を
| いいえ | なし |
querySql | 一部のビジネスシナリオでは、where パラメーターだけではフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルター SQL 文をカスタマイズできます。このパラメーターを設定すると、データ同期システムは tables、columns、および splitPk パラメーターを無視し、このパラメーターの内容を使用してデータをフィルター処理します。 querySql を設定すると、ApsaraDB For OceanBase Reader は table、column、where、および splitPk パラメーターを無視します。 | いいえ | なし |
fetchSize | このパラメーターは、各バッチで取得するデータレコードの数を指定します。この値は、Data Integration とサーバー間のネットワーク対話の数を決定し、データ抽出パフォーマンスを大幅に向上させることができます。 説明 2048 を超える fetchSize 値は、データ同期プロセスで Out-of-Memory (OOM) エラーを引き起こす可能性があります。 | いいえ | 1,024 |
Writer スクリプトデモ
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"apsaradb_for_OceanBase",// プラグイン名。
"parameter":{
"datasource": "データソース名",
"column": [// フィールド。
"id",
"name"
],
"table": "apsaradb_for_OceanBase_table",// テーブル名。
"preSql": [ // データ同期タスクの実行前に実行する SQL 文。
"delete from @table where db_id = -1"
],
"postSql": [// データ同期タスクの実行後に実行する SQL 文。
"update @table set db_modify_time = now() where db_id = 1"
],
"obWriteMode": "insert",
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコード数。
},
"speed":{
"throttle":true,// スロットリングを有効にするかどうかを指定します。true に設定するとスロットリングが有効になり、false に設定するとスロットリングが無効になり、mbps パラメーターは効果がありません。
"concurrent":1, // 同時実行タスク数。
"mbps":"12"// スロットリングレート。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | 追加した ApsaraDB For OceanBase データソースの名前です。ご利用の DataWorks バージョンが ApsaraDB For OceanBase データソースの追加をサポートしている場合、このパラメーターを使用できます。 jdbcUrl パラメーターまたは username パラメーターを使用して接続を設定できます。 | いいえ | なし |
jdbcUrl | ピアデータベースの JDBC 接続情報です。jdbcUrl パラメーターは connection 設定ユニットに含まれています。
| はい | なし |
username | データソースのユーザー名です。 | はい | なし |
password | 指定されたユーザー名のパスワードです。 | はい | なし |
table | データを書き込むテーブルの名前です。JSON 配列を使用してテーブル名を指定します。 説明 table パラメーターは connection 設定ユニットに含める必要があります。 | はい | なし |
column | データを書き込む宛先テーブルのフィールドです。フィールドはカンマ (,) で区切ります。例: 説明 column パラメーターを指定する必要があります。空にすることはできません。 | はい | なし |
obWriteMode | 宛先テーブルにデータを書き込むために使用されるモードを制御します。このパラメーターはオプションです。
| いいえ | insert |
onClauseColumns | 説明 このパラメーターは Oracle テナントモードで使用されます。 このパラメーターをプライマリキーフィールドまたは一意制約フィールドに設定します。複数のフィールドを指定するには、カンマ (,) で区切ります。例: | いいえ | なし |
obUpdateColumns | 説明 このパラメーターは、 データ書き込みの競合が発生したときに更新するフィールドです。複数のフィールドを指定するには、カンマ (,) で区切ります。例: | いいえ | すべてのフィールド |
preSql | 宛先テーブルにデータが書き込まれる前に実行する標準の文です。SQL 文がテーブルを操作する必要がある場合は、 | いいえ | なし |
postSql | 宛先テーブルにデータが書き込まれた後に実行する標準の文です。 | いいえ | なし |
batchSize | バッチコミット内のレコード数です。この値は、データ同期システムとサーバー間のネットワーク対話の数を大幅に削減し、全体的なスループットを向上させることができます。 説明 2048 を超える大きな fetchSize 値は、データ同期中に Out-of-Memory (OOM) エラーを引き起こす可能性があります。 | いいえ | 1,024 |