DTS data subscription uses the DTS SDK to subscribe and consume change data. Before using the SDK for consumption, you need to create a subscription instance and subscribe it to an RDS instance on the DTS Console.
When the subscription instance is ready, use the SDK for real-time subscription of change data in the subscription instance. Note the following:
- DTS only supports SDK JAVA version.
- One subscription channel can only be consumed by one SDK. If multiple SDKs are connected to one subscription instance, only one SDK process can consume the change data.
- In the case that multiple SDKs need to subscribe to one RDS instance, it is advised to create a corresponding subscription instance for each downstream SDK.
SDK defines many types of class objects. The following section introduces the interface definition of these SDK class objects.
- Set security credentials. The parameters correspond to the AccessKey of the Alibaba Cloud account of the subscription instance.
- Set security credentials. The parameters are the AccessKeySecret of the Alibaba Cloud account. It can be created and obtained from the AccessKeys page.
- Set whether to use the internet for data subscription by the SDK running server. If yes, the usePublicIp value is True. Otherwise, it is False. DTS only supports data subscription via the internet.
void addConcurrentListener(ClusterListener arg0)
Add a downstream listener to a ClusterClient. The listener is allowed to subscribe to the change data of the subscription channel.
ClusterListener arg0is an object of the ClusterListener class.
void askForGUID(String arg0)
Request change data from a subscription instance.
String arg0is the subscription instance ID and is obtained from the DTS Console, as shown in the following figure:
List <ClusterListener> getConcurrentListeners()
- Obtain the listener list of the current ClusterClient. The interface return type is List <ClusterListener>.
- Start SDK client, and begin subscribing to the change data.
Shut down the SDK client, and stop subscribing to the change data.
Data pulling and notify callback are performed in the same thread on the SDK. If the notify consumption code contains a function that prevents signal interruption, the stop function may not be able to smoothly close the client.
void notify(List<ClusterMessage> arg0)
Define the consumption of the change data. After receiving data, the SDK uses
notifyto inform the ClusterListner to consume data. For example, the consumption method of the demo is to print the subscription data on screen.
The input parameter type of this function is: List <ClusterMessage>. Where the
ClusterMessageis the storage structure object of the subscription data. For detailed definitions, refer to the interface definition of ClusterMessage in the following section.
Each ClusterMessage stores the data record of one transaction in RDS, and each record is stored through Record. This section introduces the main interface functions of ClusterMessage.
- Obtain a change record from ClusterMessage. The change record includes every record in the RDS binlog file, such as begin, commit, update, and insert.
- To simplify the downstream SDK disaster recovery process, the data subscription service endpoint supports consumption point storage. After an abnormal downtime and restart, the SDK automatically subscribes to and consumes data from the last consumption timestamp before the downtime.
Note: After the message consumption is complete, pull this interface to report an ACK, informing the DTS service endpoint to update the SDK consumption timestamp. This ensures the integrity of the consumption data after a SDK abnormal restart.
Record represents every record in the RDS binlog of the subscription instance, such as begin, commit, and update.
String getAttribute(String key)
- Obtain main property values in Record. The input parameter is the property name, and the return value is the property value.
The following property list indicates all attainable properties by calling this function.
|record_id||Record ID. The ID number does not ascend during the subscription process.|
|instance||The database instance connection address of this Record. The format is
|source_type||The database instance engine type of this Record. The current value is
|source_category||The type of this Record. The current value is
|timestamp||The creation time of the Records at the binlog. The time is also the SQL running time in RDS.|
|checkpoint||The corresponding binlog file point of this Record. The format is
|record_type||The corresponding operation type of this Record. The main values include: insert, update, delete, replace, ddl, begin, commit, and heartbeat.|
|db||The corresponding database name of the update table of this Record.|
|table_name||The name of the update table of this Record.|
|record_recording||The encoding of this Record.|
|primary||The Primary key column name of the update table of this Record.|
|fields_enc||The values of each encoding field of this Record. Each field is separated by a comma. The value of non-character type is empty.|
Obtain the change type of this record, including: Insert, delete, update, replace, ddl, begin, commit, and heartbeat.
The heartbeat is an exclusively defined indicator to reflect the health status of the subscription instance. In theory, DTS generates one heartbeat record per second.
Obtain the checkpoint of this change record in the binlog. The returned checkpoint format is
binlog_offsetrefers to the offset of the change record in the binlog file. The
binlog_fidrefers to the numeric suffix of the binlog file. For example, if the binlog file name is mysql-bin.0008, the binlog_fid is 8.
- Obtain the timestamp of this change record in the binlog.
- Obtain the corresponding database name of the table modified in the change record.
- Obtain the corresponding table name of the change record.
- Obtain the corresponding primary key column name of the change record. In the combined primary key, the column names are separated by commas.
- Obtain the database type of the subscription instance. DTS only supports RDS MySQL, so the value is MySQL.
- Obtain the corresponding IP:PORT of the RDS MySQL instance running process of the change record.
- Obtain the number of Fields in the change record.
- The returned result data type of this function is List < Field >List<Field>. It contains the fields’ definitions of the change record and the values before and after the change. For definition of Field object, refer to the following section Field interface definition.
- Checks whether this Record is the first transaction log in the database batch change. If yes, returns True. Otherwise, returns False.
Class Field defines the encoding, type, field name, field value, whether it is the primary key, and other properties of each field. This section introduces the interface definition of each Field class.
- Obtain the encoding format of this field value.
- Obtain the name of this field.
- Obtain the data type of this field. Refer to the following field type definition for definition of the type.
- Obtain the field value. The returned type is ByteString. When the value is empty, returns NULL.
- Check whether the field is the primary key column of the table. If yes, returns True. Otherwise returns False.