本文介绍如何将RDS MySQL数据实时迁移到RDS PostgreSQL,使RDS PostgreSQL成为RDS MySQL的实时分析库。

背景信息

当需要使用RDS PostgreSQL数据库特有的功能来分析MySQL中的数据,使用GIS处理时空数据或进行用户画像分析时,可以使用DTS功能将RDS MySQL中的数据实时迁移到RDS PostgreSQL数据库中,使PostgreSQL数据库作为MySQL的实时分析库。

前提条件

  • 已创建源RDS MySQL实例,创建方法,请参见创建RDS MySQL实例
  • 已创建目标RDS PostgreSQL实例,创建方法,请参见创建RDS PostgreSQL实例
  • 目标RDS PostgreSQL实例存储空间须大于源RDS MySQL实例占用的存储空间。

配置步骤

  1. 准备源数据库测试数据
  2. 创建迁移任务
  3. MySQL数据实时迁移至PostgreSQL

准备源数据库测试数据

  1. 连接源数据库RDS MySQL。
    mysql -h <连接地址> -u <用户名> -P <端口> -p
  2. 创建测试数据库db1。
    CREATE DATABASE db1;
  3. 进入db1数据库。
    USE db1;
  4. 创建测试数据表test_mm和test_innodb。
    CREATE TABLE `test_mm` (
    `id` INT (11) NOT NULL AUTO_INCREMENT,
    `user_id` VARCHAR (20) NOT NULL,
    `group_id` INT (11) NOT NULL,
    `create_time` datetime NOT NULL,
    PRIMARY KEY (`id`),KEY `index_user_id` (`user_id`) USING HASH
    ) ENGINE = innodb AUTO_INCREMENT = 1 
    DEFAULT CHARSET = utf8;
    CREATE TABLE `test_innodb` (
    `id` INT (11) NOT NULL AUTO_INCREMENT,
    `user_id` VARCHAR (20) NOT NULL,
    `group_id` INT (11) NOT NULL,
    `create_time` datetime NOT NULL,
    PRIMARY KEY (`id`),
    KEY `index_user_id` (`user_id`) USING HASH
    ) ENGINE = innodb AUTO_INCREMENT = 1 
    DEFAULT CHARSET = utf8;
  5. 创建随机字符串函数。
    delimiter $$
    CREATE FUNCTION rand_string(n int) RETURNS varchar(255)
    begin
    declare chars_str varchar(100)
    default "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    declare return_str varchar(255) default "";
    declare i int default 0;
    while i < n do
    set return_str=concat(return_str,substring(chars_str,floor(1+rand()*62),1));
    set i= i+1;
    end while;
    return return_str;
    end $$
    delimiter ;
  6. 创建插入测试数据的存储过程。
    delimiter $$
    CREATE PROCEDURE `insert_data`(IN n int)
    BEGIN
    DECLARE i INT DEFAULT 1;
    WHILE (i <= n ) DO
    INSERT into test_mm (user_id,group_id,create_time ) VALUEs
    (rand_string(20),FLOOR(RAND() * 100) ,now() );
    set i=i+1;
    END WHILE;
    END $$
    delimiter ;
  7. 调用存储过程。
    CALL insert_data(1000000);
    INSERT INTO test_innodb SELECT * FROM test_mm;

创建迁移任务

  1. 创建迁移任务前,需要在RDS PostgreSQL控制台创建数据库,用于从RDS MySQL接收数据。创建方法,请参见创建数据库
    说明 本示例中创建的数据库名为db2。
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据迁移
  4. 在页面顶部的迁移任务列表中选择目标实例所属地域。
    创建迁移任务
  5. 在页面右上角,单击创建迁移任务
  6. 配置源库及目标库信息。
    源库及目标库信息
    类别 配置 说明
    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息 数据库类型 选择RDS实例
    实例地区 选择源RDS MySQL实例所属地域。
    实例ID 择源RDS MySQL实例ID。
    数据库账号 填入源RDS MySQL实例的数据库账号。账号需要具备以下权限:
    • 库表结构迁移:SELECT权限。
    • 全量迁移:SELECT权限。
    • 增量迁移:REPLICATION CLIENT、REPLICATION SLAVE、SHOW VIEW和SELECT权限。
    数据库密码 填入该数据库账号对应的密码。
    连接方式 根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见设置SSL加密
    目标库信息 数据库类型 选择RDS实例
    实例地区 选择目标RDS PostgreSQL实例所属地域。
    RDS实例ID 选择目标RDS PostgreSQL实例ID。
    数据库账号 填入目标RDS PostgreSQL实例的数据库账号,需具备以下权限:
    • LOGIN权限。
    • 目标库的CONNECT、CREATE权限。
    • 目标Schema的CREATE权限。
    数据库密码 填入该数据库账号对应的密码。
  7. 配置完成后,需要分别单击源库和目标库数据库密码参数后的测试连接,测试成功后,再进行下一步。
  8. 单击页面右下角授权白名单并进入下一步
  9. 配置迁移类型及列表。
    迁移类型及列表
    配置 说明
    迁移类型 选择结构迁移、全量数据迁移和增量数据迁移。
    迁移对象 选择待迁移的表。本示例为test_innodb和test_mm。
    已选择对象 此处展示已选择待迁移的表。
    映射名称更改 默认为不进行库表名称批量修改,如果需要对库表名称进行批量修改,可在选择要进行库表名称批量更改后,单击页面右下角高级设置,批量修改数据库表名称。
    源库、目标库无法连接后的重试时间 默认为720分钟,无需修改。
  10. 上述配置完成后, 单击页面下方的预检查并启动
    说明
    • 在迁移作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动迁移作业。
    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。
      • 您可以根据提示修复后重新进行预检查。
      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。
  11. 当预检查页面中显示为预检查通过100%时,单击下一步
    与检查成功
  12. 购买配置确认页面,选择数据迁移实例的链路规格。
    说明 DTS为您提供了不同性能的迁移规格,迁移链路规格的不同会影响迁移速率,您可以根据业务场景进行选择,详情请参见 数据迁移链路规格说明
  13. 勾选《数据传输(按量付费)服务条款》
  14. 单击购买并启动,迁移任务正式开始,您可在任务列表查看具体任务进度。

MySQL数据实时迁移至PostgreSQL

  1. 查看当前全量迁移状态。
    1. 连接RDS PostgreSQL数据库。
      psql -h <数据库连接地址> -U <用户名> -p <端口号> -d db2
      说明 由于在 创建迁移任务步骤中创建了数据库db2来接收MySQL迁移的数据,因此,连接PostgreSQL数据库时, 数据库名直接配置成db2。
    2. 执行\dn命令,查看是否将MySQL数据库db1映射为PostgreSQL中的Schema。
      db2=> \dn
         List of schemas
        Name  |   Owner
      --------+------------
       db1    | test1
       public | pg*******
      (2 rows)
      说明 MySQL中的数据库名称,在迁移到PostgreSQL数据库后,将会映射到PostgreSQL中的Schema。
    3. 执行\dt+ db1.*命令,查看db1中的表状态。
      db2=> \dt+ db1.*
                                  List of relations
       Schema |    Name     | Type  | Owner | Persistence | Size  | Description
      --------+-------------+-------+-------+-------------+-------+-------------
       db1    | test_innodb | table | test1 | permanent   | 65 MB |
       db1    | test_mm     | table | test1 | permanent   | 65 MB |
      (2 rows)
                                      
    4. 执行如下命令,分别查询test_innodb和test_mm数据表记录数。
      # 查询test_innodb记录数。
      SELECT COUNT(*) FROM db1.test_innodb;
      
      #查询test_mm记录数。
      SELECT COUNT(*) FROM db1.test_mm;
      执行结果:
      db2=> SELECT COUNT(*) FROM db1.test_innodb;
        count
      ---------
       1000000
      (1 row)
                
      db2=> SELECT COUNT(*) FROM db1.test_mm;
        count
      ---------
       1000000
      (1 row)                        
      说明

      由于MySQL数据库在PostgreSQL中被映射为Schema,在PostgreSQL中查询db1中数据时,需要指定Schema。

      如果您不想每次查询db1数据时都指定Schema,可以设置 search_path参数。
      db2=> show search_path;
         search_path
      -----------------
       "$user", public
      (1 row)
      
      db2=> set search_path = db1, "$user", public;
      SET
      db2=> show search_path;
           search_path
      ----------------------
       db1, "$user", public
      (1 row)
                                              
  2. 新增数据实时迁移测试。
    1. RDS MySQL中新增数据。
      INSERT INTO test_innodb (user_id, group_id, `create_time`) VALUES ('testuser', 1, '2021-07-29 12:00:00');
      执行结果
      mysql> INSERT INTO test_innodb (user_id, group_id, `create_time`) VALUES ('testuser', 1, '2021-07-29 12:00:00');
      Query OK, 1 row affected (0.04 sec)
      
      mysql> SELECT * FROM test_Innodb WHERE user_id = 'testuser';
      +---------+----------+----------+---------------------+
      | id      | user_id  | group_id | create_time         |
      +---------+----------+----------+---------------------+
      | 1000001 | testuser |        1 | 2021-07-29 12:00:00 |
      +---------+----------+----------+---------------------+
      1 row in set (0.03 sec)
    2. RDS PostgreSQL中查看新增的记录是否迁移。
      db2=> SELECT * FROM test_Innodb WHERE user_id = 'testuser';
         id    | user_id  | group_id |     create_time
      ---------+----------+----------+---------------------
       1000001 | testuser |        1 | 2021-07-29 12:00:00
      (1 row)
  3. 更新数据实时迁移测试。
    1. RDS MySQL中更新数据。
      UPDATE test_innodb set group_id = 2 WHERE user_id = 'testuser';
      执行结果:
      mysql> UPDATE test_innodb set group_id = 2 WHERE user_id = 'testuser';
      Query OK, 1 row affected (0.03 sec)
      Rows matched: 1  Changed: 1  Warnings: 0
      
      mysql> SELECT * FROM test_innodb WHERE user_id = 'testuser';
      +---------+----------+----------+---------------------+
      | id      | user_id  | group_id | create_time         |
      +---------+----------+----------+---------------------+
      | 1000001 | testuser |        2 | 2021-07-29 12:00:00 |
      +---------+----------+----------+---------------------+
      1 row in set (0.03 sec)
    2. RDS PostgreSQL中查看更新的记录是否迁移。
      db2=> SELECT * FROM test_innodb WHERE user_id = 'testuser';
         id    | user_id  | group_id |     create_time
      ---------+----------+----------+---------------------
       1000001 | testuser |        2 | 2021-07-29 12:00:00
      (1 row)
  4. 删除数据实时迁移测试。
    1. RDS MySQL中删除数据。
      DELETE FROM test_innodb WHERE user_id = 'testuser';
      执行结果:
      mysql> DELETE FROM test_innodb WHERE user_id = 'testuser';
      Query OK, 1 row affected (0.03 sec)
      
      mysql> SELECT * FROM test_innodb WHERE user_id = 'testuser';
      Empty set (0.03 sec)
      
      mysql> SELECT MAX(id) FROM test_innodb;
      +---------+
      | MAX(id) |
      +---------+
      | 1000000 |
      +---------+
      1 row in set (0.03 sec)
      说明 未新增记录时,id最大为1000000,新增数据后,id自增到1000001,删除后,id又减少至1000000。
    2. RDS PostgreSQL中查看删除记录后数据是否迁移。
      db2=> SELECT * FROM test_innodb WHERE user_id = 'testuser';
       id | user_id | group_id | create_time
      ----+---------+----------+-------------
      (0 rows)
      
      db2=> SELECT MAX(id) FROM test_innodb;
         max
      ---------
       1000000
      (1 row)