ApsaraDB for OceanBase データソースを使用すると、ApsaraDB for OceanBase との間でデータの読み書きができます。このデータソースを使用して、DataWorks でデータ同期タスクを設定します。このトピックでは、ApsaraDB for OceanBase とのデータ同期機能について説明します。
サポート対象バージョン
ApsaraDB for OceanBase Reader と ApsaraDB for OceanBase Writer は、バッチ読み取りおよび書き込み操作において、以下の OceanBase バージョンをサポートしています。
OceanBase 2.x
OceanBase 3.x
OceanBase 4.x
制限事項
バッチ読み取り
ApsaraDB for OceanBase は Oracle および MySQL テナントモードをサポートしています。データフィルタリングのために where 句を設定する場合、または column パラメーターで関数列を設定する場合、その構文が対応するテナントモードの SQL 制約に準拠していることを確認してください。準拠していない場合、SQL ステートメントが失敗する可能性があります。
ビューからデータを読み取ることができます。
バッチ読み取り中は、同期中のデータを変更しないでください。これにより、データの重複や損失などのデータ品質の問題を防ぐことができます。
データソースで Read by Partition を設定する場合、データソースへのアクセスに使用するアカウントには system 権限が必要です。
バッチ書き込み
同期タスクには、少なくとも insert into... 権限が必要です。preSql および postSql パラメーターで指定するステートメントによっては、他の権限が必要になる場合があります。
データの書き込みには batch メソッドの使用を推奨します。このメソッドは、蓄積された行数が事前定義されたしきい値に達した場合にのみ書き込みリクエストを送信します。
ApsaraDB for OceanBase は Oracle および MySQL テナントモードをサポートしています。preSql および postSql パラメーターを設定する際は、その構文が対応するテナントモードの SQL 制約に準拠していることを確認してください。準拠していない場合、SQL ステートメントが失敗する可能性があります。
リアルタイム読み取り
この機能は、OceanBase の MySQL テナントモードのみをサポートしています。
リアルタイムデータを同期するには、binlog 機能を有効にする必要があります。詳細については、「Binlog 関連の操作 (Alibaba Cloud インスタンス)」、「」、または「Binlog 関連の操作 (OB Cloud インスタンス)」をご参照ください。
データベース全体のリアルタイム同期タスクは、接続文字列モードのデータソースをサポートしていません。
データベース全体のリアルタイム同期タスクでは、データベースのバージョンが V3.0 以降である必要があります。
ApsaraDB for OceanBase は、複数の物理的に分散したデータベースからのデータを単一の論理データベースに統合できる分散リレーショナルデータベースです。ただし、ApsaraDB for OceanBase から AnalyticDB for MySQL へリアルタイムでデータを同期する場合、単一の物理データベースからのみデータを同期できます。論理データベースからのデータ同期はサポートされていません。
前提条件
DataWorks を使用してデータを同期する前に、ご利用の ApsaraDB for OceanBase 環境を準備してください。これにより、データ同期タスクを正しく設定し、実行することができます。
ホワイトリストの設定
ご利用の サーバーレスリソースグループまたはデータ統合専用リソースグループが配置されている VPC の CIDR ブロックを、ご利用の OceanBase インスタンスのホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。
アカウントの作成と権限の付与
OceanBase データベースで必要な操作権限を持つデータベースアカウントを作成します。詳細については、「アカウントの作成」をご参照ください。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際、各パラメーターの意味については、DataWorks コンソールのパラメーター説明をご参照ください。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのバッチ同期
サポートされるデータソース:データ統合がサポートするすべてのデータソースタイプ。
設定ガイド:「バッチ同期タスクの設定」
単一テーブルのリアルタイム同期
サポートされるデータソース:Kafka
設定ガイド:「データベース全体のリアルタイム同期タスクの設定」
データベース全体のリアルタイム同期
サポートされるデータソース:MySQL
設定ガイド:「データベース全体のリアルタイム同期タスクの設定」
付録:スクリプトとパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトの例
{
"type": "job",
"steps": [
{
"stepType": "apsaradb_for_OceanBase", // プラグイン名。
"parameter": {
"datasource": "", // データソース名。
"where": "",
"column": [ // 列。
"id",
"name"
],
"splitPk": ""
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0" // 許容されるダーティデータのレコード数。
},
"speed": {
"throttle": true, // false に設定すると、mbps パラメーターは有効にならず、速度制限は適用されません。true に設定すると、速度制限が有効になります。
"concurrent": 1, // 同時実行数。
"mbps":"12"// 速度制限のレート。1 mbps = 1 MB/s。
}
}
}Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | DataWorks での ApsaraDB for OceanBase データソース名。 接続の設定には、jdbcUrl パラメーターまたは username パラメーターを使用します。 | はい | なし |
jdbcUrl | ターゲットデータベースの JDBC 接続文字列。JSON 配列でデータベースの複数のエンドポイントを指定できます。 複数のアドレスが設定されている場合、ApsaraDB for OceanBase Reader は有効なアドレスが見つかるまで IP アドレスを順番にプローブします。 すべての接続が失敗した場合、ApsaraDB for OceanBase Reader はエラーを報告します。 説明 jdbcUrl パラメーターは connection 設定ブロックに含める必要があります。 jdbcUrl のフォーマットは、公式の ApsaraDB for OceanBase の仕様に準拠する必要があり、接続制御情報を含めることができます。例: | いいえ | なし |
username | データソースのユーザー名。 | いいえ | なし |
password | 指定されたユーザー名のパスワード。 | いいえ | なし |
table | データを同期するテーブル。JSON 配列で複数のテーブルを指定できます。 複数のテーブルを設定する場合、それらのスキーマが同一であることを確認してください。ApsaraDB for OceanBase Reader は、テーブル間のスキーマの一貫性を検証しません。 説明 table パラメーターは connection 設定ブロックに含める必要があります。 | はい | なし |
column | 同期する列のセット。JSON 配列で列を指定します。デフォルトでは、すべての列が使用されます。例:
| はい | なし |
splitPk | ApsaraDB for OceanBase Reader がデータを抽出する際に splitPk パラメーターを指定すると、システムは splitPk で表される列を使用してデータシャーディングを実行します。このプロセスは、データ同期のための同時実行タスクを開始し、その効率を向上させます。
| いいえ | 空 |
where | ApsaraDB for OceanBase Reader は、指定された column、table、および where 条件に基づいて SQL クエリを構築し、データを抽出します。 たとえば、テスト中に where 条件を limit 10 に設定できます。一般的なビジネスシナリオでは、where 条件を
| いいえ | なし |
querySql | 一部のビジネスシナリオでは、where パラメーターだけではフィルタリング条件を記述するのに不十分な場合があります。このパラメーターを使用して、カスタムのフィルタリング SQL ステートメントを定義できます。このパラメーターが設定されている場合、データ同期システムは table、column、および splitPk パラメーターを無視し、このパラメーターの内容を使用してデータをフィルタリングします。 querySql を設定すると、ApsaraDB for OceanBase Reader は table、column、where、および splitPk パラメーターを無視します。 | いいえ | なし |
fetchSize | データベースサーバーから各バッチでフェッチするデータレコードの数。この値は、データ統合とサーバー間のネットワーク対話の回数を決定し、データ抽出のパフォーマンスを大幅に向上させることができます。 説明 大きな fetchSize 値 (>2048) は、データ同期プロセスで Out of Memory (OOM) エラーを引き起こす可能性があります。 | いいえ | 1,024 |
Writer スクリプトの例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"apsaradb_for_OceanBase",// プラグイン名。
"parameter":{
"datasource": "データソース名",
"column": [// 列。
"id",
"name"
],
"table": "apsaradb_for_OceanBase_table",// テーブル名。
"preSql": [ // データ同期タスクの実行前に実行される SQL ステートメント。
"delete from @table where db_id = -1"
],
"postSql": [// データ同期タスクの実行後に実行される SQL ステートメント。
"update @table set db_modify_time = now() where db_id = 1"
],
"obWriteMode": "insert",
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるダーティデータのレコード数。
},
"speed":{
"throttle":true,// false に設定すると、mbps パラメーターは有効にならず、速度制限は適用されません。true に設定すると、速度制限が有効になります。
"concurrent":1, // 同時実行数。
"mbps":"12"// 速度制限のレート。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | DataWorks での ApsaraDB for OceanBase データソース名。 接続の設定には、jdbcUrl パラメーターまたは username パラメーターを使用します。 | いいえ | なし |
jdbcUrl | ターゲットデータベースの JDBC 接続文字列。jdbcUrl パラメーターは connection 設定ブロックに含める必要があります。
| はい | なし |
username | データソースのユーザー名。 | はい | なし |
password | 指定されたユーザー名のパスワード。 | はい | なし |
table | 送信先テーブルの名前。JSON 配列で名前を指定します。 説明 table パラメーターは connection 設定ブロックに含める必要があります。 | はい | なし |
column | データを書き込む送信先テーブルの列。列名はカンマで区切ります。例: 説明 column パラメーターは必須であり、空にすることはできません。 | はい | なし |
obWriteMode | 送信先テーブルの書き込みモード。このパラメーターはオプションです。
| いいえ | insert |
onClauseColumns | 説明 このパラメーターは Oracle テナントモードで使用され、 このパラメーターには、プライマリキー列または一意制約列を設定します。複数の列はカンマで区切ります。例: | いいえ | なし |
obUpdateColumns | 説明 このパラメーターは、 データ書き込みの競合が発生したときに更新する列。複数の列はカンマで区切ります。例: | いいえ | すべての列 |
preSql | 書き込み操作の前に実行する SQL ステートメント。SQL ステートメントがテーブルを操作する必要がある場合は、プレースホルダーとして | いいえ | なし |
postSql | 書き込み操作が完了した後に実行する SQL ステートメント。 | いいえ | なし |
batchSize | 各バッチでコミットするレコードの数。この値は、データ同期システムとサーバー間のネットワーク対話を大幅に削減し、全体のスループットを向上させることができます。 説明 batchSize の値が 2048 を超えると、データ同期プロセスで OOM エラーが発生する可能性があります。 | いいえ | 1,024 |