Hologres Reader reads data from Hologres, converts the data to a format that is readable to Data Integration, and then sends the data to a writer. The writer writes the data to the related destination.

Background information

Notice
Hologres Reader reads data from Hologres tables by using PostgreSQL statements. The number of parallel threads that are used to read data is based on the number of shards in the Hologres table from which you want to read data. One SELECT statement is executed for each shard.
  • When you execute the CREATE TABLE statement to create a table in Hologres, you can use the CALL set_table_property('table_name', 'shard_count', 'xx') command to configure the number of shards for the table.

    By default, the shard_count field is set to the default number of table shards for your Hologres database. The configurations of your Hologres instance determine the default number of table shards for your Hologres database.

  • A SELECT statement uses the shard that is specified by the built-in field hg_shard_id of the source Hologres table to query data.

Parameters

Parameter Description Required Default value
endpoint The endpoint of the source Hologres instance. Specify the value in the format of instance-id-region-endpoint.hologres.aliyuncs.com:Port number. You can view the endpoint of a Hologres instance on the configuration page of the instance in the Hologres console.
The endpoint of a Hologres instance varies based on the network type, including the classic network, Internet, and virtual private cloud (VPC). Set this parameter based on the type of the network where your exclusive resource group for Data Integration and the Hologres instance reside. If an invalid endpoint is specified, the connection between the exclusive resource group for Data Integration and the Hologres instance may fail, or data synchronization performance may be poor. The endpoints for the three network types are in the following formats:
  • Classic network endpoint: instance-id-region-endpoint-internal.hologres.aliyuncs.com:Port number
  • Public endpoint: instance-id-region-endpoint.hologres.aliyuncs.com:Port number
  • VPC endpoint: instance-id-region-endpoint-vpc.hologres.aliyuncs.com:Port number

We recommend that you deploy the exclusive resource group for Data Integration and the Hologres instance in the same zone of the same region. This helps ensure a successful network connection and the optimal data synchronization performance.

Yes No default value
accessId The AccessKey ID of the account that you use to connect to Hologres. Yes No default value
accessKey The AccessKey secret of the account that you use to connect to Hologres. Make sure that the account is authorized to read data from the source table. Yes No default value
database The name of the source database in the Hologres instance. Yes No default value
table The name of the table from which you want to read data. If the table is a partitioned table, specify the name of the table instead of the name of the partition from which you want to read data.
Note Hologres Reader cannot read data from views.
Yes No default value
column The names of the columns from which you want to read data. The names of the primary key columns in the source table must be included. If you want to read data from all columns in the source table, set this parameter to ["*"]. Yes No default value
partition The partition key column and the related value of the source table, in the format of column=value. This parameter is valid only for partitioned tables.
Notice
  • Hologres supports only list partitioning, and you can specify only one column as the partition key column. The data type of the partition key column must be INT4 or TEXT.
  • The value of this parameter must match the partition expression in the data definition language (DDL) statements that are used to create the source table.
  • You must specify a partition that exists and contains data.
No Left empty, indicating that the source table is a non-partitioned table
fetchSize The maximum number of data records to read from the source table at a time by using the SELECT statement. No 1,000

Configure Hologres Reader by using the codeless UI

  1. Configure data sources.
    Configure Source and Target for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data.
    Table The name of the table from which you want to read data. This parameter is equivalent to the table parameter that is specified in the preceding section.
    Filter The condition that is used to filter the data you want to read. The WHERE clause is not supported. The SQL syntax is determined by the selected data source.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon. Field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system sorts the fields based on specific rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Add Click Add to add a field. Take note of the following rules when you add a field:
    • You can enter constants. Each constant must be enclosed in single quotation marks ('), such as 'abc' and '123'.
    • You can use scheduling parameters, such as ${bizdate}.
    • You can enter functions that are supported by relational databases, such as now() and count(1).
    • If the field that you entered cannot be parsed, the value of Type for the field is Unidentified.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to avoid heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure Hologres Reader by using the code editor

  • Read data from a non-partitioned Hologres table
    • In the following code, a synchronization node is configured to read data from a non-partitioned Hologres table:
      {
          "type":"job",
          "version":"2.0",// The version number. 
          "steps":[
              {
                  "stepType":"holo",// The reader type. 
                  "parameter":{
                      "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "***************", // The AccessKey ID of the account that you use to connect to Hologres. 
                      "accessKey": "*******************", // The AccessKey secret of the account that you use to connect to Hologres. 
                      "database": "postgres",
                      "table": "holo_reader_****",
                      "column" : [ // The names of the columns from which you want to read data. 
                          "tag",
                          "id",
                          "title"
                      ]
                  },
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"// The maximum number of dirty data records allowed. 
              },
              "speed":{
                  "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                  "concurrent":1,// The maximum number of parallel threads.  
                        "mbps":"12"// The maximum transmission rate.
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • The following sample DDL statements are used to create a non-partitioned Hologres table:
      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • Read data from a partition in a partitioned Hologres table
    • In the following code, a synchronization node is configured to read data from a partition in a partitioned Hologres table in SDK mode.
      Note Exercise caution when you set the partition parameter.
      {
          "type":"job",
          "version":"2.0",// The version number. 
          "steps":[
              {
                  "stepType":"holo",// The reader type. 
                  "parameter":{
                      "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "***************", // The AccessKey ID of the account that you use to connect to Hologres. 
                      "accessKey": "*******************", // The AccessKey secret of the account that you use to connect to Hologres. 
                      "database": "postgres",
                      "table": "holo_reader_basic_****",
                      "partition": "tag=foo",
                      "column" : [
                          "*"
                      ],
                      "fetchSize": "100"
                  },
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"// The maximum number of dirty data records allowed. 
              },
              "speed":{
                  "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                  "concurrent":1,// The maximum number of parallel threads. 
                        "mbps":"12"// The maximum transmission rate.
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • The following sample DDL statements are used to create a partitioned Hologres table:
      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # Make sure that the partition from which you want to read data is created and data is inserted into the partition. 
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')