すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Kudu コネクタ

最終更新日:Mar 26, 2026

Trino で Kudu コネクタを使用すると、Apache Kudu テーブル内のデータのクエリ、挿入、削除ができます。

前提条件

開始する前に、以下が準備できていることを確認してください。

  • Kudu サービス (バージョン 1.10 以降) を備えた Hadoop クラスター

  • Hadoop クラスターへのネットワーク接続を持つ Trino クラスター

これらのクラスターを作成するには、「クラスターの作成」をご参照ください。

制限事項

  • Kudu コネクタには、Kudu 1.10 以降が必要です。

  • Kudu のテーブル名と列名には、小文字のみ使用できます。

  • Trino クラスターと Hadoop クラスターの間にネットワーク接続が存在する必要があります。

Kudu コネクタの設定

E-MapReduce (EMR) コンソールで、Trino サービスページの [設定] タブに移動し、[kudu.properties] をクリックします。要件に応じて設定項目を修正または追加します。

コネクタ設定の詳細なリファレンスについては、「コネクタの設定」をご参照ください。

以下は、サポートされているすべての設定項目を含む完全な kudu.properties テンプレートです。

connector.name=kudu

## 必須: Kudu マスターアドレス。複数のアドレスはカンマで区切ります。
## サポートされる形式: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051,
##                    [2001:db8::1], [2001:db8::1]:7051, 2001:db8::1
## localhost を Kudu マスターノードの IP アドレスまたはホスト名 (例: master-1-1) に変更します。
kudu.client.master-addresses=localhost

## スキーマエミュレーションにより、Trino は命名規則を使用して Kudu テーブルをスキーマにマッピングできます。
## デフォルトでは、すべてのテーブルは "default" スキーマに表示されます。
#kudu.schema-emulation.enabled=false

## スキーマエミュレーションが有効な場合に使用されるプレフィックス。標準のプレフィックスは "presto::" です。空のプレフィックスも有効です。
## kudu.schema-emulation.enabled=true の場合にのみ必須です。
#kudu.schema-emulation.prefix=

## Kudu Java クライアントの高度な設定

## 管理操作 (例: CREATE TABLE、DROP TABLE) のタイムアウト。デフォルト: 30s。
#kudu.client.default-admin-operation-timeout=30s

## ユーザー操作のタイムアウト。デフォルト: 30s。
#kudu.client.default-operation-timeout=30s

## ソケットからのデータ待機タイムアウト。デフォルト: 10s。
#kudu.client.default-socket-read-timeout=10s

## Kudu クライアントの統計収集を無効にするかどうか。デフォルト: false。
#kudu.client.disable-statistics=false
重要

デフォルトで kudu.properties にない設定項目を追加するには、[kudu.properties] タブで [設定項目を追加] をクリックします。詳細については、「設定項目の追加」をご参照ください。

データのクエリ

Apache Kudu はネイティブではスキーマをサポートしていませんが、Kudu コネクタは命名規則によってスキーマをエミュレートできます。

スキーマエミュレーションが無効な場合 (デフォルト)

スキーマエミュレーションが無効な場合、すべての Kudu テーブルは default スキーマに表示されます。

orders テーブルを完全修飾名でクエリします。

SELECT * FROM kudu.default.orders;

kudu をカタログとして、default をスキーマとして設定した場合、クエリは次のように簡略化されます。

SELECT * FROM orders;

テーブル名に特殊文字が含まれる場合は、二重引用符で囲みます。

SELECT * FROM kudu.default."special.table!";

クイック例:テーブルの作成とクエリ

  1. default スキーマに users という名前のテーブルを作成します。

    CREATE TABLE kudu.default.users (
      user_id int WITH (primary_key = true),
      first_name varchar,
      last_name varchar
    ) WITH (
      partition_by_hash_columns = ARRAY['user_id'],
      partition_by_hash_buckets = 2
    );
    テーブルを作成する際は、プライマリキー、列のエンコーディングまたは圧縮形式、およびパーティション情報 (ハッシュまたはレンジ) を指定します。
  2. テーブルスキーマを確認します。

    DESCRIBE kudu.default.users;

    期待される出力:

       Column   |  Type   |                      Extra                      | Comment
    ------------+---------+-------------------------------------------------+---------
     user_id    | integer | primary_key, encoding=auto, compression=default |
     first_name | varchar | nullable, encoding=auto, compression=default    |
     last_name  | varchar | nullable, encoding=auto, compression=default    |
    (3 rows)
  3. 行を挿入します。

    INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
  4. データをクエリします。

    SELECT * FROM kudu.default.users;

スキーマエミュレーションが有効な場合

etc/catalog/kudu.propertieskudu.schema-emulation.enabled=true を設定して、スキーマエミュレーションを有効にします。これにより、Kudu テーブルはその名前に基づいて Trino スキーマにマッピングされます。

空のプレフィックスでのマッピング (kudu.schema-emulation.prefix=)

Kudu テーブル名Trino テーブル名
orderskudu.default.orders
part1.part2kudu.part1.part2
x.y.zkudu.x."y.z"
Kudu はスキーマをサポートしていません。Trino はスキーマを管理するために $schemas という名前の特別なテーブルを作成します。

標準プレフィックスでのマッピング (kudu.schema-emulation.prefix=presto::)

Kudu テーブル名Trino テーブル名
orderskudu.default.orders
part1.part2kudu.default."part1.part2"
x.y.zkudu.default."x.y.z"
presto::part1.part2kudu.part1.part2
presto::x.y.zkudu.x."y.z"
標準プレフィックスを使用する場合、Trino はスキーマを管理するために presto::$schemas という名前の特別なテーブルを作成します。

データ型のマッピング

Trino から Kudu へ

Trino から Kudu にデータを書き込む場合、以下のデータ型マッピングが適用されます。

Trino 型Kudu 型
BOOLEANBOOL
TINYINTINT8
SMALLINTINT16
INTEGERINT32
BIGINTINT64
REALFLOAT
DOUBLEDOUBLE
VARCHARSTRINGCREATE TABLE ... AS ... を使用すると、最大長の情報は失われます。
VARBINARYBINARY
TIMESTAMPUNIXTIME_MICROSKudu はマイクロ秒の精度で保存しますが、ミリ秒の解像度に変換されます。
DECIMALDECIMALKudu サーバー 1.7.0 以降が必要です。
DATEN/A対応する Kudu 型がありません。CREATE TABLE ... AS ... を使用すると STRING に変換されます。
CHARN/A対応する Kudu 型がありません。

以下の Trino 型はサポートされていません:TIME、JSON、TIME WITH TIMEZONE、TIMESTAMP WITH TIME ZONE、INTERVAL YEAR TO MONTH、INTERVAL DAY TO SECOND、ARRAY、MAP、IPADDRESS。

サポートされる SQL ステートメント

このコネクタは、Kudu データへの読み取りおよび書き込みアクセスをサポートします。以下の SQL ステートメントがサポートされています。

  • SELECT

  • INSERT INTO ... VALUES

  • INSERT INTO ... SELECT ...

  • DELETE

  • CREATE TABLE — 「テーブルの作成」をご参照ください

  • CREATE TABLE ... AS

  • DROP TABLE

  • ALTER TABLE ... RENAME TO ...

  • ALTER TABLE ... ADD COLUMN ... — 「列の追加」をご参照ください

  • ALTER TABLE ... RENAME COLUMN ... — プライマリキー以外の列でのみサポートされます

  • ALTER TABLE ... DROP COLUMN ... — プライマリキー以外の列でのみサポートされます

  • CREATE SCHEMA — スキーマエミュレーションが有効な場合にのみサポートされます

  • DROP SCHEMA — スキーマエミュレーションが有効な場合にのみサポートされます

  • SHOW SCHEMAS

  • SHOW TABLES

  • SHOW CREATE TABLE

  • SHOW COLUMNS FROM

  • DESCRIBESHOW COLUMNS FROM と同等です

  • CALL kudu.system.add_range_partition — 「レンジパーティションの管理」をご参照ください

  • CALL kudu.system.drop_range_partition — 「レンジパーティションの管理」をご参照ください

ALTER SCHEMA ... RENAME TO ... はサポートされていません。

テーブルの作成

すべての Kudu テーブルには、データ型を持つ列、プライマリキー、およびパーティション情報が必要です。列のエンコーディングと圧縮はオプションです。

CREATE TABLE user_events (
  user_id int WITH (primary_key = true),
  event_name varchar WITH (primary_key = true),
  message varchar,
  details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 5,
  number_of_replicas = 3
);

この例では:

  • user_idevent_name はプライマリキー列です。

  • テーブルは user_id でハッシュパーティション分割され、5 つのバケットに分けられます。

  • number_of_replicas は 3 に設定されており、タブレットレプリカの数を制御します。

テーブル作成時の主なルール:

  • プライマリキー列は、他のすべての列より前に定義する必要があります。

  • パーティションキー列として使用できるのは、プライマリキー列のみです。

  • number_of_replicas はオプションで、奇数である必要があります。省略した場合、Kudu マスターのデフォルトのレプリケーション係数が適用されます。

  • テーブルには、少なくとも 1 つのハッシュパーティションまたはレンジパーティションが必要です。テーブルは複数のハッシュパーティションを持つことができますが、レンジパーティションは 1 つしか持てません。

列のプロパティ

列のプロパティは WITH 句で指定します。

列のプロパティデータ型説明
primary_keyBOOLEAN列をプライマリキーとしてマークします。Kudu はプライマリキーの一意性を強制します。重複するプライマリキーを持つ行を挿入すると、既存の行が更新されます。詳細については、「Primary Key Design」をご参照ください。
nullableBOOLEAN列に null 値を含めることを許可します。プライマリキー列は null 可能にできません。
encodingVARCHAR列のエンコード形式。デフォルトは Kudu の型ベースのエンコーディングです。有効な値:autoplainbitshufflerunlengthprefixdictionarygroup_varint。詳細については、「Column Encoding」をご参照ください。
compressionVARCHAR列の圧縮形式。デフォルトは Kudu のデフォルト圧縮です。有効な値:defaultnolz4snappyzlib。詳細については、「Column compression」をご参照ください。

エンコーディングと圧縮を明示的に指定した例:

CREATE TABLE mytable (
  name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
  index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
  comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
  ...
) WITH (...);

パーティション設計

Kudu はハッシュパーティションとレンジパーティションをサポートしています。テーブルには、どちらかのタイプのパーティションが少なくとも 1 つ必要です。

ハッシュパーティションの定義

partition_by_hash_columns (パーティションキー列) と partition_by_hash_buckets (バケット数) を指定します。パーティションキー列は、プライマリキー列のサブセットである必要があります。

1 つのハッシュパーティショングループ:

CREATE TABLE mytable (
  col1 varchar WITH (primary_key=true),
  col2 varchar WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['col1', 'col2'],
  partition_by_hash_buckets = 4
);

col1col2 がハッシュパーティションキー列です。行は 4 つのバケットに分散されます。

2 つの独立したハッシュパーティショングループ:

CREATE TABLE mytable (
  col1 varchar WITH (primary_key=true),
  col2 varchar WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['col1'],
  partition_by_hash_buckets = 2,
  partition_by_second_hash_columns = ARRAY['col2'],
  partition_by_second_hash_buckets = 3
);

最初のグループは col1 によって行を 2 つのバケットに分散し、2 番目のグループは col2 によって 3 つのバケットに分散します。パーティションの総数は 6 (2 x 3) です。

レンジパーティションの定義

partition_by_range_columns を使用してレンジパーティション列を指定し、range_partitions を使用して初期のパーティション境界を定義します。

CREATE TABLE events (
  rack varchar WITH (primary_key=true),
  machine varchar WITH (primary_key=true),
  event_time timestamp WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['rack'],
  partition_by_hash_buckets = 2,
  partition_by_second_hash_columns = ARRAY['machine'],
  partition_by_second_hash_buckets = 3,
  partition_by_range_columns = ARRAY['event_time'],
  range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
                       {"lower": "2018-01-01T00:00:00", "upper": null}]'
);

このテーブルには 2 つのハッシュパーティショングループと、event_time に対する 1 つのレンジパーティションがあり、2018-01-01T00:00:00 で分割されます。

レンジパーティションの管理

ストアドプロシージャを使用して、既存のテーブルのレンジパーティションを追加または削除します。

-- レンジパーティションの追加
CALL kudu.system.add_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)

-- レンジパーティションの削除
CALL kudu.system.drop_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)

パラメーター:

パラメーター説明
<schema_name>テーブルを含むスキーマ。
<table_name>テーブルの名前。
<range_partition_as_json_string>JSON 形式のパーティション境界:'{"lower": <value>, "upper": <value>}'。複数列のレンジキーの場合は、配列を使用します:'{"lower": [<col1_value>, ...], "upper": [<col1_value>, ...]}'。無制限のパーティションにするには、どちらかの境界を null に設定します。

データ型ごとの JSON 値の形式:

データ型
BIGINT'{"lower": 0, "upper": 1000000}'
SMALLINT'{"lower": 10, "upper": null}'
VARCHAR'{"lower": "A", "upper": "M"}'
TIMESTAMP'{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}'
BOOLEAN'{"lower": false, "upper": true}'
VARBINARYBase64 エンコードされた文字列

例: myschema スキーマの events テーブルに、2018-01-01 から 2018-06-01 までのレコードをカバーするレンジパーティションを追加します。

CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')

下限の "2018-01-01"2018-01-01T00:00:00.000 として解釈されます。

テーブル上の既存のすべてのレンジパーティションを表示するには、SHOW CREATE TABLE を実行します。出力の range_partitions プロパティに、現在のパーティション境界がリストされます。

列の追加

ALTER TABLE ... ADD COLUMN ... を使用して、既存のテーブルに列を追加します。nullableencoding などの列プロパティがサポートされています。

ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')

利用可能な列プロパティについては、「列のプロパティ」をご参照ください。