Incremental metadata logs record access details for DLF metadata, such as the request type, specific operation, and request time. These logs are written in real time as a stream to the request_logs system table in the system database. You can use compute engines such as Flink to directly query the request_logs table, or synchronize the data from the table to Kafka for flexible consumption by downstream systems. This topic describes how to use the request_logs table to record and consume incremental logs.
Details of the request_logs table
Table schema
The request_logs table is used to record access logs for DLF metadata. The table structure is defined as follows.
CREATE TABLE `request_logs` (
`version` STRING COMMENT 'Version of the catalog event',
`event` STRING COMMENT 'Enumeration value',
`detail` text COMMENT 'Detailed data in JSON format',
`identifier` STRING COMMENT 'Identifier for this event',
`requested_by` STRING COMMENT 'Requesting user',
`requested_at` BIGINT COMMENT 'Request time in milliseconds',
`dt` STRING COMMENT 'Date partition, for example, 20250505'
) PARTITIONED BY (dt) WITH (
'file.format' = 'avro',
'partition.expiration-time' = '30 d',
'partition.timestamp-formatter' = 'yyyyMMdd'
);Event enumeration
The event field represents the event type and is an enumeration value. The structures of the identifier and detail fields vary for different event types and are described as follows:
Event | Identifier structure | Detail structure |
GetDatabase | database | null |
ListDatabases | null | {"maxResults":"","pageToken":"","databaseNamePattern":""} (If parameters are passed) |
CreateDatabase | database | CreateDatabaseRequest Note The xxxRequest definitions below are from the org.apache.paimon.rest.requests package. |
DropDatabase | database | null |
AlterDatabase | database | AlterDatabaseRequest |
GetTable | database.object | null |
ListTables | database | {"maxResults":"","pageToken":"","tableNamePattern":""} (If parameters are passed) |
ListTableDetails | database | {"maxResults":"","pageToken":"","tableNamePattern":""} (If parameters are passed) |
CreateTable | database.object | CreateTableRequest |
AlterTable | database.object | AlterTableRequest |
RenameTable | database.object | RenameTableRequest |
DropTable | database.object | null |
CommitTable | database.object | CommitTableRequest |
LoadTableSnapshot | database.object | null |
ListTableSnapshots | database.object | {"maxResults":"","pageToken":""} (If parameters are passed) |
RollbackTable | database.object | RollbackTableRequest |
ListBranches | database.object | {"maxResults":"","pageToken":""} (If parameters are passed) |
CreateBranch | database.object | CreateBranchRequest |
DropBranch | database.object | null |
ForwardBranch | database.object | ForwardBranchRequest |
ListPartitions | database.object | {"maxResults":"","pageToken":"","partitionNamePattern":""} (If parameters are passed) |
MarkDonePartitions | database.object | MarkDonePartitionsRequest |
GetView | database.object | null |
ListViews | database | {"maxResults":"","pageToken":"","viewNamePattern":""} (If parameters are passed) |
ListViewDetails | database | {"maxResults":"","pageToken":"","viewNamePattern":""} (If parameters are passed) |
CreateView | database.object | CreateViewRequest |
AlterView | database.object | AlterViewRequest |
RenameView | database.object | RenameTableRequest |
DropView | database.object | null |
GrantPermission | null | Example for granting the SELECT permission on a table: { "principal": "", "access": "SELECT", "catalog": "", "database": "", "table": "", "resourceType": "TABLE" } |
RevokePermission | null | Same as above |
BatchGrantPermissions | null | Example for granting the SELECT permission on a table. The permissions array can contain multiple grants: { "permissions": [ { "principal": "", "access": "SELECT", "catalog": "", "database": "", "table": "", "resourceType": "TABLE" } ] } |
BatchRevokePermissions | null | Same as above |
Procedure
Batch query
Log on to the Realtime Compute for Apache Flink management console.
In the text editor on the Data Exploration page, enter and run the SQL query.
select * from `<Your_Flink_Catalog_Name>`.`system`.`request_logs` WHERE dt='20250601';
Stream consumption
Synchronize incremental logs to Kafka
You can use a compute engine such as Flink to directly query the system.request_logs system table. Alternatively, you can synchronize the logs to Kafka and develop a custom program to meet your business requirements.
Log on to the ApsaraMQ for Kafka console, and then create and deploy a Kafka instance.
Log on to the Realtime Compute for Apache Flink console and create a Paimon DLF catalog.
On the ETL page, create a Flink stream job, and then deploy and start the job.
CREATE TEMPORARY TABLE `kafka_sink` ( version STRING, event STRING, detail STRING, identifier STRING, requested_by STRING, requested_at BIGINT, dt STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'xxx', 'properties.enable.idempotence'='false', 'format' = 'json' ); INSERT INTO `kafka_sink` SELECT * FROM `flink_test`.`system`.`request_logs`/*+ OPTIONS('scan.mode' = 'latest') */;The following table describes the parameters.
Parameter
Description
connector
Specifies that Kafka is used as the sink connector.
topic
The name of the topic in the Kafka instance. This is the destination topic where data will be written. You can obtain the topic name on the Topic Management page in the ApsaraMQ for Kafka console.
properties.bootstrap.servers
The endpoint of the Kafka cluster. You can obtain the endpoint from the Endpoint Information section on the instance details page in the ApsaraMQ for Kafka console.
properties.enable.idempotence
By default, topics of the cloud storage class in ApsaraMQ for Kafka instances do not support idempotence. You must disable idempotent writes. For more information, see Comparison of storage engines.
format
The data serialization format. Set this to JSON to match the data exchange format in Kafka.
Kafka consumption: You can develop a custom program to consume data from Kafka as required.
Parse log data
Define the RequestLog class based on the request_logs schema described previously.
import com.fasterxml.jackson.annotation.JsonProperty; public class RequestLog { @JsonProperty("version") private String version; @JsonProperty("event") private String event; @JsonProperty("detail") private String detail; @JsonProperty("identifier") private String identifier; @JsonProperty("requested_by") private String requestedBy; @JsonProperty("requested_at") private long requestedAt; @JsonProperty("dt") private String dt; public void setVersion(String version) { this.version = version; } public void setEvent(String event) { this.event = event; } public void setDetail(String detail) { this.detail = detail; } public void setRequestedBy(String requestedBy) { this.requestedBy = requestedBy; } public void setRequestedAt(long requestedAt) { this.requestedAt = requestedAt; } public void setDt(String dt) { this.dt = dt; } public RequestLog() {} public String getVersion() { return version; } public String getEvent() { return event; } public String getDetail() { return detail; } public String getRequestedBy() { return requestedBy; } public long getRequestedAt() { return requestedAt; } public String getDt() { return dt; } public String getIdentifier() { return identifier; } public void setIdentifier(String identifier) { this.identifier = identifier; } }Dynamically parse the detail field.
Dynamically transform the detail field into a specific request object based on the event type. For information about how to configure the Maven dependency, see API guide.
package com.aliyun.morax.flink; import org.apache.paimon.rest.RESTApi; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import java.util.HashMap; import java.util.Map; public class RequestLogParser { // Map event types to classes. private static final Map<String, Class<?>> EVENT_CLASS_MAP = new HashMap<>(); static { EVENT_CLASS_MAP.put("CreateTable", CreateTableRequest.class); EVENT_CLASS_MAP.put("RenameTable", RenameTableRequest.class); EVENT_CLASS_MAP.put("DropTable", DropTableRequest.class); EVENT_CLASS_MAP.put("CreateDatabase", CreateDatabaseRequest.class); EVENT_CLASS_MAP.put("DropDatabase", DropDatabaseRequest.class); EVENT_CLASS_MAP.put("AlterTable", AlterTableRequest.class); // More can be added. } public Object parseDetail(RequestLog log) { Class<?> clazz = EVENT_CLASS_MAP.get(log.getEvent()); if (clazz == null) { throw new UnsupportedOperationException("Unsupported event: " + log.getEvent()); } return fromJson(log.getDetail(), clazz); } private <T> T fromJson(String json, Class<T> clazz) { try { return RESTApi.fromJson(json, clazz); } catch (Exception e) { throw new RuntimeException("Failed to parse JSON for class: " + clazz.getSimpleName(), e); } } }
Example of a Kafka consumer program
To create a custom Kafka consumer program, see Use instance endpoints to send and receive messages. The following code provides an example.
package com.aliyun.morax.flink;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.paimon.rest.RESTApi;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.shade.com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerWithRequestLogParser {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final RequestLogParser parser = new RequestLogParser();
public static void main(String[] args) {
// For detailed code and parameter descriptions, see the official ApsaraMQ for Kafka developer documentation. This is an example.
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
// Other Properties configurations.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> topics = new ArrayList<>();
String topicStr = kafkaProperties.getProperty("topic");
for (String topic : topicStr.split(",")) {
topics.add(topic.trim());
}
consumer.subscribe(topics);
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
try {
// Parse RequestLog.
RequestLog log = objectMapper.readValue(record.value(), RequestLog.class);
// Parse the Detail field.
Object detail = parser.parseDetail(log);
// Process the detail.
} catch (Exception e) {
System.err.println("Error processing record: " + record.offset());
e.printStackTrace();
}
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
System.err.println("Consumer error, retrying...");
e.printStackTrace();
}
}
}
}