This topic describes the data types and parameters that PostgreSQL Reader supports and how to configure it by using the codeless user interface (UI) and code editor.
Background information
PostgreSQL Reader connects to a remote PostgreSQL database and executes a SELECT statement to select and read data from the database. ApsaraDB RDS provides the PostgreSQL storage engine.
- PostgreSQL Reader generates the SELECT statement based on the table, column, and where parameters that you specify, and sends the generated SELECT statement to the PostgreSQL database.
- If you specify the querySql parameter, PostgreSQL Reader directly sends the value of this parameter to the PostgreSQL database.
"parameter": {
"datasource": "abc",
"column": [
"id",
"\"pgCreateTime\"", // Add escape characters to a column name.
"\"pgCreateTimeDay\""
],
"where": "",
"splitPk": "id",
"table": "public.wpw_test"
},
Data types
PostgreSQL Reader supports most PostgreSQL data types. Make sure that your data types are supported.
Category | PostgreSQL data type |
---|---|
Integer | BIGINT, BIGSERIAL, INTEGER, SMALLINT, and SERIAL |
Floating point | DOUBLE, PRECISION, MONEY, NUMERIC, and REAL |
String | VARCHAR, CHAR, TEXT, BIT, and INET |
Date and time | DATE, TIME, and TIMESTAMP |
Boolean | BOOL |
Binary | BYTEA |
- PostgreSQL Reader supports only the data types that are described in the preceding table.
- You can convert the MONEY, INET, and BIT types by using syntax such as
a_inet::varchar
.
Parameters
Parameter | Description | Required | Default value |
---|---|---|---|
datasource | The connection name. It must be the same as the name of the created connection. You can create connections in the code editor. | Yes | N/A |
table | The name of the source table. | Yes | N/A |
column | The columns to be synchronized from the source table. The columns are described in
a JSON array. The default value is [*], which indicates all columns in the source
table.
|
Yes | N/A |
splitPk | The field that is used for data sharding when PostgreSQL Reader reads data. If you
specify the splitPk parameter, the table is sharded based on the shard key that is specified by this
parameter. Data Integration then runs concurrent threads to synchronize data. This
way, data can be synchronized more efficiently.
|
No | N/A |
where | The WHERE clause. PostgreSQL Reader generates a SELECT statement based on the table, column, and where parameters that you specify, and uses the generated SELECT statement to select and
read data. For example, set this parameter to id>2 and sex=1 .
|
No | N/A |
querySql (available only in the code editor) | The SELECT statement that is used for refined data filtering. If you specify this
parameter, Data Integration filters data based on this parameter. For example, if
you want to join multiple tables for data synchronization, set this parameter to select a,b from table_a join table_b on table_a.id = table_b.id . If you specify the querySql parameter, PostgreSQL Reader ignores the table, column, where, and splitPk parameters
that you specify.
|
No | N/A |
fetchSize | The number of data records to read at a time. This parameter determines the number
of interactions between Data Integration and the database and affects reading efficiency.
Note A value greater than 2048 may lead to out of memory (OOM) during the data synchronization
process.
|
No | 512 |
Configure PostgreSQL Reader by using the codeless UI
- Configure the connections.
Configure the connections to the source and destination data stores for the sync node.
Parameter Description Connection The datasource parameter in the preceding parameter description. Select a connection type and select the name of a connection that you have configured in DataWorks. Table The table parameter in the preceding parameter description. Filter The condition for filtering the data to be synchronized. PostgreSQL Reader cannot filter data based on the limit keyword. The SQL syntax is determined by the selected connection. Shard Key The shard key. You can specify a column in the source table as the shard key. We recommend that you use the primary key or an indexed column as the shard key. Only integer fields are supported. If data sharding is performed based on the configured shard key, data can be read concurrently. This way, data can be synchronized more efficiently.Note The Shard Key parameter is displayed only after you select the connection to the source data store for the sync node. - Configure field mapping. It is equivalent to setting the column parameter in the preceding parameter description.
Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field. To remove a field, move the pointer over the field and click the Remove icon.
GUI element Description Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. The data types of the fields must match. Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping between fields in the same row. The data types of the fields must match. Delete All Mappings Click Delete All Mappings to remove mappings that have been established. Auto Layout Click Auto Layout to sort the fields based on specified rules. Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored. Add - Click Add to add a field. Take note of the following rules when you add a field:
- You can enter constants. Each constant must be enclosed in single quotation marks (' '), for example, 'abc' and '123'.
- You can use scheduling parameters such as ${bizdate}.
- You can enter functions that are supported by relational databases, for example, now() and count(1).
- Fields that cannot be parsed are indicated by Unidentified.
- Configure channel control policies.
Parameter Description Expected Maximum Concurrency The maximum number of concurrent threads that the sync node uses to read data from or write data to data stores. You can configure the concurrency for the node on the codeless UI. Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value. Dirty Data Records Allowed The maximum number of dirty data records allowed.
Configure PostgreSQL Reader by using the code editor
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"postgresql", // The reader type.
"parameter":{
"datasource":"",// The connection name.
"column":[// The columns to be synchronized from the source table.
"col1",
"col2"
],
"where":"",// The WHERE clause.
"splitPk":"", // The shard key based on which the table is sharded. Data Integration runs concurrent threads to synchronize data based on the shard key.
"table":"" // The name of the source table.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":false, // Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
"concurrent":1 // The maximum number of concurrent threads.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Usage notes
- Data synchronization between primary and secondary databases
A secondary PostgreSQL database can be deployed for disaster recovery. The secondary database continuously synchronizes data from the primary database based on binlogs. Especially when network conditions are unfavorable, data latency between the primary and secondary databases is unavoidable, which can lead to data inconsistency.
- Concurrency control
PostgreSQL is a relational database management system (RDBMS), which supports strong consistency for data queries. A database snapshot is created before a sync node starts. PostgreSQL Reader reads data from the database snapshot. Therefore, if new data is written to the database during data synchronization, the reader cannot obtain the new data.
Data consistency cannot be ensured when you enable PostgreSQL Reader to run concurrent threads in a single sync node.
PostgreSQL Reader shards the table based on the splitPk parameter and runs multiple concurrent threads to synchronize data. These concurrent threads belong to different transactions. They read data at different time points. This means that the concurrent threads observe different snapshots.
Theoretically, the data inconsistency issue is unavoidable if a single sync node includes multiple threads. However, two workarounds can be used:
- Do not enable concurrent threads in a single sync node. Essentially, do not specify the splitPk parameter. This way, data consistency is ensured although data is synchronized at a low efficiency.
- Disable writers to make sure that the data remains unchanged during data synchronization. For example, lock the table and disable data synchronization between primary and secondary databases. This way, data is efficiently synchronized but your ongoing services may be interrupted.
- Character encoding
A PostgreSQL database supports only the EUC_CN and UTF-8 encoding formats for simplified Chinese characters. PostgreSQL Reader uses JDBC, which can automatically convert the encoding of characters. Therefore, you do not need to specify the encoding format.
If you specify an encoding format for a PostgreSQL database but data is written to the PostgreSQL database in a different encoding format, PostgreSQL Reader cannot recognize this inconsistency. As a result, garbled characters may be exported.
- Incremental data synchronization
PostgreSQL Reader connects to a database by using JDBC and uses a SELECT statement with a
WHERE
clause to read incremental data.- For data in batches, incremental add, update, and delete operations are distinguished by timestamps. The delete operations include logical delete operations. Specify the WHERE clause based on the timestamp. The timestamp must be later than the latest timestamp in the last synchronization.
- For streaming data, specify the WHERE clause based on the data record ID. The data record ID must be greater than the maximum ID in the last synchronization.
If incremental data cannot be distinguished, PostgreSQL Reader can perform only full synchronization but not incremental synchronization.
- Syntax validation
PostgreSQL Reader allows you to specify custom SELECT statements by using the querySql parameter but does not verify the syntax of the custom SELECT statements.