すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Doris data source

最終更新日:Jan 23, 2026

Doris データソースは、Doris データベースからの読み取りと書き込みのための双方向チャネルを提供するデータハブです。これにより、大量のデータを迅速に処理できます。このトピックでは、DataWorks における Doris のデータ同期機能について説明します。

サポートされているフィールドタイプ

Doris のバージョンによって、サポートされるデータの型と集計モデルは異なります。各 Doris バージョンのフィールドタイプの完全なリストについては、Doris 公式ドキュメントをご参照ください。次の表に、主要な Doris フィールドタイプのサポート状況を示します。

タイプ

サポートされているモデル

Doris バージョン

SMALLINT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

INT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

BIGINT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

LARGEINT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

FLOAT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DOUBLE

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DECIMAL

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DECIMALV3

Aggregate,Unique,Duplicate

1.2.1+, 2.x

DATE

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DATETIME

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DATEV2

Aggregate,Unique,Duplicate

1.2.x, 2.x

DATATIMEV2

Aggregate,Unique,Duplicate

1.2.x, 2.x

CHAR

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

VARCHAR

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

STRING

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

VARCHAR

Aggregate,Unique,Duplicate

1.1.x, 1.2.x, 2.x

ARRAY

Duplicate

1.2.x, 2.x

JSONB

Aggregate,Unique,Duplicate

1.2.x, 2.x

HLL

Aggregate

0.x.x, 1.1.x, 1.2.x, 2.x

BITMAP

Aggregate

0.x.x, 1.1.x, 1.2.x, 2.x

QUANTILE_STATE

Aggregate

1.2.x, 2.x

前提条件

DataWorks でデータを同期する前に、このトピックで説明されているように Doris 環境を準備する必要があります。これにより、DataWorks で Doris データ同期タスクを正しく設定および実行できます。以下のセクションでは、必要な準備について説明します。

アカウントの作成と権限の設定

データウェアハウスに接続するには、ログインアカウントを作成し、パスワードを設定する必要があります。デフォルトの root ユーザーでログインする場合、このユーザーには初期パスワードが設定されていないため、パスワードを設定する必要があります。Doris で次の SQL 文を実行してパスワードを設定できます:

SET PASSWORD FOR 'root' = PASSWORD('your_password')

Doris のネットワーク接続の設定

StreamLoad を使用してデータをインポートするには、フロントエンド (FE) ノードのプライベートエンドポイントにアクセスする必要があります。FE ノードのパブリックエンドポイントにアクセスすると、バックエンド (BE) ノードのプライベート IP アドレスにリダイレクトされます (データ操作に関するよくある質問)。したがって、Doris と、使用する サーバーレスリソースグループまたは Data Integration 専用リソースグループとの間にネットワーク接続を確立する必要があります。これにより、プライベートエンドポイント経由でのアクセスが可能になります。ネットワーク接続の確立方法の詳細については、「ネットワーク接続ソリューション」をご参照ください。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解できます

以下に、Doris データソースの設定パラメーターを説明します:

  • [JdbcUrl]:IP アドレス、ポート番号、データベース、接続パラメーターを含む JDBC 接続文字列を入力します。パブリック IP アドレスとプライベート IP アドレスの両方がサポートされています。パブリック IP アドレスを使用する場合は、Data Integration リソースグループが Doris が存在するホストにアクセスできることを確認してください。

  • [FE endpoint]:FE ノードの IP アドレスとポートを入力します。クラスターに複数の FE ノードがある場合は、各ノードの IP アドレスとポートを入力できます。コンマ (,) で区切ります。例:ip1:port1,ip2:port2。接続性をテストすると、指定されたすべての FE endpoint がテストされます。

  • [Username]:Doris データベースのユーザー名を入力します。

  • [Password]:ユーザー名に対応するパスワードを入力します。

データ同期タスクの開発

同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。

付録:スクリプトのデモとパラメーター

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスクの設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトのデモ

{
  "type": "job",
  "version": "2.0",// バージョン番号。
  "steps": [
    {
      "stepType": "doris",// プラグイン名。
      "parameter": {
        "column": [// 列名。
          "id"
        ],
        "connection": [
          {
            "querySql": [
              "select a,b from join1 c join join2 d on c.id = d.id;"
            ],
            "datasource": ""// データソース名。
          }
        ],
        "where": "",// フィルター条件。
        "splitPk": "",// シャードキー。
        "encoding": "UTF-8"// エンコード形式。
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"// エラーレコード数。
    },
    "speed": {
      "throttle": true,// throttle を false に設定すると、mbps パラメーターは有効にならず、レート制限がないことを示します。throttle を true に設定すると、レート制限が適用されます。
      "concurrent": 1,// 同時実行ジョブ数。
      "mbps": "12"// レート制限。1 mbps は 1 MB/s に相当します。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Reader スクリプトのパラメーター

スクリプトパラメーター名

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

なし

table

データを同期するソーステーブルの名前。データ統合タスクは、1 つのテーブルからのみデータを読み取ることができます。

次の例は、table パラメーターを使用して範囲を設定する高度な使用法を示しています:

  • 範囲を設定して、シャーディングされたデータベースとテーブルからデータを読み取ることができます。たとえば、'table_[0-99]' は、'table_0''table_1''table_2' から 'table_99' までのデータを読み取ることを指定します。

  • テーブル名の数値サフィックスの長さが同じ場合 (例:'table_000''table_001''table_002' から 'table_999')、パラメーターを '"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]' に設定できます。

説明

タスクは、一致するすべてのテーブルからデータを読み取ります。具体的には、column パラメーターで指定された列からデータを読み取ります。テーブルまたは指定された列が存在しない場合、タスクは失敗します。

はい

なし

column

データを同期するソーステーブルの列。JSON 配列を使用してフィールド情報を記述します。デフォルトでは、すべての列が使用されます。例:[ * ]

  • 列の選択をサポート:特定のエクスポートする列を選択できます。

  • 列の並べ替えをサポート:テーブルスキーマとは異なる順序で列をエクスポートできます。

  • 定数設定をサポート:Doris SQL 構文に従う必要があります。例:["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id は通常の列名です。

    • table は予約語でもある列名です。

    • 1 は整数定数です。

    • 'mingya.wmy' は文字列定数です。シングルクォーテーションで囲む必要があることに注意してください。

    • null について:

      • " " は空文字列を示します。

      • null は null 値を示します。

      • 'null' は文字列 "null" を示します。

    • to_char(a+1) は文字列の長さを計算する関数です。

    • 2.3 は浮動小数点数です。

    • true はブール値です。

  • column パラメーターで同期する列を明示的に指定する必要があります。パラメーターを空にすることはできません。

はい

なし

splitPk

Doris Reader がデータを抽出する際に、splitPk パラメーターを指定すると、指定されたフィールドに基づいてデータがシャーディングされます。これにより、同時データ同期タスクが可能になり、効率が向上します。

  • splitPk にはテーブルのプライマリキーを使用することを推奨します。これは、プライマリキー列の値は通常均等に分散されており、シャードでのデータホットスポットを防ぐためです。

  • 現在、splitPk は整数データのシャーディングのみをサポートしています。文字列、浮動小数点数、日付、その他のデータ型はサポートしていません。サポートされていないデータ型を指定した場合、splitPk 機能は無視され、データは単一チャネルで同期されます。

  • splitPk パラメーターを指定しない場合、または値が空の場合、データは単一チャネルで同期されます。

いいえ

なし

where

フィルター条件。ビジネスシナリオでは、当日のデータのみを同期したい場合があります。この場合、where 条件を gmt_create>$bizdate に設定できます。

  • where 条件により、効率的な増分データ同期が可能になります。where 句を指定しない場合 (キーまたは値を省略した場合を含む)、すべてのデータが同期されます。

  • where 条件を limit 10 に設定することはできません。これは Doris SQL の WHERE 句の制約に準拠していません。

いいえ

なし

querySql (このパラメーターはコードエディタでのみ使用可能です。)

一部のビジネスシナリオでは、where パラメーターだけではフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルター SQL 文をカスタマイズできます。このパラメーターを設定すると、データ同期システムはテーブル、列、および splitPk パラメーターを無視し、このパラメーターで指定された SQL 文を使用してデータをフィルターします。たとえば、データを同期する前に複数のテーブルを結合するには、select a,b from table_a join table_b on table_a.id = table_b.id を使用します。querySql パラメーターを設定すると、Doris Reader は table、column、where、および splitPk パラメーターを無視します。querySql パラメーターは、tablecolumnwhere、および splitPk パラメーターよりも優先度が高くなります。datasource パラメーターは、ユーザー名やパスワードなどの情報を解析するために使用されます。

説明

querySql パラメーターは大文字と小文字を区別します。たとえば、querysql と入力した場合、パラメーターは有効になりません。

いいえ

なし

Writer スクリプトのデモ

{
  "stepType": "doris",// プラグイン名。
  "parameter":
  {
    "postSql":// データ同期タスクの実行後に実行される SQL 文。
    [],
    "preSql":
    [],// データ同期タスクの実行前に実行される SQL 文。
    "datasource":"doris_datasource",// データソース名。
    "table": "doris_table_name",// テーブル名。
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",// CSV 形式の列区切り文字。
      "line_delimiter": "\\x02"// CSV 形式の行区切り文字。
    }
  },
  "name": "Writer",
  "category": "writer"
}

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

なし

table

データを同期する宛先テーブルの名前。

はい

なし

column

データを書き込む宛先テーブルの列。列名をコンマ (,) で区切ります。例:"column":["id","name","age"]。すべての列に順番にデータを書き込むには、(*) を使用します。例:"column":["*"]

はい

なし

preSql

データ同期タスクの実行前に実行される SQL 文。コードレス UI では 1 つの SQL 文のみ実行できます。コードエディタでは複数の SQL 文を実行できます。たとえば、タスク実行前にテーブルから古いデータをクリアできます。

いいえ

なし

postSql

データ同期タスクの実行後に実行される SQL 文。コードレス UI では 1 つの SQL 文のみ実行できます。コードエディタでは複数の SQL 文を実行できます。たとえば、タスク実行後にタイムスタンプを追加できます。

いいえ

なし

maxBatchRows

インポートされるデータの各バッチの最大行数。このパラメーターと batchSize は、各バッチのインポートサイズを制御します。バッチ内のデータがいずれかのしきい値に達すると、そのバッチのインポートが開始されます。

いいえ

500000

batchSize

インポートされるデータの各バッチの最大データ量。このパラメーターと maxBatchRows は、各バッチのインポートサイズを制御します。バッチ内のデータがいずれかのしきい値に達すると、そのバッチのインポートが開始されます。

いいえ

104857600

maxRetries

バッチインポートが失敗した後の再試行回数。

いいえ

3

labelPrefix

アップロードされる各ファイルのラベルプレフィックス。最終的なラベルは、labelPrefix + UUID で構成されるグローバルに一意なラベルです。これにより、データが繰り返しインポートされないことが保証されます。

いいえ

datax_doris_writer_

loadProps

StreamLoad のリクエストパラメーター。このパラメーターは主に、インポートされるデータのフォーマットを設定するために使用されます。デフォルトでは、データは CSV 形式でインポートされます。loadProps パラメーターを設定しない場合、デフォルトの CSV 形式が使用されます。列区切り文字は \t で、行区切り文字は \n です。設定は次のとおりです。

"loadProps": {
    "format":"csv",
    "column_separator": "\t",
    "line_delimiter": "\n"
}

JSON 形式でデータをインポートする場合は、次の設定を使用します。

"loadProps": {
    "format": "json"
}

いいえ

なし

集計タイプのスクリプト (Doris Writer による集計タイプへの書き込み)

Doris Writer は、集計タイプのデータの書き込みをサポートしています。ただし、コードエディタで追加の設定を行う必要があります。次の例は、必要な設定を示しています。

たとえば、次の Doris テーブルを考えます。uuid 列は bitmap タイプ (集計タイプ) で、sex 列は HLL タイプ (集計タイプ) です。

CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- 集計タイプ
  `sex` HLL HLL_UNION  -- 集計タイプ
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32

テーブルに次の生データを挿入します:

user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'

Doris Writer を使用して集計タイプのデータを書き込む場合、writer.parameter.column で列を指定し、writer.parameter.loadProps.columns で集計関数を設定する必要があります。たとえば、uuid 列には bitmap_hash 集計関数を、sex 列には hll_hash 集計関数を使用できます。

次のコードはスクリプトの例です:

{
    "stepType": "doris",// プラグイン名。
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// bitmap 集計タイプ
                "sex"// HLL 集計タイプ
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 集計関数を指定する必要があります。
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",// データソース名。
                    "table": "doris_table_name",// テーブル名。
        }
          "name": "Writer",
              "category": "writer"
    }
}