You can call the API operations of Data Transmission Service (DTS) in OpenAPI Explorer to achieve one-way or two-way data synchronization between ApsaraDB for Redis Enhanced Edition (Tair) instances. This topic describes the precautions and provides sample code.

The sample code provided in this topic is written in Java. You must replace <accessKeyId> and <accessSecret> in the sample code with your AccessKey ID and AccessKey secret. OpenAPI Explorer automatically calculates the signature value. For your convenience, we recommend that you call API operations in OpenAPI Explorer. After you call an operation, OpenAPI Explorer dynamically generates the sample code of the operation for different SDKs.

Prerequisites

The source and destination instances are ApsaraDB for Redis Enhanced Edition (Tair) instances (version 5.0).
Note
  • For one-way synchronization, the source instance cannot be a storage-optimized instance. For two-way synchronization, the source and destination instances cannot be storage-optimized instances.
  • The disk type of the source or destination ApsaraDB for Redis instance is local disk rather than cloud disk.
  • ApsaraDB for Redis Enhanced Edition (Tair) supports the cluster, standard, and read/write splitting architectures.

Precautions

  • During two-way data synchronization, the data synchronization task in the forward direction performs full data synchronization and incremental data synchronization. The data synchronization task in the reverse direction performs only incremental data synchronization.
    Warning To ensure data consistency, do not modify or write data to the same key in the source and destination databases when the two-way data synchronization tasks are running.
  • DTS uses the resources of the source and destination databases during full data synchronization. This may increase the loads of the database servers. If you synchronize a large volume of data or the server specifications cannot meet your requirements, the database services may become unavailable. Before you synchronize data, evaluate the impact of data synchronization on the performance of the source and destination instances. We recommend that you synchronize data during off-peak hours.
  • We recommend that you do not run the FLUSHDB or FLUSHALL command in the source instance during data synchronization. Otherwise, data may become inconsistent between the source and destination instances.
  • If the data eviction policy (maxmemory-policy) of the destination database is not set to noeviction, data may become inconsistent between the source and destination databases. For more information about data eviction policies, see How does ApsaraDB for Redis evict data by default?
  • If an expiration policy is enabled for some keys in the source database, these keys may not be deleted in a timely manner after they expired. Therefore, the number of keys in the destination database may be less than that in the source database. You can run the info command to view the number of keys in the destination database.
    Note The number of keys that do not have an expiration policy or have not expired is the same in the source and destination databases.
  • If direct connection is disabled for the destination ApsaraDB for Redis instance, DTS uses the proxy forwarding mode to write data to the destination instance.
    Note For more information, see Enable the direct connection mode.
  • During data synchronization, if the number of shards in the source ApsaraDB for Redis instance is increased or decreased, or if the database specifications are changed (for example, the memory capacity is scaled up), you must reconfigure the task. To ensure data consistency, we recommend that you clear the data that has been synchronized to the destination Redis database before you reconfigure the task.
  • During data synchronization, if the endpoint of the source ApsaraDB for Redis instance is changed (for example, the zone of the instance is changed or the network type is changed from classic network to VPC), you must submit a ticket to update the change. Otherwise, the append-only files (AOF) of the source ApsaraDB for Redis instance may be reset. In this case, you must reconfigure the task.

Limits

One-way cascade synchronization is not supported. For more information, see Synchronization topologies.

Operations that can be synchronized

  • APPEND
  • BITOP, BLPOP, BRPOP, and BRPOPLPUSH
  • DECR, DECRBY, and DEL
  • EVAL, EVALSHA, EXEC, EXPIRE, and EXPIREAT
  • GEOADD and GETSET
  • HDEL, HINCRBY, HINCRBYFLOAT, HMSET, HSET, and HSETNX
  • INCR, INCRBY, and INCRBYFLOAT
  • LINSERT, LPOP, LPUSH, LPUSHX, LREM, LSET, and LTRIM
  • MOVE, MSET, MSETNX, and MULTI
  • PERSIST, PEXPIRE, PEXPIREAT, PFADD, PFMERGE, PSETEX, and PUBLISH
  • RENAME, RENAMENX, RPOP, RPOPLPUSH, RPUSH, and RPUSHX
  • SADD, SDIFFSTORE, SELECT, SET, SETBIT, SETEX, SETNX, SETRANGE, SINTERSTORE, SMOVE, SPOP, SREM, and SUNIONSTORE
  • ZADD, ZINCRBY, ZINTERSTORE, ZREM, ZREMRANGEBYLEX, ZUNIONSTORE, ZREMRANGEBYRANK, and ZREMRANGEBYSCORE
  • SWAPDB (This operation is not supported if the source or destination Redis instance is deployed in the cluster architecture.)
Notice
  • If you run the EVAL or EVALSHA command to call Lua scripts, DTS cannot identify whether these Lua scripts are executed on the destination database. During incremental data synchronization, the destination database does not explicitly return the execution results of Lua scripts.
  • When DTS calls the SYNC or PSYNC command to transfer data of the LIST type, DTS does not clear the existing data. In this case, the destination database may contain duplicate data records.

Before you begin

  1. Create an AccessKey pair. For more information, see Create an AccessKey pair.
    Note To protect the AccessKey pair of your Alibaba Cloud account, we recommend that you create a RAM user, grant the RAM user the permissions to access DTS, and then use the AccessKey pair of the RAM user to call DTS SDK for Java. For more information, see Implement access control by using RAM.
  2. Download the SDK package. For more information, see Use DTS SDK for Java.

Create a data synchronization task

For more information about the request parameters and response parameters in this example, see CreateSynchronizationJob.

Note If you set the Topology parameter to oneway, a one-way data synchronization task is created. If you set the parameter to bidirectional, two-way data synchronization tasks are created.

Sample request:

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import java.util.*;
import com.aliyuncs.dts.model.v20180801.*;

public class CreateSynchronizationJob {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
        IAcsClient client = new DefaultAcsClient(profile);

        CreateSynchronizationJobRequest request = new CreateSynchronizationJobRequest();
        request.setRegionId("cn-hangzhou");
        request.setSourceRegion("cn-beijing");
        request.setDestRegion("cn-beijing");
        request.setSynchronizationJobClass("small");
        request.setPayType("Postpaid");
        request.setTopology("oneway");
        request.setSourceEndpointInstanceType("Redis");
        request.setDestinationEndpointInstanceType("Redis");

        try {
            CreateSynchronizationJobResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

Sample response:

{
    "SynchronizationJobId": "dts********",
    "RequestId": "158347*****-rqyjQ",
    "Success": true
}

Configure a one-way data synchronization task

In this example, the database account of the source instance must have the read permissions on the source database. The database account of the destination instance must have the read and write permissions on the destination database.
Note When you configure a one-way data synchronization task, you can also use a self-managed Redis database or an ApsaraDB for Redis Community Edition instance as the source or destination database. The configuration method is similar to that described in this topic. However, you must specify the request parameters based on the actual scenario, for example, MigrationReserved.

For more information about the request parameters and response parameters, see ConfigureSynchronizationJob.

Sample request:

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import java.util.*;
import com.aliyuncs.dts.model.v20180801.*;

public class ConfigureSynchronizationJob {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
        IAcsClient client = new DefaultAcsClient(profile);

        ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
        request.setRegionId("cn-hangzhou");
        request.setSynchronizationJobId("dts*******");
        request.setStructureInitialization(true);
        request.setDataInitialization(true);
        request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
        request.setSynchronizationJobName("apitest");
        request.setSourceEndpointInstanceId("r-2ze********");
        request.setSourceEndpointInstanceType("redis");
        request.setSourceEndpointPassword("Test******");
        request.setDestinationEndpointInstanceId("r-2ze********");
        request.setDestinationEndpointInstanceType("redis");
        request.setDestinationEndpointPassword("Test******");
        request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");

        try {
            ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

Sample response:

{
    "RequestId": "1583********-BcO5F",
    "Success": true
}

Configure two-way data synchronization tasks

In this example, the database accounts used for data synchronization must have the read and write permissions.

For more information about the request parameters and response parameters, see ConfigureSynchronizationJob.

Sample request:

  1. Configure the data synchronization task in the forward direction.
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import java.util.*;
    import com.aliyuncs.dts.model.v20180801.*;
    
    public class ConfigureSynchronizationJob {
    
        public static void main(String[] args) {
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
            IAcsClient client = new DefaultAcsClient(profile);
    
            ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
            request.setRegionId("cn-hangzhou");
            request.setSynchronizationJobId("dts*******");
            request.setStructureInitialization(true);
            request.setDataInitialization(true);
            request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
            request.setSynchronizationJobName("apitest");
            request.setSynchronizationDirection("Forward");
            request.setSourceEndpointInstanceId("r-2ze********");
            request.setSourceEndpointInstanceType("redis");
            request.setSourceEndpointPassword("Test******");
            request.setDestinationEndpointInstanceId("r-2ze********");
            request.setDestinationEndpointInstanceType("redis");
            request.setDestinationEndpointPassword("Test******");
            request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");
    
            try {
                ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
                System.out.println(new Gson().toJson(response));
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                System.out.println("ErrCode:" + e.getErrCode());
                System.out.println("ErrMsg:" + e.getErrMsg());
                System.out.println("RequestId:" + e.getRequestId());
            }
    
        }
    }
  2. Wait until the data synchronization task in the forward direction enters the Synchronizing state.
    Note You can view the status of a data synchronization task in the DTS console. You can also call the DescribeSynchronizationJobStatus operation to view the status.
  3. Configure the data synchronization task in the reverse direction.
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import java.util.*;
    import com.aliyuncs.dts.model.v20180801.*;
    
    public class ConfigureSynchronizationJob {
    
        public static void main(String[] args) {
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
            IAcsClient client = new DefaultAcsClient(profile);
    
            ConfigureSynchronizationJobRequest request = new ConfigureSynchronizationJobRequest();
            request.setRegionId("cn-hangzhou");
            request.setSynchronizationJobId("dts*******");
            request.setStructureInitialization(true);
            request.setDataInitialization(true);
            request.setSynchronizationObjects("[{\"DBName\": \"0\",\"NewDBName\": \"0\"}]");
            request.setSynchronizationJobName("apitest");
            request.setSynchronizationDirection("Reverse");
            request.setSourceEndpointInstanceId("r-2ze********");
            request.setSourceEndpointInstanceType("redis");
            request.setSourceEndpointPassword("Test******");
            request.setDestinationEndpointInstanceId("r-2ze********");
            request.setDestinationEndpointInstanceType("redis");
            request.setDestinationEndpointPassword("Test******");
            request.setMigrationReserved("{\"srcRedisType\":\"enterprise\",\"destRedisType\":\"enterprise\"}");
    
            try {
                ConfigureSynchronizationJobResponse response = client.getAcsResponse(request);
                System.out.println(new Gson().toJson(response));
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                System.out.println("ErrCode:" + e.getErrCode());
                System.out.println("ErrMsg:" + e.getErrMsg());
                System.out.println("RequestId:" + e.getRequestId());
            }
    
        }
    }

Sample response:

{
    "RequestId": "1583********-BcO5F",
    "Success": true
}

References