This topic describes the data types and parameters that are supported by AnalyticDB for PostgreSQL Reader and how to configure AnalyticDB for PostgreSQL Reader by using the codeless user interface (UI) and code editor.

AnalyticDB for PostgreSQL Reader reads data from AnalyticDB for PostgreSQL.

AnalyticDB for PostgreSQL Reader connects to a remote AnalyticDB for PostgreSQL database by using Java Database Connectivity (JDBC), generates an SQL statement based on your configurations, and then sends the statement to the database. The system executes the statement on the database and returns data. Then, AnalyticDB for PostgreSQL Reader assembles the returned data into abstract datasets of the data types supported by Data Integration and sends the datasets to a writer.
  • AnalyticDB for PostgreSQL Reader generates the SQL statement based on the settings of the table, column, and where parameters and sends the generated statement to the remote database.
  • If you specify the querySql parameter, AnalyticDB for PostgreSQL Reader sends the value of this parameter to the AnalyticDB for PostgreSQL database.

Data types

AnalyticDB for PostgreSQL Reader supports most AnalyticDB for PostgreSQL data types. Make sure that the data types of your database are supported.

The following table lists the data types that are supported by AnalyticDB for PostgreSQL Reader.
Category AnalyticDB for PostgreSQL data type
Integer BIGINT, BIGSERIAL, INTEGER, SMALLINT, and SERIAL
Floating point DOUBLE, PRECISION, MONEY, NUMERIC, and REAL
String VARCHAR, CHAR, TEXT, BIT, and INET
Date and time DATE, TIME, and TIMESTAMP
Boolean BOOLEAN
Binary BYTEA

Parameters

Parameter Description Required Default value
datasource The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. Yes No default value
table The name of the table from which you want to read data. Yes No default value
column The names of the columns from which you want to read data. Specify the names in a JSON array. The default value is [*], which indicates all the columns in the source table.
  • You can select specific columns to read.
  • The column order can be changed. This indicates that you can specify columns in an order different from the order specified by the schema of the source table.
  • Constants are supported. The column names must be arranged in compliance with the SQL syntax supported by AnalyticDB for PostgreSQL, such as ["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"].
    • id: a column name.
    • table: the name of a column that contains reserved keywords.
    • 1: an integer constant.
    • 'mingya.wmy': a string constant, which is enclosed in single quotation marks (').
    • 'null': the string null.
    • to_char(a+1): a function expression that is used to calculate the length of a string.
    • 2.3: a floating-point constant.
    • true: a Boolean value.
  • The column parameter must explicitly specify all the columns from which you want to read data. The parameter cannot be left empty.
Yes No default value
splitPk The field that is used for data sharding when AnalyticDB for PostgreSQL Reader reads data. If you specify this parameter, the source table is sharded based on the value of this parameter. Data Integration then runs parallel threads to read data. This improves data synchronization efficiency.
  • We recommend that you set the splitPk parameter to the name of the primary key column of the table. Data can be evenly distributed to different shards based on the primary key column, instead of being intensively distributed only to specific shards.
  • The splitPk parameter supports sharding only for data of integer data types. If you set this parameter to a field of an unsupported data type, such as a string, floating point, or date data type, AnalyticDB for PostgreSQL Reader ignores the setting of the splitPk parameter and uses a single thread to read data.
  • If the splitPk parameter is not provided or is left empty, AnalyticDB for PostgreSQL Reader uses a single thread to read data.
No No default value
where The WHERE clause. AnalyticDB for PostgreSQL Reader generates an SQL statement based on the settings of the table, column, and where parameters and uses the generated statement to read data. For example, when you perform a test, you can set the where parameter to id>2 and sex=1 to read the data that is generated on the current day.
  • You can use the WHERE clause to read incremental data.
  • If the where parameter is not provided or is left empty, AnalyticDB for PostgreSQL Reader reads full data.
No No default value
querySql (advanced parameter, which is available only in the code editor) The SQL statement that is used for refined data filtering. If you specify this parameter, AnalyticDB for PostgreSQL Reader filters data based only on the value of this parameter. For example, if you want to join multiple tables for data synchronization, set this parameter to select a,b from table_a join table_b on table_a.id = table_b.id.

If you specify this parameter, AnalyticDB for PostgreSQL Reader ignores the settings of the column, table, and where parameters.

No No default value
fetchSize The number of data records to read at a time. This parameter determines the number of interactions between Data Integration and the database and affects read efficiency.
Note If you set this parameter to a value greater than 2048, an out of memory (OOM) error may occur during data synchronization.
No 512

Configure AnalyticDB for PostgreSQL Reader by using the codeless UI

  1. Configure data sources.
    Configure Source and Target for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data. This parameter is equivalent to the datasource parameter that is described in the preceding section.
    Table The name of the table from which you want to read data. This parameter is equivalent to the table parameter that is described in the preceding section.
    Filter The condition that is used to filter the data you want to read. Filtering based on the LIMIT keyword is not supported. The SQL syntax is determined by the selected data source.
    Shard Key The shard key. You can use a column in the source table as the shard key. We recommend that you use the primary key column or an indexed column. Only integer columns are supported. If you specify this parameter, data sharding is performed based on the value of this parameter, and parallel threads can be used to read data. This improves data synchronization efficiency.
    Note The Shard Key parameter is displayed only after you select the data source for the synchronization node.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon. Field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system automatically sorts the fields based on specific rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Add

    Click Add to add a field. Take note of the following rules when you add a field:

    • You can enter constants. Each constant must be enclosed in single quotation marks ('), such as 'abc' and '123'.
    • You can use scheduling parameters, such as ${bizdate}.
    • You can enter functions that are supported by relational databases, such as now() and count(1).
    • If the field that you entered cannot be parsed, the value of Type for the field is Unidentified.
  3. Configure channel control policies.Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure AnalyticDB for PostgreSQL Reader by using the code editor

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",// The name of the data source. 
                "column": [// The names of the columns from which you want to read data. 
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",// The WHERE clause. 
                "splitPk": "id",// The shard key. 
                "table": "public.person"// The name of the table from which you want to read data. 
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// The version number.
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// The maximum number of dirty data records allowed. 
            "record": ""
        },
        "speed": {
            "concurrent": 6,// The maximum number of parallel threads. 
            "throttle": true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
           "mbps":"12"// The maximum transmission rate.
        }
    }
}