This topic describes how to use a connector to write data from Flink sinks, sources, and lookup tables to Hologres.
Prerequisites
- A Hologres instance is purchased. A development tool is connected to the instance. For more information, see Use Hologres from the PostgreSQL client.
- A Flink cluster is available for you to commit tasks.
- To write data from a Flink sink, source, or lookup table to Hologres in real time, reference a JAR package in Flink. You can submit a ticket or join the Hologres DingTalk group 32314975 to obtain the JAR package.
Write data from a Flink sink to Hologres
CREATE TABLE mysource(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector.type'='hologres',
'connector.database'='Name of the Hologres database to be connected',
'connector.table'='Name of the Hologres table used to receive data',
'connector.username'='AccessKey ID of the account used to connect to Hologres',
'connector.password'='AccessKey secret of the account used to connect to Hologres',
'connector.endpoint'='<ip>:<port>'// The endpoint and port number used to connect to Hologres.
);
The following table describes the parameters in the statement.Parameter | Description | Required |
---|---|---|
type | The type of the sink.
Set the value to hologres. |
Yes |
database | The name of the Hologres database to be connected. | Yes |
table | The name of the Hologres table that is used to receive data. | Yes |
username | The AccessKey ID of the account that is used to connect to Hologres.
You can obtain the AccessKey ID in the User Management console. |
Yes |
password | The AccessKey secret of the account that is used to connect to Hologres.
You can obtain the AccessKey secret in the User Management console. |
Yes |
endpoint | The endpoint used to connect to Hologres.
You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console. |
Yes |
bulkload | Specifies whether to write multiple data entries to Hologres at a time. Default value: false. |
No |
field_delimiter | The delimiter used to separate data entries written to Hologres.
Do not insert delimiters in data. This parameter takes effect only when the bulkload parameter is set to true. Default value: \u0002. |
No |
upsert_type | The semantics that is used to write data to Hologres in a continuous manner.
Default value: insertorignore. |
No |
partition_router | Specifies whether to route data of various partitions to the corresponding child partitioned
tables.
Default value: false. |
No |
- Streaming sinking semantics
If you want to write data from a Flink sink to Hologres in a continuous manner and query the data written to Hologres in real time, use the streaming sinking semantics. We recommend that you use the streaming sinking semantics to write streaming data from a Flink sink to Hologres in real time.
You can use the exactly-once or at-least-once semantic based on the configurations of the sink and attributes of the Hologres table.- Exactly-once: The system processes data or events only once, no matter whether a fault occurs.
- At-least-once: If data or an event is lost before a process is completed, the system transfers all data and events anew. In this case, data or events may be processed multiple times. If the first retry succeeds, no other retries are required.
When you use streaming sinking semantics in Flink sinks, pay attention to the following points:- If the destination Hologres table has a primary key, you can ensure the idempotence
based on the primary key conflicts to implement the exactly-once semantics.
When data entries with the same primary key are written to Hologres, you can set mutationType to one of the following values to ensure that each data entry is written exactly once to Hologres:
- insertorignore: Retain each data entry written to Hologres for the first time and ignore all data entries with the same primary key value written to Hologres later. This is the default value.
- insertorreplace: Update the existing data entry as a whole.
- insertorupdate: Update the specified fields of the existing data entry.
- If the destination Hologres table contains no primary key, implement the at-least-once semantics.
By default, you can only import data from a Flink sink to a non-partitioned table or a child partitioned table in Hologres. If you import data to a parent partitioned table, no error message is returned but you cannot query the written data in Hologres.
To route data in a parent partitioned table to the corresponding child partitioned tables, set thepartitionRouter
parameter to true. For example, you can use the following statement to create a Flink sink, write data to a parent partitioned table, and then route data from the parent partitioned table to the corresponding child partitioned tables:CREATE TABLE mysource( name varchar, age BIGINT, birthday BIGINT ) WITH ( 'connector.type'='hologres', 'connector.database'='Name of the Hologres database to be connected', 'connector.table'='Name of the parent partitioned table in Hologres used to receive data', 'connector.username'='AccessKey ID of the account used to connect to Hologres', 'connector.password'='AccessKey secret of the account used to connect to Hologres', 'connector.endpoint'='<ip>:<port>',// The endpoint and port number used to connect to Hologres. 'connector.partitionRouter'='true', );
Note- Set the tableName parameter to the name of the parent partitioned table.
- Hologres does not automatically create child partitioned tables when it receives data from Flink. You must create child partitioned tables as needed in advance.
- Batch sinking semantics
If you want to write a large amount of data entries from a Flink sink to Hologres at a time, use the batch sinking semantics. You can query the data only after all the data is written to Hologres. The data writing process runs as a transaction. If the transaction succeeds, all the data is written to Hologres exactly once.
The batch sinking semantics improves the high-throughput performance. We recommend that you use the batch sinking semantics when you process multiple data entries in Flink at a time, such as data migration and data backfilling.
Note To use the batch sinking semantics to write data from a Flink sink to a partitioned table in Hologres, you can only write the data to a child partitioned table.
Write data from a Flink source to Hologres
You can use the batch reading semantics to read data from a Flink source.
When you use the batch reading semantics to read data from a Flink source, all the snapshot data of the source is read. The data reading process runs as a transaction. If the transaction fails, the system reads the data anew from the snapshot at the latest time point.
We recommend that you use the batch reading semantics when you read a large amount of data entries from a Flink source at a time.
- Flink sources support the projection pushdown rule, meaning that you can read data from the specified columns in a Flink source.
- Each parallel instance of Flink can read one or more partitions of the source. We recommend that the number of the parallel instances be no more than that of the partitions.
CREATE TABLE mysource(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector.type'='hologres',
'connector.database'='Name of the Hologres database to be connected',
'connector.table'='Name of the Hologres table used to receive data',
'connector.username'='AccessKey ID of the account used to connect to Hologres',
'connector.password'='AccessKey secret of the account used to connect to Hologres',
'connector.endpoint'='<ip>:<port>'// The endpoint and port number used to connect to Hologres.
);
The following table describes the parameters in the statement.Parameter | Description | Required |
---|---|---|
type | The type of the source.
Set the value to hologres. |
Yes |
database | The name of the Hologres database to be connected. | Yes |
table | The name of the Hologres table that is used to receive data. | Yes |
username | The AccessKey ID of the account that is used to connect to Hologres.
You can obtain the AccessKey ID in the User Management console. |
Yes |
password | The AccessKey secret of the account that is used to connect to Hologres.
You can obtain the AccessKey secret in the User Management console. |
Yes |
endpoint | The endpoint used to connect to Hologres.
You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console. |
Yes |
field_delimiter | The delimiter used to separate data entries written to Hologres.
Do not insert delimiters in data. Default value: \u0002. |
No |
Write data from a Flink lookup table to Hologres
- Lookup tables in Flink do not support the async mode.
- Lookup tables in Flink do not support caching.
- If you need help, submit a ticket or contact the technical support of Flink or Hologres.
CREATE TABLE mysource(
...
) WITH (
'connector.type'='hologres',
'connector.database'='Name of the Hologres database to be connected',
'connector.table'='Name of the Hologres table used to receive data',
'connector.username'='AccessKey ID of the account used to connect to Hologres',
'connector.password'='AccessKey secret of the account used to connect to Hologres',
'connector.endpoint'='<ip>:<port>'// The endpoint and port number used to connect to Hologres.
);
Examples// register hologreslookup as udf
SELECT * FROM source, LATERAL TABLE(hologreslookup(a, b))
// register hologreslookup as lookup table source
SELECT x, a, b, c FROM src
JOIN hologreslookup FOR SYSTEM_TIME AS OF src.proc as h ON src.x = h.a AND src.y = h.b
Appendix: Data types in Flink and Hologres
The following table lists the mappings between data types in Flink and Hologres.
Flink | Hologres |
---|---|
INT | INT |
ARRAY<INT> | INT[] |
BIGINT | BIGINT |
ARRAY<BIGINT> | BIGINT[] |
FLOAT | REAL |
ARRAY<FLOAT> | REAL[] |
DOUBLE | DOUBLE PRECISION |
ARRAY<DOUBLE> | DOUBLE PRECISION[] |
BOOLEAN | BOOLEAN |
ARRAY<BOOLEAN> | BOOLEAN[] |
VARCHAR | TEXT |
ARRAY<VARCHAR> | TEXT[] |
DECIMAL | NUMERIC |
DATE | DATE |
TIMESTAMP | TIMESTAMP WITH TIME ZONE |
For more information about the data types supported by Hologres, see Data types.