您可以通过调用DTS的相关接口,为Redis企业版实例配置单向或双向数据同步。本文介绍相关注意事项及示例代码。

本文展示的示例代码基于Java语言,您需要将示例代码中的<accessKeyId><accessSecret>分别替换为您的AccessKeyId和AccessKeySecret。您也可以在OpenAPI Explorer中直接运行相关接口,免去您计算签名的困扰。运行成功后,OpenAPI Explorer可以自动生成SDK代码示例。

前提条件

源和目标实例为阿里云Redis企业版实例(5.0版本)。
说明
  • 单向同步时,不支持源实例为容量存储型;双向同步时,不支持源和目标实例为容量存储型。
  • 源云数据库Redis为企业版(持久内存型)时,需要用户开启appendonly参数。
  • 架构类型不限制,即支持集群版标准版读写分离版
  • 源Redis实例Slave和Master之间的复制超时时间参数repl-timeout,默认为60秒, 建议使用config set repl-timeout 600命令设置为600秒。若源数据库数据量比较大,可以适当增大repl-timeout参数的值。

注意事项

  • 双向数据同步时,正向数据同步作业会执行 全量数据初始化增量数据同步 ,反向数据同步作业仅执行增量数据同步。
    警告 为保障数据一致性,双向数据同步作业运行期间,请勿在两端数据库同时对同一个key执行修改或写入操作。
  • DTS在执行全量数据初始化时将占用源库和目标库一定的资源,可能会导致数据库服务器负载上升。如果数据库业务量较大或服务器规格较低,可能会加重数据库压力,甚至导致数据库服务不可用。建议您在执行数据迁移前谨慎评估,在业务低峰期执行数据同步。
  • 请勿在源实例中执行FLUSHDBFLUSHALL命令,否则将导致源和目标的数据不一致。
  • 若目标数据库内存不足,触发数据逐出时,由于云数据库Redis版的默认数据逐出策略(maxmemory-policy)为volatile-lru,会导致目标库与源库数据不一致的情况,但不会影响任务的正常运行。

    为避免该情况发生,建议将目标库的数据逐出策略设置为noeviction,当目标库内存不足时,数据会写入失败,同时任务也会失败,但目标库不会因为数据逐出而丢失数据。

    说明 关于数据逐出策略详情,请参见Redis数据逐出策略介绍
  • 如果源库中的某些Key使用了过期(expire)策略,由于可能存在Key已过期但未被及时删除的情况,所以在目标库中查看到的Key数量(例如通过info命令查看)会比源库的Key数量少。
    说明 源和目标库中,未设置过期策略或未过期的Key数量是一致的。
  • 如果目标Redis实例没有开通直连访问,DTS将采用代理转发模式将数据写入目标实例。
    说明 关于直连访问的开通方法,请参见开通直连访问
  • 同步期间,如源Redis实例发生扩缩容(如增加或者减少分片)、规格变配(如扩大内存),则您需重新配置任务。且为保障数据一致性,在重新配置任务前,建议先清空已同步至目标Redis的数据。
  • 同步期间,如源Redis实例的连接地址变化(如迁移可用区、经典网络切换专有网络),则您需要重新配置任务。
  • 如双向同步任务的源实例或目标实例位于海外地域,则仅支持同地域的双向同步,不支持跨地域的双向同步。例如,支持日本地域间的双向同步,不支持日本地域与法兰克福地域间的双向同步。
  • 源库单机版Redis同步到目标库集群版Redis的操作限制:由于集群cluster只允许单个命令操作单个slot,若在源库执行多Key操作时,Key不在同一个slot或涉及多个slot,则会出现以下报错:
    CROSSSLOT Keys in request don't hash to the same slot
    建议在DTS同步过程中仅执行单Key操作,以免导致链路中断。
  • 若目标实例的架构类型为集群版且某一个分片达到了内存上限,或目标实例的存储空间不足时,DTS任务会因内存溢出(Out of Memory)而失败。
  • 若源或目标实例已开启透明数据加密TDE功能,则暂不支持通过DTS同步数据。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

功能限制

不支持级联单向同步,相关介绍请参见数据同步拓扑介绍

支持的同步命令

  • APPEND
  • BITOP、BLPOP、BRPOP、BRPOPLPUSH
  • DECR、DECRBY、DEL
  • EVAL、EVALSHA、EXEC、EXPIRE、EXPIREAT
  • GEOADD、GETSET
  • HDEL、HINCRBY、HINCRBYFLOAT、HMSET、HSET、HSETNX
  • INCR、INCRBY、INCRBYFLOAT
  • LINSERT、LPOP、LPUSH、LPUSHX、LREM、LSET、LTRIM
  • MOVE、MSET、MSETNX、MULTI
  • PERSIST、PEXPIRE、PEXPIREAT、PFADD、PFMERGE、PSETEX
  • RENAME、RENAMENX、RPOP、RPOPLPUSH、RPUSH、RPUSHX
  • SADD、SDIFFSTORE、SELECT、SET、SETBIT、SETEX、SETNX、SETRANGE、SINTERSTORE、SMOVE、SPOP、SREM、SUNIONSTORE
  • UNLINK、ZADD、ZINCRBY、ZINTERSTORE、ZREM、ZREMRANGEBYLEX、ZUNIONSTORE、ZREMRANGEBYRANK、ZREMRANGEBYSCORE
  • SWAPDB(当源或目标实例的架构为集群版时不支持)
重要
  • 不支持同步PUBLISH命令。
  • 对于通过EVAL或者EVALSHA调用Lua脚本,在增量数据同步时,由于目标端在执行脚本时不会明确返回执行结果,DTS无法确保该类型脚本能够执行成功。
  • 对于List,由于DTS在调用sync或psync进行重传时,不会对目标端已有的数据进行清空,可能导致出现重复数据。

准备工作

  1. 创建AccessKey
    说明 为避免主账号泄露AccessKey带来的安全风险,建议您创建RAM用户,授予RAM用户访问DTS资源的权限,再使用RAM用户的AccessKey来调用SDK。详情请参见账号访问控制
  2. 下载SDK。相关安装方法及介绍,请参见DTS Java SDK使用说明

创建数据同步作业

本示例中,请求参数和返回参数的详细说明请参见购买同步实例

说明 Topology参数传入oneway,则创建单向数据同步作业;传入bidirectional则创建双向数据同步作业。

请求示例:

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import java.util.*;
import com.aliyuncs.dts.model.v20180801.*;

public class CreateSynchronizationJob {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
        IAcsClient client = new DefaultAcsClient(profile);

        CreateSynchronizationJobRequest request = new CreateSynchronizationJobRequest();
        request.setRegionId("cn-hangzhou");
        request.setSourceRegion("cn-beijing");
        request.setDestRegion("cn-beijing");
        request.setSynchronizationJobClass("small");
        request.setPayType("Postpaid");
        request.setTopology("oneway");
        request.setSourceEndpointInstanceType("Redis");
        request.setDestinationEndpointInstanceType("Redis");

        try {
            CreateSynchronizationJobResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

返回示例:

{
    "SynchronizationJobId": "dts********",
    "RequestId": "158347*****-rqyjQ",
    "Success": true
}

配置数据同步作业(单向)

本示例中,源实例的数据库账号需具备读权限,目标实例的数据库账号需具备读写权限。
说明 配置单向数据同步时,您也可以将自建Redis数据库或阿里云Redis社区版实例作为源库或目标库,配置方法与本案例类似(需调整对应的请求参数,例如MigrationReserved)。

关于请求参数和返回参数的详细说明请参见配置同步实例

请求示例:

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import java.util.*;
import com.aliyuncs.dts.model.v20180801.*;

public class ConfigureSynchronizationJob {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
        IAcsClient client = new DefaultAcsClient(profile);

        ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
        request.setRegionId("cn-hangzhou");
        request.setSynchronizationJobId("dts*******");
        request.setStructureInitialization(true);
        request.setDataInitialization(true);
        request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
        request.setSynchronizationJobName("apitest");
        request.setSourceEndpointInstanceId("r-2ze********");
        request.setSourceEndpointInstanceType("redis");
        request.setSourceEndpointPassword("Test******");
        request.setDestinationEndpointInstanceId("r-2ze********");
        request.setDestinationEndpointInstanceType("redis");
        request.setDestinationEndpointPassword("Test******");
        request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");

        try {
            ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

返回示例:

{
    "RequestId": "1583********-BcO5F",
    "Success": true
}

配置数据同步作业(双向)

本示例中,用于数据同步的数据库账号需具备读写权限。

关于请求参数和返回参数的详细说明请参见配置同步实例

请求示例:

  1. 配置正向数据同步作业。
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import java.util.*;
    import com.aliyuncs.dts.model.v20180801.*;
    
    public class ConfigureSynchronizationJob {
    
        public static void main(String[] args) {
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
            IAcsClient client = new DefaultAcsClient(profile);
    
            ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
            request.setRegionId("cn-hangzhou");
            request.setSynchronizationJobId("dts*******");
            request.setStructureInitialization(true);
            request.setDataInitialization(true);
            request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
            request.setSynchronizationJobName("apitest");
            request.setSynchronizationDirection("Forward");
            request.setSourceEndpointInstanceId("r-2ze********");
            request.setSourceEndpointInstanceType("redis");
            request.setSourceEndpointPassword("Test******");
            request.setDestinationEndpointInstanceId("r-2ze********");
            request.setDestinationEndpointInstanceType("redis");
            request.setDestinationEndpointPassword("Test******");
            request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");
    
            try {
                ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
                System.out.println(new Gson().toJson(response));
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                System.out.println("ErrCode:" + e.getErrCode());
                System.out.println("ErrMsg:" + e.getErrMsg());
                System.out.println("RequestId:" + e.getRequestId());
            }
    
        }
    }
  2. 等待正向数据同步作业初始化完成(即转变为同步中状态)。
    说明 您可以通过控制台或调用查询一个数据同步实例状态接口查看数据同步作业的状态信息。
  3. 配置反向数据同步作业。
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import java.util.*;
    import com.aliyuncs.dts.model.v20180801.*;
    
    public class ConfigureSynchronizationJob {
    
        public static void main(String[] args) {
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
            IAcsClient client = new DefaultAcsClient(profile);
    
            ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
            request.setRegionId("cn-hangzhou");
            request.setSynchronizationJobId("dts*******");
            request.setStructureInitialization(true);
            request.setDataInitialization(true);
            request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
            request.setSynchronizationJobName("apitest");
            request.setSynchronizationDirection("Reverse");
            request.setSourceEndpointInstanceId("r-2ze********");
            request.setSourceEndpointInstanceType("redis");
            request.setSourceEndpointPassword("Test******");
            request.setDestinationEndpointInstanceId("r-2ze********");
            request.setDestinationEndpointInstanceType("redis");
            request.setDestinationEndpointPassword("Test******");
            request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");
    
            try {
                ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
                System.out.println(new Gson().toJson(response));
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                System.out.println("ErrCode:" + e.getErrCode());
                System.out.println("ErrMsg:" + e.getErrMsg());
                System.out.println("RequestId:" + e.getRequestId());
            }
    
        }
    }

返回示例:

{
    "RequestId": "1583********-BcO5F",
    "Success": true
}

相关文档