All Products
Search
Document Center

Realtime Compute for Apache Flink:StarRocks connector

Last Updated:Jun 06, 2023

This topic describes how to use the StarRocks connector.

Background information

StarRocks is a new generation of Massively Parallel Processing (MPP) data warehouses that provide extremely fast query performance in all scenarios. StarRocks is dedicated to providing extremely fast and unified analytics experience. StarRocks provides the following benefits:

  • Is compatible with the MySQL protocol. You can use a MySQL client or a common business intelligence (BI) tool to access StarRocks for data analytics.

  • Uses a distributed architecture that provides the following capabilities:

    • Horizontally splits tables and stores data in multiple replicas.

    • Scales clusters in a flexible manner to support analytics of 10 PB of data.

    • Supports the MPP architecture to accelerate data computing.

    • Supports multiple replicas to ensure fault tolerance.

Flink connectors cache data and use Stream Load to import data in batches to generate result tables, and read data in batches to generate source tables. The following table describes the capabilities supported by the StarRocks connector.

Item

Description

Table type

Source table and result table

Running mode

Streaming mode and batch mode

Data format

JSON and CSV

Metric

N/A

API type

DataStream API and SQL API

Data update or deletion in a result table

Yes

Features

StarRocks of E-MapReduce (EMR) supports the CREATE TABLE AS and CREATE DATABASE AS statements. The CREATE TABLE AS statement can be used to synchronize the schema and data of a single table. The CREATE DATABASE AS statement can be used to synchronize data of an entire database or the schema and data of multiple tables in the same database. For more information, see Use the CREATE TABLE AS and CREATE DATABASE AS statements of Realtime Compute for Apache Flink to synchronize data from an ApsaraDB RDS for MySQL instance to a StarRocks cluster.

Prerequisites

A StarRocks cluster is created. The StarRocks cluster can be a StarRocks cluster of EMR or a self-managed StarRocks cluster that is hosted on Elastic Compute Service (ECS) instances.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.5 or later supports the StarRocks connector.

  • The StarRocks connector supports only the at-least-once and exactly-once semantics.

Syntax

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

Parameters in the WITH clause

Category

Parameter

Description

Data type

Required

Default value

Remarks

Common parameters

connector

The type of the table.

STRING

Yes

No default value

Set the value to starrocks.

jdbc-url

The Java Database Connectivity (JDBC) URL that is used to connect to the database.

STRING

Yes

No default value

The specified IP address and JDBC port of a frontend (FE) are used. The value of this parameter is in the jdbc:mysql://ip:port format.

database-name

The name of the StarRocks database.

STRING

Yes

No default value

N/A.

table-name

The name of the StarRocks table.

STRING

Yes

No default value

N/A.

username

The username that is used to connect to the StarRocks database.

STRING

Yes

No default value

N/A.

password

The password that is used to connect to the StarRocks database.

STRING

Yes

No default value

N/A.

Parameters only for source tables

scan-url

The URL for data scan.

STRING

No

No default value

The specified IP address and HTTP port of an FE are used. The value of this parameter is in the fe_ip:http_port;fe_ip:http_port format.

Note

Separate multiple pairs of IP addresses and port numbers with semicolons (;).

scan.connect.timeout-ms

The timeout period for the StarRocks connector of Flink to connect to the StarRocks database.

If the connection duration exceeds the value of this parameter, an error is returned.

STRING

No

1000

Unit: milliseconds.

scan.params.keep-alive-min

The keep-alive period of the query task.

STRING

No

10

N/A.

scan.params.query-timeout-s

The timeout period of the query task.

If no query result is returned within the period specified by this parameter, the query task is stopped.

STRING

No

600

Unit: seconds.

scan.params.mem-limit-byte

The maximum memory for a single query in a backend (BE) node.

STRING

No

1073741824 (1 GB)

Unit: bytes.

scan.max-retries

The maximum number of retries when a query fails.

If the number of retries reaches the value of this parameter, an error is returned.

STRING

No

1

N/A.

Parameters only for result tables

load-url

The URL for data import.

STRING

Yes

No default value

The specified IP address and HTTP port of an FE are used. The value of this parameter is in the fe_ip:http_port;fe_ip:http_port format.

Note

Separate multiple pairs of IP addresses and port numbers with semicolons (;).

sink.semantic

The semantics for data writing.

STRING

No

at-least-once

Valid values:

  • at-least-once: The at-least-once semantics is used. This is the default value.

  • exactly-once: The exactly-once semantics is used.

sink.buffer-flush.max-bytes

The maximum amount of data that is allowed in the buffer.

STRING

No

94371840 (90 MB)

Valid values: 64 MB to 10 GB.

sink.buffer-flush.max-rows

The maximum number of rows that are allowed in the buffer.

STRING

No

500000

Valid values: 64000 to 5000000.

sink.buffer-flush.interval-ms

The interval at which the buffer is refreshed.

STRING

No

300000

Valid values: 1000 to 3600000. Unit: milliseconds.

sink.max-retries

The maximum number of retries for writing data to the table.

STRING

No

1

Valid values: 0 to 10.

sink.connect.timeout-ms

The timeout period for connecting to the StarRocks database.

STRING

No

1000

Valid values: 100 to 60000. Unit: milliseconds.

sink.properties.*

The properties of the result table.

STRING

No

No default value

N/A.

Data type mappings

Data type of StarRocks

Data type of Flink

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

LARGEINT

STRING

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR

CHAR

VARCHAR

STRING

Sample code

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;