すべてのプロダクト
Search
ドキュメントセンター

DataWorks:ApsaraDB for OceanBase データソース

最終更新日:Mar 18, 2025

DataWorks は、ApsaraDB for OceanBase データソースからデータを読み取り、ApsaraDB for OceanBase データソースにデータを書き込むための ApsaraDB for OceanBase Reader と ApsaraDB for OceanBase Writer を提供しています。ApsaraDB for OceanBase データソースのデータ同期タスクを構成できます。このトピックでは、ApsaraDB for OceanBase データソースからのデータ同期、または ApsaraDB for OceanBase データソースへのデータ同期の機能について説明します。

サポートされている ApsaraDB for OceanBase のバージョン

バッチデータの読み取りと書き込み

ApsaraDB for OceanBase Reader および ApsaraDB for OceanBase Writer でサポートされている ApsaraDB for OceanBase のバージョン:

  • OceanBase 2.x

  • OceanBase 3.x

  • OceanBase 4.x

制限事項

バッチデータの読み取り

  • ApsaraDB for OceanBase は、Oracle および MySQL テナントモードをサポートしています。[WHERE] 句と、[column] パラメーターで指定する列が、Oracle または MySQL でサポートされている SQL 構文の制約に準拠していることを確認してください。そうでない場合、SQL 文が実行されない可能性があります。

  • ビューのデータを読み取ることができます。

  • バッチ同期タスクを使用して ApsaraDB for OceanBase データソースからデータを読み取る場合は、同期中のデータを変更しないでください。変更すると、データの重複やデータの損失などのデータ品質の問題が発生する可能性があります。

バッチデータの書き込み

説明

ApsaraDB for OceanBase Writer を使用する同期タスクには、少なくとも INSERT INTO を実行するための権限が必要です。その他の権限が必要かどうかは、タスクを構成するときに preSql パラメーターと postSql パラメーターで指定する SQL 文によって異なります。

  • 宛先テーブルにデータをバッチで書き込むことをお勧めします。ApsaraDB for OceanBase Writer は、行数が特定のしきい値に達すると書き込みリクエストを送信します。

  • ApsaraDB for OceanBase は、Oracle および MySQL テナントモードをサポートしています。preSql パラメーターと postSql パラメーターで指定する SQL 文が、関連する SQL 構文に準拠していることを確認してください。そうでない場合、SQL 文が実行されない可能性があります。

リアルタイムデータの読み取り

ApsaraDB for OceanBase は、複数の物理データベースに分散されたデータを統一された論理データベースに統合できる分散リレーショナルデータベースです。ただし、リアルタイムで 1 つの物理 AnalyticDB for MySQL のデータのみを AnalyticDB for MySQL クラスタに同期できます。

説明
  • データベースのデータを同期するために使用されるリアルタイム同期タスクでは、接続文字列モードで追加されたデータソースを選択することはできません。

  • ApsaraDB for OceanBase データソースのデータを同期するために使用されるリアルタイム同期タスクでは、ApsaraDB for OceanBase データベースのバージョンが V3.0 以降である必要があります。

データ同期前に ApsaraDB for OceanBase 環境を準備する

DataWorks を使用して ApsaraDB for OceanBase データソースとの間でデータを同期する前に、ApsaraDB for OceanBase 環境を準備する必要があります。これにより、データ同期タスクを構成し、ApsaraDB for OceanBase データソースとの間でデータを想定どおりに同期できるようになります。次の情報は、ApsaraDB for OceanBase データソースとのデータ同期のために ApsaraDB for OceanBase 環境を準備する方法について説明しています。

準備 1: IP アドレスのホワイトリストを構成する

データ統合のサーバーレス リソースグループまたはデータ統合専用リソースグループの Elastic IP Address(EIP)と、サーバーレス リソースグループまたはデータ統合専用リソースグループが関連付けられている vSwitch の CIDR ブロックを、ApsaraDB for OceanBase データソースのホワイトリストに追加します。詳細については、「IP アドレス ホワイトリストを構成する」をご参照ください。

準備 2: 必要な権限を持つアカウントを準備する

ApsaraDB for OceanBase クラスタのデータベースにログオンするためのアカウントを作成する必要があります。アカウントに必要な権限を付与する必要があります。詳細については、「アカウントを作成する」をご参照ください。

データソースを追加する

DataWorks で同期タスクを開発する前に、「データソースを追加および管理する」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。 DataWorks コンソールでパラメーターのヒントを表示して、データソースを追加するときのパラメーターの意味を理解できます

データ同期タスクを開発する

同期タスクのエントリポイントと構成手順については、次の構成ガイドを参照してください。

単一テーブルのデータを同期するためのバッチ同期タスクを構成する

データベースのデータを同期するためのリアルタイム同期タスクを構成する

構成手順の詳細については、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

単一テーブルまたはデータベースでフルデータと増分データの (リアルタイム) 同期を実装するための同期設定を構成する

構成手順の詳細については、「Data Integration で同期タスクを構成する」をご参照ください。

付録: コードとパラメーター

コードエディタを使用してバッチ同期タスクを構成する

コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプト形式の要件に基づいて、スクリプトに関連パラメーターを構成する必要があります。詳細については、「コードエディタを使用してバッチ同期タスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成するときにデータソースに構成する必要があるパラメーターについて説明しています。

ApsaraDB for OceanBase Reader のコード

{
    "type": "job",
    "steps": [
        {
            "stepType": "apsaradb_for_OceanBase", // プラグイン名。
            "parameter": {
                "datasource": "", // データソースの名前。
                "where": "",
                "column": [ // 列の名前。
                    "id",
                    "name"
                ],
                "splitPk": ""
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0" // ダーティデータレコードの最大許容数。
        },
        "speed": {
            "throttle": true, // スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent": 1, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/s。
        }
    }
}

ApsaraDB for OceanBase Reader のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。

jdbcUrl パラメーターまたは username パラメーターの設定に基づいて、ApsaraDB for OceanBase データベースに接続できます。

はい

デフォルト値なし

jdbcUrl

ApsaraDB for OceanBase データベースの JDBC URL。データベースに JSON 配列で複数の JDBC URL を指定できます。

複数の JDBC URL を指定すると、ApsaraDB for OceanBase Reader は URL の接続性を順番に検証して、有効な URL を見つけます。

有効な URL がない場合、ApsaraDB for OceanBase Reader はエラーを返します。

説明

jdbcUrl パラメーターは、connection パラメーターに含める必要があります。

jdbcUrl パラメーターの値は、ApsaraDB for OceanBase でサポートされている標準フォーマットに準拠している必要があります。添付機能の情報を指定することもできます。JDBC URL の例は、jdbc:OceanBase://127.0.0.1:3306/database です。jdbcUrl または username のいずれかを指定する必要があります。

いいえ

デフォルト値なし

username

データベースへの接続に使用できるユーザー名。

いいえ

デフォルト値なし

password

データベースへの接続に使用できるパスワード。

いいえ

デフォルト値なし

table

データを読み取るテーブルの名前。ApsaraDB for OceanBase Reader は、複数のテーブルからデータを読み取ることができます。テーブルは JSON 配列で指定されます。

複数のテーブルを指定する場合は、テーブルのスキーマが同じであることを確認してください。ApsaraDB for OceanBase Reader は、テーブルのスキーマが同じかどうかを確認しません。

説明

table パラメーターは、connection パラメーターに含める必要があります。

はい

デフォルト値なし

column

データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [ * ] で、ソーステーブルのすべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序を変更できます。これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを示します。

  • 定数がサポートされています。例: 「123」

  • 関数がサポートされています。例: date('now')

  • column パラメーターでは、同期するすべての列を明示的に指定する必要があります。このパラメーターを空にすることはできません。

はい

デフォルト値なし

splitPk

ApsaraDB for OceanBase Reader がデータを読み取るときにデータシャーディングに使用されるフィールド。このパラメーターを指定すると、ソーステーブルはこのパラメーターの値に基づいてシャーディングされます。次に、Data Integration は並列スレッドを実行してデータを読み取ります。これにより、データをより効率的に同期できます。

  • splitPk パラメーターをテーブルのプライマリキー列の名前に設定することをお勧めします。プライマリキー列に基づいてデータを異なるシャードに均等に分散できるため、特定のシャードにのみ集中的に分散されることを防ぎます。

  • splitPk パラメーターは、整数データ型のデータのシャーディングのみをサポートします。このパラメーターを、文字列、浮動小数点、日付データ型などのサポートされていないデータ型のフィールドに設定すると、ApsaraDB for OceanBase Reader はエラーを返します。

  • splitPk パラメーターを空のままにすると、ApsaraDB for OceanBase Reader は単一スレッドを使用してデータを読み取ります。

いいえ

デフォルト値なし

where

WHERE 句。ApsaraDB for OceanBase Reader は、構成した columntablewhere パラメーターに基づいて SQL 文を生成し、生成された文を使用してデータを読み取ります。

たとえば、テストを実行するときに、where パラメーターを limit 10 に設定できます。現在の日付に生成されたデータを読み取るには、where パラメーターを gmt_create > $bizdate に設定できます。

  • WHERE 句を使用して増分データを読み取ることができます。

  • where パラメーターが指定されていないか空のままになっている場合は、すべてのデータが読み取られます。

いいえ

デフォルト値なし

querySql

絞り込んだデータフィルタリングに使用される SQL 文。このパラメーターを指定すると、このパラメーターの値のみに基づいてデータがフィルタリングされます。

このパラメーターを指定すると、ApsaraDB for OceanBase Reader は tablecolumnwheresplitPk パラメーターの設定を無視します。

いいえ

デフォルト値なし

fetchSize

一度に読み取るデータレコードの数。このパラメーターは、Data Integration とデータベース間のインタラクションの数を決定し、読み取り効率に影響します。

説明

このパラメーターを 2048 より大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

1,024

ApsaraDB for OceanBase Writer のコード

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"apsaradb_for_OceanBase",// プラグイン名。
            "parameter":{
                "datasource": "データソースの名前",
                "column": [// 列の名前。
                    "id",
                    "name"
                ],
                "table": "apsaradb_for_OceanBase_table",// テーブルの名前。
                "preSql": [ // 同期タスクの実行前に実行する SQL 文。
                    "delete from @table where db_id = -1"
                ],
                "postSql": [// 同期タスクの実行後に実行する SQL 文。
                    "update @table set db_modify_time = now() where db_id = 1"
                ],
                "obWriteMode": "insert",
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// ダーティデータレコードの最大許容数。
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent":1, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

ApsaraDB for OceanBase Writer のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。

jdbcUrl パラメーターまたは username パラメーターの設定に基づいて、ApsaraDB for OceanBase データベースに接続できます。

いいえ

デフォルト値なし

jdbcUrl

ApsaraDB for OceanBase データベースの JDBC URL。jdbcUrl パラメーターは、connection パラメーターに含める必要があります。

  • データベースには 1 つの JDBC URL のみ構成できます。ApsaraDB for OceanBase Writer は、複数のプライマリデータベースを持つデータベースにデータを書き込むことはできません。

  • jdbcUrl パラメーターの値のフォーマットは、ApsaraDB for OceanBase の公式仕様に準拠している必要があります。また、このパラメーターの値に追加の JDBC 接続プロパティを指定することもできます。例: jdbc:oceanbase://127.0.0.1:3306/database

はい

デフォルト値なし

username

データベースへの接続に使用できるユーザー名。

はい

デフォルト値なし

password

データベースへの接続に使用できるパスワード。

はい

デフォルト値なし

table

データを書き込むテーブルの名前。JSON 配列で名前を指定します。

説明

table パラメーターは、connection パラメーターに含める必要があります。

はい

デフォルト値なし

column

データを書き込む列の名前。カンマ (,) で区切って名前を指定します (例: "column": ["id", "name", "age"])。

説明

column パラメーターを空にすることはできません。

はい

デフォルト値なし

obWriteMode

オプション。書き込みモード。有効な値:

  • insert: insert into ... 文に対応します。プライマリキーの競合または一意なインデックスの競合が発生した場合、競合する行にデータを書き込むことはできません。

  • update: ... on duplicate key update ... 文に対応します。この書き込みモードは MySQL テナントモードで使用されます。競合が発生すると、競合する行のデータが更新されます。

  • merge: merge into ... matched then update ... 文に対応します。この書き込みモードは Oracle テナントモードで使用されます。競合が発生すると、競合する行のデータが更新されます。

いいえ

insert

onClauseColumns

説明

このパラメーターは Oracle テナントモードで使用され、obWriteMode パラメーターが merge に設定されている場合は必須です。このパラメーターを空のままにすると、insert 書き込みモードが使用されます。

このパラメーターをプライマリキーフィールドまたは一意な制約フィールドに設定します。複数のフィールドを指定する場合は、フィールドをカンマ (,) で区切ります。例: ID,C1

いいえ

デフォルト値なし

obUpdateColumns

説明

このパラメーターは、obWriteMode パラメーターが merge または update に設定されている場合に有効になります。

データ書き込みの競合が発生した場合に更新が必要なフィールド。複数のフィールドを指定する場合は、フィールドをカンマ (,) で区切ります。例: c2,c3

いいえ

すべてのフィールド

preSql

同期タスクの実行前に実行する SQL 文。SQL 文で宛先テーブルの名前を示すには、@table を使用します。この SQL 文を実行すると、@table は宛先テーブルの名前で置き換えられます。

いいえ

デフォルト値なし

postSql

同期タスクの実行後に実行する SQL 文。

いいえ

デフォルト値なし

batchSize

一度に書き込むデータレコードの数。ビジネス要件に基づいてこのパラメーターを適切な値に設定します。これにより、Data Integration とデータベース間のインタラクションが大幅に削減され、スループットが向上します。

説明

このパラメーターを 2048 より大きい値に設定すると、データ同期中に OOM エラーが発生する可能性があります。

いいえ

1,024