Use the Kudu connector with Trino to query, insert, and delete data in Apache Kudu tables.
Prerequisites
Before you begin, make sure you have:
A Hadoop cluster with the Kudu service (version 1.10 or later)
A Trino cluster with network connectivity to the Hadoop cluster
To create these clusters, see Create a cluster.
Limitations
The Kudu connector requires Kudu 1.10 or later.
Kudu table names and column names can contain only lowercase letters.
A network connection must exist between the Trino cluster and the Hadoop cluster.
Configure the Kudu connector
In the E-MapReduce (EMR) console, go to the Configure tab of the Trino service page and click kudu.properties. Modify or add configuration items based on your requirements.
For a full reference on configuring connectors, see Configure a connector.
The following is a complete kudu.properties template with all supported configuration items:
connector.name=kudu
## Required: Kudu master address(es). Separate multiple addresses with commas.
## Supported formats: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051,
## [2001:db8::1], [2001:db8::1]:7051, 2001:db8::1
## Change localhost to the IP address or hostname of your Kudu master node (e.g., master-1-1).
kudu.client.master-addresses=localhost
## Schema emulation lets Trino map Kudu tables to schemas using naming conventions.
## By default, all tables appear in the "default" schema.
#kudu.schema-emulation.enabled=false
## Prefix used when schema emulation is enabled. Standard prefix is "presto::". Empty prefix is also valid.
## Required only when kudu.schema-emulation.enabled=true.
#kudu.schema-emulation.prefix=
## Advanced Kudu Java client configuration
## Timeout for administrative operations (e.g., CREATE TABLE, DROP TABLE). Default: 30s.
#kudu.client.default-admin-operation-timeout=30s
## Timeout for user operations. Default: 30s.
#kudu.client.default-operation-timeout=30s
## Timeout for waiting on data from a socket. Default: 10s.
#kudu.client.default-socket-read-timeout=10s
## Whether to disable Kudu client statistics collection. Default: false.
#kudu.client.disable-statistics=falseTo add a configuration item that is not in kudu.properties by default, click Add Configuration Item on the kudu.properties tab. For details, see Add configuration items.
Query data
Apache Kudu does not support schemas natively, but the Kudu connector can emulate them through naming conventions.
Schema emulation disabled (default)
With schema emulation disabled, all Kudu tables appear in the default schema.
Query the orders table using its fully qualified name:
SELECT * FROM kudu.default.orders;If you set kudu as the catalog and default as the schema, the query simplifies to:
SELECT * FROM orders;If a table name contains special characters, wrap it in double quotation marks:
SELECT * FROM kudu.default."special.table!";Quick example: create and query a table
Create a table named
usersin thedefaultschema:CREATE TABLE kudu.default.users ( user_id int WITH (primary_key = true), first_name varchar, last_name varchar ) WITH ( partition_by_hash_columns = ARRAY['user_id'], partition_by_hash_buckets = 2 );When creating a table, specify the primary key, column encoding or compression format, and partition information (hash or range).
Inspect the table schema:
DESCRIBE kudu.default.users;Expected output:
Column | Type | Extra | Comment ------------+---------+-------------------------------------------------+--------- user_id | integer | primary_key, encoding=auto, compression=default | first_name | varchar | nullable, encoding=auto, compression=default | last_name | varchar | nullable, encoding=auto, compression=default | (3 rows)Insert rows:
INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');Query the data:
SELECT * FROM kudu.default.users;
Schema emulation enabled
Enable schema emulation by setting kudu.schema-emulation.enabled=true in etc/catalog/kudu.properties. Kudu tables are then mapped to Trino schemas based on their names.
Mapping with an empty prefix (kudu.schema-emulation.prefix=)
| Kudu table name | Trino table name |
|---|---|
orders | kudu.default.orders |
part1.part2 | kudu.part1.part2 |
x.y.z | kudu.x."y.z" |
Kudu does not support schemas. Trino creates a special table named $schemas to manage them.Mapping with the standard prefix (kudu.schema-emulation.prefix=presto::)
| Kudu table name | Trino table name |
|---|---|
orders | kudu.default.orders |
part1.part2 | kudu.default."part1.part2" |
x.y.z | kudu.default."x.y.z" |
presto::part1.part2 | kudu.part1.part2 |
presto::x.y.z | kudu.x."y.z" |
With the standard prefix, Trino creates a special table named presto::$schemas to manage schemas.Data type mappings
Trino to Kudu
When writing data from Trino to Kudu, the following type mappings apply:
| Trino type | Kudu type | Notes |
|---|---|---|
| BOOLEAN | BOOL | |
| TINYINT | INT8 | |
| SMALLINT | INT16 | |
| INTEGER | INT32 | |
| BIGINT | INT64 | |
| REAL | FLOAT | |
| DOUBLE | DOUBLE | |
| VARCHAR | STRING | Maximum length is lost when using CREATE TABLE ... AS .... |
| VARBINARY | BINARY | |
| TIMESTAMP | UNIXTIME_MICROS | Kudu stores microsecond precision, but reduces it to millisecond resolution. |
| DECIMAL | DECIMAL | Requires Kudu server 1.7.0 or later. |
| DATE | N/A | No matching Kudu type. Converted to STRING when using CREATE TABLE ... AS .... |
| CHAR | N/A | No matching Kudu type. |
The following Trino types are not supported: TIME, JSON, TIME WITH TIMEZONE, TIMESTAMP WITH TIME ZONE, INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND, ARRAY, MAP, IPADDRESS.
Supported SQL statements
The connector supports read and write access to Kudu data. The following SQL statements are supported:
SELECTINSERT INTO ... VALUESINSERT INTO ... SELECT ...DELETECREATE TABLE— see Create a tableCREATE TABLE ... ASDROP TABLEALTER TABLE ... RENAME TO ...ALTER TABLE ... ADD COLUMN ...— see Add a columnALTER TABLE ... RENAME COLUMN ...— supported only for non-primary-key columnsALTER TABLE ... DROP COLUMN ...— supported only for non-primary-key columnsCREATE SCHEMA— supported only when schema emulation is enabledDROP SCHEMA— supported only when schema emulation is enabledSHOW SCHEMASSHOW TABLESSHOW CREATE TABLESHOW COLUMNS FROMDESCRIBE— equivalent toSHOW COLUMNS FROMCALL kudu.system.add_range_partition— see Manage range partitionsCALL kudu.system.drop_range_partition— see Manage range partitions
ALTER SCHEMA ... RENAME TO ... is not supported.Create a table
Every Kudu table requires columns with data types, a primary key, and partition information. Column encoding and compression are optional.
CREATE TABLE user_events (
user_id int WITH (primary_key = true),
event_name varchar WITH (primary_key = true),
message varchar,
details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 5,
number_of_replicas = 3
);In this example:
user_idandevent_nameare primary key columns.The table is hash-partitioned on
user_idinto 5 buckets.number_of_replicasis set to 3, which controls the number of tablet replicas.
Key rules when creating a table:
Primary key columns must be listed before all other columns.
Only primary key columns can serve as partition key columns.
number_of_replicasis optional and must be an odd number. If omitted, the Kudu master's default replication factor applies.A table must have at least one hash or range partition. A table can have multiple hash partitions but only one range partition.
Column properties
Specify column properties in the WITH clause:
| Column property | Data type | Description |
|---|---|---|
primary_key | BOOLEAN | Marks the column as a primary key. Kudu enforces uniqueness on primary keys; inserting a row with a duplicate primary key updates the existing row. See Primary Key Design. |
nullable | BOOLEAN | Allows the column to contain null values. Primary key columns cannot be nullable. |
encoding | VARCHAR | Column encoding format. Defaults to Kudu's type-based encoding. Valid values: auto, plain, bitshuffle, runlength, prefix, dictionary, group_varint. See Column Encoding. |
compression | VARCHAR | Column compression format. Defaults to Kudu's default compression. Valid values: default, no, lz4, snappy, zlib. See Column compression. |
Example with explicit encoding and compression:
CREATE TABLE mytable (
name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
...
) WITH (...);Partition design
Kudu supports hash partitions and range partitions. A table must have at least one partition of either type.
Define hash partitions
Specify partition_by_hash_columns (partition key columns) and partition_by_hash_buckets (number of buckets). Partition key columns must be a subset of primary key columns.
One hash partition group:
CREATE TABLE mytable (
col1 varchar WITH (primary_key=true),
col2 varchar WITH (primary_key=true),
...
) WITH (
partition_by_hash_columns = ARRAY['col1', 'col2'],
partition_by_hash_buckets = 4
);col1 and col2 are the hash partition key columns. Rows are distributed across 4 buckets.
Two independent hash partition groups:
CREATE TABLE mytable (
col1 varchar WITH (primary_key=true),
col2 varchar WITH (primary_key=true),
...
) WITH (
partition_by_hash_columns = ARRAY['col1'],
partition_by_hash_buckets = 2,
partition_by_second_hash_columns = ARRAY['col2'],
partition_by_second_hash_buckets = 3
);The first group distributes rows by col1 into 2 buckets; the second distributes by col2 into 3 buckets. The total number of partitions is 6 (2 x 3).
Define range partitions
Use partition_by_range_columns to specify range partition columns and range_partitions to define initial partition bounds.
CREATE TABLE events (
rack varchar WITH (primary_key=true),
machine varchar WITH (primary_key=true),
event_time timestamp WITH (primary_key=true),
...
) WITH (
partition_by_hash_columns = ARRAY['rack'],
partition_by_hash_buckets = 2,
partition_by_second_hash_columns = ARRAY['machine'],
partition_by_second_hash_buckets = 3,
partition_by_range_columns = ARRAY['event_time'],
range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
{"lower": "2018-01-01T00:00:00", "upper": null}]'
);This table has two hash partition groups and one range partition on event_time, split at 2018-01-01T00:00:00.
Manage range partitions
Add or drop range partitions on an existing table using stored procedures:
-- Add a range partition
CALL kudu.system.add_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)
-- Drop a range partition
CALL kudu.system.drop_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)Parameters:
| Parameter | Description |
|---|---|
<schema_name> | The schema that contains the table. |
<table_name> | The name of the table. |
<range_partition_as_json_string> | The partition bounds in JSON format: '{"lower": <value>, "upper": <value>}'. For multi-column range keys, use arrays: '{"lower": [<col1_value>, ...], "upper": [<col1_value>, ...]}'. Set either bound to null for an unbounded partition. |
JSON value formats by data type:
| Data type | Example |
|---|---|
| BIGINT | '{"lower": 0, "upper": 1000000}' |
| SMALLINT | '{"lower": 10, "upper": null}' |
| VARCHAR | '{"lower": "A", "upper": "M"}' |
| TIMESTAMP | '{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}' |
| BOOLEAN | '{"lower": false, "upper": true}' |
| VARBINARY | Base64-encoded strings |
Example: Add a range partition to the events table in the myschema schema, covering records from 2018-01-01 to 2018-06-01:
CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')The lower bound "2018-01-01" is interpreted as 2018-01-01T00:00:00.000.
To view all existing range partitions on a table, run SHOW CREATE TABLE. The range_partitions property in the output lists the current partition bounds.
Add a column
Use ALTER TABLE ... ADD COLUMN ... to add a column to an existing table. Column properties such as nullable and encoding are supported.
ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')For available column properties, see Column properties.