本文介绍AnalyticDB PostgreSQL版如何通过DMS的作业调度功能,实现定时调度RDS PostgreSQL数据库的数据。

功能介绍

本次作业调度使用OSS作为中间态的存储,调度任务会将数据从RDS PostgreSQL数据库加载到OSS上,再使用AnalyticDB PostgreSQL版Serverless模式对该数据进行分析。整个ETL(Extract-Transform-Load)链路调度均通过DMS实现。作业调度示意图如下:

DMS作业调度

优势

  • 数据存储在OSS上,实现低成本存储归档,且数据不会被删除。
  • 数据从RDS数据库以“T+1”的形式加载到AnalyticDB PostgreSQL版Serverless模式中进行高性能数据分析。
  • DMS作业调度支持配置自动调度框架,低代码、白屏化操作,上手容易。

注意事项

  • RDS数据库中的数据,需要可以指定条件来增量归档。例如通过表中时间列按天归档。
  • RDS PostgreSQL实例、AnalyticDB PostgreSQL版实例和OSS Bucket需在同一地域内。

准备工作

AnalyticDB PostgreSQL版

  • 创建AnalyticDB PostgreSQL版Serverless模式实例,如何创建实例,请参见创建实例
  • 创建初始账号。如何创建初始账号,请参见创建数据库账号

RDS PostgreSQL

  • 创建RDS PostgreSQL实例。如何创建实例,请参见创建RDS PostgreSQL实例
    说明 RDS PostgreSQL实例需要为PostgreSQL 9.4至PostgreSQL 13.0版本。
  • 创建高权限账号。如何创建高权限账号,请参见创建账号

OSS

  • 创建OSS Bucket。如何创建OSS Bucket,请参见创建存储空间
  • 获取OSS Bucket的Bucket名称Endpoint(地域节点)信息,获取方式如下:
    1. 登录OSS管理控制台
    2. 在左侧导航栏中,单击目标Bucket列表
    3. Bucket列表,单击目标Bucket。

      Bucket列表页面,您可以获取Bucket名称

    4. 单击左侧导航栏中的概览
    5. 概览页面的访问域名区域,您可以获取Endpoint(地域节点)

      建议使用ECS的VPC网络访问(内网)的访问域名进行访问。

获取AccessKey ID和AccessKey Secret

获取AccessKey ID和AccessKey Secret的具体操作,请参见获取AccessKey

准备服务和数据

RDS PostgreSQL

  1. 连接RDS PostgreSQL数据库。如何连接数据库,请参见连接PostgreSQL实例
    本文示例中所有操作均使用DMS连接并执行。
  2. 创建测试表t_src,并插入测试数据。语句如下:
    CREATE TABLE t_src (a int, b int, c date);
    INSERT INTO t_src SELECT generate_series(1, 1000), 1, now();
  3. 安装OSS外表插件。语句如下:
    CREATE EXTENSION IF NOT EXISTS oss_fdw;
  4. 创建一个OSS外表。语句如下:
    CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS
         (host '<bucket_host>' , id '<access_key>', key '<secret_key>',bucket '<bucket_name>');

    参数说明如下:

    参数 说明
    host 准备工作中获取的OSS的Endpoint(地域节点)
    id 准备工作中获取的AccessKey ID。
    key 准备工作中获取的AccessKey Secret。
    bucket 准备工作中获取的OSS的Bucket名称

AnalyticDB PostgreSQL版

  1. 连接AnalyticDB PostgreSQL版数据库。如何连接数据库,请参见客户端连接
    本文示例中所有操作均使用DMS连接并执行。
  2. 创建一张与RDS PostgreSQL侧表结构一致的表t_target。语句如下:
    CREATE TABLE t_target (a int, b int, c date);
    说明 AnalyticDB PostgreSQL版Serverless模式暂不支持主键。
  3. 安装OSS外表插件。语句如下:
    CREATE EXTENSION IF NOT EXISTS oss_fdw;
  4. 创建OSS Server和User Mapping。语句如下:
    CREATE SERVER oss_serv
        FOREIGN DATA WRAPPER oss_fdw
        OPTIONS (
            endpoint '<bucket_host>',
            bucket '<bucket_name>'
      );
    
    CREATE USER MAPPING FOR PUBLIC
        SERVER oss_serv
        OPTIONS (
            id '<access_key>',
            key '<secret_key>'
        );

    参数说明如下:

    参数 说明
    endpoint 准备工作中获取的OSS的Endpoint(地域节点)
    id 准备工作中获取的AccessKey ID。
    key 准备工作中获取的AccessKey Secret。
    bucket 准备工作中获取的OSS的Bucket名称

配置ETL任务

  1. 登录数据管理服务DMS控制台
  2. 单击顶部菜单栏中的集成与开发(DTS),然后在左侧导航栏中,选择数据开发 > 任务编排
  3. 任务流区域中,单击新增任务流
  4. 新建任务流对话框中,输入任务流名称后,单击确认
    本次示例中,任务流名称RDSPG数据导入OSS
  5. 配置RDS PostgreSQL数据归档任务流。具体步骤如下:
    1. RDSPG数据导入OSS页签中,将左侧数据加工分类中的单实例SQL拖动到中间画布中。
    2. 可选:单击画布中该任务的DMS作业调度-重命名图标,重命名该任务。
      重命名任务是为了方便后续维护整个ETL链路,您可以根据自身需求设置任务名。本次示例中的任务名设置为RDS数据抽取
    3. 单击画布中新建任务的DMS-编辑当前节点配置图标。
    4. 选择需要绑定的RDS PostgreSQL数据库。
      DMS作业调度-选择需要绑定的数据库

      您可以切换至RDS PostgreSQL数据库的SQL编辑页签查看数据库。

      DMS作业调度-获取数据库信息
    5. 在下方编辑框中,粘贴以下SQL:
      DROP FOREIGN TABLE IF EXISTS oss_${mydate};
      
      CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate}
          (a int,
           b int,
           c date)
           SERVER ossserver
           OPTIONS ( dir 'rds/t3/${mydate}/', DELIMITER  '|' ,
               format 'csv', encoding 'utf8');
      
      INSERT INTO oss_${mydate} SELECT * FROM t_src WHERE c >= '${mydate}';
    6. 单击右侧的变量设置,选择节点变量,将变量名设置为mydata时间格式设置为yyyyMMdd
      DMS作业调度-RDS变量设置
  6. 返回RDSPG数据导入OSS任务流,配置AnalyticDB PostgreSQL版的加载任务。具体步骤如下:
    1. RDSPG数据导入OSS页签中,将左侧数据加工分类中的单实例SQL拖动到中间画布中。
    2. 可选:单击画布中该任务的DMS作业调度-重命名图标,重命名该任务。
      重命名任务是为了方便后续维护整个ETL链路,您可以根据自身需求设置任务名。本次示例中的任务名设置为ADBPG数据加载
    3. 单击画布中新建任务的DMS-编辑当前节点配置图标。
    4. 选择需要绑定的AnalyticDB PostgreSQL版数据库。
      AnalyticDB PostgreSQL版数据库获取方式与步骤5中获取RDS PostgreSQL数据库方式一致。
    5. 在下方编辑框中,粘贴以下SQL:
      CREATE FOREIGN TABLE IF NOT EXISTS  oss_${mydate}(
           a int ,
           b int ,
           c date
      ) SERVER oss_serv
          OPTIONS (
              dir 'rds/t3/${mydate}/',
              format 'csv',
              delimiter '|',
              encoding 'utf8');
      
      INSERT INTO t_target SELECT * FROM oss_${mydate};
    6. 单击右侧的变量设置,选择节点变量,将变量名设置为mydata时间格式设置为yyyyMMdd
      DMS作业调度-RDS变量设置
  7. 配置调度任务,需要先运行RDS PostgreSQL数据抽取任务,再运行AnalyticDB PostgreSQL版数据加载任务。配置方法如下:
    1. 选中RDS数据抽取任务右侧的圆点,拖动到ADBPG数据加载任务上,完成拖动后显示效果如下:
      DMS作业调度-任务地图
    2. 单击页面下方的任务流信息,打开调度配置开启调度的开关。
    3. 选择需要的作业调度周期,每个周期调度任务都会进行RDS侧数据的抽取和AnalyticDB PostgreSQL版侧数据的加载。
      DMS作业调度-调度配置
  8. 完成配置后,单击左上方试运行
  9. 任务流测试无误后单击发布

相关文档