All Products
Search
Document Center

DataWorks:AnalyticDB for PostgreSQL data source

Last Updated:Nov 15, 2023

DataWorks provides AnalyticDB for PostgreSQL Reader and AnalyticDB for PostgreSQL Writer for you to read data from and write data to AnalyticDB for PostgreSQL data sources. This topic describes the capabilities of synchronizing data from or to AnalyticDB for PostgreSQL data sources.

Limits

Data of views can be read during batch synchronization.

Supported AnalyticDB for PostgreSQL versions

AnalyticDB for PostgreSQL of V7.0 and earlier versions is supported.

Data type mappings

Batch data read

AnalyticDB for PostgreSQL Reader supports most AnalyticDB for PostgreSQL data types. Make sure that the data types of your database are supported.

The following table lists the data type mappings based on which AnalyticDB for PostgreSQL Reader converts data types.

Category

AnalyticDB for PostgreSQL data type

Integer

BIGINT, BIGSERIAL, INTEGER, SMALLINT, and SERIAL

Floating point

DOUBLE, PRECISION, MONEY, NUMERIC, and REAL

String

VARCHAR, CHAR, TEXT, BIT, and INET

Date and time

DATE, TIME, and TIMESTAMP

Boolean

BOOL

Binary

BYTEA

Batch data write

AnalyticDB for PostgreSQL Writer supports most AnalyticDB for PostgreSQL data types. Make sure that the data types of your database are supported.

The following table lists the data type mappings based on which AnalyticDB for PostgreSQL Writer converts data types.

Category

AnalyticDB for PostgreSQL data type

LONG

BIGINT, BIGSERIAL, INTEGER, SMALLINT, and SERIAL

DOUBLE

DOUBLE, PRECISION, MONEY, NUMERIC, and REAL

STRING

VARCHAR, CHAR, TEXT, BIT, and INET

DATE

DATE, TIME, and TIMESTAMP

BOOLEAN

BOOL

BYTES

BYTEA

Note

The syntax such as a_inet::varchar is required when AnalyticDB for PostgreSQL Writer converts data to the MONEY, INET, or BIT data type.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a data source

Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure synchronization settings to implement batch synchronization of all data in a database

For more information about the configuration procedure, see Configure a synchronization task in Data Integration.

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for AnalyticDB for PostgreSQL Reader

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",// The name of the data source. 
                "column": [// The names of the columns. 
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",// The WHERE clause. 
                "splitPk": "id",// The shard key. 
                "table": "public.person"// The name of the table. 
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// The version number.
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// The maximum number of dirty data records allowed. 
            "record": ""
        },
        "speed": {
            "concurrent": 6,// The maximum number of parallel threads. 
            "throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
           "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    }
}

Parameters in code for AnalyticDB for PostgreSQL Reader

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table from which you want to read data.

Yes

No default value

column

The names of the columns from which you want to read data. Specify the names in a JSON array. The default value is [*], which indicates all the columns in the source table.

  • You can select specific columns to read.

  • The column order can be changed. This indicates that you can specify columns in an order different from the order specified by the schema of the source table.

  • Constants are supported. The column names must be arranged in compliance with the SQL syntax supported by AnalyticDB for PostgreSQL, such as ["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"].

    • id: a column name.

    • table: the name of a column that contains reserved keywords.

    • 1: an integer constant.

    • 'mingya.wmy': a string constant, which is enclosed in single quotation marks (').

    • 'null': the string null.

    • to_char(a+1): a function expression that is used to calculate the length of a string.

    • 2.3: a floating-point constant.

    • true: a Boolean value.

  • The column parameter must explicitly specify all the columns from which you want to read data. This parameter cannot be left empty.

Yes

No default value

splitPk

The field that is used for data sharding when AnalyticDB for PostgreSQL Reader reads data. If you specify this parameter, the source table is sharded based on the value of this parameter. Data Integration then runs parallel threads to read data. This improves data synchronization efficiency.

  • We recommend that you set the splitPk parameter to the name of a primary key column of the table. Data can be evenly distributed to different shards based on the primary key column, instead of being intensively distributed only to specific shards.

  • The splitPk parameter supports sharding for data only of integer data types. If you set the splitPk parameter to a field of an unsupported data type, such as a string, floating point, or date data type, the setting of this parameter is ignored, and a single thread is used to read data.

  • If the splitPk parameter is not provided or is left empty, a single thread is used to read data.

No

No default value

where

The WHERE clause. AnalyticDB for PostgreSQL Reader generates an SQL statement based on the settings of the table, column, and where parameters and uses the generated statement to read data. For example, when you perform a test, you can set the where parameter to id>2 and sex=1 to read the data that is generated on the current day.

  • You can use the WHERE clause to read incremental data.

  • If the where parameter is not provided or is left empty, AnalyticDB for PostgreSQL Reader reads full data.

No

No default value

querySql (advanced parameter, which is available only in the code editor)

The SQL statement that is used for refined data filtering. If you specify this parameter, AnalyticDB for PostgreSQL Reader filters data based only on the value of this parameter. For example, if you want to join multiple tables for data synchronization, set this parameter to select a,b from table_a join table_b on table_a.id = table_b.id.

If you specify this parameter, AnalyticDB for PostgreSQL Reader ignores the settings of the column, table, and where parameters.

No

No default value

fetchSize

The number of data records to read at a time. This parameter determines the number of interactions between Data Integration and the source database and affects read efficiency.

Note

If you set this parameter to a value greater than 2048, an out of memory (OOM) error may occur during data synchronization.

No

512

Code for AnalyticDB for PostgreSQL Writer

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],// The SQL statement that you want to execute after the synchronization task is run. 
                "datasource": "test_004",// The name of the data source. 
                "column": [// The names of the columns. 
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "table": "public.person",// The name of the table. 
                "preSql": []// The SQL statement that you want to execute before the synchronization task is run. 
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// The version number. 
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// The maximum number of dirty data records allowed. 
            "record": ""
        },
        "speed": {
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":6, // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate.
        }
    }
}

Parameters in code for AnalyticDB for PostgreSQL Writer

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table to which you want to write data.

Yes

No default value

writeMode

The write mode. Valid values:

Note

You can configure the conflictMode parameter to select a policy to handle a primary key conflict or unique index conflict that occurs when data is written.

  • insert: AnalyticDB for PostgreSQL Writer executes the INSERT INTO...VALUES... statement to write data to the AnalyticDB for PostgreSQL database. We recommend that you select this mode in most cases.

  • copy: AnalyticDB for PostgreSQL provides the copy command to copy data between tables and the standard input or standard output file. Data Integration supports the COPY FROM statement, which allows you to copy data from a file to a table. We recommend that you use this mode if a performance issue occurs.

    Note

    If a conflict occurs when data is written in this mode, DataWorks uses the upsert policy specified by the conflictMode parameter to handle the conflict by default.

No

insert

conflictMode

The policy to handle a primary key conflict or unique index conflict that occurs when data is written. Valid values:

  • report: Data cannot be written to the conflicting rows, and the data that fails to be written to these rows is regarded as dirty data.

  • upsert: Existing data is overwritten.

Note

You can configure this parameter only when you configure a synchronization task by using the code editor.

No

report

column

The names of the columns to which you want to write data. Separate the names with commas (,), such as "column":["id","name","age"]. If you want to write data to all the columns in the destination table, set this parameter to an asterisk (*), such as "column":["*"].

Yes

No default value

preSql

The SQL statement that you want to execute before the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to delete outdated data. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor.

No

No default value

postSql

The SQL statement that you want to execute after the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to add a timestamp. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor.

No

No default value

batchSize

The number of data records to write at a time. Set this parameter to an appropriate value based on your business requirements. This greatly reduces the interactions between Data Integration and AnalyticDB for PostgreSQL and increases throughput. If you set this parameter to an excessively large value, an OOM error may occur during data synchronization.

No

1,024