You can use the Kudu connector to query data in, insert data into, and delete data from Kudu.

Background information

Prerequisites

A Presto cluster and a Hadoop cluster are created, and the Kudu service is selected for the Hadoop cluster. For more information, see Create a cluster.

Limits

  • You can use the Kudu connector to connect only to Kudu 1.10 or later.
  • You must establish a network connection between your Presto cluster and the Hadoop cluster that you want to access.
  • The names of Kudu tables and columns can contain only lowercase letters.

Modify the configurations of the Kudu connector

Modify the configurations of the Kudu connector. For more information, see Configure a connector.

Go to the Presto service page in the EMR console, click the Configure tab, and then click the kudu.properties tab. Modify the configuration items that are described in the following table based on your business requirements.
Configuration item Description
kudu.client.master-addresses The Kudu master address. If you want to configure multiple Kudu master addresses, separate the addresses with commas (,).

The following address formats are supported: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051, [2001:db8::1], [2001:db8::1]:7051, and 2001:db8::1.

Default value: localhost.

kudu.schema-emulation.enabled Specifies whether to enable the schema emulation feature. Valid values:
  • false: The schema emulation feature is disabled. This is the default value.
  • true: The schema emulation feature is enabled.
Important You can click Add Configuration Item in the upper-left corner of the kudu.properties tab to add this configuration item. For more information about how to add a configuration item, see Add parameters.
kudu.schema-emulation.prefix The prefix for the schema emulation feature.
Important You need to set this configuration item only if you set the kudu.schema emulation.enabled configuration item to true.

The standard prefix is 'presto::`. You can leave this configuration item empty.

kudu.client.default-admin-operation-timeout The default timeout period for administrative operations, such as the operations to create or delete tables.

Default value: 30. Unit: seconds.

kudu.client.default-operation-timeout The default timeout period for user operations.

Default value: 30. Unit: seconds.

kudu.client.default-socket-read-timeout The default timeout period for waiting for data from a socket.

Default value: 10. Unit: seconds.

kudu.client.disable-statistics Specifies whether to enable the Kudu client to collect statistical information. Valid values:
  • false: The collection feature is disabled. This is the default value.
  • true: The collection feature is enabled.

Query data

Apache Kudu does not support schemas. However, you can configure the Kudu connector to emulate schemas.

Schema emulation disabled (default)

By default, the schema emulation feature is disabled, and all tables in Kudu belong to the default schema.

For example, you can execute the SELECT * FROM kudu.default.orders statement to query data in the orders table. If you set the Catalog parameter to kudu and the Schema parameter to default, you can simply execute the SELECT * FROM orders statement to query data in the orders table.

The name of a Kudu table can contain all characters. If a table name contains a special character, you must enclose the name in a pair of double quotation marks ("). For example, to query data in the special.table! table, execute the SELECT * FROM kudu.default."special.table!" statement.

Examples:
  1. Create a table named users in the default schema.
    CREATE TABLE kudu.default.users (
      user_id int WITH (primary_key = true),
      first_name varchar,
      last_name varchar
    ) WITH (
      partition_by_hash_columns = ARRAY['user_id'],
      partition_by_hash_buckets = 2
    );
    Note When you create a table, you must specify the required table information, such as the encoding format or compression format of primary keys and columns, and the hash partitions or range partitions.
  2. View information about the table.
    DESCRIBE kudu.default.users;
    Information similar to the following output is returned:
       Column   |  Type   |                      Extra                      | Comment
    ------------+---------+-------------------------------------------------+---------
     user_id    | integer | primary_key, encoding=auto, compression=default |
     first_name | varchar | nullable, encoding=auto, compression=default    |
     last_name  | varchar | nullable, encoding=auto, compression=default    |
    (3 rows)
  3. Inserts data into the table.
    INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
  4. Query data of the table.
    SELECT * FROM kudu.default.users;

Schema emulation enabled

If you enable the schema emulation feature in the kudu.properties configuration file for the Kudu connector in the etc/catalog/ directory, Kudu tables are mapped to schemas based on the naming conventions.
  • If kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix= are configured, the mappings listed in the following table prevail.
    Kudu table name Table name used in Presto
    orders kudu.default.orders
    part1.part2 kudu.part1.part2
    x.y.z kudu.x."y.z"
    Note Kudu does not support schemas. Presto creates a special table named $schemas to manage schemas.
  • If kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=presto:: are configured, the mappings listed in the following table prevail.
    Kudu table name Table name used in Presto
    orders kudu.default.orders
    part1.part2 kudu.default."part1.part2"
    x.y.z kudu.default."x.y.z"
    presto::part1.part2 kudu.part1.part2
    presto::x.y.z kudu.x."y.z"
    Note Kudu does not support schemas. Presto creates a special table named presto::$schemas to manage schemas.

Data type mappings

The following table describes the mappings between Presto data types and Kudu data types.
Presto data type Kudu data type Remarks
BOOLEAN BOOL None.
TINYINT INT8
SMALLINT INT16
INTEGER INT32
BIGINT INT64
REAL FLOAT
DOUBLE DOUBLE
VARCHAR STRING When you execute the CREATE TABLE ... AS ... statement to create a Kudu table from an existing Presto table, the optional maximum length for VARCHAR is lost.
VARBINARY BINARY
TIMESTAMP UNIXTIME_MICROS The precision of a Kudu column of this data type is reduced from µs to ms.
DECIMAL DECIMAL This data type is supported only by Kudu servers of 1.7.0 or a later version.
DATE N/A Kudu does not have a data type that matches DATE.

When you execute the CREATE TABLE ... AS ... statement to create a Kudu table from an existing Presto table, the DATE type for a column is converted to the STRING type.

CHAR N/A Kudu does not have data types that match these data types.
TIME
JSON
TIME WITH TIMEZONE
TIMESTAMP WITH TIME ZONE
INTERVAL YEAR TO MO NTH
INTERVAL DAY TO SEC OND
ARRAY
MAP
IPADDRESS

Supported Presto SQL statements

Note The ALTER SCHEMA ... RENAME TO ... statement is not supported.
Statement Remarks
SELECT None.
INSERT INTO ... VALUES None.
INSERT INTO ... SELECT ... None.
DELETE None.
DROP SCHEMA This statement is supported only if schema emulation is enabled.
CREATE SCHEMA This statement is supported only if schema emulation is enabled.
CREATE TABLE For more information about how to create a table, see Create a table.
CREATE TABLE ... AS None.
DROP TABLE None.
ALTER TABLE ... RENAME TO ... None.
ALTER TABLE ... ADD COLUMN ... For more information about how to add a column, see Add a column.
ALTER TABLE ... RENAME COLUMN ... These statements are supported only if the column that you want to rename or drop is not a primary key column.
ALTER TABLE ... DROP COLUMN ...
SHOW SCHEMAS None.
SHOW TABLES None.
SHOW CREATE TABLE None.
SHOW COLUMNS FROM None.
DESCRIBE This statement is equivalent to SHOW COLUMNS FROM.
CALL kudu.system.add_range_partition This statement is used to add a range partition. For more information, see Range partitions.
CALL kudu.system.drop_range_partition This statement is used to remove a range partition. For more information, see Range partitions.

Create a table

When you create a table, you must specify columns, data types, and partition information. You can also specify the column encoding format or compression format based on your requirements. Example:
CREATE TABLE user_events (
  user_id int WITH (primary_key = true),
  event_name varchar WITH (primary_key = true),
  message varchar,
  details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 5,
  number_of_replicas = 3
);

In this example, user_id and event_name are primary key columns. The table is divided into five partitions based on the hash values in the user_id column. The value of number_of_replicas is 3.

Take note of the following points when you configure parameters for the CREATE TABLE statement:
  • Primary key columns must be specified before other columns, and only a primary key column can be configured as a partition key column.
  • The number_of_replicas parameter is optional. This parameter specifies the number of tablet replicas and must be set to an odd number. If you do not configure this parameter, the default replication factor from the Kudu master configuration is used.
  • Kudu supports hash partitions and range partitions. A hash partition distributes rows by hash value to one of many buckets. A range partition distributes rows by using an ordered range partition key. The concrete range partitions must be explicitly created. Kudu supports multi-level partitioning. A table must contain at least one hash or range partition. A table can contain only one range partition but multiple hash partitions.

Column properties

In addition to column names and data types, you can also specify other column properties.
Column property Data type Description
primary_key BOOLEAN If this parameter is set to true, the column is used as a primary key column.

A Kudu primary key enforces the uniqueness constraint. If you insert a row that has the same primary key value as an existing row, the existing row is updated. For more information, see Primary Key Design.

nullable BOOLEAN If you set this parameter to true, the column can contain null values.
Important A primary key column cannot contain null values.
encoding VARCHAR Specifies the column encoding format to save storage space and improve query performance.

If you do not specify this parameter, Kudu encodes data in the column based on the column data type. Valid values: auto, plain, bitshuffle, runlength, prefix, dictionary, and group_varint. For more information, see Column Encoding.

compression VARCHAR Specifies the column compression format.

If you do not specify this parameter, Kudu uses the default compression format. Valid values: default, no, lz4, snappy, and zlib. For more information, see Column compression.

Example:
CREATE TABLE mytable (
  name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
  index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
  comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
   ...
) WITH (...);

Partition design

A table must contain at least one hash or range partition. A table can contain only one range partition but multiple hash partitions. The following section describes the partition information:
  • Define hash partitions
    • Define one partition group
      You can use the table property partition_by_hash_columns to specify partition key columns and use the table property partition_by_hash_buckets to specify the number of partitions. The partition key columns must be a subset of primary key columns. Example:
      CREATE TABLE mytable (
        col1 varchar WITH (primary_key=true),
        col2 varchar WITH (primary_key=true),
        ...
      ) WITH (
        partition_by_hash_columns = ARRAY['col1', 'col2'],
        partition_by_hash_buckets = 4
      )
      Note In this example, col1 and col2 columns are defined as hash partition key columns, and data is distributed to four partitions.
    • Define two partition groups
      If you want to define two independent hash partition groups, in addition to the table properties specified in the preceding example, you must specify the table properties partition_by_second_hash_columns and partition_by_second_hash_buckets. Example:
      CREATE TABLE mytable (
        col1 varchar WITH (primary_key=true),
        col2 varchar WITH (primary_key=true),
        ...
      ) WITH (
        partition_by_hash_columns = ARRAY['col1'],
        partition_by_hash_buckets = 2,
        partition_by_second_hash_columns = ARRAY['col2'],
        partition_by_second_hash_buckets = 3
      )
      Note In this example, two hash partition groups are defined. In the first hash partition group, rows are distributed to two partitions based on the col1 column. In the second hash partition group, rows are distributed to three partitions based on the col2 column. In this case, the total number of partitions in the table is 6 (2 × 3).
  • Define range partitions
    A Kudu table can contain only one range partition, which can be defined by using the table property partition_by_range_columns. When you create a table, you can use the table property range_partitions to define the range of the partition. You can use the table properties kudu.system.add_range_partition and kudu.system.drop_range_partition to manage the range partitions of existing tables. Example:
    CREATE TABLE events (
      rack varchar WITH (primary_key=true),
      machine varchar WITH (primary_key=true),
      event_time timestamp WITH (primary_key=true),
      ...
    ) WITH (
      partition_by_hash_columns = ARRAY['rack'],
      partition_by_hash_buckets = 2,
      partition_by_second_hash_columns = ARRAY['machine'],
      partition_by_second_hash_buckets = 3,
      partition_by_range_columns = ARRAY['event_time'],
      range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
                           {"lower": "2018-01-01T00:00:00", "upper": null}]'
    )
    Note In this example, two hash partition groups and one range partition are defined. The table is range partitioned on the event_time column, and data is split based on 2018-01-01T00:00:00.
  • Manage range partitions

    You can use a stored procedure to add a range partition to or remove a range partition from an existing table.

    Examples:
    • Add a range partition:
      CALL kudu.system.add_range_partition(<your_schema_name>, <your_table_name>, <range_partition_as_json_string>)
    • Remove a range partition:
      CALL kudu.system.drop_range_partition(<your_schema_name>, <your_table_name>, <range_partition_as_json_string>)
    Parameter Description
    <your_schema_name> The schema to which the table belongs.
    <your_table_name> The name of the table.
    <range_partition_as_json_string> The upper and lower bounds of the range partition. You must set this parameter in the '{"lower": <value>, "upper": <value>}' JSON format. If the partition has multiple columns, you must set this parameter in the '{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}' format. The specific value formats of the upper and lower bounds depend on the data types of columns. Mappings between data types and JSON string formats:
    • BIGINT: '{"lower": 0, "upper": 1000000}'
    • SMALLINT: '{"lower": 10, "upper": null}'
    • VARCHAR: '{"lower": "A", "upper": "M"}'
    • TIMESTAMP: '{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}'
    • BOOLEAN: '{"lower": false, "upper": true}'
    • VARBINARY: Base64-encoded strings
    Note If you set this parameter to null, the partition is unbounded.
    Example:
    CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')
    Note In this example, a range partition is added to the events table in the myschema schema. The lower bound of the partition is 2018-01-01, whose accurate value is 2018-01-01T00:00:00.000. The upper bound of the partition is 2018-06-01.
    You can execute the SHOW CREATE TABLE statement to query the existing range partition of the table. In the return results, the table property range_partitions indicates the partition information of the table.

Add a column

You can execute the ALTER TABLE ... ADD COLUMN ... statement to add a column to an existing table. You can also use column properties to add columns. For more information about the column properties, see Create a table.
ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')