Trino で Kudu コネクタを使用すると、Apache Kudu テーブル内のデータのクエリ、挿入、削除ができます。
前提条件
開始する前に、以下が準備できていることを確認してください。
Kudu サービス (バージョン 1.10 以降) を備えた Hadoop クラスター
Hadoop クラスターへのネットワーク接続を持つ Trino クラスター
これらのクラスターを作成するには、「クラスターの作成」をご参照ください。
制限事項
Kudu コネクタには、Kudu 1.10 以降が必要です。
Kudu のテーブル名と列名には、小文字のみ使用できます。
Trino クラスターと Hadoop クラスターの間にネットワーク接続が存在する必要があります。
Kudu コネクタの設定
E-MapReduce (EMR) コンソールで、Trino サービスページの [設定] タブに移動し、[kudu.properties] をクリックします。要件に応じて設定項目を修正または追加します。
コネクタ設定の詳細なリファレンスについては、「コネクタの設定」をご参照ください。
以下は、サポートされているすべての設定項目を含む完全な kudu.properties テンプレートです。
connector.name=kudu
## 必須: Kudu マスターアドレス。複数のアドレスはカンマで区切ります。
## サポートされる形式: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051,
## [2001:db8::1], [2001:db8::1]:7051, 2001:db8::1
## localhost を Kudu マスターノードの IP アドレスまたはホスト名 (例: master-1-1) に変更します。
kudu.client.master-addresses=localhost
## スキーマエミュレーションにより、Trino は命名規則を使用して Kudu テーブルをスキーマにマッピングできます。
## デフォルトでは、すべてのテーブルは "default" スキーマに表示されます。
#kudu.schema-emulation.enabled=false
## スキーマエミュレーションが有効な場合に使用されるプレフィックス。標準のプレフィックスは "presto::" です。空のプレフィックスも有効です。
## kudu.schema-emulation.enabled=true の場合にのみ必須です。
#kudu.schema-emulation.prefix=
## Kudu Java クライアントの高度な設定
## 管理操作 (例: CREATE TABLE、DROP TABLE) のタイムアウト。デフォルト: 30s。
#kudu.client.default-admin-operation-timeout=30s
## ユーザー操作のタイムアウト。デフォルト: 30s。
#kudu.client.default-operation-timeout=30s
## ソケットからのデータ待機タイムアウト。デフォルト: 10s。
#kudu.client.default-socket-read-timeout=10s
## Kudu クライアントの統計収集を無効にするかどうか。デフォルト: false。
#kudu.client.disable-statistics=falseデフォルトで kudu.properties にない設定項目を追加するには、[kudu.properties] タブで [設定項目を追加] をクリックします。詳細については、「設定項目の追加」をご参照ください。
データのクエリ
Apache Kudu はネイティブではスキーマをサポートしていませんが、Kudu コネクタは命名規則によってスキーマをエミュレートできます。
スキーマエミュレーションが無効な場合 (デフォルト)
スキーマエミュレーションが無効な場合、すべての Kudu テーブルは default スキーマに表示されます。
orders テーブルを完全修飾名でクエリします。
SELECT * FROM kudu.default.orders;kudu をカタログとして、default をスキーマとして設定した場合、クエリは次のように簡略化されます。
SELECT * FROM orders;テーブル名に特殊文字が含まれる場合は、二重引用符で囲みます。
SELECT * FROM kudu.default."special.table!";クイック例:テーブルの作成とクエリ
defaultスキーマにusersという名前のテーブルを作成します。CREATE TABLE kudu.default.users ( user_id int WITH (primary_key = true), first_name varchar, last_name varchar ) WITH ( partition_by_hash_columns = ARRAY['user_id'], partition_by_hash_buckets = 2 );テーブルを作成する際は、プライマリキー、列のエンコーディングまたは圧縮形式、およびパーティション情報 (ハッシュまたはレンジ) を指定します。
テーブルスキーマを確認します。
DESCRIBE kudu.default.users;期待される出力:
Column | Type | Extra | Comment ------------+---------+-------------------------------------------------+--------- user_id | integer | primary_key, encoding=auto, compression=default | first_name | varchar | nullable, encoding=auto, compression=default | last_name | varchar | nullable, encoding=auto, compression=default | (3 rows)行を挿入します。
INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');データをクエリします。
SELECT * FROM kudu.default.users;
スキーマエミュレーションが有効な場合
etc/catalog/kudu.properties で kudu.schema-emulation.enabled=true を設定して、スキーマエミュレーションを有効にします。これにより、Kudu テーブルはその名前に基づいて Trino スキーマにマッピングされます。
空のプレフィックスでのマッピング (kudu.schema-emulation.prefix=)
| Kudu テーブル名 | Trino テーブル名 |
|---|---|
orders | kudu.default.orders |
part1.part2 | kudu.part1.part2 |
x.y.z | kudu.x."y.z" |
Kudu はスキーマをサポートしていません。Trino はスキーマを管理するために $schemas という名前の特別なテーブルを作成します。標準プレフィックスでのマッピング (kudu.schema-emulation.prefix=presto::)
| Kudu テーブル名 | Trino テーブル名 |
|---|---|
orders | kudu.default.orders |
part1.part2 | kudu.default."part1.part2" |
x.y.z | kudu.default."x.y.z" |
presto::part1.part2 | kudu.part1.part2 |
presto::x.y.z | kudu.x."y.z" |
標準プレフィックスを使用する場合、Trino はスキーマを管理するために presto::$schemas という名前の特別なテーブルを作成します。データ型のマッピング
Trino から Kudu へ
Trino から Kudu にデータを書き込む場合、以下のデータ型マッピングが適用されます。
| Trino 型 | Kudu 型 | 注 |
|---|---|---|
| BOOLEAN | BOOL | |
| TINYINT | INT8 | |
| SMALLINT | INT16 | |
| INTEGER | INT32 | |
| BIGINT | INT64 | |
| REAL | FLOAT | |
| DOUBLE | DOUBLE | |
| VARCHAR | STRING | CREATE TABLE ... AS ... を使用すると、最大長の情報は失われます。 |
| VARBINARY | BINARY | |
| TIMESTAMP | UNIXTIME_MICROS | Kudu はマイクロ秒の精度で保存しますが、ミリ秒の解像度に変換されます。 |
| DECIMAL | DECIMAL | Kudu サーバー 1.7.0 以降が必要です。 |
| DATE | N/A | 対応する Kudu 型がありません。CREATE TABLE ... AS ... を使用すると STRING に変換されます。 |
| CHAR | N/A | 対応する Kudu 型がありません。 |
以下の Trino 型はサポートされていません:TIME、JSON、TIME WITH TIMEZONE、TIMESTAMP WITH TIME ZONE、INTERVAL YEAR TO MONTH、INTERVAL DAY TO SECOND、ARRAY、MAP、IPADDRESS。
サポートされる SQL ステートメント
このコネクタは、Kudu データへの読み取りおよび書き込みアクセスをサポートします。以下の SQL ステートメントがサポートされています。
SELECTINSERT INTO ... VALUESINSERT INTO ... SELECT ...DELETECREATE TABLE— 「テーブルの作成」をご参照くださいCREATE TABLE ... ASDROP TABLEALTER TABLE ... RENAME TO ...ALTER TABLE ... ADD COLUMN ...— 「列の追加」をご参照くださいALTER TABLE ... RENAME COLUMN ...— プライマリキー以外の列でのみサポートされますALTER TABLE ... DROP COLUMN ...— プライマリキー以外の列でのみサポートされますCREATE SCHEMA— スキーマエミュレーションが有効な場合にのみサポートされますDROP SCHEMA— スキーマエミュレーションが有効な場合にのみサポートされますSHOW SCHEMASSHOW TABLESSHOW CREATE TABLESHOW COLUMNS FROMDESCRIBE—SHOW COLUMNS FROMと同等ですCALL kudu.system.add_range_partition— 「レンジパーティションの管理」をご参照くださいCALL kudu.system.drop_range_partition— 「レンジパーティションの管理」をご参照ください
ALTER SCHEMA ... RENAME TO ... はサポートされていません。テーブルの作成
すべての Kudu テーブルには、データ型を持つ列、プライマリキー、およびパーティション情報が必要です。列のエンコーディングと圧縮はオプションです。
CREATE TABLE user_events (
user_id int WITH (primary_key = true),
event_name varchar WITH (primary_key = true),
message varchar,
details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 5,
number_of_replicas = 3
);この例では:
user_idとevent_nameはプライマリキー列です。テーブルは
user_idでハッシュパーティション分割され、5 つのバケットに分けられます。number_of_replicasは 3 に設定されており、タブレットレプリカの数を制御します。
テーブル作成時の主なルール:
プライマリキー列は、他のすべての列より前に定義する必要があります。
パーティションキー列として使用できるのは、プライマリキー列のみです。
number_of_replicasはオプションで、奇数である必要があります。省略した場合、Kudu マスターのデフォルトのレプリケーション係数が適用されます。テーブルには、少なくとも 1 つのハッシュパーティションまたはレンジパーティションが必要です。テーブルは複数のハッシュパーティションを持つことができますが、レンジパーティションは 1 つしか持てません。
列のプロパティ
列のプロパティは WITH 句で指定します。
| 列のプロパティ | データ型 | 説明 |
|---|---|---|
primary_key | BOOLEAN | 列をプライマリキーとしてマークします。Kudu はプライマリキーの一意性を強制します。重複するプライマリキーを持つ行を挿入すると、既存の行が更新されます。詳細については、「Primary Key Design」をご参照ください。 |
nullable | BOOLEAN | 列に null 値を含めることを許可します。プライマリキー列は null 可能にできません。 |
encoding | VARCHAR | 列のエンコード形式。デフォルトは Kudu の型ベースのエンコーディングです。有効な値:auto、plain、bitshuffle、runlength、prefix、dictionary、group_varint。詳細については、「Column Encoding」をご参照ください。 |
compression | VARCHAR | 列の圧縮形式。デフォルトは Kudu のデフォルト圧縮です。有効な値:default、no、lz4、snappy、zlib。詳細については、「Column compression」をご参照ください。 |
エンコーディングと圧縮を明示的に指定した例:
CREATE TABLE mytable (
name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
...
) WITH (...);パーティション設計
Kudu はハッシュパーティションとレンジパーティションをサポートしています。テーブルには、どちらかのタイプのパーティションが少なくとも 1 つ必要です。
ハッシュパーティションの定義
partition_by_hash_columns (パーティションキー列) と partition_by_hash_buckets (バケット数) を指定します。パーティションキー列は、プライマリキー列のサブセットである必要があります。
1 つのハッシュパーティショングループ:
CREATE TABLE mytable (
col1 varchar WITH (primary_key=true),
col2 varchar WITH (primary_key=true),
...
) WITH (
partition_by_hash_columns = ARRAY['col1', 'col2'],
partition_by_hash_buckets = 4
);col1 と col2 がハッシュパーティションキー列です。行は 4 つのバケットに分散されます。
2 つの独立したハッシュパーティショングループ:
CREATE TABLE mytable (
col1 varchar WITH (primary_key=true),
col2 varchar WITH (primary_key=true),
...
) WITH (
partition_by_hash_columns = ARRAY['col1'],
partition_by_hash_buckets = 2,
partition_by_second_hash_columns = ARRAY['col2'],
partition_by_second_hash_buckets = 3
);最初のグループは col1 によって行を 2 つのバケットに分散し、2 番目のグループは col2 によって 3 つのバケットに分散します。パーティションの総数は 6 (2 x 3) です。
レンジパーティションの定義
partition_by_range_columns を使用してレンジパーティション列を指定し、range_partitions を使用して初期のパーティション境界を定義します。
CREATE TABLE events (
rack varchar WITH (primary_key=true),
machine varchar WITH (primary_key=true),
event_time timestamp WITH (primary_key=true),
...
) WITH (
partition_by_hash_columns = ARRAY['rack'],
partition_by_hash_buckets = 2,
partition_by_second_hash_columns = ARRAY['machine'],
partition_by_second_hash_buckets = 3,
partition_by_range_columns = ARRAY['event_time'],
range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
{"lower": "2018-01-01T00:00:00", "upper": null}]'
);このテーブルには 2 つのハッシュパーティショングループと、event_time に対する 1 つのレンジパーティションがあり、2018-01-01T00:00:00 で分割されます。
レンジパーティションの管理
ストアドプロシージャを使用して、既存のテーブルのレンジパーティションを追加または削除します。
-- レンジパーティションの追加
CALL kudu.system.add_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)
-- レンジパーティションの削除
CALL kudu.system.drop_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)パラメーター:
| パラメーター | 説明 |
|---|---|
<schema_name> | テーブルを含むスキーマ。 |
<table_name> | テーブルの名前。 |
<range_partition_as_json_string> | JSON 形式のパーティション境界:'{"lower": <value>, "upper": <value>}'。複数列のレンジキーの場合は、配列を使用します:'{"lower": [<col1_value>, ...], "upper": [<col1_value>, ...]}'。無制限のパーティションにするには、どちらかの境界を null に設定します。 |
データ型ごとの JSON 値の形式:
| データ型 | 例 |
|---|---|
| BIGINT | '{"lower": 0, "upper": 1000000}' |
| SMALLINT | '{"lower": 10, "upper": null}' |
| VARCHAR | '{"lower": "A", "upper": "M"}' |
| TIMESTAMP | '{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}' |
| BOOLEAN | '{"lower": false, "upper": true}' |
| VARBINARY | Base64 エンコードされた文字列 |
例: myschema スキーマの events テーブルに、2018-01-01 から 2018-06-01 までのレコードをカバーするレンジパーティションを追加します。
CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')下限の "2018-01-01" は 2018-01-01T00:00:00.000 として解釈されます。
テーブル上の既存のすべてのレンジパーティションを表示するには、SHOW CREATE TABLE を実行します。出力の range_partitions プロパティに、現在のパーティション境界がリストされます。
列の追加
ALTER TABLE ... ADD COLUMN ... を使用して、既存のテーブルに列を追加します。nullable や encoding などの列プロパティがサポートされています。
ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')利用可能な列プロパティについては、「列のプロパティ」をご参照ください。