This topic describes the data types and parameters supported by PostgreSQL Reader and how to configure it by using the codeless user interface (UI) and code editor.

PostgreSQL Reader connects to a remote PostgreSQL database and runs a SELECT statement to select and read data from the database. ApsaraDB for Relational Database Service (RDS) provides the PostgreSQL storage engine.

Specifically, PostgreSQL Reader connects to a remote PostgreSQL database through Java Database Connectivity (JDBC), generates a SELECT statement based on your configurations, and then sends the statement to the database. The PostgreSQL database runs the statement and returns the result. Then, PostgreSQL Reader assembles the returned data to abstract datasets in custom data types supported by Data Integration, and sends the datasets to a writer.
  • PostgreSQL Reader generates the SELECT statement based on the table, column, and where parameters that you have configured, and sends the generated SELECT statement to the PostgreSQL database.
  • If you specify the querySql parameter, PostgreSQL Reader directly sends the value of this parameter to the PostgreSQL database.

Data types

PostgreSQL Reader supports most PostgreSQL data types. Make sure that your data types are supported.

The following table lists the data types supported by PostgreSQL Reader.

Category PostgreSQL data type
Integer bigint, bigserial, integer, smallint, and serial
Floating point double, precision, money, numeric, and real
String varchar, char, text, bit, and inet
Date and time date, time, and timestamp
Boolean boolean
Binary bytea
Note
  • Except for the preceding data types, other types are not supported.
  • You can convert the money, inet, and bit types by using syntax such as a_inet::varchar.

Parameters

Parameter Description Required Default value
datasource The connection name. It must be identical to the name of the added connection. You can add connections in the code editor. Yes None
table The name of the table to be synchronized. Yes None
column The columns to be synchronized from the source table. The columns are described in a JSON array. The default value is [ * ], which indicates all columns.
  • Column pruning is supported. You can select and export specific columns.
  • Change of the column order is supported. You can export the columns in an order different from that specified in the schema of the table.
  • Constants are supported. The column names must be arranged in compliance with the SQL syntax supported by MySQL, for example, ["id", "table","1", "'mingya.wmy'", "'null'", "to_char(a+1)", "2.3", "true"] .
    • id: a column name.
    • table: the name of a column that contains reserved keywords.
    • 1: an integer constant.
    • 'mingya.wmy': a string constant, which is enclosed in single quotation marks (' ').
    • 'null': a string.
    • to_char(a+1): a function expression.
    • 2.3: a floating-point constant.
    • true: a Boolean value.
  • The column parameter must explicitly specify a set of columns to be synchronized. The parameter cannot be left empty.
Yes None
splitPk The field used for data sharding when PostgreSQL Reader extracts data. If you specify the splitPk parameter, the table is sharded based on the shard key specified by this parameter. Data Integration then initiates concurrent sync threads, which improves efficiency.
  • We recommend that you set the splitPk parameter to the primary key of the table. Based on the primary key, data can be well distributed to different shards, but not intensively distributed to certain shards.
  • Currently, the splitPk parameter supports data sharding only for integers but not for other data types such as string, floating point, and date. If you specify this parameter to a column of an unsupported type, PostgreSQL Reader ignores the splitPk parameter and synchronizes data through a single thread.
  • If you do not specify the splitPk parameter or leave it empty, Data Integration synchronizes data through a single thread.
No None
where The WHERE clause. PostgreSQL Reader generates a SELECT statement based on the table, column, and where parameters that you have configured, and uses the generated SELECT statement to select and read data. For example, set this parameter to id>2 and sex=1.
  • You can use the WHERE clause to synchronize incremental data.
  • If you do not specify the where parameter or leave it empty, all data is synchronized.
No None
querySql (only available in the code editor) The SELECT statement used for refined data filtering. If you specify this parameter, Data Integration directly filters data based on this parameter. For example, if you want to join multiple tables for data synchronization, set this parameter to select a,b from table_a join table_b on table_a.id = table_b.id. If you specify the querySql parameter, PostgreSQL Reader ignores the table, column, where, and splitPk parameters that you have configured. No None
fetchSize The number of data records to read at a time. This parameter determines the number of interactions between Data Integration and the database and affects reading efficiency.
Note A value greater than 2048 may lead to out of memory (OOM) during the data synchronization process.
No 512

Configure PostgreSQL Reader by using the codeless UI

  1. Configure the connections.
    Configure the source and destination connections for the sync node.
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type, and enter the name of a connection that has been configured in DataWorks.
    Table The table parameter in the preceding parameter description.
    Filter The filter condition for the data to be synchronized. Currently, filtering based on the limit keyword is not supported. The SQL syntax is determined by the selected connection.
    Shard Key The shard key. You can use a column in the source table as the shard key. We recommend that you use the primary key or an indexed column. Only integer fields are supported.
    If data sharding is performed based on the configured shard key, data can be read concurrently to improve data synchronization efficiency.
    Note The Shard Key parameter appears only when you configure the source connection for a sync node.
  2. Configure field mapping, that is, the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field, or move the pointer over a field and click the Delete icon to delete the field.
    Button or icon Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. Note that the data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping for fields in the same row. Note that the data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
    Auto Layout Click Auto Layout. The fields are automatically sorted based on specified rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box that appears, you can manually edit fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Add
    • Click Add to add a field. You can enter constants. Each constant must be enclosed in single quotation marks (' '), such as 'abc' and '123'.
    • You can use scheduling parameters, such as ${bizdate}.
    • You can enter functions supported by relational databases, such as now() and count(1).
    • Fields that cannot be parsed are indicated by Unidentified.
  3. Configure channel control policies.
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads to read data from or write data to data storage within the sync node. You can configure the concurrency for a node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Resource Group The resource group used for running the sync node. If a large number of nodes including this sync node are deployed on the default resource group, the sync node may need to wait for resources. We recommend that you purchase an exclusive resource group for data integration or add a custom resource group. For more information, see DataWorks exclusive resources and Add a custom resource group.

Configure PostgreSQL Reader by using the code editor

In the following code, a node is configured to read data from a PostgreSQL database.
{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
            "stepType":"postgresql",// The reader type.
            "parameter":{
                "datasource":"",// The connection name.
                "column":[// The columns to be synchronized.
                    "col1",
                    "col2"
                ],
                "where":"",// The WHERE clause.
                "splitPk":"",// The shard key based on which the table is sharded. Data Integration initiates concurrent threads to synchronize data.
                "table":""// The name of the table to be synchronized.
            },
            "name":"Reader",
            "category":"reader"
        },
        {// The following template is used to configure Stream Writer. For more information, see the corresponding topic.
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
            "concurrent":1,// The maximum number of concurrent threads.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Additional instructions

  • Data synchronization between primary and secondary databases

    A secondary PostgreSQL database can be deployed for disaster recovery. The secondary database continuously synchronizes data from the primary database based on binlogs. Especially when network conditions are unfavorable, data latency between the primary and secondary databases is unavoidable, which can lead to data inconsistency.

  • Concurrency control

    PostgreSQL is a relational database management system (RDBMS), which supports strong consistency for data queries. A database snapshot is created before a sync node starts. PostgreSQL Reader reads data from the database snapshot. Therefore, if new data is written to the database during data synchronization, the reader cannot obtain the new data.

    Data consistency cannot be guaranteed when you enable PostgreSQL Reader to run concurrent threads on a single sync node.

    PostgreSQL Reader shards the table based on the splitPk parameter and runs multiple concurrent threads to synchronize data. These concurrent threads belong to different transactions. They read data at different time points. This means that the concurrent threads observe different snapshots.

    Theoretically, the data inconsistency issue is unavoidable if a single sync node includes multiple threads. However, two workarounds are available:

    • Do not enable concurrent threads on a single sync node. Essentially, do not specify the splitPk parameter. In this way, data consistency is guaranteed although data is synchronized at a low efficiency.
    • Disable writers to make sure that the data is unchanged during data synchronization. For example, lock the table and disable data synchronization between primary and secondary databases. In this way, data is synchronized efficiently but your ongoing services may be interrupted.
  • Character encoding

    A PostgreSQL database supports only EUC_CN and UTF-8 encoding formats for simplified Chinese characters. PostgreSQL Reader uses JDBC, which can automatically convert the encoding of characters. Therefore, you do not need to specify the encoding format.

    If you specify the encoding format for a PostgreSQL database but data is written to the PostgreSQL database in a different encoding format, PostgreSQL Reader cannot recognize this inconsistency and may export garbled characters.

  • Incremental data synchronization
    PostgreSQL Reader connects to a database through JDBC and uses a SELECT statement with a WHERE clause to read incremental data in the following ways:
    • For data in batches, incremental add, update, and delete operations are distinguished by timestamps. The delete operations include logical delete operations. Specify the WHERE clause based on the timestamp. The timestamp must be later than the latest timestamp in the last synchronization.
    • For streaming data, specify the WHERE clause based on the data record ID. The data record ID must be larger than the maximum ID involved in the last synchronization.

    If incremental data cannot be distinguished, PostgreSQL Reader cannot perform incremental synchronization but can perform full synchronization only.

  • Syntax validation

    PostgreSQL Reader allows you to specify custom SELECT statements by using the querySql parameter but does not verify the syntax of the custom SELECT statements.