すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:ルーチンロード

最終更新日:Jan 11, 2025

ルーチンロードを使用すると、長時間のインポートジョブを送信できます。 ルーチンロードを使用すると、指定されたデータソースから Doris に継続的にデータをインポートできます。 このトピックでは、ルーチンロードのしくみ、ルーチンロードの使用方法、およびベストプラクティスについて説明します。

制限事項

ルーチンロードでは、Kafka からのみデータをインポートできます。

  • 認証なしまたは SSL 認証を使用した Kafka クラスタへのアクセスがサポートされています。

  • メッセージは CSV または JSON 形式にすることができます。 CSV 形式では、各メッセージは 1 行として表され、行末は改行ではありません。

  • デフォルトでは、Kafka 0.10.0.0 以後がサポートされています。 0.9.0、0.8.2、0.8.1、0.8.0 などの 0.10.0.0 より前の Kafka バージョンを使用する場合は、バックエンド(BE)の構成を変更します。 kafka_broker_version_fallback パラメータを使用する以前のバージョンに設定する必要があります。 あるいは、ルーチンロードジョブを作成するときに、property.broker.version.fallback パラメータを以前のバージョンに設定することもできます。

    説明

    0.10.0.0 より前の Kafka バージョンを使用する場合、ルーチンロードの一部の機能が使用できなくなる可能性があります。 たとえば、Kafka パーティションに時間ベースのオフセットを設定することはできません。

しくみ

次の図は、クライアントがルーチンロードジョブをフロントエンド(FE)に送信する方法、および FE がジョブを BE に割り当てる方法を示しています。

+---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+
  1. FE は、JobScheduler を使用してルーチンロードジョブを複数のタスクに分割します。 各タスクは、データの特定の部分をインポートします。 各タスクは、TaskScheduler によって指定されたバックエンドに割り当てられ、実行されます。

  2. BE はタスクを通常のインポートジョブと見なし、Stream Load を使用してデータをインポートします。 インポートが完了すると、BE はインポート結果を FE に報告します。

  3. FE の JobScheduler は、インポート結果に基づいて新しいタスクの生成または失敗したタスクの再試行を続けます。

  4. FE は継続的に新しいタスクを生成して、中断のないデータインポートを実現します。

ルーチンロードを使用して Kafka からデータをインポートする

このセクションでは、ルーチンロードを使用して Kafka からデータをインポートする方法とベストプラクティスについて説明します。

ルーチンロードジョブを作成する

ルーチンロードジョブを作成するための詳細な構文については、「CREATE ROUTINE LOAD」をご参照ください。 HELP ROUTINE LOAD; コマンドを実行して、コマンドに関するヘルプ情報を表示することもできます。 次の例は、ルーチンロードジョブを作成する方法を示しています。

  • example_db データベースの example_tbl テーブルに対して、test1 という名前のルーチンロードジョブを作成します。 列区切り文字、グループ ID、およびクライアント ID を指定します。 デフォルトですべてのパーティションをサブスクライブするようにジョブを構成し、デフォルトのオフセットを OFFSET_BEGINNING に設定します。 OFFSET_BEGINNING は、データが使用可能な場所からデータ消費が開始されることを示します。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS TERMINATED BY ",",
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.group.id" = "xxx",
                "property.client.id" = "xxx",
                "property.kafka_default_offsets" = "OFFSET_BEGINNING"
            );
  • 厳密モードで、example_db データベースの example_tbl テーブルに対して test1 という名前のルーチンロードジョブを作成します。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
            WHERE k1 > 100 and k2 like "%doris%"
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "true"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );
  • JSON 形式のデータのインポート例

    ルーチンロードは、次の 2 種類の JSON 形式のデータのみをサポートしています。

    1 つのレコードのみを含む JSON オブジェクト:

    {"category":"a9jadhx","author":"test","price":895}

    複数のレコードを含めることができる JSON 配列:

    [
        {
            "category":"11",
            "author":"4avc",
            "price":895,
            "timestamp":1589191587
        },
        {
            "category":"22",
            "author":"2avc",
            "price":895,
            "timestamp":1589191487
        },
        {
            "category":"33",
            "author":"3avc",
            "price":342,
            "timestamp":1589191387
        }
    ]

    データのインポート先の Doris データテーブルを作成します。 ステートメントの例:

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20200509")),
        PARTITION p20200509 VALUES [("20200509"), ("20200510")),
        PARTITION p20200510 VALUES [("20200510"), ("20200511")),
        PARTITION p20200511 VALUES [("20200511"), ("20200512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
    PROPERTIES (
        "replication_num" = "1"
    );

    各レコードが JSON オブジェクトとして格納されている JSON 形式のデータをインポートします。 ステートメントの例:

    CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    COLUMNS(category,price,author)
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
     );

    複数のレコードが JSON 配列に格納されている JSON 形式のデータをインポートします。 ステートメントの例:

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json",
        "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
        "strip_outer_array" = "true"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
    );
    説明

    サンプルデータでは、Doris データテーブルのパーティションフィールド dt は使用できません。 したがって、dt フィールドの値は、ルーチンロードステートメントで dt=from_unixtime(timestamp, '%Y%m%d') を使用して生成されます。

    • 厳密モードとソースデータの関係

      • テーブルの列の型が TinyInt で、列に NULL 値のインポートが許可されている場合の厳密モードとソースデータの関係

        ソースデータ

        ソースデータの例

        文字列から整数へ

        厳密モード

        結果

        NULL

        \N

        該当なし

        true または false

        NULL

        NULL 以外

        aaa または 2000

        NULL

        true

        無効なデータ(フィルタリング済み)

        NULL 以外

        aaa

        NULL

        true

        NULL

        NULL 以外

        1

        1

        true または false

        正しいデータ

      • テーブルの列の型が Decimal(1,0) で、列に NULL 値のインポートが許可されている場合の厳密モードとソースデータの関係

        ソースデータ

        ソースデータの例

        文字列から整数へ

        厳密モード

        結果

        NULL

        \N

        該当なし

        true または false

        NULL

        NULL 以外

        aaa

        NULL

        true

        無効なデータ(フィルタリング済み)

        NULL 以外

        aaa

        NULL

        false

        NULL

        NULL 以外

        1 または 10

        1

        true または false

        正しいデータ

        説明

        値 10 は、Decimal(1,0) 型の有効な値ではありません。 ただし、値 10 は Decimal 型の要件を満たしているため、厳密モードではフィルタリングされません。 値 10 は、抽出、変換、ロード(ETL)プロセスでフィルタリングされます。

    • SSL 認証を使用して Kafka クラスタにアクセスする

      SSL 認証を使用して Kafka クラスタにアクセスするには、Kafka ブローカーの公開鍵証明書(ca.pem)を提供する必要があります。 Kafka クラスタでクライアント認証も有効になっている場合は、クライアントの公開鍵証明書(client.pem)、鍵ファイル(client.key)、およびパスワードを提供する必要があります。 CREATE FILE コマンドを実行して、ファイルを Doris にアップロードできます。 コマンドで、catalog パラメータを kafka に設定します。 HELP CREATE FILE; コマンドを実行して、CREATE FILE コマンドに関するヘルプ情報を表示できます。 次の例は、SSL 認証を使用して Kafka クラスタにアクセスする方法を示しています。

      1. ファイルをアップロードします。

        CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
        CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
        CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
      2. ルーチンロードジョブを作成します。

        CREATE ROUTINE LOAD db1.job1 on tbl1
        PROPERTIES
        (
            "desired_concurrent_number"="1"
        )
        FROM KAFKA
        (
            "kafka_broker_list"= "broker1:9091,broker2:9091",
            "kafka_topic" = "my_topic",
            "property.security.protocol" = "ssl",
            "property.ssl.ca.location" = "FILE:ca.pem",
            "property.ssl.certificate.location" = "FILE:client.pem",
            "property.ssl.key.location" = "FILE:client.key",
            "property.ssl.key.password" = "abcd***"
        );

      Doris は、Kafka の C++ API librdkafka を呼び出すことによって Kafka クラスタにアクセスします。 librdkafka でサポートされているパラメータの詳細については、librdkafka をご参照ください。

ルーチンロードジョブのステータスを表示する

SHOW ROUTINE LOAD コマンドを実行して、ルーチンロードジョブのステータスを表示できます。 コマンド構文と例の詳細については、HELP SHOW ROUTINE LOAD; コマンドを実行してください。

SHOW ROUTINE LOAD TASK コマンドを実行して、ルーチンロードジョブのタスクのステータスを表示できます。 コマンド構文と例の詳細については、HELP SHOW ROUTINE LOAD TASK; コマンドを実行してください。

説明

実行中のジョブのステータスのみを表示できます。 完了したジョブと実行されていないジョブのステータスは表示できません。

ルーチンロードジョブのプロパティを変更する

ALTER ROUTINE LOAD コマンドを実行して、作成済みのルーチンロードジョブのプロパティを変更できます。 詳細については、「ALTER ROUTINE LOAD」をご参照ください。 HELP ALTER ROUTINE LOAD; コマンドを実行して、コマンドに関するヘルプ情報を表示することもできます。

ルーチンロードジョブを停止、一時停止、または再開する

STOP、PAUSE、または RESUME コマンドを実行して、ルーチンロードジョブを停止、一時停止、または再開できます。 コマンド構文と例の詳細については、HELP STOP ROUTINE LOAD;HELP PAUSE ROUTINE LOAD;、または HELP RESUME ROUTINE LOAD; コマンドを実行してください。

追加情報

  • ルーチンロードジョブと ALTER TABLE 操作の関係

    • ルーチンロードジョブは、SCHEMA CHANGE または ROLLUP 操作をブロックしません。

      ただし、SCHEMA CHANGE 操作後にソースデータの列がデスティネーションテーブルの列と一致しない場合、誤ったデータレコードの数が増加し、最終的にジョブは一時停止されます。 この問題を防ぐために、ルーチンロードジョブで列マッピングを明示的に指定し、NULLABLE 列または DEFAULT 制約付きの列を使用することをお勧めします。

    • テーブルのパーティションを削除すると、パーティションが見つからないため、データのインポートに失敗する可能性があります。 この場合、ジョブは一時停止されます。

  • ルーチンインポートジョブと LOAD、DELETE、および INSERT 操作の関係

    • ルーチンロードジョブは、LOAD または INSERT 操作と競合しません。

    • テーブルで DELETE 操作を実行するには、対応するテーブルパーティションにデータがインポートされていないことを確認する必要があります。 したがって、DELETE 操作を実行する前に、ルーチンロードジョブを一時停止し、割り当て済みのすべてのタスクが完了するまで待つ必要があります。

  • ルーチンロードジョブと DROP DATABASE または DROP TABLE 操作の関係:ルーチンロードジョブがデータをインポートしているデータベースまたはテーブルが削除されると、ジョブはキャンセルされます。

  • ルーチンロードジョブと Kafka トピックの関係

    ジョブを作成するときにルーチンロードに指定した Kafka トピックが Kafka クラスタに存在しない場合、Kafka ブローカーは auto.create.topics.enable パラメータの設定に基づいてトピックを自動的に作成できます。

    • Kafka ブローカーの auto.create.topics.enable パラメータが true に設定されている場合、トピックは自動的に作成されます。 自動的に作成されるパーティションの数は、Kafka ブローカーの num.partitions パラメータによって決まります。 ルーチンロードジョブは、通常どおりトピックのデータを読み取り続けます。

    • Kafka ブローカーの auto.create.topics.enable パラメータが false に設定されている場合、トピックは自動的に作成されません。 この場合、ルーチンロードジョブはデータが使用可能になるまで一時停止されます。

    したがって、トピックを自動的に作成する場合は、Kafka クラスタのブローカーの auto.create.topics.enable パラメータを true に設定します。

  • ネットワーク分離環境で発生する可能性のある問題

    一部の環境では、CIDR ブロックまたはドメイン名解決の分離対策が存在します。 この場合、次の項目に注意してください。

    • Doris は、ルーチンロードジョブに指定されたブローカーリストのブローカーにアクセスできる必要があります。

    • Kafka クラスタでアドバタイズリスナーが構成されている場合、Doris はアドバタイズリスナーのアドレスにアクセスできる必要があります。

  • データ消費のパーティションとオフセットを指定する

    Doris では、データ消費のパーティションとオフセットを指定できます。 新しいバージョンでは、データ消費を開始する時点を指定することもできます。 次の 3 つのパラメータが関係しています。

    • kafka_partitions:データが消費されるパーティション。 例:"0, 1, 2, 3"。

    • kafka_offsets:各パーティションの開始オフセット。 このパラメータに指定するオフセットの数は、kafka_partitions パラメータに指定するパーティションの数と同じである必要があります。 例:"1000, 1000, 2000, 2000"。

    • property.kafka_default_offset:パーティションのデフォルトオフセット。

    ルーチンロードジョブを作成するときは、次の方法で 3 つのパラメータを組み合わせることができます。

    組み合わせ

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    動作

    1

    いいえ

    いいえ

    いいえ

    システムは Kafka トピックのすべてのパーティションを自動的に検出し、パーティションの末尾からデータ消費を開始します。

    2

    いいえ

    いいえ

    はい

    システムは Kafka トピックのすべてのパーティションを自動的に検出し、デフォルトのオフセットからデータ消費を開始します。

    3

    はい

    いいえ

    いいえ

    システムは、指定されたパーティションの末尾からデータ消費を開始します。

    4

    はい

    はい

    いいえ

    システムは、指定されたパーティションの指定されたオフセットからデータ消費を開始します。

    5

    はい

    いいえ

    はい

    システムは、指定されたパーティションのデフォルトオフセットからデータ消費を開始します。

  • STOP と PAUSE の違い

    FE は、STOPPED 状態のルーチンロードジョブを定期的に自動的にクリアします。 PAUSED 状態のルーチンロードジョブは再開できます。

パラメータ

次の表に、ルーチンロードの使用に影響を与える可能性のあるシステムパラメータを示します。

パラメータ

FE または BE

デフォルト値

説明

max_routine_load_task_concurrent_num

FE

5

ルーチンロードジョブを分割できるタスクの最大数。 ルーチンロードジョブの実行中にこのパラメータを変更できます。 デフォルト値を使用することをお勧めします。 パラメータが大きい値に設定されている場合、同時タスク数が過剰になり、大量のクラスタリソースを占有する可能性があります。

max_routine_load_task_num_per_be

FE

5

各 BE で同時に実行できるタスクの最大数。 ルーチンロードジョブの実行中にこのパラメータを変更できます。 デフォルト値を使用することをお勧めします。 パラメータが大きい値に設定されている場合、同時タスク数が過剰になり、大量のクラスタリソースを占有する可能性があります。

max_routine_load_job_num

FE

100

NEED_SCHEDULED、RUNNING、および PAUSED 状態のジョブを含む、ルーチンロードジョブの最大数。 ルーチンロードジョブの実行中にこのパラメータを変更できます。 ルーチンロードジョブの合計数が最大値に達すると、それ以上ジョブを送信できなくなります。

max_consumer_num_per_group

BE

3

データを消費するためにタスクで生成できるコンシューマーの最大数。 Kafka データソースの場合、コンシューマーは 1 つ以上のパーティションからデータを消費する場合があります。 たとえば、ジョブが 6 つのパーティションからデータを消費する場合、タスクは 3 つのコンシューマーを生成し、各コンシューマーは 2 つのパーティションからデータを消費します。 ジョブが 2 つのパーティションからのみデータを消費する場合、タスクは 2 つのコンシューマーを生成し、各コンシューマーは 1 つのパーティションからデータを消費します。

push_write_mbytes_per_sec

BE

10 MB/秒

ルーチンロードジョブを含む、すべてのインポートジョブの共通パラメータ。 このパラメータは、ディスクにデータを書き込むことができる最大速度を指定します。 SSD などの高性能ストレージデバイスの場合、このパラメータを大きい値に設定できます。

max_tolerable_backend_down_num

FE

0

許容できる失敗した BE の最大数。 このパラメータは、Doris が PAUSED 状態のジョブの再スケジュールを許可する前に満たす必要がある条件を指定します。 一時停止されたジョブが再スケジュールされると、ジョブの状態は RUNNING になります。 値 0 は、すべての BE が Alive 状態の場合にのみ再スケジュールが許可されることを示します。

period_of_auto_resume_min

FE

5 分

Doris がジョブの再スケジュールを試行できる期間。 Doris は、デフォルトで 5 分である期間内に最大 3 回ジョブの再スケジュールを試行します。 3 回すべて試行が失敗した場合、ジョブはロックされ、それ以上のスケジュールは実行されません。 ただし、ジョブを手動で再開できます。