All Products
Search
Document Center

AnalyticDB:Use DMS to schedule jobs

Last Updated:Feb 12, 2025

This topic describes how to use the job scheduling feature of Data Management (DMS) to schedule data from ApsaraDB RDS for PostgreSQL to AnalyticDB for PostgreSQL for analysis.

Overview

DMS provides a job scheduling feature. This feature allows you to load data from ApsaraDB RDS for PostgreSQL to Object Storage Service (OSS), and then to AnalyticDB for PostgreSQL in Serverless mode for analysis. DMS is used to schedule the entire extract-transform-load (ETL) job.

Benefits

  • Data is stored in an OSS bucket, which reduces storage and archiving costs and ensures data security.

  • Data is loaded in T+1 mode from ApsaraDB RDS for PostgreSQL to AnalyticDB for PostgreSQL in Serverless mode for high-performance analysis.

  • DMS supports job scheduling based on a simple and visualized automatic scheduling framework.

Usage notes

  • Data in ApsaraDB RDS for PostgreSQL must support incremental archiving based on specified conditions such as by day.

  • The involved ApsaraDB RDS for PostgreSQL instance, AnalyticDB for PostgreSQL instance, and OSS bucket must reside within the same region.

Prerequisites

AnalyticDB for PostgreSQL

ApsaraDB RDS for PostgreSQL

  • An ApsaraDB RDS for PostgreSQL instance is created. For more information, see Create an ApsaraDB RDS for PostgreSQL instance.

    Note

    The engine version of the ApsaraDB RDS for PostgreSQL instance must be PostgreSQL 9.4 to PostgreSQL 13.0.

  • A privileged account is created. For more information, see Create an account.

OSS

  • An OSS bucket is created. For more information, see Create buckets.

  • The name and endpoint of the bucket are obtained. Method:

    1. Log on to the OSS console.

    2. In the left-side navigation pane, click Buckets.

    3. On the Buckets page, click the name of the bucket.

      You can obtain the bucket name on the Buckets page.

    4. In the left-side navigation pane, click Overview.

    5. In the Port section of the Overview page, obtain the endpoint of the bucket.

      We recommend that you use the endpoint of Access from ECS over the VPC (internal network) for data access.

AccessKey ID and AccessKey secret

The AccessKey ID and AccessKey secret are obtained. For more information, see Create an AccessKey pair.

Prepare service connections and data

ApsaraDB RDS for PostgreSQL

  1. Connect to the ApsaraDB RDS for PostgreSQL instance. For more information, see Connect to an ApsaraDB RDS for PostgreSQL instance.

    In this topic, DMS is used to connect to instances.

  2. Create a test table named t_src and insert test data into the table.

    CREATE TABLE t_src (a int, b int, c date);
    INSERT INTO t_src SELECT generate_series(1, 1000), 1, now();
  3. Install the oss_fdw plug-in.

    CREATE EXTENSION IF NOT EXISTS oss_fdw;
  4. Create an OSS foreign table.

    CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS
         (host '<bucket_host>' , id '<access_key>', key '<secret_key>',bucket '<bucket_name>');

    The following table describes the parameters.

    Parameter

    Description

    host

    The endpoint of the OSS bucket.

    id

    The AccessKey ID.

    key

    The AccessKey secret.

    bucket

    The name of the OSS bucket.

AnalyticDB for PostgreSQL

  1. Connect to the AnalyticDB for PostgreSQL instance. For more information, see Client connection.

    In this topic, DMS is used to connect to instances.

  2. Create a table named t_target that uses the same schema as the table in the ApsaraDB RDS for PostgreSQL instance.

    CREATE TABLE t_target (a int, b int, c date);
    Note

    AnalyticDB for PostgreSQL in Serverless mode does not support primary keys.

  3. Install the oss_fdw plug-in.

    CREATE EXTENSION IF NOT EXISTS oss_fdw;
  4. Create an OSS server and a user mapping.

    CREATE SERVER oss_serv
        FOREIGN DATA WRAPPER oss_fdw
        OPTIONS (
            endpoint '<bucket_host>',
            bucket '<bucket_name>'
      );
    
    CREATE USER MAPPING FOR PUBLIC
        SERVER oss_serv
        OPTIONS (
            id '<access_key>',
            key '<secret_key>'
        );

    The following table describes the parameters.

    Parameter

    Description

    endpoint

    The endpoint of the OSS bucket.

    id

    The AccessKey ID.

    key

    The AccessKey secret.

    bucket

    The name of the OSS bucket.

Configure an ETL job

  1. Log on to the DMS console.

  2. In the top navigation bar, click Data Development. In the left-side navigation pane, choose Data Development > Task Orchestration.

  3. In the Task Flow section, click Create Task Flow.

  4. In the Create Task Flow dialog box, specify a value for Task Flow Name and click OK.

    In this example, set Task Flow Name to Import data from RDS to OSS.

  5. Configure a task to archive ApsaraDB RDS for PostgreSQL data by performing the following operations:

    1. On the Import data from RDS to OSS tab, choose Data Processing Nodes > Single Instance SQL in the left-side navigation pane and drag Single Instance SQL to the canvas on the right side.

    2. (Optional) Select the added task and click the DMS作业调度-重命名 icon to rename the task.

      You can specify a descriptive task name to make it easy to maintain the entire ETL job. In this example, set the task name to Extract data from RDS.

    3. Select the added task and click the DMS-编辑当前节点配置 icon.

    4. Select the ApsaraDB RDS for PostgreSQL database to be bound from the drop-down list above the code editor.

      DMS作业调度-选择需要绑定的数据库

      You can switch to the SQL tab of the ApsaraDB RDS for PostgreSQL database to view the database.

    5. In the code editor, enter the following statements:

      DROP FOREIGN TABLE IF EXISTS oss_${mydate};
      
      CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate}
          (a int,
           b int,
           c date)
           SERVER ossserver
           OPTIONS ( dir 'rds/t3/${mydate}/', DELIMITER  '|' ,
               format 'csv', encoding 'utf8');
      
      INSERT INTO oss_${mydate} SELECT * FROM t_src WHERE c >= '${mydate}';
    6. Click the Variable Setting tab in the right-side navigation pane. Select Node Variable. Set Variable Name to mydata and Time Format to yyyyMMdd.

      DMS作业调度-RDS变量设置

  6. Return to the Import data from RDS to OSS task flow. Configure the load task for AnalyticDB for PostgreSQL by performing the following operations:

    1. On the Import data from RDS to OSS tab, choose Data Processing Nodes > Single Instance SQL in the left-side navigation pane and drag Single Instance SQL to the canvas on the right side.

    2. (Optional) Select the added task and click the DMS作业调度-重命名 icon to rename the task.

      You can specify a descriptive task name to make it easy to maintain the entire ETL job. In this example, set the task name to Load data to AnalyticDB for PostgreSQL.

    3. Select the added task and click the DMS-编辑当前节点配置 icon.

    4. Select the AnalyticDB for PostgreSQL database to be bound from the drop-down list above the code editor.

      You can obtain the AnalyticDB for PostgreSQL database in the same manner as in Step 5.

    5. In the code editor, enter the following statements:

      CREATE FOREIGN TABLE IF NOT EXISTS  oss_${mydate}(
           a int ,
           b int ,
           c date
      ) SERVER oss_serv
          OPTIONS (
              dir 'rds/t3/${mydate}/',
              format 'csv',
              delimiter '|',
              encoding 'utf8');
      
      INSERT INTO t_target SELECT * FROM oss_${mydate};
    6. Click the Variable Setting tab in the right-side navigation pane. Select Node Variable. Set Variable Name to mydata and Time Format to yyyyMMdd.

      DMS作业调度-RDS变量设置

  7. Configure scheduling by performing the following steps. You must run the Extract Data from RDS task before the Load Data to AnalyticDB for PostgreSQL task.

    1. On the Import Data from RDS to OSS tab, move the pointer over the Extract Data from RDS task, click and hold the small circle on the right side, and then draw a line from the circle to the Load Data to AnalyticDB for PostgreSQL task. The following figure shows the generated task flow.

      DMS作业调度-任务地图

    2. Click the Task Flow Information tab in the lower part of the page. In the Scheduling Settings section, turn on Enable Scheduling.

    3. Select a scheduling cycle for both the Extract Data from RDS and Load Data to AnalyticDB for PostgreSQL tasks.

  8. After the preceding configurations are complete, click Try Run in the upper part of the tab.

  9. If the task flow runs normally, click Publish.

References