本文介绍DTS提供的SDK示例代码的运行流程,并提供示例供您参考。
SDK示例代码下载
下载DtsSubscribeDemo并解压后,您需要使用文本编辑工具打开pom.xml文件,将SDK的版本修改为最新版本。
说明 您可以在Maven网站中获取最新的数据订阅SDK版本,详情请参见数据订阅SDK的Maven页面。
初始化RegionContext
RegionContext
主要用于保存安全认证凭证及网络访问模式的设置,下述代码为您演示如何始化RegionContext
。
import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 创建RegionContext
RegionContext context = new RegionContext();
// 配置阿里云账号的AccessKey ID和AccessKey Secret
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
// 是否通过公网连接数据订阅通道
context.setUsePublicIp(true);
// 设置传输数据格式为二进制格式
context.setUseBinary(true);
// 开启网络优化模式
context.setDrcNet(true);
…………
}
}
初始化ClusterClient
ClusterClient
主要用于连接数据订阅通道和接收增量变更数据,下述代码为您演示如何初始化ClusterClient
。
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 创建RegionContext
RegionContext context = new RegionContext();
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
context.setUsePublicIp(true);
// 创建ClusterClient
final ClusterClient client = new DefaultClusterClient(context);
……………
}
}
初始化Listener
Listener
通过定义notify
函数来接受订阅数据并完成数据消费。下述代码为您演示简易的消费逻辑(将订阅到的增量数据输出至屏幕)。
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 初始化RegionContext
………
// 初始化ClusterClient
………
ClusterListener listener = new ClusterListener(){
@Override
public void notify(List<ClusterMessage> messages) throws Exception {
for (ClusterMessage message : messages) {
// 将订阅到的增量数据输出至屏幕
System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
+ message.getRecord().getOpt());
// 完成消费后向DTS服务器发送ACK确认信息(必须调用)
message.ackAsConsumed();
}
}
}
}
说明 由于
ackAsConsumed()
接口会将SDK消费的最新一条数据的位点及时间戳发送给DTS服务器,如果SDK运行出现故障,再次启动SDK时,会自动从DTS服务器上获取该消费时间点并继续消费,避免消费到重复的数据。
启动ClusterClient
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 初始化RegionContext
…………
// 初始化ClusterClient
…………
// 初始化ClusterListener
…………
// 添加监听者
client.addConcurrentListener(listener);
// 设置订阅通道ID
client.askForGUID("dts_rdsr******_DSF");
// 启动后台线程(主线程不能退出)
client.start();
}
在启动ClusterClient
之前,需要将Listener
添加到ClusterClient
中,当ClusterClient
从订阅通道中获取到增量数据时,会同步回调Listener
的notify
函数来执行数据消费。