This topic describes the data types and parameters that AnalyticDB for PostgreSQL Reader supports and how to configure it by using the codeless user interface (UI) and code editor.

AnalyticDB for PostgreSQL Reader allows you to read data from AnalyticDB for PostgreSQL. AnalyticDB for PostgreSQL Reader connects to a remote AnalyticDB for PostgreSQL database and executes a SELECT statement to select and read data from the database. Relational Database Service (RDS) supports the AnalyticDB for PostgreSQL storage engine.

Specifically, AnalyticDB for PostgreSQL Reader connects to a remote AnalyticDB for PostgreSQL database by using Java Database Connectivity (JDBC), generates a SELECT statement based on your configurations, and then sends the statement to the database. The AnalyticDB for PostgreSQL database executes the statement and returns the result. Then, AnalyticDB for PostgreSQL Reader assembles the returned data to abstract datasets of custom data types that are supported by Data Integration, and passes the datasets to a writer.
  • AnalyticDB for PostgreSQL Reader generates the SELECT statement based on the table, column, and where parameters that you have configured, and sends the generated SELECT statement to the AnalyticDB for PostgreSQL database.
  • If you specify the querySql parameter, AnalyticDB for PostgreSQL Reader directly sends the value of this parameter to the AnalyticDB for PostgreSQL database.

Data types

AnalyticDB for PostgreSQL Reader supports most AnalyticDB for PostgreSQL data types. Make sure that your data types are supported.

The following table describes the data types that AnalyticDB for PostgreSQL Reader supports.
Category AnalyticDB for PostgreSQL data type
Integer BIGINT, BIGSERIAL, INTEGER, SMALLINT, and SERIAL
Floating point DOUBLE, PRECISION, MONEY, NUMERIC, and REAL
String VARCHAR, CHAR, TEXT, BIT, and INET
Date and time DATE, TIME, and TIMESTAMP
Boolean BOOL
Binary BYTEA

Parameters

Parameter Description Required Default value
datasource The connection name. It must be the same as the name of the created connection. You can create connections in the code editor. Yes N/A
table The name of the source table. Yes N/A
column The columns to be synchronized from the source table. The columns are described in a JSON array. The default value is [*], which indicates all columns.
  • Column pruning is supported. You can select specific columns to export.
  • The column order can be changed. You can configure AnalyticDB for PostgreSQL Reader to export the specified columns in an order different from that specified in the schema of the table.
  • Constants are supported. The column names must be arranged in compliance with the SQL syntax that AnalyticDB for PostgreSQL supports, for example, ["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"].
    • id: a column name.
    • table: the name of a column that contains reserved keywords.
    • 1: an integer constant.
    • 'mingya.wmy': a string constant, which is enclosed in single quotation marks (' ').
    • 'null': the string null.
    • to_char(a+1): a function expression.
    • 2.3: a floating-point constant.
    • true: a Boolean value.
  • The column parameter must explicitly specify a set of columns to be synchronized. The parameter cannot be left empty.
Yes N/A
splitPk The field that is used for data sharding when AnalyticDB for PostgreSQL Reader reads data. If you specify the splitPk parameter, the table is sharded based on the shard key that is specified by this parameter. Data Integration then runs concurrent threads to synchronize data. This way, data can be synchronized more efficiently.
  • We recommend that you set the splitPk parameter to the primary key of the table. Based on the primary key, data can be well distributed to different shards, but not intensively distributed to specific shards.
  • The splitPk parameter supports data sharding only for integers but not for other data types such as string, floating point, and date. If you set the splitPk parameter to a column of an unsupported type, AnalyticDB for PostgreSQL Reader ignores the splitPk parameter and synchronizes data by using a single thread.
  • If you do not specify the splitPk parameter or leave it empty, AnalyticDB for PostgreSQL Reader synchronizes data by using a single thread.
No N/A
where The WHERE clause. AnalyticDB for PostgreSQL Reader generates a SELECT statement based on the table, column, and where parameters that you have configured, and uses the generated SELECT statement to select and read data. For example, set this parameter to id>2 and sex=1.
  • You can use the WHERE clause to synchronize incremental data.
  • If you do not specify the where parameter or leave it empty, all data is synchronized.
No N/A
querySql (available only in the code editor) The SELECT statement that is used for refined data filtering. If you specify this parameter, Data Integration filters data based on this parameter. For example, if you want to join multiple tables for data synchronization, set this parameter to select a,b from table_a join table_b on table_a.id = table_b.id.

If you specify the querySql parameter, AnalyticDB for PostgreSQL Reader ignores the table, column, and where parameters that you have configured.

No N/A
fetchSize The number of data records to read at a time. This parameter determines the number of interactions between Data Integration and the database and affects reading efficiency.
Note A value greater than 2048 may lead to out of memory (OOM) during the data synchronization process.
No 512

Configure AnalyticDB for PostgreSQL Reader by using the codeless UI

  1. Configure the connections.
    Configure the connections to the source and destination data stores for the sync node.Connections section
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type and select the name of a connection that you have configured in DataWorks.
    Table The table parameter in the preceding parameter description.
    Filter The condition for filtering the data to be synchronized. AnalyticDB for PostgreSQL Reader cannot filter data based on the limit keyword. The SQL syntax is determined by the selected connection.
    Shard Key The shard key. You can specify a column in the source table as the shard key. We recommend that you use the primary key or an indexed column as the shard key. Only integer fields are supported. If data sharding is performed based on the configured shard key, data can be read concurrently. This way, data can be synchronized more efficiently.
    Note The Shard Key parameter is displayed only after you select the connection to the source data store for the sync node.
  2. Configure field mapping. It is equivalent to setting the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field. To delete a field, move the pointer over the field and click the Delete icon.Delete icon
    GUI element Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. Note that the data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping between fields in the same row. Note that the data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
    Auto Layout Click Auto Layout to sort the fields based on specified rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Add
    • Click Add to add a field. Take note of the following rules when you add a field:
    • You can enter constants. Each constant must be enclosed in single quotation marks (' '), for example, 'abc' and '123'.
    • You can use scheduling parameters such as ${bizdate}.
    • You can enter functions that are supported by relational databases, for example, now() and count(1).
    • Fields that cannot be parsed are indicated by Unidentified.
  3. Configure channel control policies.Channel section
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads that the sync node uses to read data from or write data to data stores. You can configure the concurrency for the node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Resource Group The resource group that is used to run the sync node. If a large number of nodes including this sync node are deployed on the default resource group, the sync node may need to wait for resources. We recommend that you purchase an exclusive resource group for Data Integration or create a custom resource group. For more information, see DataWorks exclusive resources and Add a custom resource group.

Configure AnalyticDB for PostgreSQL Reader by using the code editor

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",// The connection name.
                "column": [// The columns to be synchronized from the source table.
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",// The WHERE clause.
                "splitPk": "id",// The shard key.
                "table": "public.person"// The name of the source table.
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// The version number.
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// The maximum number of dirty data records allowed.
            "record": ""
        },
        "speed": {
            "concurrent": 6,// The number of concurrent threads.
            "throttle": false,// Specifies whether to enable bandwidth throttling.
        }
    }
}