すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:ルーチンロードを使用したデータのインポート

最終更新日:Jan 16, 2025

ルーチンロードを使用すると、常駐インポートジョブを送信して、特定のデータソースから ApsaraDB for SelectDB インスタンスにデータを継続的に読み込んでインポートできます。このトピックでは、ルーチンロードを使用して Kafka データソースから ApsaraDB for SelectDB インスタンスにデータをインポートする方法について説明します。

前提条件

  • データソースは Kafka データソースである必要があります。ルーチンロードジョブでは、認証なしの Kafka クラスタ、または PLAIN、SSL、または Kerberos 認証をサポートする Kafka クラスタにアクセスできます。

  • メッセージは CSV または JSON 形式である必要があります。CSV 形式では、各メッセージは行末に改行なしで 1 行として表示されます。

使用上の注意

デフォルトでは、Kafka 0.10.0.0 以降がサポートされています。 0.9.0、0.8.2、0.8.1、0.8.0 などの 0.10.0.0 より前のバージョンの Kafka を使用する場合、次のいずれかの方法を使用します。

  • バックエンド(BE)の構成で kafka_broker_version_fallback パラメータの値を使用する以前のバージョンの Kafka に設定できます。

  • ルーチンロードジョブの作成時に、property.broker.version.fallback パラメータの値を以前のバージョンの Kafka に設定することもできます。

説明

0.10.0.0 より前のバージョンの Kafka を使用する場合、ルーチンロードの一部の機能が使用できない場合があります。たとえば、Kafka パーティションに時間ベースのオフセットを設定することはできません。

ルーチンロードジョブの作成

ルーチンロードを使用するには、ルーチンロードジョブを作成する必要があります。ルーチンロードジョブは、ルーチン スケジューリングに基づいてタスクを継続的にスケジュールします。各タスクは、特定の数の Kafka メッセージを消費します。

構文

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

パラメータ

パラメータ

説明

[db.]job_name

ルーチンロードジョブの名前。データベース内で複数のジョブが同じ名前の場合、一度に実行できるジョブは 1 つだけです。

tbl_name

データをインポートする宛先テーブルの名前。

merge_type

インポートのデータマージモード。デフォルト値: APPEND。インポートが標準の追加操作であることを指定します。このパラメータは、一意キーモデルを使用するテーブルに対してのみ MERGE または DELETE に設定できます。このパラメータが MERGE に設定されている場合、DELETE ON ステートメントを使用して、削除フラグ列として機能する列を指定する必要があります。このパラメータが DELETE に設定されている場合、インポートされたすべてのデータが宛先テーブルから削除されます。

load_properties

インポートされたデータを処理するために使用されるパラメータ。詳細については、このトピックの load_properties のパラメータ セクションを参照してください。

job_properties

ルーチンロードジョブに関連するパラメータ。詳細については、このトピックの job_properties のパラメータ セクションを参照してください。

data_source_properties

データソースのタイプ。詳細については、このトピックの data_source_properties のパラメータ セクションを参照してください。

load_properties のパラメータ

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

パラメータ

説明

column_separator

COLUMNS TERMINATED BY ","

列の区切り文字。デフォルト値: \t

columns_mapping

(k1,k2,tmpk1,k3=tmpk1+1)

インポートされたファイルの列と宛先テーブルの列の間のマッピング、およびさまざまな列変換操作。詳細については、ソースデータの変換 を参照してください。

preceding_filter

該当なし

ソースデータをフィルタリングするための条件。詳細については、ソースデータの変換 を参照してください。

where_predicates

WHERE k1>100 and k2=1000

インポートされたデータをフィルタリングするための条件。詳細については、ソースデータの変換 を参照してください。

partitions

PARTITION(p1,p2,p3)

宛先テーブルでデータをインポートするパーティション。パーティションを指定しない場合、ソースデータは対応するパーティションに自動的にインポートされます。

DELETE ON

DELETE ON v3>100

インポートされたデータの削除フラグ列と計算関係を指定するために使用されるステートメント。

説明

このパラメータは、merge_type パラメータが MERGE に設定されている場合に必須です。このパラメータは、一意キーモデルを使用するテーブルに対してのみ有効です。

ORDER BY

該当なし

インポートされたデータのシーケンス列を指定するために使用されるステートメント。このパラメータは、インポート中に正しいデータの順序を維持するために使用されます。

説明

このパラメータは、一意キーモデルを使用するテーブルに対してのみ有効です。

ジョブプロパティのパラメーター

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
説明

ルーチンロードジョブは複数のタスクに分割されます。 max_batch_interval パラメータは、タスクの最大実行時間を指定します。 max_batch_rows パラメータは、タスクが読み取ることができる最大行数を指定します。 max_batch_size パラメータは、タスクが読み取ることができる最大バイト数を指定します。 3 つのパラメータで指定されたしきい値のいずれかに達すると、タスクは終了します。

パラメータ

説明

desired_concurrent_number

"desired_concurrent_number" = "3"

同時に実行できるタスクの最大数。値は 0 より大きい整数である必要があります。デフォルト値: 3。ルーチンロードジョブは複数のタスクに分割されます。このパラメータは、ジョブに対して同時に実行できるタスクの最大数を指定します。

説明
  1. 実際に同時に実行されるタスクの数は、指定されたしきい値に達しない場合があります。実際の数は、クラスタ内のノードの数、負荷、およびデータソースによって異なります。

  2. 同時実行性を高めると、分散クラスタに基づいてルーチンロードジョブを高速化できます。ただし、同時実行性が過度に大きい場合、サイズの小さいファイルが多数書き込まれる可能性があります。このパラメータを クラスタのコア数 / 16 に設定することをお勧めします。

max_batch_interval

"max_batch_interval" = "20"

各タスクの最大実行時間。単位: 秒。デフォルト値: 10。有効な値: 5 ~ 60。

max_batch_rows

"max_batch_rows" = "300000"

各タスクが読み取ることができる最大行数。デフォルト値: 200000。値は 200000 以上である必要があります。

max_batch_size

"max_batch_size" = "209715200"

各タスクが読み取ることができる最大バイト数。単位: バイト。デフォルト値: 104857600(100 MB)。有効な値は 100 MB ~ 1 GB の範囲です。

max_error_number

"max_error_number"="3"

サンプリングウィンドウで許容されるエラー行の最大数。デフォルト値: 0。エラー行が許可されていないことを指定します。値は 0 より大きい整数である必要があります。

サンプリングウィンドウは、max_batch_rows パラメータの値の 10 倍です。サンプリングウィンドウ内のエラー行の数がしきい値を超えると、ルーチンロードジョブは一時停止され、データ品質を確認するために手動介入が必要になります。

説明

WHERE 条件によって除外された行は、エラー行とは見なされません。

strict_mode

"strict_mode"="true"

厳密モードを有効にするかどうかを指定します。デフォルト値: false。厳密モードでは、ソース列の NOT NULL 値が列型変換後に対応する宛先列の NULL 値に変換された場合、その値は除外されます。厳密モードが有効になっている場合、列型変換後のデータは、次のルールに基づいてインポートプロセス中に厳密にフィルタリングされます。

  • エラーデータは、列型変換後に除外されます。 エラーデータとは、列型変換後にソース列の NOT NULL データから宛先列に生成された NULL データを指します。

  • 厳密モードは、値が関数によって生成される宛先列には適用されません。

  • 宛先列が値を特定の範囲に制限し、ソース列の値が型変換をサポートしているが、変換された値が範囲に属さない場合、厳密モードは宛先列には適用されません。 たとえば、ソース列の値が 10 で、宛先列の型が DECIMAL(1,0) であるとします。値 10 は変換できますが、変換された値は宛先列に指定された範囲に属していません。この場合、厳密モードは宛先列には適用されません。

timezone

"timezone" = "Africa/Abidjan"

ルーチンロードジョブに使用されるタイムゾーン。デフォルトでは、セッションのタイムゾーンが使用されます。

説明

このパラメータは、ルーチンロードジョブに含まれるすべてのタイムゾーン関連関数の結果に影響します。

format

"format" = "json"

インポートされたデータの形式。デフォルト値: CSV。 JSON 形式もサポートされています。

jsonpaths

-H "jsonpaths:[\"$.k2\",\"$.k1\"]"

インポートされたデータが JSON 形式の場合、JSON データから抽出されるフィールド。

strip_outer_array

-H "strip_outer_array:true"

インポートされたデータが JSON 形式の場合、JSON データを配列として表示するかどうかを指定します。 strip_outer_array パラメータが true に設定されている場合、JSON データは配列として表示されます。 JSON データの各要素は、データの行と見なされます。デフォルト値: false

json_root

-H "json_root:$.RECORDS"

インポートされたデータが JSON 形式の場合、JSON データのルートノード。 ApsaraDB for SelectDB は、json_root パラメータで指定されたルートノードの要素を抽出して解析します。デフォルトでは、このパラメータは空のままです。

send_batch_parallelism

該当なし

バッチ処理のためにデータを送信する同時ジョブの最大数。このパラメータの値が BE 構成の max_send_batch_parallelism_per_job パラメータの値よりも大きい場合、BE は max_send_batch_parallelism_per_job パラメータの値を使用します。

load_to_single_tablet

該当なし

パーティションの 1 つのタブレットのみにデータをインポートするかどうかを指定します。デフォルト値: false。このパラメータは、重複キーモデルを使用し、ランダムパーティションを含むテーブルにデータをインポートする場合にのみ使用できます。

厳密モードとインポートされるソースデータの関係

この例では、TINYLNT 型の列をインポートします。次の表は、システムが NULL 列値のインポートを許可する場合の厳密モードとソースデータの関係を示しています。

ソースデータ

STRING から INT へ

厳密モード

結果

NULL

\N

該当なし

true または false

NULL

NOT NULL

aaa または 2000

NULL

true

無効なデータ(フィルタリング済み)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1

1

true または false

正しいデータ

この例では、DECIMAL(1,0) 型の列をインポートします。次の表は、システムが NULL 列値のインポートを許可する場合の厳密モードとソースデータの関係を示しています。

ソースデータ

STRING から INT へ

厳密モード

結果

NULL

\N

該当なし

true または false

NULL

NOT NULL

aaa

NULL

true

無効なデータ(フィルタリング済み)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1 または 10

1

true または false

正しいデータ

説明

値 10 は、DECIMAL(1,0) 型の許容範囲を超えています。ただし、値 10 は DECIMAL 型の要件を満たしているため、厳密モードでは除外されません。値 10 は最終的に ETL(抽出、変換、ロード)プロセスで除外されます。

data_source_properties のパラメーター

FROM KAFKA
(
    "key1" = "val1",
    "key2" = "val2"
)

パラメータ

説明

kafka_broker_list

Kafkaクラスター内のブローカーへの接続に使用する構成。形式: ip:host。複数の構成はカンマ (,) で区切ります。

例: "kafka_broker_list"="broker1:9092,broker2:9092"

kafka_topic

サブスクライブする Kafka トピックです。

形式: "kafka_topic"="my_topic"

kafka_topic

サブスクライブする Kafka パーティションと各パーティションの開始オフセット。特定の時点を指定した場合、データ消費はその時点以降の最新のオフセットから開始されます。

0 以上のオフセットを指定できます。または、kafka_offsets パラメーターを次のいずれかの値に設定します。

  • OFFSET_BEGINNING: データが利用可能なオフセットからパーティションをサブスクライブします。

  • OFFSET_END: 末尾のオフセットからパーティションをサブスクライブします。

  • yyyy-MM-dd HH:mm:ss 形式で時点を指定します。例:2021-05-22 11:00:00。

このパラメーターを指定しない場合、システムは end offset からトピック内のすべてのパーティションをサブスクライブします。

例:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"
重要

時刻形式とオフセット形式を混在させることはできません。

property

カスタム Kafka パラメーター。このパラメーターは、Kafka シェルの --property パラメーターと同等です。

このパラメーターの値がファイルの場合、値の前にキーワード FILE: を追加する必要があります。

プロパティ パラメーター

  • SSL認証方式を使用してKafkaクラスターに接続する場合は、次のパラメーターを構成する必要があります。

    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"

    property.security.protocol パラメーターと property.ssl.ca.location パラメーターは、Kafkaクラスターへの接続に使用される方式と認証局(CA)証明書の場所を指定するために必要です。

    Kafkaサーバーでクライアント認証モードが有効になっている場合は、次のパラメーターを構成する必要があります。

    "property.ssl.certificate.location"
    "property.ssl.key.location"
    "property.ssl.key.password"

    上記のパラメーターは、クライアントの公開鍵証明書の場所、クライアントの秘密鍵ファイルの場所、およびクライアントの秘密鍵にアクセスするためのパスワードを指定するために使用されます。

  • Kafkaパーティションのデフォルトの開始オフセットを指定します。

    デフォルトでは、kafka_partitions/kafka_offsets パラメーターが指定されていない場合、すべてのパーティションのデータが消費されます。この場合、kafka_default_offsets パラメーターを構成して、各パーティションの開始オフセットを指定できます。デフォルト値:OFFSET_END。これは、パーティションが終了オフセットからサブスクライブされることを示します。

    "property.kafka_default_offsets" = "OFFSET_BEGINNING" // 開始オフセットを指定

サポートされているカスタムパラメーターの詳細については、「librdkafka の公式構成ドキュメントのクライアント構成項目」をご参照ください。たとえば、次のカスタムパラメーターを使用できます。

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

ルーチンロードジョブの作成

  1. ApsaraDB for SelectDBインスタンスにデータを読み込むテーブルを作成します。サンプルコード:

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 次のサンプルコードのパラメーターを構成して、データを読み込みます。

    • example_dbデータベースのtest_tableテーブルに対して、test1という名前のRoutine Loadジョブを作成します。グループID、クライアントID、および列区切り文字を指定し、デフォルトですべてのパーティションのデータを自動的に使用するようにシステムを有効にし、データが使用可能な開始オフセットからパーティションをサブスクライブします。サンプルコード:

      CREATE ROUTINE LOAD example_db.test1 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • example_dbデータベースのtest_tableテーブルに対して、test2という名前のRoutine Loadジョブを作成します。ジョブに対して厳密モードを有効にします。サンプルコード:

      CREATE ROUTINE LOAD example_db.test2 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • 指定した時点からパーティションのデータを使用します。サンプルコード:

      CREATE ROUTINE LOAD example_db.test4 ON test_table
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "30",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200"
      ) FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offset" = "2024-01-21 10:00:00"
      );

JSON データのインポート

Routine Loadを使用して、次の2種類のJSONデータをインポートできます。

  • JSONデータには、データレコードが1つだけ含まれており、JSONオブジェクトです。

    単一テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、ON TABLE_NAMEステートメントを実行することでテーブル名が指定されます。

    {"key1":"value1","key2":"value2","key3":"value3"}

    動的インポートモードまたは複数テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、テーブル名は指定されません。

    table_name|{"key1":"value1","key2":"value2","key3":"value3"}
  • JSONデータは、複数のデータレコードを含む配列です。

    単一テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、ON TABLE_NAMEステートメントを実行することでテーブル名が指定されます。

    [
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

    動的インポートモードまたは複数テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、テーブル名は指定されません。

       table_name|[
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

JSON データをインポートします。

  1. ApsaraDB for SelectDBインスタンスにデータを読み込みたいテーブルを作成します。サンプルコード:

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"  // OLAP
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
  2. 上記の2種類のJSONデータをトピックにインポートします。サンプルコード:

    {
        "category":"value1331",
        "author":"value1233",
        "timestamp":1700346050,
        "price":1413
    }
    [
        {
            "category":"value13z2",
            "author":"vaelue13",
            "timestamp":1705645251,
            "price":14330
        },
        {
            "category":"lvalue211",
            "author":"lvalue122",
            "timestamp":1684448450,
            "price":24440
        }
    ]
  3. JSONデータをさまざまなモードでインポートします。

    • シンプルモードでJSONデータをインポートします。サンプルコード:

      CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
      COLUMNS(category,price,author)
      PROPERTIES
      (
          "desired_concurrent_number"="3", // 必要な同時実行数
          "max_batch_interval" = "20", // 最大バッチ間隔
          "max_batch_rows" = "300000", // 最大バッチ行数
          "max_batch_size" = "209715200", // 最大バッチサイズ
          "strict_mode" = "false", // 厳格モード
          "format" = "json" // フォーマット
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", // Kafkaブローカーリスト
          "kafka_topic" = "my_topic", // Kafkaトピック
          "kafka_partitions" = "0,1,2", // Kafkaパーティション
          "kafka_offsets" = "0,0,0" // Kafkaオフセット
       );
    • JSON データを正確にインポートします。サンプルコード:

      CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
      COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      PROPERTIES
      (
          "desired_concurrent_number"="3", // 必要な同時実行数
          "max_batch_interval" = "20", // 最大バッチ間隔
          "max_batch_rows" = "300000", // 最大バッチ行数
          "max_batch_size" = "209715200", // 最大バッチサイズ
          "strict_mode" = "false", // 厳格モード
          "format" = "json", // フォーマット
          "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", // JSONパス
          "strip_outer_array" = "true" // 外側の配列を削除
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", // Kafkaブローカーリスト
          "kafka_topic" = "my_topic", // Kafkaトピック
          "kafka_partitions" = "0,1,2", // Kafkaパーティション
          "kafka_offsets" = "0,0,0" // Kafkaオフセット
      );
      説明

      サンプルデータでは、テーブルのパーティションフィールド dt は使用できません。 dt フィールドの値は、CREATE ROUTINE LOAD ステートメントの dt=from_unixtime(timestamp,'%Y%m%d') 構成を使用して生成されます。

さまざまな認証方式を使用する Kafka クラスターへのアクセス

次の例は、Kafkaクラスターの認証方式に基づいてKafkaクラスターにアクセスする方法を示しています。

  1. SSL認証方式が有効になっているKafkaクラスタにアクセスします。

    SSL認証方式が有効になっているKafkaクラスタにアクセスするには、Kafkaブローカーの公開鍵を認証するために使用される証明書ファイル(ca.pem)を提供する必要があります。 Kafkaクラスタでクライアント認証モードが有効になっている場合は、クライアントの公開鍵証明書(client.pem)、秘密鍵ファイル(client.key)、および秘密鍵のパスワードも必要です。必要なファイルは、CREATE FILE ステートメントを実行して、事前にApsaraDB for SelectDBにアップロードする必要があります。カタログ名は kafka です。

    1. ファイルをアップロードします。サンプルコード:

      CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
      CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
      CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. ルーチンロードジョブを作成します。サンプルコード:

      CREATE ROUTINE LOAD db1.job1 on tbl1
      PROPERTIES
      (
          "desired_concurrent_number"="1"
      )
      FROM KAFKA
      (
          "kafka_broker_list"= "broker1:9091,broker2:9091",
          "kafka_topic" = "my_topic",
          "property.security.protocol" = "ssl",
          "property.ssl.ca.location" = "FILE:ca.pem",
          "property.ssl.certificate.location" = "FILE:client.pem",
          "property.ssl.key.location" = "FILE:client.key",
          "property.ssl.key.password" = "abcdefg"
      );
      説明

      ApsaraDB for SelectDBは、Kafka C++クライアントライブラリ librdkafka を使用してKafkaクラスタにアクセスします。 librdkafkaでサポートされているパラメータの詳細については、librdkafkaConfiguration properties ドキュメントをご参照ください。

  2. PLAIN認証方式が有効になっているKafkaクラスタにアクセスします。

    PLAIN認証方式が有効になっているKafkaクラスタにアクセスするには、次の設定を追加する必要があります。

    1. property.security.protocol=SASL_PLAINTEXT: Simple Authentication and Security Layer(SASL)プレーンテキスト認証方式を使用します。

    2. property.sasl.mechanism=PLAIN: SASL認証方式をPLAINに設定します。

    3. property.sasl.username=admin: SASLのユーザー名を指定します。

    4. property.sasl.password=admin: SASLのパスワードを指定します。

    ルーチンロードジョブを作成します。サンプルコード:

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol"="SASL_PLAINTEXT",
        "property.sasl.mechanism"="PLAIN",
        "property.sasl.username"="admin",
        "property.sasl.password"="admin"
    );
    
  3. Kerberos認証方式が有効になっているKafkaクラスタにアクセスします。

    Kerberos認証方式が有効になっているKafkaクラスタにアクセスするには、次の設定を追加する必要があります。

    1. security.protocol=SASL_PLAINTEXT: SASLプレーンテキスト認証方式を使用します。

    2. sasl.kerberos.service.name=$SERVICENAME: ブローカーサービス名を指定します。

    3. sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab: ローカルの.keytabファイルのパスを指定します。

    4. sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}: ApsaraDB for SelectDBがKafkaクラスタへの接続に使用するKerberosプリンシパルを指定します。

    ルーチンロードジョブを作成します。サンプルコード:

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol" = "SASL_PLAINTEXT",
        "property.sasl.kerberos.service.name" = "kafka",
        "property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
        "property.sasl.kerberos.principal" = "id@your.com"
    );
    説明
    • ApsaraDB for SelectDBがKerberos認証方式が有効になっているKafkaクラスタにアクセスできるようにするには、ApsaraDB for SelectDBクラスタのすべての実行ノードにKerberosクライアント kinit をデプロイし、krb5.confファイルを構成し、Key Distribution Center(KDC)サービス情報を指定する必要があります。

    • property.sasl.kerberos.keytab パラメータをローカルの.keytabファイルの絶対パスに設定し、ApsaraDB for SelectDBプロセスがローカルの.keytabファイルにアクセスできるようにします。

ルーチンロードジョブの変更

PAUSED 状態の既存のルーチンロードジョブを変更できます。

構文

ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]

パラメーター

パラメーター

説明

[db.]job_name

変更するジョブの名前。

tbl_name

データのインポート先となるテーブルの名前。

job_properties

変更するジョブパラメーター。変更できるパラメーターは次のとおりです。

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source

データソースのタイプ。このパラメーターを KAFKA に設定します。

data_source_properties

データソースのパラメーター。次のパラメーターのみがサポートされています。

  1. kafka_partitions

  2. kafka_offsets

  3. kafka_broker_list

  4. kafka_topic

  5. property.group.id などのカスタムプロパティ。

説明

kafka_partitions パラメーターと kafka_offsets パラメーターは、消費される Kafka パーティションのオフセットを変更するために使用されます。消費されるパーティションのみを変更できます。パーティションを追加することはできません。

  • 値を サンプルコード:必要な同時実行数 パラメーターを 1 に設定

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "1"
    );
  • 値を変更します サンプルコード:必要な同時実行数 パラメーターを 10 に設定し、パーティションオフセットとグループ ID を変更します

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "10"
    )
    FROM kafka
    (
        "kafka_partitions" = "0, 1, 2",
        "kafka_offsets" = "100, 200, 100",
        "property.group.id" = "new_group"
    );

ルーチンロードジョブの一時停止

PAUSE文を実行することでルーチンロードジョブを一時停止し、RESUME文を実行することで一時停止したジョブを再開できます。

構文

PAUSE [ALL] ROUTINE LOAD FOR <job_name>;

パラメーター

パラメーター

説明

[db.]job_name

一時停止するジョブの名前。

  • test1という名前のルーチンロードジョブを一時停止するには、次の文を実行します:

    PAUSE ROUTINE LOAD FOR test1;
  • すべてのルーチンロードジョブを一時停止するには、次の文を実行します:

    PAUSE ALL ROUTINE LOAD;

ルーチンロードジョブの再開

一時停止したルーチンロードジョブを再開できます。再開されたジョブは、最後に消費されたオフセットからデータの消費を続けます。

構文

RESUME [ALL] ROUTINE LOAD FOR <job_name>

パラメーター

パラメーター

説明

[db.]job_name

再開するルーチンロードジョブの名前。

  • test1 という名前のルーチンロードジョブを再開するには、次のステートメントを実行します。

    RESUME ROUTINE LOAD FOR test1;
  • すべてのルーチンロードジョブを再開するには、次のステートメントを実行します。

    RESUME ALL ROUTINE LOAD;

ルーチンロードジョブの停止

ルーチンロードジョブを停止できます。停止されたルーチンロードジョブは再起動できません。ルーチンロードジョブが停止された後、インポートされたデータはロールバックできません。

構文

STOP ROUTINE LOAD FOR <job_name>;

パラメーター

パラメーター

説明

[db.]job_name

停止するジョブの名前。

test1 という名前のルーチンロードジョブを停止するには、次のステートメントを実行します。

STOP ROUTINE LOAD FOR test1;

1 つ以上のルーチンロードジョブのクエリ

SHOW ROUTINE LOAD ステートメントを実行して、1 つ以上のルーチンロードジョブのステータスをクエリできます。

構文

SHOW [ALL] ROUTINE LOAD [FOR job_name];

パラメーター

パラメーター

説明

[db.]job_name

クエリするジョブの名前。

説明

インポートされたデータの形式が無効な場合、詳細なエラー情報は ErrorLogUrls パラメーターの値に記録されます。 ErrorLogUrls パラメーターの値には複数の URL が含まれています。 URL の 1 つをコピーして、ブラウザーでエラー情報をクエリできます。

  • 次のステートメントを実行して、停止およびキャンセルされたジョブを含む、test1 という名前のすべてのルーチンロードジョブをクエリします。 結果出力には、各ジョブが別々の行に表示され、ジョブの数に応じて 1 つ以上の行で構成される場合があります。

    SHOW ALL ROUTINE LOAD FOR test1;
  • 次のステートメントを実行して、test1 という名前の進行中のルーチンロードジョブをクエリします。

    SHOW ROUTINE LOAD FOR test1;
  • 次のステートメントを実行して、停止およびキャンセルされたジョブを含む、example_db データベース内のすべてのルーチンロードジョブをクエリします。 結果出力には、各ジョブが別々の行に表示され、ジョブの数に応じて 1 つ以上の行で構成される場合があります。

    use example_db;
    SHOW ALL ROUTINE LOAD;
  • 次のステートメントを実行して、example_db データベース内のすべての進行中のルーチンロードジョブをクエリします。

    use example_db;
    SHOW ROUTINE LOAD;
  • 次のステートメントを実行して、example_db データベース内の test1 という名前の進行中のルーチンロードジョブをクエリします。

    SHOW ROUTINE LOAD FOR example_db.test1;
  • 次のステートメントを実行して、停止およびキャンセルされたジョブを含む、example_db データベース内の test1 という名前のすべてのルーチンロードジョブをクエリします。 結果出力には、各ジョブが別々の行に表示され、ジョブの数に応じて 1 つ以上の行で構成される場合があります。

    SHOW ALL ROUTINE LOAD FOR example_db.test1;

関連システム構成

関連システム構成は、Routine Load の使用に影響します。

  • max_routine_load_task_concurrent_num

    フロントエンド(FE)パラメーターです。デフォルト値:5。実行時にパラメーターを変更できます。このパラメーターは、Routine Load ジョブに対して同時に実行できるタスクの最大数を指定します。デフォルト値を使用することをお勧めします。パラメーターに大きな値を設定すると、同時実行タスク数が過剰になり、クラスターリソースを大量に占有する可能性があります。

  • max_routine_load_task_num_per_be

    FE パラメーターです。デフォルト値:5。実行時にパラメーターを変更できます。このパラメーターは、各 BE ノードで同時に実行できるタスクの最大数を指定します。デフォルト値を使用することをお勧めします。パラメーターに大きな値を設定すると、同時実行タスク数が過剰になり、クラスターリソースを大量に占有する可能性があります。

  • max_routine_load_job_num

    FE パラメーターです。デフォルト値:100。実行時にパラメーターを変更できます。このパラメーターは、NEED_SCHEDULED、RUNNING、または PAUSED 状態のジョブを含む、送信できる Routine Load ジョブの最大数を指定します。送信した Routine Load ジョブの合計数が最大値に達すると、それ以上のジョブを送信できなくなります。

  • max_consumer_num_per_group

    BE パラメーターです。デフォルト値:3。このパラメーターは、タスクのデータを使用するために生成できるコンシューマーの最大数を指定します。Kafka データソースの場合、コンシューマーは 1 つ以上の Kafka パーティションのデータを使用する場合があります。タスクが 6 つの Kafka パーティションのデータを使用する場合、3 つのコンシューマーが生成されます。各コンシューマーは 2 つのパーティションのデータを使用します。パーティションが 2 つしかない場合は、2 つのコンシューマーのみが生成され、各コンシューマーは 1 つのパーティションのデータを使用します。

  • max_tolerable_backend_down_num

    FE パラメーターです。デフォルト値:0。特定の条件が満たされると、ApsaraDB for SelectDB は PAUSED 状態のジョブを再スケジュールします。その後、再スケジュールされたジョブの状態は RUNNING に変わります。値 0 は、すべての BE ノードが稼働している場合にのみジョブを再スケジュールできることを示します。

  • period_of_auto_resume_min

    FE パラメーターです。デフォルト値は 5 分です。これは、ApsaraDB for SelectDB が 5 分以内にジョブを最大 3 回再スケジュールすることを示します。ジョブが 3 回再スケジュールに失敗した場合、ジョブはロックされ、再スケジュールされなくなります。ただし、手動で介入してジョブを再開できます。

その他の説明

  • Routine LoadジョブとALTER TABLE操作の関係

    • Routine Loadジョブは、SCHEMA CHANGE操作またはROLLUP操作をブロックしません。ただし、SCHEMA CHANGE操作後にソースデータの列がデスティネーションテーブルの列と一致しない場合、エラーデータレコードの数が増加し、最終的にジョブは一時停止します。この問題を防ぐために、Routine Loadジョブで列マッピングを明示的に指定し、NULLABLE列またはDEFAULT制約付きの列を使用することをお勧めします。

    • テーブルのパーティションを削除すると、パーティションが見つからないため、データのインポートに失敗する可能性があります。この場合、ジョブは一時停止します。

  • Routine LoadジョブとLOAD、DELETE、およびINSERT操作の関係

    • Routine Loadジョブは、LOAD操作またはINSERT操作と競合しません。

    • テーブルに対してDELETE操作を実行するには、対応するテーブルパーティションにデータがインポートされていないことを確認する必要があります。そのため、DELETE操作を実行する前に、Routine Loadジョブを一時停止し、割り当て済みのすべてのタスクが完了するまで待つ必要があります。

  • Routine LoadジョブとDROP DATABASEまたはDROP TABLE操作の関係

    Routine Loadジョブがデータをインポートしているデータベースまたはテーブルが削除されると、ジョブは自動的にキャンセルされます。

  • KafkaクラスタのRoutine LoadジョブとKafkaトピックの関係

    CREATE ROUTINE LOADステートメントで宣言されているKafka topicがKafkaクラスタに存在しない場合、Kafkaブローカーは auto.create.topics.enable パラメータの設定に基づいてトピックを自動的に作成できます。

    • Kafkaブローカーのauto.create.topics.enableパラメータがtrueに設定されている場合、トピックは自動的に作成されます。自動的に作成されるパーティションの数は、Kafkaブローカーの num.partitions パラメータによって決まります。 Routine Loadジョブは、トピックのデータを読み取り続けます。

    • Kafkaブローカーのauto.create.topics.enableパラメータがfalseに設定されている場合、トピックは自動的に作成されません。この場合、データが利用可能になるまで、Routine Loadジョブは一時停止されます。

    そのため、トピックを自動的に作成する場合は、Kafkaクラスタのブローカーのauto.create.topics.enableパラメータをtrueに設定します。

  • 環境におけるCIDRブロックとドメイン名解決の分離に関する考慮事項

    • Routine Loadジョブの作成時に指定するブローカーは、ApsaraDB for SelectDBからアクセスできる必要があります。

    • Kafkaでadvertised.listenersパラメータが設定されている場合、advertised.listenersパラメータの値のアドレスは、ApsaraDB for SelectDBからアクセスできる必要があります。

  • データ消費のパーティションとオフセットを指定する

    ApsaraDB for SelectDBでは、データ消費のパーティション、オフセット、および時点を指定できます。次のセクションでは、パラメータについて説明します。

    • kafka_partitions: データが消費されるパーティション。例: "0,1,2,3"。

    • kafka_offsets: 各パーティションの開始オフセット。このパラメータに指定するオフセットの数は、kafka_partitionsパラメータに指定するパーティションの数と同じである必要があります。例: "1000,1000,2000,2000"。

    • property.kafka_default_offset: パーティションのデフォルトの開始オフセット。

    Routine Loadジョブを作成するときは、次の表に示す5つの方法のいずれかを使用して、3つのパラメータを組み合わせることができます。

    方法

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    動作

    1

    いいえ

    いいえ

    いいえ

    システムはKafkaトピックのすべてのパーティションを自動的に検索し、パーティションの終了オフセットからデータの消費を開始します。

    2

    いいえ

    いいえ

    はい

    システムはKafkaトピックのすべてのパーティションを自動的に検索し、デフォルトのオフセットからデータの消費を開始します。

    3

    はい

    いいえ

    いいえ

    システムは、指定されたパーティションの終了オフセットからデータの消費を開始します。

    4

    はい

    はい

    いいえ

    システムは、指定されたパーティションの指定されたオフセットからデータの消費を開始します。

    5

    はい

    いいえ

    はい

    システムは、指定されたパーティションのデフォルトのオフセットからデータの消費を開始します。

  • STOPPED状態とPAUSED状態の違い

    FEは、定期的にSTOPPED状態のRoutine Loadジョブを自動的にクリアします。 PAUSED状態のRoutine Loadジョブは再開できます。