All Products
Search
Document Center

Data Lake Formation:Incremental logs

Last Updated:Dec 05, 2025

Incremental metadata logs record access details for DLF metadata, such as the request type, specific operation, and request time. These logs are written in real time as a stream to the request_logs system table in the system database. You can use compute engines such as Flink to directly query the request_logs table, or synchronize the data from the table to Kafka for flexible consumption by downstream systems. This topic describes how to use the request_logs table to record and consume incremental logs.

Details of the request_logs table

Table schema

The request_logs table is used to record access logs for DLF metadata. The table structure is defined as follows.

CREATE TABLE `request_logs` (
  `version` STRING COMMENT 'Version of the catalog event',
  `event` STRING COMMENT 'Enumeration value',
  `detail` text COMMENT 'Detailed data in JSON format',
  `identifier` STRING COMMENT 'Identifier for this event',
  `requested_by` STRING COMMENT 'Requesting user',
  `requested_at` BIGINT COMMENT 'Request time in milliseconds',
  `dt` STRING COMMENT 'Date partition, for example, 20250505'
) PARTITIONED BY (dt) WITH (
   'file.format' = 'avro',
   'partition.expiration-time' = '30 d',
   'partition.timestamp-formatter' = 'yyyyMMdd'
);

Event enumeration

The event field represents the event type and is an enumeration value. The structures of the identifier and detail fields vary for different event types and are described as follows:

Event

Identifier structure

Detail structure

GetDatabase

database

null

ListDatabases

null

{"maxResults":"","pageToken":"","databaseNamePattern":""}

(If parameters are passed)

CreateDatabase

database

CreateDatabaseRequest

Note

The xxxRequest definitions below are from the org.apache.paimon.rest.requests package.

DropDatabase

database

null

AlterDatabase

database

AlterDatabaseRequest

GetTable

database.object

null

ListTables

database

{"maxResults":"","pageToken":"","tableNamePattern":""}

(If parameters are passed)

ListTableDetails

database

{"maxResults":"","pageToken":"","tableNamePattern":""}

(If parameters are passed)

CreateTable

database.object

CreateTableRequest

AlterTable

database.object

AlterTableRequest

RenameTable

database.object

RenameTableRequest

DropTable

database.object

null

CommitTable

database.object

CommitTableRequest

LoadTableSnapshot

database.object

null

ListTableSnapshots

database.object

{"maxResults":"","pageToken":""}

(If parameters are passed)

RollbackTable

database.object

RollbackTableRequest

ListBranches

database.object

{"maxResults":"","pageToken":""}

(If parameters are passed)

CreateBranch

database.object

CreateBranchRequest

DropBranch

database.object

null

ForwardBranch

database.object

ForwardBranchRequest

ListPartitions

database.object

{"maxResults":"","pageToken":"","partitionNamePattern":""}

(If parameters are passed)

MarkDonePartitions

database.object

MarkDonePartitionsRequest

GetView

database.object

null

ListViews

database

{"maxResults":"","pageToken":"","viewNamePattern":""}

(If parameters are passed)

ListViewDetails

database

{"maxResults":"","pageToken":"","viewNamePattern":""}

(If parameters are passed)

CreateView

database.object

CreateViewRequest

AlterView

database.object

AlterViewRequest

RenameView

database.object

RenameTableRequest

DropView

database.object

null

GrantPermission

null

Example for granting the SELECT permission on a table:

{

"principal": "",

"access": "SELECT",

"catalog": "",

"database": "",

"table": "",

"resourceType": "TABLE"

}

RevokePermission

null

Same as above

BatchGrantPermissions

null

Example for granting the SELECT permission on a table. The permissions array can contain multiple grants:

{

"permissions": [

{

"principal": "",

"access": "SELECT",

"catalog": "",

"database": "",

"table": "",

"resourceType": "TABLE"

}

]

}

BatchRevokePermissions

null

Same as above

Procedure

Batch query

  1. Log on to the Realtime Compute for Apache Flink management console.

  2. Create a Paimon DLF catalog.

  3. In the text editor on the Data Exploration page, enter and run the SQL query.

    select * from `<Your_Flink_Catalog_Name>`.`system`.`request_logs` WHERE dt='20250601';

Stream consumption

Synchronize incremental logs to Kafka

You can use a compute engine such as Flink to directly query the system.request_logs system table. Alternatively, you can synchronize the logs to Kafka and develop a custom program to meet your business requirements.

  1. Log on to the ApsaraMQ for Kafka console, and then create and deploy a Kafka instance.

  2. Log on to the Realtime Compute for Apache Flink console and create a Paimon DLF catalog.

  3. On the ETL page, create a Flink stream job, and then deploy and start the job.

    CREATE TEMPORARY TABLE `kafka_sink` (
      version STRING,
      event STRING,
      detail STRING,
      identifier STRING,
      requested_by STRING,
      requested_at BIGINT,
      dt STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xxx',
      'properties.bootstrap.servers' = 'xxx',
      'properties.enable.idempotence'='false',
      'format' = 'json'
    );
    
    INSERT INTO `kafka_sink`
    SELECT *
    FROM `flink_test`.`system`.`request_logs`/*+ OPTIONS('scan.mode' = 'latest') */;

    The following table describes the parameters.

    Parameter

    Description

    connector

    Specifies that Kafka is used as the sink connector.

    topic

    The name of the topic in the Kafka instance. This is the destination topic where data will be written. You can obtain the topic name on the Topic Management page in the ApsaraMQ for Kafka console.

    properties.bootstrap.servers

    The endpoint of the Kafka cluster. You can obtain the endpoint from the Endpoint Information section on the instance details page in the ApsaraMQ for Kafka console.

    properties.enable.idempotence

    By default, topics of the cloud storage class in ApsaraMQ for Kafka instances do not support idempotence. You must disable idempotent writes. For more information, see Comparison of storage engines.

    format

    The data serialization format. Set this to JSON to match the data exchange format in Kafka.

  4. Kafka consumption: You can develop a custom program to consume data from Kafka as required.

Parse log data

  1. Define the RequestLog class based on the request_logs schema described previously.

    import com.fasterxml.jackson.annotation.JsonProperty;
    
    public class RequestLog {
    
        @JsonProperty("version")
        private String version;
    
        @JsonProperty("event")
        private String event;
    
        @JsonProperty("detail")
        private String detail;
    
        @JsonProperty("identifier")
        private String identifier;
    
        @JsonProperty("requested_by")
        private String requestedBy;
    
        @JsonProperty("requested_at")
        private long requestedAt;
    
        @JsonProperty("dt")
        private String dt;
    
        public void setVersion(String version) {
            this.version = version;
        }
    
        public void setEvent(String event) {
            this.event = event;
        }
    
        public void setDetail(String detail) {
            this.detail = detail;
        }
    
        public void setRequestedBy(String requestedBy) {
            this.requestedBy = requestedBy;
        }
    
        public void setRequestedAt(long requestedAt) {
            this.requestedAt = requestedAt;
        }
    
        public void setDt(String dt) {
            this.dt = dt;
        }
    
        public RequestLog() {}
    
        public String getVersion() {
            return version;
        }
    
        public String getEvent() {
            return event;
        }
    
        public String getDetail() {
            return detail;
        }
    
        public String getRequestedBy() {
            return requestedBy;
        }
    
        public long getRequestedAt() {
            return requestedAt;
        }
    
        public String getDt() {
            return dt;
        }
    
        public String getIdentifier() {
            return identifier;
        }
    
        public void setIdentifier(String identifier) {
            this.identifier = identifier;
        }    
    }
    
  2. Dynamically parse the detail field.

    Dynamically transform the detail field into a specific request object based on the event type. For information about how to configure the Maven dependency, see API guide.

    package com.aliyun.morax.flink;
    
    import org.apache.paimon.rest.RESTApi;
    import org.apache.paimon.rest.requests.CreateTableRequest;
    import org.apache.paimon.rest.requests.RenameTableRequest;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class RequestLogParser {
    
        // Map event types to classes.
        private static final Map<String, Class<?>> EVENT_CLASS_MAP = new HashMap<>();
    
        static {
            EVENT_CLASS_MAP.put("CreateTable", CreateTableRequest.class);
            EVENT_CLASS_MAP.put("RenameTable", RenameTableRequest.class);
            EVENT_CLASS_MAP.put("DropTable", DropTableRequest.class);
            EVENT_CLASS_MAP.put("CreateDatabase", CreateDatabaseRequest.class);
            EVENT_CLASS_MAP.put("DropDatabase", DropDatabaseRequest.class);
            EVENT_CLASS_MAP.put("AlterTable", AlterTableRequest.class);
            // More can be added.
        }
    
        public Object parseDetail(RequestLog log) {
            Class<?> clazz = EVENT_CLASS_MAP.get(log.getEvent());
            if (clazz == null) {
                throw new UnsupportedOperationException("Unsupported event: " + log.getEvent());
            }
            return fromJson(log.getDetail(), clazz);
        }
    
        private <T> T fromJson(String json, Class<T> clazz) {
            try {
                return RESTApi.fromJson(json, clazz);
            } catch (Exception e) {
                throw new RuntimeException("Failed to parse JSON for class: " + clazz.getSimpleName(), e);
            }
        }
        
    }
    

Example of a Kafka consumer program

To create a custom Kafka consumer program, see Use instance endpoints to send and receive messages. The following code provides an example.

package com.aliyun.morax.flink;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.paimon.rest.RESTApi;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.shade.com.fasterxml.jackson.databind.ObjectMapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumerWithRequestLogParser {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    private static final RequestLogParser parser = new RequestLogParser();

    public static void main(String[] args) {
        // For detailed code and parameter descriptions, see the official ApsaraMQ for Kafka developer documentation. This is an example.
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        // Other Properties configurations.

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        List<String> topics = new ArrayList<>();
        String topicStr = kafkaProperties.getProperty("topic");
        for (String topic : topicStr.split(",")) {
            topics.add(topic.trim());
        }
        consumer.subscribe(topics);

        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // Parse RequestLog.
                        RequestLog log = objectMapper.readValue(record.value(), RequestLog.class);
                        // Parse the Detail field.
                        Object detail = parser.parseDetail(log);

                        // Process the detail.
                    } catch (Exception e) {
                        System.err.println("Error processing record: " + record.offset());
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ignore) {
                    Thread.currentThread().interrupt();
                }
                System.err.println("Consumer error, retrying...");
                e.printStackTrace();
            }
        }
    }

}