You can call the API operations of Data Transmission Service (DTS) in OpenAPI Explorer to configure one-way or two-way data synchronization between ApsaraDB for Redis Enhanced Edition (Tair) instances. This topic describes the usage notes and provides sample code.

The sample code provided in this topic is written in Java. You must replace <accessKeyId> and <accessSecret> in the sample code with your AccessKey ID and AccessKey secret. OpenAPI Explorer automatically calculates the signature value. For your convenience, we recommend that you call API operations in OpenAPI Explorer. OpenAPI Explorer dynamically generates the sample code of the operation for different SDKs.

Prerequisites

The source and destination instances are ApsaraDB for Redis Enhanced Edition (Tair) instances that run Redis 5.0.
Note
  • For one-way synchronization, the source instance cannot be a storage-optimized instance. For two-way synchronization, the source and destination instances cannot be storage-optimized instances.
  • The appendonly parameter is set to yes if the source instance is a persistent memory-optimized instance of ApsaraDB for Redis Enhanced Edition (Tair).
  • The architecture of the instances is not restricted. The architecture can be cluster, standard, or read/write splitting.

Usage notes

  • During two-way data synchronization, the data synchronization task in the forward direction performs full data synchronization and incremental data synchronization. The data synchronization task in the reverse direction performs only incremental data synchronization.
    Warning To ensure data consistency, do not modify or write data to the same key in the source and destination databases when the two-way data synchronization tasks are running.
  • DTS uses the resources of the source and destination databases during full data synchronization. This may increase the loads of the database servers. If you synchronize a large volume of data or the server specifications cannot meet your requirements, the database services may become unavailable. Before you synchronize data, evaluate the impact of data synchronization on the performance of the source and destination databases. We recommend that you synchronize data during off-peak hours.
  • We recommend that you do not run the FLUSHDB or FLUSHALL command in the source instance during data synchronization. Otherwise, data may become inconsistent between the source and destination instances.
  • If the data eviction policy (maxmemory-policy) of the destination instance is not set to noeviction, data inconsistency may occur between the source and destination instances. For more information about data eviction policies, see How does ApsaraDB for Redis evict data by default?
  • If an expiration policy is enabled for some keys in the source database, these keys may not be deleted in a timely manner after they expired. Therefore, the number of keys in the destination database may be less than that in the source database. You can run the info command to view the number of keys in the destination database.
    Note The number of keys that do not have an expiration policy or have not expired is the same in the source and destination databases.
  • If direct connection is disabled for the destination ApsaraDB for Redis instance, DTS uses the proxy forwarding mode to write data to the destination instance.
    Note For more information about how to enable direct connection, see Enable the direct connection mode.
  • During data synchronization, if the number of shards in the source instance is changed or if the database specifications are changed (for example, the memory capacity is scaled up), you must reconfigure the data synchronization task. To ensure data consistency, we recommend that you clear the data that has been synchronized to the destination instance before you reconfigure the task.
  • During data synchronization, if the endpoint of the source instance is changed, you must submit a ticket to update the endpoint change. Instance operations that may cause endpoint changes include zone changes and network type changes from classic network to virtual private cloud (VPC). Otherwise, the append-only files (AOFs) of the source instance may be reset. For this reason, you must reconfigure the task.
  • If the source or destination database of a two-way data synchronization task resides in a region outside the Chinese mainland, data can be synchronized only within this region. Cross-region two-way synchronization is not supported. For example, if the source instance resides in the Japan (Tokyo) region, data can be synchronized only within the Japan (Tokyo) region and cannot be synchronized to the Germany (Frankfurt) region.
  • Limits on synchronizing data from a standalone ApsaraDB for Redis instance to an ApsaraDB for Redis cluster instance: Each command can be run only on a single slot in an ApsaraDB for Redis cluster instance. If you perform operations on multiple keys in the source database and the keys belong to different slots, the following error occurs:
    CROSSSLOT Keys in request don't hash to the same slot
    We recommend that you perform operations on only one key during data synchronization. Otherwise, the data synchronization task will be interrupted.

Limits

One-way cascade synchronization is not supported. For more information, see Synchronization topologies.

Operations that can be synchronized

  • APPEND
  • BITOP, BLPOP, BRPOP, and BRPOPLPUSH
  • DECR, DECRBY, and DEL
  • EVAL, EVALSHA, EXEC, EXPIRE, and EXPIREAT
  • GEOADD and GETSET
  • HDEL, HINCRBY, HINCRBYFLOAT, HMSET, HSET, and HSETNX
  • INCR, INCRBY, and INCRBYFLOAT
  • LINSERT, LPOP, LPUSH, LPUSHX, LREM, LSET, and LTRIM
  • MOVE, MSET, MSETNX, and MULTI
  • PERSIST, PEXPIRE, PEXPIREAT, PFADD, PFMERGE, and PSETEX
  • RENAME, RENAMENX, RPOP, RPOPLPUSH, RPUSH, and RPUSHX
  • SADD, SDIFFSTORE, SELECT, SET, SETBIT, SETEX, SETNX, SETRANGE, SINTERSTORE, SMOVE, SPOP, SREM, and SUNIONSTORE
  • UNLINK, ZADD, ZINCRBY, ZINTERSTORE, ZREM, ZREMRANGEBYLEX, ZUNIONSTORE, ZREMRANGEBYRANK, and ZREMRANGEBYSCORE
  • SWAPDB (This operation is not supported if the source or destination ApsaraDB for Redis instance is deployed in the cluster architecture.)
Notice
  • PUBLISH operations cannot be synchronized.
  • For an EVAL or EVALSHA command that is run to call Lua scripts, DTS does not ensure that these Lua scripts are executed on the destination database. This is because the destination database does not explicitly return the execution results of Lua scripts during incremental data synchronization.
  • When DTS runs the SYNC or PSYNC command to transfer data of the LIST type, DTS does not clear the existing data. As a result, the destination instance may contain duplicate data records.

Preparations

  1. Create an AccessKey pair.
    Note To protect the AccessKey pair of your Alibaba Cloud account, we recommend that you create a RAM user, grant the RAM user the permissions to access DTS, and then use the AccessKey pair of the RAM user to call DTS SDK for Java. For more information, see Implement access control by using RAM.
  2. Download DTS SDK for Java. For more information, see Use DTS SDK for Java.

Create a data synchronization task

For more information about the request parameters and response parameters in this example, see CreateSynchronizationJob.

Note If you set the Topology parameter to oneway, a one-way data synchronization task is created. If you set the parameter to bidirectional, two-way data synchronization tasks are created.

Sample request:

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import java.util.*;
import com.aliyuncs.dts.model.v20180801.*;

public class CreateSynchronizationJob {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
        IAcsClient client = new DefaultAcsClient(profile);

        CreateSynchronizationJobRequest request = new CreateSynchronizationJobRequest();
        request.setRegionId("cn-hangzhou");
        request.setSourceRegion("cn-beijing");
        request.setDestRegion("cn-beijing");
        request.setSynchronizationJobClass("small");
        request.setPayType("Postpaid");
        request.setTopology("oneway");
        request.setSourceEndpointInstanceType("Redis");
        request.setDestinationEndpointInstanceType("Redis");

        try {
            CreateSynchronizationJobResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

Sample response:

{
    "SynchronizationJobId": "dts********",
    "RequestId": "158347*****-rqyjQ",
    "Success": true
}

Configure a one-way data synchronization task

In this example, the database account of the source instance has the read permissions on the source database. The database account of the destination instance has the read and write permissions on the destination database.
Note When you configure a one-way data synchronization task, you can also use a self-managed Redis database or an ApsaraDB for Redis Community Edition instance as the source or destination database. The configuration method is similar to that described in this topic. However, you must specify the request parameters such as MigrationReserved based on your actual scenario.

For more information about the request parameters and response parameters, see ConfigureSynchronizationJob.

Sample request:

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import java.util.*;
import com.aliyuncs.dts.model.v20180801.*;

public class ConfigureSynchronizationJob {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
        IAcsClient client = new DefaultAcsClient(profile);

        ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
        request.setRegionId("cn-hangzhou");
        request.setSynchronizationJobId("dts*******");
        request.setStructureInitialization(true);
        request.setDataInitialization(true);
        request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
        request.setSynchronizationJobName("apitest");
        request.setSourceEndpointInstanceId("r-2ze********");
        request.setSourceEndpointInstanceType("redis");
        request.setSourceEndpointPassword("Test******");
        request.setDestinationEndpointInstanceId("r-2ze********");
        request.setDestinationEndpointInstanceType("redis");
        request.setDestinationEndpointPassword("Test******");
        request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");

        try {
            ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

Sample response:

{
    "RequestId": "1583********-BcO5F",
    "Success": true
}

Configure two-way data synchronization tasks

In this example, the database accounts used for data synchronization have the read and write permissions.

For more information about the request parameters and response parameters, see ConfigureSynchronizationJob.

Sample request:

  1. Configure the data synchronization task in the forward direction.
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import java.util.*;
    import com.aliyuncs.dts.model.v20180801.*;
    
    public class ConfigureSynchronizationJob {
    
        public static void main(String[] args) {
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
            IAcsClient client = new DefaultAcsClient(profile);
    
            ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
            request.setRegionId("cn-hangzhou");
            request.setSynchronizationJobId("dts*******");
            request.setStructureInitialization(true);
            request.setDataInitialization(true);
            request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
            request.setSynchronizationJobName("apitest");
            request.setSynchronizationDirection("Forward");
            request.setSourceEndpointInstanceId("r-2ze********");
            request.setSourceEndpointInstanceType("redis");
            request.setSourceEndpointPassword("Test******");
            request.setDestinationEndpointInstanceId("r-2ze********");
            request.setDestinationEndpointInstanceType("redis");
            request.setDestinationEndpointPassword("Test******");
            request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");
    
            try {
                ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
                System.out.println(new Gson().toJson(response));
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                System.out.println("ErrCode:" + e.getErrCode());
                System.out.println("ErrMsg:" + e.getErrMsg());
                System.out.println("RequestId:" + e.getRequestId());
            }
    
        }
    }
  2. Wait until the data synchronization task in the forward direction enters the Synchronizing state.
    Note You can view the status of a data synchronization task in the DTS console. You can also call the DescribeSynchronizationJobStatus operation to view the status.
  3. Configure the data synchronization task in the reverse direction.
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import java.util.*;
    import com.aliyuncs.dts.model.v20180801.*;
    
    public class ConfigureSynchronizationJob {
    
        public static void main(String[] args) {
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
            IAcsClient client = new DefaultAcsClient(profile);
    
            ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
            request.setRegionId("cn-hangzhou");
            request.setSynchronizationJobId("dts*******");
            request.setStructureInitialization(true);
            request.setDataInitialization(true);
            request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
            request.setSynchronizationJobName("apitest");
            request.setSynchronizationDirection("Reverse");
            request.setSourceEndpointInstanceId("r-2ze********");
            request.setSourceEndpointInstanceType("redis");
            request.setSourceEndpointPassword("Test******");
            request.setDestinationEndpointInstanceId("r-2ze********");
            request.setDestinationEndpointInstanceType("redis");
            request.setDestinationEndpointPassword("Test******");
            request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");
    
            try {
                ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
                System.out.println(new Gson().toJson(response));
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                System.out.println("ErrCode:" + e.getErrCode());
                System.out.println("ErrMsg:" + e.getErrMsg());
                System.out.println("RequestId:" + e.getRequestId());
            }
    
        }
    }

Sample response:

{
    "RequestId": "1583********-BcO5F",
    "Success": true
}

References