ApsaraDB for OceanBase データソースを使用すると、ApsaraDB for OceanBase からデータを読み取り、および ApsaraDB for OceanBase へデータを書き込むことができます。このデータソースを活用して、データ同期タスクの構成および実行が可能です。本トピックでは、DataWorks が ApsaraDB for OceanBase に対して提供するデータ同期機能について説明します。
サポートされるバージョン
ApsaraDB for OceanBase Reader および ApsaraDB for OceanBase Writer プラグインは、以下の ApsaraDB for OceanBase バージョンにおけるバッチデータの読み書き操作をサポートしています。
OceanBase 2.x
OceanBase 3.x
OceanBase 4.x
制限事項
バッチ読み取り
ApsaraDB for OceanBase には Oracle テナントモードと MySQL テナントモードの 2 種類があります。パラメーター where 句または column パラメーター内の関数列を構成する際は、構文がご利用のインスタンスのテナントモードと互換性を持つ必要があります。互換性がない場合、SQL 文の実行に失敗する可能性があります。
ビューからデータを読み取ることができます。
データ重複やデータ損失などのデータ品質問題を防止するため、バッチ同期タスクの実行中にデータを変更しないでください。
データソースを パーティション単位でデータを読み取る ように構成する場合、データソースへのアクセスに使用するアカウントには system 権限が必要です。
バッチ書き込み
タスク実行に使用するアカウントには、少なくとも insert into... 権限が必要です。preSql および postSql パラメーターで指定する SQL 文に応じて、追加の権限が必要になる場合があります。
データ書き込みには バッチ メソッドの使用を推奨します。この方法では、指定された行数が蓄積された後にのみ書き込みリクエストが発行されます。
ApsaraDB for OceanBase には Oracle テナントモードと MySQL テナントモードの 2 種類があります。preSql および postSql パラメーターを構成する際は、構文がご利用のインスタンスのテナントモードと互換性を持つ必要があります。互換性がない場合、SQL 文の実行に失敗する可能性があります。
リアルタイム読み取り
ApsaraDB for OceanBase の MySQL テナントモードのみがサポートされています。
リアルタイムでデータを同期するには、Binlog 機能を有効化する必要があります。詳細については、「Binlog に関する操作 (Alibaba Cloud インスタンス)」および「」「Binlog に関する操作 (OceanBase Cloud インスタンス)」をご参照ください。
データベース全体のリアルタイム同期では、接続文字列を使用したデータソースはサポートされていません。
データベース全体のリアルタイム同期では、ApsaraDB for OceanBase インスタンスのバージョンが V3.0 以降である必要があります。
ApsaraDB for OceanBase は分散リレーショナルデータベースであり、物理的に分散配置された複数のデータベースのデータを 1 つの論理データベースに統合できます。ただし、ApsaraDB for OceanBase から AnalyticDB for MySQL へのリアルタイム同期では、単一の物理データベースからのみデータを同期できます。論理データベースからの同期はできません。
事前準備
DataWorks で ApsaraDB for OceanBase のデータ同期タスクを構成および実行する前に、ApsaraDB for OceanBase インスタンスをあらかじめ構成しておく必要があります。本セクションでは、必要な事前準備について説明します。
ホワイトリストの構成
ApsaraDB for OceanBase インスタンスのホワイトリストに、サーバーレスリソースグループ またはデータ統合専用リソースグループの CIDR ブロックを追加してください。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。
アカウントの作成と権限の付与
データ同期用のデータベースアカウントを作成します。このアカウントには、ApsaraDB for OceanBase インスタンスに対する必要な操作権限が必要です。詳細については、「アカウントの作成」をご参照ください。
データソースの追加
DataWorks で同期タスクを開発する前に、データソース管理 の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールで パラメーターの説明 を表示することで、各パラメーターの意味を確認できます。
データ同期タスクの構成
同期タスクの構成入口および手順については、以下の構成ガイドをご参照ください。
単一テーブルのバッチ同期
サポートされるデータソース:Data Integration がサポートするすべてのデータソースタイプ。
構成ガイド:「コードレス UI によるタスク構成」
単一テーブルのリアルタイム同期
サポートされるデータソース:Kafka
構成ガイド:「リアルタイム完全データベース同期タスクの構成」
データベース全体のリアルタイム同期
サポートされるデータソース:MySQL
構成ガイド:「リアルタイム完全データベース同期タスクの構成」
付録:スクリプト例およびパラメーター
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプトフォーマット要件に基づき、スクリプト内で関連パラメーターを構成する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを構成する際に、データソース向けに設定する必要があるパラメーターについて説明します。
Reader スクリプト例
{
"type": "job",
"steps": [
{
"stepType": "apsaradb_for_OceanBase", // プラグイン名。
"parameter": {
"datasource": "", // データソース名。
"where": "",
"column": [ // 列。
"id",
"name"
],
"splitPk": ""
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0" // 許容されるエラー レコード数。
},
"speed": {
"throttle": true, // 速度制限を有効化するかどうかを指定します。throttle を false に設定した場合、mbps パラメーターは無効になります。throttle を true に設定した場合、速度制限が有効になります。
"concurrent": 1, // 同時実行スレッド数。
"mbps":"12"// 最大転送速度。単位:MB/s。
}
}
}Reader スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト |
datasource | ApsaraDB for OceanBase データソースの名前。 接続は、このパラメーター、または jdbcUrl、username、および | はい | なし |
jdbcUrl | ソースデータベースの JDBC URL。JSON 配列で複数の接続 URL を指定できます。 複数の URL を指定した場合、ApsaraDB for OceanBase Reader プラグインは、有効な接続が確立されるまで順次接続を試行します。 すべての接続が失敗した場合、タスクは失敗します。 説明 jdbcUrl パラメーターは、connection パラメーター内にネストする必要があります。 jdbcUrl 接続文字列は、公式の ApsaraDB For OceanBase 仕様に従って指定する必要があります。例: | いいえ | なし |
username | データベースアカウントのユーザー名。 | いいえ | なし |
password | データベースアカウントのパスワード。 | いいえ | なし |
table | データを読み取るテーブル。JSON 配列で複数のテーブルを指定できます。 複数のテーブルを指定する場合、それらが同一のスキーマを持つことを保証する必要があります。ApsaraDB for OceanBase Reader プラグインは、テーブル間の論理整合性をチェックしません。 説明 table パラメーターは、connection パラメーター内にネストする必要があります。 | はい | なし |
column | 指定されたテーブルから読み取る列。JSON 配列で列を指定します。すべての列を読み取る場合は、このパラメーターを
| はい | なし |
splitPk | ApsaraDB for OceanBase Reader がデータを抽出する際に、splitPk パラメーターを指定すると、splitPk で表される列をデータシャーディングに使用することを意味します。これにより、データ同期が同時実行タスクで開始され、効率が向上します。
| いいえ | 空 |
where | SQL クエリに追加されるフィルター条件。ApsaraDB for OceanBase Reader プラグインは、指定された column、table、および where パラメーターに基づいて SQL クエリを構築します。 たとえば、テスト目的で where パラメーターを limit 10 に設定できます。増分同期を必要とするビジネスシナリオでは、where パラメーターを
| いいえ | なし |
querySql | 複雑なフィルター条件を記述する際に where パラメーターだけでは不十分な場合に、カスタム SQL 文を指定してデータをフィルターします。このパラメーターを構成した場合、プラグインは table、column、および splitPk パラメーターを無視します。 querySql を構成した場合、ApsaraDB For OceanBase Reader は table、column、where、および splitPk パラメーターを無視します。 | いいえ | なし |
fetchSize | データベースサーバーから 1 回のバッチでフェッチするレコード数。値を大きくすると、Data Integration とサーバー間のネットワーク通信回数が減少し、データ抽出のパフォーマンスが大幅に向上します。 説明 fetchSize の値が大きすぎると(>2048)、データ同期プロセスでメモリ不足(OOM)エラーが発生する可能性があります。 | いいえ | 1,024 |
Writer スクリプト例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"apsaradb_for_OceanBase",// プラグイン名。
"parameter":{
"datasource": "データソース名",
"column": [// 列。
"id",
"name"
],
"table": "apsaradb_for_OceanBase_table",// テーブル名。
"preSql": [ // 同期タスク実行前に実行する SQL 文。
"delete from @table where db_id = -1"
],
"postSql": [// 同期タスク実行後に実行する SQL 文。
"update @table set db_modify_time = now() where db_id = 1"
],
"obWriteMode": "insert",
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるエラー レコード数。
},
"speed":{
"throttle":true,// 速度制限を有効化するかどうかを指定します。throttle を false に設定した場合、mbps パラメーターは無効になります。throttle を true に設定した場合、速度制限が有効になります。
"concurrent":1, // 同時実行スレッド数。
"mbps":"12"// 最大転送速度。単位:MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト |
datasource | ApsaraDB for OceanBase データソースの名前。 接続は、このパラメーター、または jdbcUrl、username、および | いいえ | なし |
jdbcUrl | ターゲットデータベースの JDBC URL。jdbcUrl パラメーターは、connection パラメーター内にネストする必要があります。
| はい | なし |
username | データベースアカウントのユーザー名。 | はい | なし |
password | データベースアカウントのパスワード。 | はい | なし |
table | ターゲットテーブルの名前。1 つのテーブルのみを指定できます。 説明 table パラメーターは、connection パラメーター内にネストする必要があります。 | はい | なし |
column | データを書き込むターゲットテーブルの列。文字列の JSON 配列として列を指定します。例: 説明 column パラメーターは必ず構成してください。空欄にすることはできません。 | はい | なし |
obWriteMode | ターゲットテーブルへの書き込みモードを制御します。有効な値:
| いいえ | insert |
onClauseColumns | 説明 このパラメーターは Oracle テナントモード向けです。 プライマリキー列または一意な制約列。複数の列を指定する場合は、カンマで区切ります。例: | いいえ | なし |
obUpdateColumns | 説明
書き込み競合が発生した場合に更新する列。複数の列を指定する場合は、カンマで区切ります。例: | いいえ | すべての列 |
preSql | タスクがターゲットテーブルにデータを書き込む前に実行する SQL 文。SQL 文でテーブル名を参照する必要がある場合は、 | いいえ | なし |
postSql | タスクがターゲットテーブルにデータを書き込んだ後に実行する SQL 文。 | いいえ | なし |
batchSize | 1 回のバッチで送信するレコード数。値を大きくすると、データ同期システムとデータベースサーバー間のネットワーク通信回数が減少し、全体のスループットが大幅に向上します。 説明 batchSize の値が大きすぎると(>2048)、データ同期プロセスでメモリ不足(OOM)エラーが発生する可能性があります。 | いいえ | 1,024 |