使用DTS提供的SDK订阅到的数据为自定义的格式,本文介绍各类SQL语句解析的示例代码。
DDL解析
如果源库执行了DDL操作,记录(Record)的操作类型即为DDL,且DDL语句存储在第一列的值中。获取DDL语句的代码示例如下:
String ddl_string;
Record.Type type=record.getOpt();
if(type.equals(Record.Type.DDL)){
List<DataMessage.Record.Field> fields = record.getFieldList();
ddl_string = fields.get(0).getValue().toString();
}
INSERT解析
如果源库执行了INSERT操作,记录的操作类型即为INSERT,获取INSERT完整语句的代码示例如下:
StringBuilder insert_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder FieldName=new StringBuilder();
StringBuilder FieldValue = new StringBuilder();
if(type.equals(Record.Type.INSERT)){
int i=0;
List<DataMessage.Record.Field> fields = record.getFieldList();
for (; i < fields.size(); i++) {
field = fields.get(i); FieldName.append('`'+field.getFieldname().toLowerCase()+'`');
FieldValue.append("'"+field.getValue()+"'");
if (i != fields.size() - 1) {
FieldName.append(',');
FieldValue.append(',');
}
}
insert_string.append("insert "+ record.getTablename()+"("+FieldName.toString()+") values("+FieldValue.toString()+");");
}
UPDATE解析
如果源库执行了UPDATE操作,记录的操作类型即为UPDATE。数据更新前的字段存储在Record.getFieldList()
中索引为偶数的Field,更新后的字段值存储在索引为奇数的Field。
当执行UPDATE的表具备主键时,获取UPDATE完整语句的代码示例如下:
StringBuilder update_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder SetValue = new StringBuilder();
StringBuilder WhereCondition = new StringBuilder();
String ConditionStr;
boolean hasPk=false;
boolean pkMode=false;
boolean hasSet=false;
if(type.equals(Record.Type.UPDATE)){
int i=0;
DataMessage.Record.Field OldField = null;
DataMessage.Record.Field NewField = null;
List<DataMessage.Record.Field> fields = record.getFieldList();
for (; i <fields.size() ; i++) {
if (i % 2 == 0) {
OldField = fields.get(i);
continue;
}
NewField = fields.get(i);
field = NewField;
if (field.isPrimary()) {
if (hasPk) {
WhereCondition.append(" and ");
}
//where old value
ConditionStr = getFieldValue(OldField);
if(ConditionStr==null){ WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`" + " " + "is null");
}else{
WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`"+" = "+ "'"+OldField.getValue()+"'");
}
hasPk = true;
}
if (hasSet) {
SetValue.append(",");
}
SetValue.append("`"+field.getFieldname().toLowerCase()+"`" + " = " + "'"+field.getValue()+"'");
String setStr = getFieldValue(field);
hasSet = true;
}
update_string.append("Update "+record.getTablename() +" Set " + SetValue + " Where "+WhereCondition +";");
}
protected String getFieldValue(Field field) throws Exception {
ByteString byteString = field.getValue();
if (byteString == null) {
return null;
}
else {
String value;
if (field.getType() == com.aliyun.drc.client.message.DataMessage.Record.Field.Type.STRING && field.getEncoding() != null && field.getEncoding() != "ASCII") {
value = field.getValue().toString(field.getEncoding());
}
else {
value = byteString.toString();
}
return value;
}
}
DELETE解析
如果源库执行了DELETE语句,该记录的操作类型为DELETE。当执行DELETE的表具备主键时,获取DELETE完整语句的代码示例如下:
StringBuilder delete_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder FieldName=new StringBuilder();
StringBuilder FieldValue = new StringBuilder();
StringBuilder DeleteCondition = new StringBuilder();
boolean hasPk=false;
boolean pkMode=false;
if(type.equals(Record.Type.DELETE)){
int i=0;
List<DataMessage.Record.Field> fields = record.getFieldList();
delete_string.append("Delete From" + record.getTablename() + "where");
// 表是否有主键?
if (record.getPrimaryKeys() != null) {
pkMode = record.getPrimaryKeys().length() > 0 ? true : false;
}
for (; i < fields.size(); i++) {
if ((pkMode && !field.isPrimary())) {
continue;
}
if (hasPk) {
delete_string.append(" and ");
}
delete_string.append(field.getFieldname() + "=" + field.getValue());
hasPk = true;
}
delete_string.append(";");
}
REPLACE解析
如果源库执行了REPLACE操作,该记录的操作类型即为UPDATE或INSERT。
- 当REPLACE设置的值不存在时,该记录的操作类型为INSERT。
- 当REPLACE设置的值存在时,该记录的操作类型为UPDATE。
BEGIN解析
如果源库执行了BEGIN操作,该记录的操作类型即为BEGIN。由于BEGIN语句没有实际的内容,只需要判断操作类型,无需对Field进行处理,代码示例如下:
StringBuilder sql_string = new StringBuilder();
Record.Type type = record.getOpt();
if(type.equals(Record.Type.BEGIN)){
sql_string.append("Begin");
}
COMMIT解析
如果源库执行了COMMIT操作,该记录的操作类型即为COMMIT。由于COMMIT语句没有实际的内容,只需要判断操作类型,无需对Field进行处理,代码示例如下:
StringBuilder sql_string = new StringBuilder();
Record.Type type = record.getOpt();
if(type.equals(Record.Type.COMMIT)){
sql_string.append("commit");
}