All Products
Search
Document Center

E-MapReduce:Kudu connector

Last Updated:Mar 26, 2026

Use the Kudu connector with Trino to query, insert, and delete data in Apache Kudu tables.

Prerequisites

Before you begin, make sure you have:

  • A Hadoop cluster with the Kudu service (version 1.10 or later)

  • A Trino cluster with network connectivity to the Hadoop cluster

To create these clusters, see Create a cluster.

Limitations

  • The Kudu connector requires Kudu 1.10 or later.

  • Kudu table names and column names can contain only lowercase letters.

  • A network connection must exist between the Trino cluster and the Hadoop cluster.

Configure the Kudu connector

In the E-MapReduce (EMR) console, go to the Configure tab of the Trino service page and click kudu.properties. Modify or add configuration items based on your requirements.

For a full reference on configuring connectors, see Configure a connector.

The following is a complete kudu.properties template with all supported configuration items:

connector.name=kudu

## Required: Kudu master address(es). Separate multiple addresses with commas.
## Supported formats: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051,
##                    [2001:db8::1], [2001:db8::1]:7051, 2001:db8::1
## Change localhost to the IP address or hostname of your Kudu master node (e.g., master-1-1).
kudu.client.master-addresses=localhost

## Schema emulation lets Trino map Kudu tables to schemas using naming conventions.
## By default, all tables appear in the "default" schema.
#kudu.schema-emulation.enabled=false

## Prefix used when schema emulation is enabled. Standard prefix is "presto::". Empty prefix is also valid.
## Required only when kudu.schema-emulation.enabled=true.
#kudu.schema-emulation.prefix=

## Advanced Kudu Java client configuration

## Timeout for administrative operations (e.g., CREATE TABLE, DROP TABLE). Default: 30s.
#kudu.client.default-admin-operation-timeout=30s

## Timeout for user operations. Default: 30s.
#kudu.client.default-operation-timeout=30s

## Timeout for waiting on data from a socket. Default: 10s.
#kudu.client.default-socket-read-timeout=10s

## Whether to disable Kudu client statistics collection. Default: false.
#kudu.client.disable-statistics=false
Important

To add a configuration item that is not in kudu.properties by default, click Add Configuration Item on the kudu.properties tab. For details, see Add configuration items.

Query data

Apache Kudu does not support schemas natively, but the Kudu connector can emulate them through naming conventions.

Schema emulation disabled (default)

With schema emulation disabled, all Kudu tables appear in the default schema.

Query the orders table using its fully qualified name:

SELECT * FROM kudu.default.orders;

If you set kudu as the catalog and default as the schema, the query simplifies to:

SELECT * FROM orders;

If a table name contains special characters, wrap it in double quotation marks:

SELECT * FROM kudu.default."special.table!";

Quick example: create and query a table

  1. Create a table named users in the default schema:

    CREATE TABLE kudu.default.users (
      user_id int WITH (primary_key = true),
      first_name varchar,
      last_name varchar
    ) WITH (
      partition_by_hash_columns = ARRAY['user_id'],
      partition_by_hash_buckets = 2
    );
    When creating a table, specify the primary key, column encoding or compression format, and partition information (hash or range).
  2. Inspect the table schema:

    DESCRIBE kudu.default.users;

    Expected output:

       Column   |  Type   |                      Extra                      | Comment
    ------------+---------+-------------------------------------------------+---------
     user_id    | integer | primary_key, encoding=auto, compression=default |
     first_name | varchar | nullable, encoding=auto, compression=default    |
     last_name  | varchar | nullable, encoding=auto, compression=default    |
    (3 rows)
  3. Insert rows:

    INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
  4. Query the data:

    SELECT * FROM kudu.default.users;

Schema emulation enabled

Enable schema emulation by setting kudu.schema-emulation.enabled=true in etc/catalog/kudu.properties. Kudu tables are then mapped to Trino schemas based on their names.

Mapping with an empty prefix (kudu.schema-emulation.prefix=)

Kudu table nameTrino table name
orderskudu.default.orders
part1.part2kudu.part1.part2
x.y.zkudu.x."y.z"
Kudu does not support schemas. Trino creates a special table named $schemas to manage them.

Mapping with the standard prefix (kudu.schema-emulation.prefix=presto::)

Kudu table nameTrino table name
orderskudu.default.orders
part1.part2kudu.default."part1.part2"
x.y.zkudu.default."x.y.z"
presto::part1.part2kudu.part1.part2
presto::x.y.zkudu.x."y.z"
With the standard prefix, Trino creates a special table named presto::$schemas to manage schemas.

Data type mappings

Trino to Kudu

When writing data from Trino to Kudu, the following type mappings apply:

Trino typeKudu typeNotes
BOOLEANBOOL
TINYINTINT8
SMALLINTINT16
INTEGERINT32
BIGINTINT64
REALFLOAT
DOUBLEDOUBLE
VARCHARSTRINGMaximum length is lost when using CREATE TABLE ... AS ....
VARBINARYBINARY
TIMESTAMPUNIXTIME_MICROSKudu stores microsecond precision, but reduces it to millisecond resolution.
DECIMALDECIMALRequires Kudu server 1.7.0 or later.
DATEN/ANo matching Kudu type. Converted to STRING when using CREATE TABLE ... AS ....
CHARN/ANo matching Kudu type.

The following Trino types are not supported: TIME, JSON, TIME WITH TIMEZONE, TIMESTAMP WITH TIME ZONE, INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND, ARRAY, MAP, IPADDRESS.

Supported SQL statements

The connector supports read and write access to Kudu data. The following SQL statements are supported:

  • SELECT

  • INSERT INTO ... VALUES

  • INSERT INTO ... SELECT ...

  • DELETE

  • CREATE TABLE — see Create a table

  • CREATE TABLE ... AS

  • DROP TABLE

  • ALTER TABLE ... RENAME TO ...

  • ALTER TABLE ... ADD COLUMN ... — see Add a column

  • ALTER TABLE ... RENAME COLUMN ... — supported only for non-primary-key columns

  • ALTER TABLE ... DROP COLUMN ... — supported only for non-primary-key columns

  • CREATE SCHEMA — supported only when schema emulation is enabled

  • DROP SCHEMA — supported only when schema emulation is enabled

  • SHOW SCHEMAS

  • SHOW TABLES

  • SHOW CREATE TABLE

  • SHOW COLUMNS FROM

  • DESCRIBE — equivalent to SHOW COLUMNS FROM

  • CALL kudu.system.add_range_partition — see Manage range partitions

  • CALL kudu.system.drop_range_partition — see Manage range partitions

ALTER SCHEMA ... RENAME TO ... is not supported.

Create a table

Every Kudu table requires columns with data types, a primary key, and partition information. Column encoding and compression are optional.

CREATE TABLE user_events (
  user_id int WITH (primary_key = true),
  event_name varchar WITH (primary_key = true),
  message varchar,
  details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 5,
  number_of_replicas = 3
);

In this example:

  • user_id and event_name are primary key columns.

  • The table is hash-partitioned on user_id into 5 buckets.

  • number_of_replicas is set to 3, which controls the number of tablet replicas.

Key rules when creating a table:

  • Primary key columns must be listed before all other columns.

  • Only primary key columns can serve as partition key columns.

  • number_of_replicas is optional and must be an odd number. If omitted, the Kudu master's default replication factor applies.

  • A table must have at least one hash or range partition. A table can have multiple hash partitions but only one range partition.

Column properties

Specify column properties in the WITH clause:

Column propertyData typeDescription
primary_keyBOOLEANMarks the column as a primary key. Kudu enforces uniqueness on primary keys; inserting a row with a duplicate primary key updates the existing row. See Primary Key Design.
nullableBOOLEANAllows the column to contain null values. Primary key columns cannot be nullable.
encodingVARCHARColumn encoding format. Defaults to Kudu's type-based encoding. Valid values: auto, plain, bitshuffle, runlength, prefix, dictionary, group_varint. See Column Encoding.
compressionVARCHARColumn compression format. Defaults to Kudu's default compression. Valid values: default, no, lz4, snappy, zlib. See Column compression.

Example with explicit encoding and compression:

CREATE TABLE mytable (
  name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
  index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
  comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
  ...
) WITH (...);

Partition design

Kudu supports hash partitions and range partitions. A table must have at least one partition of either type.

Define hash partitions

Specify partition_by_hash_columns (partition key columns) and partition_by_hash_buckets (number of buckets). Partition key columns must be a subset of primary key columns.

One hash partition group:

CREATE TABLE mytable (
  col1 varchar WITH (primary_key=true),
  col2 varchar WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['col1', 'col2'],
  partition_by_hash_buckets = 4
);

col1 and col2 are the hash partition key columns. Rows are distributed across 4 buckets.

Two independent hash partition groups:

CREATE TABLE mytable (
  col1 varchar WITH (primary_key=true),
  col2 varchar WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['col1'],
  partition_by_hash_buckets = 2,
  partition_by_second_hash_columns = ARRAY['col2'],
  partition_by_second_hash_buckets = 3
);

The first group distributes rows by col1 into 2 buckets; the second distributes by col2 into 3 buckets. The total number of partitions is 6 (2 x 3).

Define range partitions

Use partition_by_range_columns to specify range partition columns and range_partitions to define initial partition bounds.

CREATE TABLE events (
  rack varchar WITH (primary_key=true),
  machine varchar WITH (primary_key=true),
  event_time timestamp WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['rack'],
  partition_by_hash_buckets = 2,
  partition_by_second_hash_columns = ARRAY['machine'],
  partition_by_second_hash_buckets = 3,
  partition_by_range_columns = ARRAY['event_time'],
  range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
                       {"lower": "2018-01-01T00:00:00", "upper": null}]'
);

This table has two hash partition groups and one range partition on event_time, split at 2018-01-01T00:00:00.

Manage range partitions

Add or drop range partitions on an existing table using stored procedures:

-- Add a range partition
CALL kudu.system.add_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)

-- Drop a range partition
CALL kudu.system.drop_range_partition(<schema_name>, <table_name>, <range_partition_as_json_string>)

Parameters:

ParameterDescription
<schema_name>The schema that contains the table.
<table_name>The name of the table.
<range_partition_as_json_string>The partition bounds in JSON format: '{"lower": <value>, "upper": <value>}'. For multi-column range keys, use arrays: '{"lower": [<col1_value>, ...], "upper": [<col1_value>, ...]}'. Set either bound to null for an unbounded partition.

JSON value formats by data type:

Data typeExample
BIGINT'{"lower": 0, "upper": 1000000}'
SMALLINT'{"lower": 10, "upper": null}'
VARCHAR'{"lower": "A", "upper": "M"}'
TIMESTAMP'{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}'
BOOLEAN'{"lower": false, "upper": true}'
VARBINARYBase64-encoded strings

Example: Add a range partition to the events table in the myschema schema, covering records from 2018-01-01 to 2018-06-01:

CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')

The lower bound "2018-01-01" is interpreted as 2018-01-01T00:00:00.000.

To view all existing range partitions on a table, run SHOW CREATE TABLE. The range_partitions property in the output lists the current partition bounds.

Add a column

Use ALTER TABLE ... ADD COLUMN ... to add a column to an existing table. Column properties such as nullable and encoding are supported.

ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')

For available column properties, see Column properties.