すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Doris

最終更新日:Apr 21, 2026

Doris データソースを使用すると、Doris データベースからデータを読み取ったり、Doris データベースにデータを書き込んだりして、大規模なデータ処理を行うことができます。このトピックでは、DataWorks を使用して Doris とデータを同期する方法について説明します。

サポートされるデータの型

Doris はバージョンによって、サポートされるデータ型と集約モデルが異なります。各 Doris バージョンでサポートされているすべてのデータ型の詳細については、「Doris の公式ドキュメント」をご参照ください。次の表では、主にサポートされているデータ型について説明します。

データの型

サポートされるモデル

Doris バージョン

SMALLINT

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

INT

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

BIGINT

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

LARGEINT

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

FLOAT

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

DOUBLE

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

DECIMAL

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

DECIMALV3

Aggregate、Unique、Duplicate

1.2.1 以降のバージョン、2.x

DATE

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

DATETIME

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

DATEV2

Aggregate、Unique、Duplicate

1.2.x、2.x

DATATIMEV2

Aggregate、Unique、Duplicate

1.2.x、2.x

CHAR

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

VARCHAR

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

STRING

Aggregate、Unique、Duplicate

0.x.x、1.1.x、1.2.x、2.x

VARCHAR

Aggregate、Unique、Duplicate

1.1.x、1.2.x、2.x

ARRAY

Duplicate

1.2.x、2.x

JSONB

Aggregate、Unique、Duplicate

1.2.x、2.x

HLL

Aggregate

0.x.x、1.1.x、1.2.x、2.x

BITMAP

Aggregate

0.x.x、1.1.x、1.2.x、2.x

QUANTILE_STATE

Aggregate

1.2.x、2.x

データ同期の前に ApsaraDB for OceanBase 環境を準備する

DataWorks を使用して Doris データソースにデータを同期する前に、Doris 環境を準備する必要があります。これにより、データ同期タスクを期待どおりに設定し、Doris データソースにデータを同期できるようになります。以下では、Doris データソースへのデータ同期のために Doris 環境を準備する方法について説明します。

アカウントの作成と権限の付与

後続の操作のために Doris データベースにログインするためのアカウントを作成する必要があります。Doris データベースへの後続の接続のために、アカウントのパスワードを指定する必要があります。Doris のデフォルトの root ユーザーを使用して Doris データベースにログインする場合は、root ユーザーのパスワードを指定する必要があります。デフォルトでは、root ユーザーにはパスワードがありません。Doris で SQL ステートメントを実行してパスワードを指定できます:

SET PASSWORD FOR 'root' = PASSWORD('Password')

Doris のネットワーク接続の設定

StreamLoad メソッドを使用してデータを書き込むには、FE ノードのプライベート IP アドレスにアクセスする必要があります。FE ノードのパブリック IP アドレスにアクセスすると、BE ノードのプライベート IP アドレスにリダイレクトされます。リダイレクトの詳細については、「データ操作の問題」をご参照ください。この場合、リソースグループが内部ネットワーク経由でデータソースにアクセスできるようにするには、データソースと サーバーレスリソースグループ またはデータ統合専用リソースグループとの間にネットワーク接続を確立する必要があります。Doris データベースとリソースグループの間にネットワーク接続を確立する方法の詳細については、「ネットワーク接続ソリューション」をご参照ください。

データソースの追加

DataWorks で同期タスクを開発する前に、データソース管理の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解できます

Doris データソースの以下の設定項目に関する設定要件に注意してください:

  • JdbcUrl:JDBC 接続文字列を入力します。これには、IP アドレス、ポート番号、データベース、および接続パラメーターが含まれます。パブリック IP アドレスとプライベート IP アドレスの両方がサポートされています。パブリック IP アドレスを使用する場合は、Data Integration リソースグループが Doris インスタンスが配置されているホストにアクセスできることを確認してください。

  • FE endpoint:FE ノードの IP アドレスとポートを入力します。クラスターに複数の FE ノードがある場合は、ip1:port1,ip2:port2 のように、カンマで区切って複数のエンドポイントを入力できます。接続をテストする際、DataWorks は指定されたすべての FE エンドポイントへの接続性をテストします。

  • Username:Doris データベースへのアクセスに使用するユーザー名を入力します。

  • Password:ユーザー名に対応するパスワードを入力します。

データ同期タスクの開発

同期タスクを設定するためのエントリーポイントと手順については、以下の設定ガイドをご参照ください。

付録:コードとパラメーター

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプトフォーマットの要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、コードエディタの使用をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトのデモ

{
  "type": "job",
  "version": "2.0",// バージョン番号。
  "steps": [
    {
      "stepType": "doris",// プラグイン名。
      "parameter": {
        "column": [// 列名。
          "id"
        ],
        "connection": [
          {
            "querySql": [
              "select a,b from join1 c join join2 d on c.id = d.id;"
            ],
            "datasource": ""// データソース名。
          }
        ],
        "where": "",// WHERE 句。
        "splitPk": "",// シャードキー。
        "encoding": "UTF-8"// エンコード形式。
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"// 許容されるダーティデータレコードの最大数。
    },
    "speed": {
      "throttle": true,// 速度制限を有効にするかどうかを指定します。false は速度制限が無効であることを示し、true は速度制限が有効であることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
      "concurrent": 1,// 最大並列スレッド数。
      "mbps": "12"// 最大転送レート。単位:MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加したデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。各同期タスクは、1 つのテーブルからのみデータを同期するために使用できます。

シャードテーブルの場合、table パラメーターを使用して、データを読み取るパーティションを指定できます。例:

  • 範囲を設定して、シャードデータベースとテーブルからデータを読み取ることができます。たとえば、'table_[0-99]' は、'table_0''table_1''table_2' から 'table_99' までのデータを読み取ることを指定します。

  • テーブルに 'table_000''table_001''table_002' から 'table_999' までの同じ長さの数字の接尾辞がある場合は、パラメーターを '"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]' のように設定できます。

説明

Doris Reader は、table パラメーターで指定されたパーティション内の column パラメーターで指定された列からデータを読み取ります。指定されたパーティションまたは列が存在しない場合、同期タスクは失敗します。

はい

デフォルト値なし

column

同期する列。列は JSON 配列で記述します。デフォルトでは、すべての列が同期されます。たとえば、["*"] を使用します。

  • 列のプルーニング:エクスポートする列のサブセットを選択できます。

  • 列の順序は変更できます。これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを意味します。

  • 定数設定:Doris SQL 構文に従う必要があります。例:["id","`table`","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id:列名。

    • table:予約キーワードを含む列の名前。

    • 1:整数定数。

    • 'mingya.wmy':文字列定数。シングルクォーテーション (') で囲みます。

    • null

      • " " は空文字列を示します。

      • null は NULL 値を示します。

      • 'null' は文字列 "null" を示します。

    • to_char(a+1):文字列の長さを計算するために使用される関数式。

    • 2.3:浮動小数点定数。

    • true:ブール値。

  • column パラメーターには、データを読み取るすべての列を明示的に指定する必要があります。このパラメーターを空にすることはできません。

はい

デフォルト値なし

splitPk

読み取りパフォーマンスを向上させるために、splitPk パラメーターを使用してシャードキーを指定できます。Data Integration はこのキーを使用してデータをパーティション分割し、同時実行タスクを実行します。

  • splitPk パラメーターをテーブルのプライマリキー列の名前に設定することを推奨します。データは、特定のシャードに集中的に分散されるのではなく、プライマリキー列に基づいて異なるシャードに均等に分散できます。

  • splitPk パラメーターは、整数データ型のデータのみのシャーディングをサポートします。splitPk パラメーターを文字列、浮動小数点、日付データ型などのサポートされていないデータ型のフィールドに設定した場合、このパラメーターの設定は無視され、単一のスレッドを使用してデータが読み取られます。

  • splitPk パラメーターが指定されていないか、空のままの場合、単一のスレッドを使用してデータが読み取られます。

いいえ

デフォルト値なし

where

フィルター条件。多くのビジネスシナリオでは、現在の日付のデータのみを同期したい場合があります。これを行うには、where 条件を gmt_create > $bizdate のように指定します。

  • WHERE 句を使用して増分データを読み取ることができます。where パラメーターが指定されていないか、空のままの場合、Doris Reader はすべてのデータを読み取ります。

  • where パラメーターを limit 10 に設定しないでください。この値は、SQL WHERE 句に関する Doris の制約に準拠していません。

いいえ

デフォルト値なし

querySql (高度なパラメーター、コードエディタでのみ利用可能)

一部のビジネスシナリオでは、where パラメーターだけでは目的のフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルタリング用のカスタム SQL クエリを指定できます。このパラメーターを設定すると、Data Integration は table、column、where、および splitPk パラメーターを無視し、カスタムクエリを使用してデータを取得します。たとえば、同期前に複数のテーブルを結合するには、select a,b from table_a join table_b on table_a.id = table_b.id のようなクエリを使用できます。querySql パラメーターは、tablecolumnwhere、および splitPk パラメーターよりも優先度が高くなります。システムがユーザー名やパスワードなどの接続詳細を取得するには、datasource パラメーターが依然として必要です。

説明

querySql パラメーターの名前は、大文字と小文字を区別します。たとえば、querysql は有効になりません。

いいえ

デフォルト値なし

Writer スクリプトのデモ

{
  "stepType": "doris",// プラグイン名。
  "parameter":
  {
    "postSql":// 同期タスクの実行後に実行する SQL ステートメント。
    [],
    "preSql":
    [],// 同期タスクの実行前に実行する SQL ステートメント。
    "datasource":"doris_datasource",// データソース名。
    "table": "doris_table_name",// テーブル名。
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",// CSV 形式のデータの列区切り文字。
      "line_delimiter": "\\x02"// CSV 形式のデータの行区切り文字。
    }
  },
  "name": "Writer",
  "category": "writer"
}

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加したデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。

はい

デフォルト値なし

column

データを書き込む宛先列。列名を配列で指定します。例:"column":["id","name","age"]。すべての列に順番にデータを書き込むには、配列でアスタリスク (*) を使用します。例:"column":["*"]

はい

デフォルト値なし

preSql

データ同期タスクが開始される前に実行する SQL ステートメント。コードレス UI では、1 つの SQL ステートメントしか実行できません。コードエディタでは、複数の SQL ステートメントを実行できます。たとえば、これらのステートメントを使用して、テーブルから既存のデータをクリアできます。

いいえ

デフォルト値なし

postSql

同期タスクの実行後に実行する SQL ステートメント。たとえば、このパラメーターをタイムスタンプを追加するために使用される SQL ステートメントに設定できます。コードレス UI では 1 つの SQL ステートメントしか実行できず、コードエディタでは複数の SQL ステートメントを実行できます。

いいえ

デフォルト値なし

maxBatchRows

一度に宛先テーブルに書き込むことができる最大行数。このパラメーターと batchSize パラメーターの両方で、一度に宛先テーブルに書き込むことができるデータレコードの数が決まります。キャッシュされたデータがいずれかのパラメーターの値に達するたびに、ライターは宛先テーブルへのデータの書き込みを開始します。

いいえ

500000

batchSize

一度に宛先テーブルに書き込むことができるデータの最大量。このパラメーターと maxBatchRows パラメーターの両方で、一度に宛先テーブルに書き込むことができるデータレコードの数が決まります。キャッシュされたデータがいずれかのパラメーターの値に達するたびに、ライターは宛先テーブルへのデータの書き込みを開始します。

いいえ

104857600

maxRetries

一度に複数のデータレコードを宛先テーブルに書き込むのに失敗した後に許可される最大再試行回数。

いいえ

3

labelPrefix

アップロードされる各ファイルバッチのラベルプレフィックス。最終的なラベルは labelPrefix + UUID の組み合わせであり、各インポートがべき等であることを保証し、データの重複を防ぎます。

いいえ

datax_doris_writer_

loadProps

StreamLoad のリクエストパラメーターで、主にインポートデータ形式を設定するために使用されます。デフォルトでは、データは CSV 形式でインポートされます。loadProps パラメーターが設定されていない場合、デフォルトの CSV 形式が使用され、列区切り文字として \t、行区切り文字として \n が使用されます。デフォルトの設定は次のとおりです:

"loadProps": {
    "format":"csv",
    "column_separator": "\t",
    "line_delimiter": "\n"
}

JSON 形式でデータを書き込む場合は、次の設定を使用します:

"loadProps": {
    "format": "json"
}

いいえ

デフォルト値なし

集約型のデータの書き込み

Doris Writer を使用して、特定の集約型の列にデータを書き込むことができます。Doris Writer を使用して特定の集約型の列にデータを書き込む場合は、追加のパラメーターを設定する必要があります。

たとえば、次の Doris テーブルでは、uuidbitmap 型 (集約型) であり、sexHLL 型 (集約型) です。

CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- 集約型
  `sex` HLL HLL_UNION  -- 集約型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32

テーブルに生データを挿入します:

user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'

Doris Writer を使用して集約型の列にデータを書き込む場合、writer.parameter.column で列を指定し、writer.parameter.loadProps.columns で集計関数を設定する必要があります。たとえば、uuid 列には集計関数 bitmap_hash を使用し、sex 列には集計関数 hll_hash を使用する必要があります。

サンプルコード:

{
    "stepType": "doris",// プラグイン名。
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// 集約型は bitmap です。
                "sex"// 集約型は HLL です。
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 集計関数を指定する必要があります。
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",// データソース名。
                    "table": "doris_table_name",// テーブル名。
        }
          "name": "Writer",
              "category": "writer"
    }
}