System connector

Last Updated: May 04, 2018


SQL can be used to query the basic information and measurements of the Presto cluster through the connector.


All information can be obtained through a catalog known as system without configuration.


  1. --- List all supported data entries
  2. SHOW SCHEMAS FROM system;
  3. --- List all data entries in the project during runtime
  4. SHOW TABLES FROM system.runtime;
  5. --- Obtain node status
  6. SELECT * FROM system.runtime.nodes;
  7. node_id | http_uri | node_version | coordinator | state
  8. -------------+--------------------------+--------------+-------------+--------
  9. 3d7e8095-...|| 0.188 | false | active
  10. 7868d742-...|| 0.188 | false | active
  11. 7c51b0c1-...|| 0.188 | true | active
  12. --- Force cancel a query
  13. CALL system.runtime.kill_query('20151207_215727_00146_tx3nr');

Data tables

The connector provides the following data tables:

catalogs metadata This table contains the list of all catalogs supported in the connector
schema_properties metadata This table contains the list of available properties that can be set when creating a Schema.
table_properties metadata This table contains the list of available properties that can be set when creating a table.
nodes runtime This table contains the list of all visible nodes and statuses thereof in the Presto cluster
queries runtime This table contains information queries currently and recently initiated in the Presto cluster, including the original query texts (SQL), identities of the users who initiate the queries and information on query performances, for example, query queue and analysis time, etc.
tasks runtime This table contains information on the task involved in the queries in the Presto, including the locations and numbers of lines and bytes processed in each task.
transactions runtime This table contains the list of currently opened transactions and related metadata. The data includes information like creation time, idle time, initiation parameters, and access catalogs.

Stored procedures

The connector supports the following stored procedures:

  • runtime.kill_query(id) Cancel query from specified ID

JMX connector


JMX information for all nodes in the Presto cluster can be queried through the JMX connector. The connector is generally used for system monitoring and debugging. Regular dump of JMX information can be implemented through modifying the configuration of the connector.


Create a file etc/catalog/jmx.properties, add the following content, and enable JMX connector.

  1. connector.name=jmx

If regular dump of JMX data is expected, the following content can be added in the configuration file:

  1. connector.name=jmx
  2. jmx.dump-tables=java.lang:type=Runtime,com.facebook.presto.execution.scheduler:name=NodeScheduler
  3. jmx.dump-period=10s
  4. jmx.max-entries=86400


  • dump-tables is a list of MBeans (Managed Beans) separated with commas. This configuration specifies which MBeans is sampled and stored in the memory for each sample period;
  • dump-period is used for setting the sample period, which is 10s by default;
  • max-entries is used for setting the max length of the history, which is 86400 by default.

If the name of a metric contains a comma, it must be escaped using ‘\,’ as follows:

  1. connector.name=jmx
  2. jmx.dump-tables=com.facebook.presto.memory:type=memorypool\\,name=general,\
  3. com.facebook.presto.memory:type=memorypool\\,name=system,\
  4. com.facebook.presto.memory:type=memorypool\\,name=reserved

Data tables

JMX connector provides 2 schemas, current and history, respectively. Where:

current contains the current MBean in each node, the name of which is the table name in current (if the bean name contains non-standard characters, the table name must be in quotation marks for the query), which can be obtained through the following statement:

  1. SHOW TABLES FROM jmx.current;


  1. --- Obtain jvm information for each node
  2. SELECT node, vmname, vmversion
  3. FROM jmx.current."java.lang:type=runtime";
  4. node | vmname | vmversion
  5. --------------+-----------------------------------+-----------
  6. ddc4df17-xxx | Java HotSpot(TM) 64-Bit Server VM | 24.60-b09
  7. (1 row)
  1. --- Obtain the metrics of max and min file descriptor counts for each node
  2. SELECT openfiledescriptorcount, maxfiledescriptorcount
  3. FROM jmx.current."java.lang:type=operatingsystem";
  4. openfiledescriptorcount | maxfiledescriptorcount
  5. -------------------------+------------------------
  6. 329 | 10240
  7. (1 row)

history contains the data table corresponding to the metrics to be dumped in the configuration file. The following statement can be used to query:

  1. SELECT "timestamp", "uptime" FROM jmx.history."java.lang:type=runtime";
  2. timestamp | uptime
  3. -------------------------+--------
  4. 2016-01-28 10:18:50.000 | 11420
  5. 2016-01-28 10:19:00.000 | 21422
  6. 2016-01-28 10:19:10.000 | 31412
  7. (3 rows)

Kafka connector


The connector maps topic in Kafka to tables in Presto. Each record in Kafka is mapped to a message in Presto tables.

Note: Since data in Kafka is dynamic, when Presto is used for multiple queries, something strange may sometimes occur. Currently, Presto is incapable of handling such cases.


Create a file etc/catalog/kafka.properties, add the following content, and enable Kafka connector.

  1. connector.name=kafka
  2. kafka.table-names=table1,table2
  3. kafka.nodes=host1:port,host2:port

Presto can connect to multiple Kafka cluster through adding a new properties file in the configuration catalog, and the file name is mapped to the Presto catalog. For example, when a configuration file “orders.properties” is added, Presto creates a catalog named ‘orders’.

  1. ## orders.properties
  2. connector.name=kafka # It denotes the connector type, which cannot be changed
  3. kafka.table-names=tableA,tableB
  4. kafka.nodes=host1:port,host2:port

Kafka connector provides the following properties:

  • kafka.table-names

    Description: [Required], it defines the list of tables supported in the connector

    Details: The file name here can be modified using Schema name, with forms like {schema_name} and {table_name}. The file name also can be not modified using Schema name, and the table is mapped to the Schema defined in kafka.default-schema.

  • kafka.default-schema

    Description: [Optional]default Schema name, with the default value default

  • kafka.nodes

    Description [Required]the node list in the Kafka clusterDetails: the configuration form is like hostname:port[,hostname:port…]. You can configure only part of the Kafka nodes here, but Presto must be capable of connecting to all nodes in the Kafka cluster. Otherwise, a part of data may not be obtained.

  • kafka.connect-timeout

    Description: [Optional], timeout for the connector and the Kafka cluster, which is 10s by defaultDetails: If the pressure on the Kafka cluster is large, a long time may be taken for creating a connection, causing timeout when executing the query by Presto. At this time, adding the configured value is a better choice.

  • kafka.buffer-size

    Description: [Optional]read buffer size, which is 64 kb by defaultDetails: It is used to set the size of the buffer for internal data reads from Kafka. The data buffer must have a size larger than that of a message. A data buffer is distributed for each worker and data node.

  • kafka.table-description-dir

    Description: [Optional]the catalog of topic (table) description file, which is etc/kafka by defaultDetails: Data table definition files in JSON format is stored in this directory (.json has to be used as a suffix).

  • kafka.hide-internal-columns

    Description: [Optional], the list of preset columns to be hidden, the value of which is true by default_Details: In addition to data columns defined in the table description file, the connector also maintains many extra columns for each table. The property is used to control whether these columns are shown in the execution result of the statement DESCRIBE and SELECT *. Regardless of the setting in the configuration, these columns are involved in the query process.

The Kafka connector provides internal columns as shown in the following table:

Column name Type Description
_partition_id BIGINT The ID of the Kafka partition where the record is located.
_partition_offset BIGINT The offset of the Kafka partition where the record is located.
_segment_start BIGINT The lowest offset containing this data segment. This offset is for each partition.
_segment_end BIGINT The largest offset containing this data segment (which is the start offset of the next segment). This offset is for each partition.
_segment_count BIGINT The serial number of the column in this segment. For an uncompressed topic, _segment_start + _segment_count = _partition_offset.
_message_corrupt BOOLEAN This field will be set to TRUE if a decoder cannot decode the record.
_message VARCHAR A string coded with UTF-8 from the message bytes. When the type of the topic message is text, the field will be useful.
_message_length BIGINT The byte length of the message.
_key_corrupt BOOLEAN This field will be set to TRUE if a key decoder cannot decode the record.
_key VARCHAR A string coded with UTF-8 from the key bytes. When the type of the topic message is text, the field will be useful.
_key_length BIGINT The byte length of the key.

For those tables without definition files, _key_corrupt and _message_corrupt remain FALSE.

Table definition files

Kafka is a Schema-Less message system, and the formats of the messages must defined by the producers and consumers. While Presto requires that data must be capable of being mapped into tables. Therefore, the users must provide corresponding table definition files according to the actual uses of the messages. For messages in JSON format, if a definition file is not provided, JSON functions in Presto can be used for resolution in the queries. While the method is flexible, it increases the difficulty of writing SQL statements.

When JSON is used to define a table in a table definition file, the file name can be customized, while the extension must be *.json.

  1. {
  2. "tableName": ...,
  3. "schemaName": ...,
  4. "topicName": ...,
  5. "key": {
  6. "dataFormat": ...,
  7. "fields": [
  8. ...
  9. ]
  10. },
  11. "message": {
  12. "dataFormat": ...,
  13. "fields": [
  14. ...
  15. ]
  16. }
  17. }
Field Optionality Type Description
tableName required string Presto table name
schemaName optional string the name of the Schema where the table is located
topicName required string Kafka topic name
key optional JSON object rules for mapping from message keys to columns
message optional JSON object rules for mapping from messages to columns

In which, the mapping rules for keys and messages use the following fields for description.

Field Optionality Type Description
dataFormat required string A decoder for setting a group of columns
fields required JSON array Column definition list

fields here is a JSON array, and each element is a JSON object as follows:

  1. {
  2. "name": ...,
  3. "type": ...,
  4. "dataFormat": ...,
  5. "mapping": ...,
  6. "formatHint": ...,
  7. "hidden": ...,
  8. "comment": ...
  9. }
Field Optionality Type Description
name required string column name
type required string column data type
dataFormat optional string column data decoder
Mapping optional string decoder parameters
formatHint optional string hint set for the column, which can be used by the decoder
hiddent optional boolean whether hidden or not
comment optional string column description


The function of the decoder is to map Kafka messages (key + message) to the columns in the data tables. Presto uses dummy decoder in the absence of table definition files.

Kafka connector provides the following decoders:

  • raw - original bytes are directly used without conversion
  • csv - messages are processed as strings in CSV format
  • json - messages are processed as strings in JSON format
