すべてのプロダクト
Search
ドキュメントセンター

DataWorks:ApsaraDB for OceanBase

最終更新日:Apr 21, 2026

ApsaraDB for OceanBase データソースを使用すると、ApsaraDB for OceanBase との間でデータの読み書きができます。このデータソースを使用して、DataWorks でデータ同期タスクを設定します。このトピックでは、ApsaraDB for OceanBase とのデータ同期機能について説明します。

サポート対象バージョン

ApsaraDB for OceanBase Reader と ApsaraDB for OceanBase Writer は、バッチ読み取りおよび書き込み操作において、以下の OceanBase バージョンをサポートしています。

  • OceanBase 2.x

  • OceanBase 3.x

  • OceanBase 4.x

制限事項

バッチ読み取り

  • ApsaraDB for OceanBase は Oracle および MySQL テナントモードをサポートしています。データフィルタリングのために where 句を設定する場合、または column パラメーターで関数列を設定する場合、その構文が対応するテナントモードの SQL 制約に準拠していることを確認してください。準拠していない場合、SQL ステートメントが失敗する可能性があります。

  • ビューからデータを読み取ることができます。

  • バッチ読み取り中は、同期中のデータを変更しないでください。これにより、データの重複や損失などのデータ品質の問題を防ぐことができます。

  • データソースで Read by Partition を設定する場合、データソースへのアクセスに使用するアカウントには system 権限が必要です。

バッチ書き込み

説明

同期タスクには、少なくとも insert into... 権限が必要です。preSql および postSql パラメーターで指定するステートメントによっては、他の権限が必要になる場合があります。

  • データの書き込みには batch メソッドの使用を推奨します。このメソッドは、蓄積された行数が事前定義されたしきい値に達した場合にのみ書き込みリクエストを送信します。

  • ApsaraDB for OceanBase は Oracle および MySQL テナントモードをサポートしています。preSql および postSql パラメーターを設定する際は、その構文が対応するテナントモードの SQL 制約に準拠していることを確認してください。準拠していない場合、SQL ステートメントが失敗する可能性があります。

リアルタイム読み取り

  • この機能は、OceanBase の MySQL テナントモードのみをサポートしています。

  • リアルタイムデータを同期するには、binlog 機能を有効にする必要があります。詳細については、「Binlog 関連の操作 (Alibaba Cloud インスタンス)」、「」、または「Binlog 関連の操作 (OB Cloud インスタンス)」をご参照ください。

  • データベース全体のリアルタイム同期タスクは、接続文字列モードのデータソースをサポートしていません。

  • データベース全体のリアルタイム同期タスクでは、データベースのバージョンが V3.0 以降である必要があります。

  • ApsaraDB for OceanBase は、複数の物理的に分散したデータベースからのデータを単一の論理データベースに統合できる分散リレーショナルデータベースです。ただし、ApsaraDB for OceanBase から AnalyticDB for MySQL へリアルタイムでデータを同期する場合、単一の物理データベースからのみデータを同期できます。論理データベースからのデータ同期はサポートされていません。

前提条件

DataWorks を使用してデータを同期する前に、ご利用の ApsaraDB for OceanBase 環境を準備してください。これにより、データ同期タスクを正しく設定し、実行することができます。

ホワイトリストの設定

ご利用の サーバーレスリソースグループまたはデータ統合専用リソースグループが配置されている VPC の CIDR ブロックを、ご利用の OceanBase インスタンスのホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

アカウントの作成と権限の付与

OceanBase データベースで必要な操作権限を持つデータベースアカウントを作成します。詳細については、「アカウントの作成」をご参照ください。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際、各パラメーターの意味については、DataWorks コンソールのパラメーター説明をご参照ください

データ同期タスクの開発

同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのバッチ同期

  • サポートされるデータソース:データ統合がサポートするすべてのデータソースタイプ。

  • 設定ガイド:「バッチ同期タスクの設定

単一テーブルのリアルタイム同期

データベース全体のリアルタイム同期

付録:スクリプトとパラメーター

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトの例

{
    "type": "job",
    "steps": [
        {
            "stepType": "apsaradb_for_OceanBase", // プラグイン名。
            "parameter": {
                "datasource": "", // データソース名。
                "where": "",
                "column": [ // 列。
                    "id",
                    "name"
                ],
                "splitPk": ""
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0" // 許容されるダーティデータのレコード数。
        },
        "speed": {
            "throttle": true, // false に設定すると、mbps パラメーターは有効にならず、速度制限は適用されません。true に設定すると、速度制限が有効になります。
            "concurrent": 1, // 同時実行数。
            "mbps":"12"// 速度制限のレート。1 mbps = 1 MB/s。
        }
    }
}

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

DataWorks での ApsaraDB for OceanBase データソース名。

接続の設定には、jdbcUrl パラメーターまたは username パラメーターを使用します。

はい

なし

jdbcUrl

ターゲットデータベースの JDBC 接続文字列。JSON 配列でデータベースの複数のエンドポイントを指定できます。

複数のアドレスが設定されている場合、ApsaraDB for OceanBase Reader は有効なアドレスが見つかるまで IP アドレスを順番にプローブします。

すべての接続が失敗した場合、ApsaraDB for OceanBase Reader はエラーを報告します。

説明

jdbcUrl パラメーターは connection 設定ブロックに含める必要があります。

jdbcUrl のフォーマットは、公式の ApsaraDB for OceanBase の仕様に準拠する必要があり、接続制御情報を含めることができます。例:jdbc:oceanbase://127.0.0.1:3306/database。このパラメーターまたは username パラメーターのいずれかを指定する必要があります。

いいえ

なし

username

データソースのユーザー名。

いいえ

なし

password

指定されたユーザー名のパスワード。

いいえ

なし

table

データを同期するテーブル。JSON 配列で複数のテーブルを指定できます。

複数のテーブルを設定する場合、それらのスキーマが同一であることを確認してください。ApsaraDB for OceanBase Reader は、テーブル間のスキーマの一貫性を検証しません。

説明

table パラメーターは connection 設定ブロックに含める必要があります。

はい

なし

column

同期する列のセット。JSON 配列で列を指定します。デフォルトでは、すべての列が使用されます。例:["*"]

  • 列の射影:列のサブセットをエクスポートできます。

  • 列の並べ替え:テーブルスキーマとは異なる順序で列をエクスポートできます。

  • 定数の設定:定数を使用できます。例:'123'

  • 関数列:関数列を使用できます。例:date('now')

  • column パラメーターは、同期する列のセットを明示的に指定する必要があり、空にすることはできません。

はい

なし

splitPk

ApsaraDB for OceanBase Reader がデータを抽出する際に splitPk パラメーターを指定すると、システムは splitPk で表される列を使用してデータシャーディングを実行します。このプロセスは、データ同期のための同時実行タスクを開始し、その効率を向上させます。

  • splitPk にはテーブルのプライマリキーを使用することを推奨します。プライマリキーは通常、均等に分散されているため、シャード内のデータホットスポットを防ぐのに役立ちます。

  • 現在、splitPk は整数データ型のシャーディングのみをサポートしています。文字列、浮動小数点、日付などの他の型はサポートされていません。サポートされていない型を指定すると、ApsaraDB for OceanBase Reader はエラーを報告します。

  • splitPk を空のままにした場合、システムはテーブルのシャーディングが許可されていないとみなし、単一のチャネルを使用してデータを抽出します。

いいえ

where

ApsaraDB for OceanBase Reader は、指定された columntable、および where 条件に基づいて SQL クエリを構築し、データを抽出します。

たとえば、テスト中に where 条件を limit 10 に設定できます。一般的なビジネスシナリオでは、where 条件を gmt_create>$bizdate に設定して、当日のデータを同期する場合があります。

  • where 条件を使用して、効果的な増分同期を行うことができます。

  • where 条件が設定されていないか、空のままの場合、システムはテーブル全体の同期を実行します。

いいえ

なし

querySql

一部のビジネスシナリオでは、where パラメーターだけではフィルタリング条件を記述するのに不十分な場合があります。このパラメーターを使用して、カスタムのフィルタリング SQL ステートメントを定義できます。このパラメーターが設定されている場合、データ同期システムは tablecolumn、および splitPk パラメーターを無視し、このパラメーターの内容を使用してデータをフィルタリングします。

querySql を設定すると、ApsaraDB for OceanBase Reader は tablecolumnwhere、および splitPk パラメーターを無視します。

いいえ

なし

fetchSize

データベースサーバーから各バッチでフェッチするデータレコードの数。この値は、データ統合とサーバー間のネットワーク対話の回数を決定し、データ抽出のパフォーマンスを大幅に向上させることができます。

説明

大きな fetchSize 値 (>2048) は、データ同期プロセスで Out of Memory (OOM) エラーを引き起こす可能性があります。

いいえ

1,024

Writer スクリプトの例

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"apsaradb_for_OceanBase",// プラグイン名。
            "parameter":{
                "datasource": "データソース名",
                "column": [// 列。
                    "id",
                    "name"
                ],
                "table": "apsaradb_for_OceanBase_table",// テーブル名。
                "preSql": [ // データ同期タスクの実行前に実行される SQL ステートメント。
                    "delete from @table where db_id = -1"
                ],
                "postSql": [// データ同期タスクの実行後に実行される SQL ステートメント。
                    "update @table set db_modify_time = now() where db_id = 1"
                ],
                "obWriteMode": "insert",
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// 許容されるダーティデータのレコード数。
        },
        "speed":{
            "throttle":true,// false に設定すると、mbps パラメーターは有効にならず、速度制限は適用されません。true に設定すると、速度制限が有効になります。
            "concurrent":1, // 同時実行数。
            "mbps":"12"// 速度制限のレート。1 mbps = 1 MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

DataWorks での ApsaraDB for OceanBase データソース名。

接続の設定には、jdbcUrl パラメーターまたは username パラメーターを使用します。

いいえ

なし

jdbcUrl

ターゲットデータベースの JDBC 接続文字列。jdbcUrl パラメーターは connection 設定ブロックに含める必要があります。

  • データベースには 1 つの値しか設定できません。複数のプライマリノードを持つデータベースへのデータ書き込み (デュアルプライマリインポートシナリオ) はサポートされていません。

  • jdbcUrl のフォーマットは、公式の ApsaraDB for OceanBase の仕様に準拠する必要があり、追加の接続パラメーターを含めることができます。例:jdbc:oceanbase://127.0.0.1:3306/database

はい

なし

username

データソースのユーザー名。

はい

なし

password

指定されたユーザー名のパスワード。

はい

なし

table

送信先テーブルの名前。JSON 配列で名前を指定します。

説明

table パラメーターは connection 設定ブロックに含める必要があります。

はい

なし

column

データを書き込む送信先テーブルの列。列名はカンマで区切ります。例:"column": ["id", "name", "age"]

説明

column パラメーターは必須であり、空にすることはできません。

はい

なし

obWriteMode

送信先テーブルの書き込みモード。このパラメーターはオプションです。

  • insertINSERT INTO ... ステートメントを使用します。プライマリキーまたは一意なインデックスの競合が発生した場合、競合する行は書き込まれません。

  • update... ON DUPLICATE KEY UPDATE ... ステートメントを使用します。このモードは MySQL テナントモードで使用されます。競合が発生した場合、競合する行が更新されます。

  • mergeMERGE INTO ... WHEN MATCHED THEN UPDATE ... ステートメントを使用します。このモードは Oracle テナントモードで使用されます。競合が発生した場合、競合する行が更新されます。

いいえ

insert

onClauseColumns

説明

このパラメーターは Oracle テナントモードで使用され、obWriteModemerge に設定されている場合に必須です。設定されていない場合、データは insert モードで書き込まれます。

このパラメーターには、プライマリキー列または一意制約列を設定します。複数の列はカンマで区切ります。例:ID,C1

いいえ

なし

obUpdateColumns

説明

このパラメーターは、obWriteModemerge または update に設定されている場合に有効になります。

データ書き込みの競合が発生したときに更新する列。複数の列はカンマで区切ります。例:c2,c3

いいえ

すべての列

preSql

書き込み操作の前に実行する SQL ステートメント。SQL ステートメントがテーブルを操作する必要がある場合は、プレースホルダーとして @table を使用します。これは実行時に実際のテーブル名に置き換えられます。

いいえ

なし

postSql

書き込み操作が完了した後に実行する SQL ステートメント。

いいえ

なし

batchSize

各バッチでコミットするレコードの数。この値は、データ同期システムとサーバー間のネットワーク対話を大幅に削減し、全体のスループットを向上させることができます。

説明

batchSize の値が 2048 を超えると、データ同期プロセスで OOM エラーが発生する可能性があります。

いいえ

1,024