数据管理DMS支持将操作日志导出至阿里云日志服务(SLS),便于对操作日志进行加工和分析。
前提条件
已开通日志服务。更多信息,请参见开通日志服务。
已创建日志服务Project和Logstore。具体操作,请参见管理Project和创建基础Logstore。
已在DMS的实例管理中将创建的Project录入至DMS。录入实例的操作,请参见云数据库录入。
导出的目标Logstore必须为空,且全文索引和字段索引必须至少启用一种。全文索引和字段索引的详细信息,请参见创建索引。
背景信息
操作日志指用户通过DMS系统发起所有操作的流水账式日志,详情请参见功能说明。
费用说明
操作步骤
- 登录数据管理DMS 5.0。
单击控制台左上角的
图标,选择。说明若您使用的是非极简模式的控制台,在顶部菜单栏中,选择。
选择导出日志页签,单击右上角新建任务。
在新建导出任务对话框中,配置如下信息。
配置项
是否必填
说明
任务名称
是
导出任务名称,便于后续查找。
目标SLS
是
日志服务的资源管理单元Project。
SLS Logstore
是
将日志导出至创建的Logstore。该Logstore需要创建索引并不含任何日志条目。
说明您可单击同步字典,再单击确认,DMS可自动采集Logstore中的元数据信息。
功能模块
是
选择需要导出DMS哪些功能模块的日志(与操作日志中的模块对应),包含不限、权限、数据Owner、数据查询、查询结果导出、跨库查询结果导出等功能,默认选择不限。
调度方式
是
选择本次任务的调度方式。
单次:指成功创建导出任务后,仅导出一次。
周期:可选择按日、周或月循环多次导出日志至Logstore。周期调度任务第一次会导出从日志开始时间到第一次调度开始时间范围内,您在DMS生成的所有操作日志,后续仅导出增量的日志。具体配置,请参见周期调度。
日志时间范围
否
说明调度方式选择单次时会出现此配置项。
导出某时间范围内的日志。不填写该配置项则默认导出三年内的日志。
日志开始时间
否
说明调度方式选择周期时会出现此配置项。
周期任务没有截止时间。
DMS日志记录的开始时间,不填写则默认为创建导出任务时间对应三年前的时间。
单击确认,会创建一个导出日志任务,同时,系统还会在您的Logstore中创建一些用于后续查询分析数据的索引字段,例如dbId、dbName、dbUser等。
对于单次任务,仅导出一次日志,当任务的状态为运行成功时,表示日志导出成功。
说明因Logstore索引延迟生效,所以单次调度任务会在创建成功后的90秒左右开始执行。
对于周期任务,会多次导出日志,且导出前和导出后的任务状态均为待调度。您可通过查看任务日志,判断某次任务是否执行成功。
您还可以在目标任务行操作列下进行如下操作。
查询:单击查询,系统自动跳转至SQL Console页面,单击查询,在页面下方的执行结果区域可查看导出至Logstore的日志。
任务日志:单击任务日志,查看任务开始、结束时间、投递日志数量、任务状态等信息。
暂停:单击暂停,在弹出的提示对话框中,单击确认,周期任务会被暂停执行。
重启:单击重启,在弹出的提示对话框中,单击确认,可重新启动已被暂停执行的周期任务。
周期调度
配置项 | 说明 |
调度周期 | 选择调度任务的周期:
|
指定时间 |
|
具体时间 | 设置执行任务流的具体时间。 例如配置02:55,系统将在指定天的02时55分执行任务。 |
cron表达式 | 不需要手动配置,系统会根据您配置的周期、具体时间自动展现。 |
代码实现
也可以通过Java代码的方式实现日志的导出。
对应的账号需具备DMS的
GetOpLog权限,以及 SLS 的PutLogs和CreateIndex权限。为提升安全性,建议在工程代码中采用无 AK 的凭据配置方式,具体配置方法请参见:管理访问凭据。
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dms_enterprise20181101</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.100</version>
</dependency>package org.example;
import com.aliyun.dms_enterprise20181101.models.GetOpLogResponse;
import com.aliyun.dms_enterprise20181101.models.GetOpLogResponseBody;
import com.aliyun.openservices.log.common.LogItem;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
public class ExportDmsOperLogExample {
private static com.aliyun.dms_enterprise20181101.Client dmsClient = null;
private static com.aliyun.openservices.log.Client slsClient = null;
public static com.aliyun.credentials.Client getCredentialClient() {
com.aliyun.credentials.models.Config credentialConfig = new com.aliyun.credentials.models.Config();
credentialConfig.setType("access_key");
// 必填参数,此处以从环境变量中获取AccessKey ID为例
credentialConfig.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// 必填参数,此处以从环境变量中获取AccessKey Secret为例
credentialConfig.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
return new com.aliyun.credentials.Client(credentialConfig);
}
// 创建DMS OpenAPI Client,用于调用GetOpLog
public static synchronized com.aliyun.dms_enterprise20181101.Client createDmsClient() throws Exception {
if (dmsClient != null) {
return dmsClient;
}
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
.setCredential(getCredentialClient());
// Endpoint 请参考 https://api.aliyun.com/product/dms-enterprise
config.endpoint = "dms-enterprise.cn-hangzhou.aliyuncs.com";
dmsClient = new com.aliyun.dms_enterprise20181101.Client(config);
return dmsClient;
}
// 创建SLS OpenAPI Client,用于调用PutLogs
public static synchronized com.aliyun.openservices.log.Client createSlsClient() throws Exception {
if (slsClient != null) {
return slsClient;
}
com.aliyun.credentials.Client credentialClient = getCredentialClient();
// Endpoint 请参考 https://api.aliyun.com/product/Sls
String endpoint = "cn-hangzhou.log.aliyuncs.com";
slsClient = new com.aliyun.openservices.log.Client(endpoint, credentialClient.getAccessKeyId(), credentialClient.getAccessKeySecret());
return slsClient;
}
// 调用DMS OpenAPI GetOpLog 获取操作日志
public static List<GetOpLogResponseBody.GetOpLogResponseBodyOpLogDetailsOpLogDetail> getLogs(com.aliyun.dms_enterprise20181101.Client client, String startTime, String endTime, Integer pageNumber, Integer pageSize) {
com.aliyun.dms_enterprise20181101.models.GetOpLogRequest getOpLogRequest = new com.aliyun.dms_enterprise20181101.models.GetOpLogRequest()
.setStartTime(startTime)
.setEndTime(endTime)
.setPageSize(pageSize)
.setPageNumber(pageNumber);
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
try {
GetOpLogResponse response = client.getOpLogWithOptions(getOpLogRequest, runtime);
return Optional.ofNullable(response.getBody())
.map(GetOpLogResponseBody::getOpLogDetails)
.map(GetOpLogResponseBody.GetOpLogResponseBodyOpLogDetails::getOpLogDetail)
.orElse(new ArrayList<>());
} catch (Exception e) {
System.out.println(e.getMessage());
throw new RuntimeException(e.getMessage());
}
}
// 调用PutLogs将日志导入SLS LogStore
public static void putLogs(com.aliyun.openservices.log.Client client, String project, String logStore, List<GetOpLogResponseBody.GetOpLogResponseBodyOpLogDetailsOpLogDetail> logDetailList) {
List<LogItem> logItemList = new ArrayList<>();
for (GetOpLogResponseBody.GetOpLogResponseBodyOpLogDetailsOpLogDetail logDetail : logDetailList) {
LogItem logItem = new LogItem((int) (new Date().getTime() / 1000));
logItem.PushBack("module", logDetail.getModule());
logItem.PushBack("database", logDetail.getDatabase());
logItem.PushBack("userId", logDetail.getUserId());
logItem.PushBack("opUserId", String.valueOf(logDetail.getOpUserId()));
logItem.PushBack("userNick", logDetail.getUserNick());
logItem.PushBack("opTime", logDetail.getOpTime());
logItem.PushBack("opContent", logDetail.getOpContent());
logItem.PushBack("orderId", String.valueOf(logDetail.getOrderId()));
logItemList.add(logItem);
}
try {
client.PutLogs(project, logStore, "", logItemList, "");
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
throw new RuntimeException(e.getMessage());
}
}
// 创建索引,一个logStore初始化一次即可
public static void createIndex(com.aliyun.openservices.log.Client client, String project, String logStore) {
try {
GetIndexResponse getIndexResponse = client.GetIndex(project, logStore);
Index index = getIndexResponse.GetIndex();
IndexLine indexLine = index.GetLine();
List<String> indexToken = null;
if (indexLine != null) {
indexToken = indexLine.GetToken();
}
IndexKeys keys = new IndexKeys();
List<String> logStoreKeys = List.of("module", "database", "userId", "opUserId",
"userNick", "opTime", "opContent", "orderId");
for (String logStoreKey : logStoreKeys) {
IndexKey key = new IndexKey();
key.SetType("text");
key.SetChn(true);
key.SetDocValue(false);
key.SetToken(indexToken);
keys.AddKey(logStoreKey, key);
index.SetKeys(keys);
}
client.UpdateIndex(project, logStore, index);
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
throw new RuntimeException(e.getMessage());
}
}
public static void main(String[] args) throws Exception {
// DMS 操作日志开始时间(精确到秒)
String startStr = "2025-01-10 14:30:00";
// DMS 操作日志结束时间(精确到秒)
String endStr = "2025-01-13 09:15:00";
// SLS project名
String project = "project";
// SLS logStore名
String logStore = "logStore";
// 创建sls logStore索引,一个logStore创建一次即可
createIndex(createSlsClient(), project, logStore);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime start = LocalDateTime.parse(startStr, formatter);
LocalDateTime end = LocalDateTime.parse(endStr, formatter);
if (start.isAfter(end)) {
throw new IllegalArgumentException("开始时间不能晚于结束时间");
}
LocalDateTime current = start;
// DMS日志量较大,为避免OpenAPI请求超时,每次请求1天的日志
while (!current.isAfter(end)) {
// 计算当天结束时间:取 (当天23:59:59) 和 (总结束时间) 的较小值
LocalDateTime dayEnd = current.toLocalDate().atTime(23, 59, 59);
if (dayEnd.isAfter(end)) {
dayEnd = end;
}
String segmentStartStr = current.format(formatter);
String segmentEndStr = dayEnd.format(formatter);
// 业务处理逻辑:先获取DMS日志,再导入至SLS
process(segmentStartStr, segmentEndStr, project, logStore);
// 移动到下一天的 00:00:00
current = dayEnd.toLocalDate().plusDays(1).atStartOfDay();
}
}
public static void process(String startTime, String endTime, String project, String logStore) throws Exception {
int pageNumber = 1;
int pageSize = 100;
com.aliyun.dms_enterprise20181101.Client dmsClient = createDmsClient();
com.aliyun.openservices.log.Client slsClient = createSlsClient();
List<GetOpLogResponseBody.GetOpLogResponseBodyOpLogDetailsOpLogDetail> logs = getLogs(dmsClient, startTime, endTime, pageNumber, pageSize);
while(logs.size() >= 100) {
putLogs(slsClient, project, logStore, logs);
pageNumber++;
logs = getLogs(dmsClient, startTime, endTime, pageNumber, pageSize);
}
}
}
相关文档
将DMS的操作日志导出到SLS后,您可能需要进行查询、分析日志及后续操作。具体操作,请参见在日志服务SLS中查询、分析DMS的操作日志。