すべてのプロダクト
Search
ドキュメントセンター

DataWorks:ApsaraDB For OceanBase データソース

最終更新日:Nov 27, 2025

OceanBase データソースは、OceanBase からのデータの読み取りと OceanBase へのデータの書き込みをサポートします。OceanBase データソースを使用して、データ同期タスクを設定できます。このトピックでは、DataWorks が OceanBase に対して提供するデータ同期機能について説明します。

サポート対象バージョン

OceanBase Reader と OceanBase Writer は、オフラインデータの読み取りと書き込みにおいて、以下の OceanBase バージョンをサポートしています。

  • OceanBase 2.x

  • OceanBase 3.x

  • OceanBase 4.x

制限事項

オフライン読み取り

  • OceanBase は、Oracle および MySQL のテナントモードをサポートしています。[where] フィルター条件と [column] パラメーター内の関数カラムを構成する際は、対応するテナントモードの SQL 構文に準拠する必要があります。そうしないと、SQL 文の実行に失敗する可能性があります。

  • ビューからデータを読み取ることができます。

  • OceanBase からのオフライン読み取り中に同期中のデータを変更しないでください。変更すると、データ重複やデータ損失などのデータ品質の問題が発生する可能性があります。

  • データソースが パーティションによる読み取り で設定されている場合、データソースへのアクセスに使用するアカウントにはシステム権限が必要です。

オフライン書き込み

説明

タスクに使用するアカウントには、少なくとも insert into... 文を実行する権限が必要です。preSql および postSql パラメーターで指定する文によっては、他の権限が必要になる場合があります。

  • データはバッチモードで書き込まれます。行数が指定されたしきい値に達した後に書き込みリクエストが開始されます。

  • OceanBase は Oracle と MySQL のテナントモードをサポートしています。preSql および postSql パラメーターで指定する文は、対応するテナントモードの SQL 構文に準拠する必要があります。準拠していない場合、SQL 文の実行に失敗する可能性があります。

リアルタイム読み取り

OceanBase は、物理的に分散した複数のデータベースのデータを単一の論理データベースに統合する分散リレーショナルデータベースです。ただし、OceanBase から AnalyticDB for MySQL へリアルタイムでデータを同期する場合、単一の物理データベースからのみデータを同期できます。論理データベースからデータを同期することはできません。

説明
  • 接続文字列を使用して追加されたデータソースは、データベース全体を同期するリアルタイムタスクではサポートされていません。

  • リアルタイム同期タスクの場合、データベースのバージョンは 3.0 以降である必要があります。

データ同期の事前準備

DataWorks でデータを同期する前に、OceanBase 環境を準備する必要があります。これにより、OceanBase データ同期タスクが期待どおりに設定および実行されることが保証されます。以下のセクションでは、必要な準備について説明します。

ホワイトリストの設定

ご利用の OceanBase インスタンスのホワイトリストに、サーバーレスリソースグループまたはデータ統合専用リソースグループの VPC CIDR ブロックを追加します。詳細については、「ホワイトリストの追加」をご参照ください。

アカウントの作成と権限付与

データベースログインアカウントを作成します。このアカウントには、OceanBase に対する必要な操作権限が必要です。詳細については、「アカウントの作成」をご参照ください。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターのヒントを表示して、各パラメーターの意味を理解できます

データ同期タスクの開発

同期タスクを設定するためのエントリポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのバッチ同期タスクの設定

データベース全体のリアルタイム同期タスクの設定

手順の詳細については、「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。

単一テーブルまたはデータベース全体の完全および増分 (リアルタイム) 読み取り同期タスクの設定

手順の詳細については、「データベース全体のリアルタイム同期タスクの設定」をご参照ください。

付録:スクリプトデモとパラメーターの説明

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプトフォーマットの要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトデモ

{
    "type": "job",
    "steps": [
        {
            "stepType": "apsaradb_for_OceanBase", // プラグイン名
            "parameter": {
                "datasource": "", // データソース名
                "where": "",
                "column": [ // フィールド
                    "id",
                    "name"
                ],
                "splitPk": ""
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0" // エラーレコード数
        },
        "speed": {
            "throttle": true, // スロットリングを有効にするかどうかを指定します。true に設定するとスロットリングが有効になり、false に設定するとスロットリングが無効になり、mbps パラメーターは効果がありません。
            "concurrent": 1, // 同時実行タスク数
            "mbps":"12"// スロットリングレート。1 mbps = 1 MB/s。
        }
    }
}

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

追加した ApsaraDB For OceanBase データソースの名前です。ご利用の DataWorks バージョンが ApsaraDB For OceanBase データソースの追加をサポートしている場合、このパラメーターを使用できます。

jdbcUrl パラメーターまたは username パラメーターを使用して接続を設定できます。

はい

なし

jdbcUrl

ピアデータベースの JDBC 接続情報です。JSON 配列を使用して、データベースの複数の接続アドレスを指定します。

複数のアドレスを設定した場合、ApsaraDB For OceanBase Reader は有効な IP アドレスが見つかるまで、IP アドレスの接続性を順番にプローブします。

すべての接続に失敗した場合、ApsaraDB For OceanBase Reader はエラーを報告します。

説明

jdbcUrl パラメーターは connection 設定ユニットに含める必要があります。

jdbcUrl のフォーマットは、ApsaraDB For OceanBase の公式仕様に準拠する必要があります。接続の添付ファイル制御情報を指定することもできます。例:jdbc:oceanbase://127.0.0.1:3306/database。このパラメーターまたは username パラメーターのいずれかを指定する必要があります。

いいえ

なし

username

データソースのユーザー名です。

いいえ

なし

password

指定されたユーザー名のパスワードです。

いいえ

なし

table

同期するテーブルです。JSON 配列を使用して複数のテーブルを指定します。

複数のテーブルを設定する場合は、テーブルが同じスキーマ構造を持っていることを確認してください。ApsaraDB For OceanBase Reader は、テーブルが統一された論理構造を持っているかどうかをチェックしません。

説明

table パラメーターは connection 設定ユニットに含める必要があります。

はい

なし

column

設定されたテーブルから同期する列のセットです。JSON 配列を使用してフィールドを記述します。デフォルトでは、すべての列が使用されます。例:[ * ]。

  • 列のサブセットをエクスポートできます。

  • 列の順序を変更できます。

  • 定数がサポートされています。例:'123'

  • 関数列がサポートされています。例:date('now')

  • 同期する列を明示的に指定する必要があります。column パラメーターを空にすることはできません。

はい

なし

splitPk

ApsaraDB for OceanBase Reader がデータを抽出する際に splitPk パラメーターを指定すると、splitPk で指定されたフィールドがデータシャーディングに使用されます。その後、Data Integration は並列スレッドを実行してデータを読み取ります。これにより、データをより効率的に同期できます。

  • splitPk をテーブルのプライマリキーに設定します。プライマリキーは通常、均等に分散されているため、シャードでのデータホットスポットを防ぎます。

  • 現在、splitPk は整数型のみのデータシャーディングをサポートしています。文字列、浮動小数点数、日付などの他の型はサポートしていません。

  • splitPk を空のままにすると、システムは単一テーブルのシャーディングを許可しないとみなし、データ抽出に単一チャネルを使用します。

いいえ

where

ApsaraDB for OceanBase Reader は、指定された columntable、および where 条件に基づいて SQL 文を連結し、その SQL 文に基づいてデータを抽出します。

たとえば、テスト目的で where 条件を limit 10 に設定できます。実際のビジネスシナリオでは、通常、当日のデータを同期し、where 条件を gmt_create>$bizdate に設定します。

  • where 条件を使用すると、増分同期を効率的に実行できます。

  • where 条件が設定されていないか、空のままの場合、テーブル全体のデータが同期されます。

いいえ

なし

querySql

一部のビジネスシナリオでは、where パラメーターだけではフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルター SQL 文をカスタマイズできます。このパラメーターを設定すると、データ同期システムは tablescolumns、および splitPk パラメーターを無視し、このパラメーターの内容を使用してデータをフィルター処理します。

querySql を設定すると、ApsaraDB For OceanBase Reader は tablecolumnwhere、および splitPk パラメーターを無視します。

いいえ

なし

fetchSize

このパラメーターは、各バッチで取得するデータレコードの数を指定します。この値は、Data Integration とサーバー間のネットワーク対話の数を決定し、データ抽出パフォーマンスを大幅に向上させることができます。

説明

2048 を超える fetchSize 値は、データ同期プロセスで Out-of-Memory (OOM) エラーを引き起こす可能性があります。

いいえ

1,024

Writer スクリプトデモ

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"apsaradb_for_OceanBase",// プラグイン名。
            "parameter":{
                "datasource": "データソース名",
                "column": [// フィールド。
                    "id",
                    "name"
                ],
                "table": "apsaradb_for_OceanBase_table",// テーブル名。
                "preSql": [ // データ同期タスクの実行前に実行する SQL 文。
                    "delete from @table where db_id = -1"
                ],
                "postSql": [// データ同期タスクの実行後に実行する SQL 文。
                    "update @table set db_modify_time = now() where db_id = 1"
                ],
                "obWriteMode": "insert",
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// エラーレコード数。
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。true に設定するとスロットリングが有効になり、false に設定するとスロットリングが無効になり、mbps パラメーターは効果がありません。
            "concurrent":1, // 同時実行タスク数。
            "mbps":"12"// スロットリングレート。1 mbps = 1 MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

追加した ApsaraDB For OceanBase データソースの名前です。ご利用の DataWorks バージョンが ApsaraDB For OceanBase データソースの追加をサポートしている場合、このパラメーターを使用できます。

jdbcUrl パラメーターまたは username パラメーターを使用して接続を設定できます。

いいえ

なし

jdbcUrl

ピアデータベースの JDBC 接続情報です。jdbcUrl パラメーターは connection 設定ユニットに含まれています。

  • データベースには 1 つの値しか設定できません。データベースに複数のプライマリデータベースがある場合 (双方向データインポート) はサポートされていません。

  • jdbcUrl のフォーマットは、ApsaraDB For OceanBase の公式仕様に準拠する必要があります。接続の添付ファイル情報を指定することもできます。例:jdbc:oceanbase://127.0.0.1:3306/database

はい

なし

username

データソースのユーザー名です。

はい

なし

password

指定されたユーザー名のパスワードです。

はい

なし

table

データを書き込むテーブルの名前です。JSON 配列を使用してテーブル名を指定します。

説明

table パラメーターは connection 設定ユニットに含める必要があります。

はい

なし

column

データを書き込む宛先テーブルのフィールドです。フィールドはカンマ (,) で区切ります。例:"column": ["id", "name", "age"]

説明

column パラメーターを指定する必要があります。空にすることはできません。

はい

なし

obWriteMode

宛先テーブルにデータを書き込むために使用されるモードを制御します。このパラメーターはオプションです。

  • insertinsert into ... プライマリキーまたは一意なインデックスの競合が発生した場合、競合する行は書き込めません。

  • update... on duplicate key update ... このモードは MySQL テナントで使用されます。競合が発生した場合、競合する行は更新されます。

  • mergemerge into ... matched then update ... このモードは Oracle テナントで使用されます。競合が発生した場合、競合する行は更新されます。

いいえ

insert

onClauseColumns

説明

このパラメーターは Oracle テナントモードで使用されます。obWriteModemerge に設定されている場合に必須です。このパラメーターが設定されていない場合、insert モードがデータの書き込みに使用されます。

このパラメーターをプライマリキーフィールドまたは一意制約フィールドに設定します。複数のフィールドを指定するには、カンマ (,) で区切ります。例:ID,C1

いいえ

なし

obUpdateColumns

説明

このパラメーターは、obWriteModemerge または update に設定されている場合に有効になります。

データ書き込みの競合が発生したときに更新するフィールドです。複数のフィールドを指定するには、カンマ (,) で区切ります。例:c2,c3

いいえ

すべてのフィールド

preSql

宛先テーブルにデータが書き込まれる前に実行する標準の文です。SQL 文がテーブルを操作する必要がある場合は、@table を使用してテーブル名を表します。SQL 文が実行されると、`@table` 変数は実際のテーブル名に置き換えられます。

いいえ

なし

postSql

宛先テーブルにデータが書き込まれた後に実行する標準の文です。

いいえ

なし

batchSize

バッチコミット内のレコード数です。この値は、データ同期システムとサーバー間のネットワーク対話の数を大幅に削減し、全体的なスループットを向上させることができます。

説明

2048 を超える大きな fetchSize 値は、データ同期中に Out-of-Memory (OOM) エラーを引き起こす可能性があります。

いいえ

1,024