Overview

The Kafka connector is used to map topics in Kafka to tables in Presto. Each record in Kafka is mapped to a message in Presto tables.

Notice

The data that is returned by multiple queries by using Presto may be inconsistent because data in Kafka changes dynamically. Currently, Presto is incapable of processing inconsistent returned data.

Configuration

Create the file etc/catalog/kafka.properties, add the following content, and enable the Kafka connector.

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
Note

You can connect Presto to multiple Kafka clusters by adding a new properties file in the configuration catalog. The file name is mapped to the Presto catalog. For example, when the configuration file "orders.properties" is added, Presto creates a catalog named "orders".

## orders.properties
connector.name=kafka  # The connector type, which cannot be changed.
kafka.table-names=tableA,tableB
kafka.nodes=host1:port,host2:port
The Kafka connector provides the following properties:
Parameter Required Description Remarks
kafka.table-names Yes A list of tables supported by the connector. The file name can be modified by using the schema name in the format {schema_name}.{table_name}. If the file name is not modified by using the schema name, the table is mapped to the schema defined in kafka.default-schema.
kafka.default-schema The default schema name, which is default.
kafka.nodes Yes A list of nodes in the Kafka cluster. The format is hostname:port[,hostname:port...]. You can configure only part of the Kafka nodes here, but Presto must be connected to all the nodes in the Kafka cluster. Otherwise, a portion of data may not be obtained.
kafka.connect-timeout No The timeout period of the connection between the Kafka connector and the Kafka cluster. The default value is 10 seconds. If the Kafka cluster is under heavy load, it may take a long time to create a connection, causing a timeout when Presto runs a query. In this case, you can increase the value of this parameter.
kafka.buffer-size No The size of the read buffer. The default value is 64 KB. This parameter is used to set the size of the internal buffer that stores the data read from Kafka. The buffer size must be greater than that of a message. A data buffer is allocated to each worker and data node, respectively.
kafka.table-description-dir No The directory that stores the topic (table) description file. The default value is etc/kafka. The directory stores the data table definition files in the JSON format (the file name must be suffixed with .json).
kafka.hide-internal-columns No A list of the predefined columns that need to be hidden. The default value is true. The Kafka connector maintains many extra columns for each table, in addition to the data columns defined in the table description file. This property controls whether to display the extra columns in the execution results of the DESCRIBE <table-name> and SELECT * statements. These columns are involved in the query process regardless of the setting.
  • The Kafka connector provides the following internal columns:

    Column Type Description
    _partition_id BIGINT The ID of the Kafka partition where the current record row is located.
    _partition_offset BIGINT The offset of the current record row in the Kafka partition.
    _segment_start BIGINT The minimum offset of the data segment that contains the current row. This offset is applicable to each partition.
    _segment_end BIGINT The maximum offset of the data segment that contains the current row (which is the starting offset of the next data segment). This offset is applicable to each partition.
    _segment_count BIGINT The serial number of the current row in the data segment. The calculation formula for an uncompressed topic is as follows: _segment_start + _segment_count = _partition_offset.
    _message_corrupt BOOLEAN This field is set to TRUE if the records in the current row cannot be decoded by using a decoder.
    _message VARCHAR A string encoded with UTF-8 from the message bytes. This field is useful when the topic message is of the text type.
    _message_length BIGINT The byte length of the current message.
    _key_corrupt BOOLEAN This field is set to TRUE if the records in the current row cannot be decoded by using a decoder.
    _key VARCHAR A string encoded with UTF-8 from the key bytes. This field is useful when the topic message is of the text type.
    _key_length BIGINT The byte length of the message key.
    Note For those tables without definition files, _key_corrupt and _message_corrupt are set to FALSE by default.

Table definition files

Kafka is a schema-less message system. The message format must be defined by the producer and consumer. Presto requires that data be mapped to tables. Therefore, you must provide corresponding table definition files based on the actual usage of messages. Messages in the JSON format can be parsed by using the JSON functions of Presto if no definition files are provided. While the method is flexible, it increases the complexity of writing SQL statements.

When JSON is used to define a table in a table definition file, the file name can be customized, with the invariable extension .json.

{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}
Field Required Type Description
tableName Yes String The Presto table name.
schemaName No String The name of the schema where the table is located.
topicName Yes String The Kafka topic name.
Key No JSON object The rules for mapping message keys to columns.
message No JSON object The rules for mapping messages to columns.

The mapping rules for keys and messages use the following fields for description:

Field Required Type Description
dataFormat Yes String A decoder for setting a group of columns.
fields Yes JSON array The column definition list.

fields is a JSON array, and each element is a JSON object in the following format:

{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}
Field Required Type Description
name Yes String The column name.
type Yes String The data type of this column.
dataFormat No String The column data decoder.
mapping No String The decoder parameters.
formatHint No String The prompt set for the column, which can be used by the decoder.
hiddent No Boolean Indicates whether a column is hidden.
comment No String The column description.

Decoder

A decoder maps Kafka messages (key+message) to the columns of data tables. Presto uses the dummy decoder when table definition files are unavailable.

The Kafka connector provides the following three decoders:

  • raw: uses raw bytes directly without conversion.
  • csv: processes messages as strings in the CSV format.
  • json: processes messages in the JSON format.