All Products
Search
Document Center

Realtime Compute for Apache Flink:Configure log export

Last Updated:Nov 05, 2025

You can export job logs at specific levels to external storage, such as Object Storage Service (OSS), Simple Log Service (SLS), and Kafka. This topic describes how to configure log export settings.

Usage notes

  • After you configure log export settings, restart the job.

  • After you configure log export settings, if Allow Log Archives remains on, job logs continue to be stored in the OSS bucket associated with the workspace. If you turn off Allow Log Archives, you can no longer view the logs in the console.

    image

  • You can use namespace variables in the ${secret_values.xxxx} format in the log configuration.

Configure log export for a single job

Via console

  1. Go to the log settings page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the target workspace and click Console in the Actions column.

    3. In the left-side navigation pane, choose O&M > Deployments, and select the target deployment.

    4. On the Configuration tab of the deployment details page, in the Logging section, click Edit in the upper-right corner.

    5. For Logging Profile, select Custom Template.

  2. Click Apply with System Profiles > default.

  3. Click Add Appender and select the target storage system.

  4. Configure the log output settings.

    To export logs at different levels to different storage systems, configure log filter rules for appenders.

    Export logs to SLS

    image

    Item

    Description

    name

    Enter your custom Appender name.

    type

    The output channel type, fixed to SLS.

    pattern

    The log export format.

    The default value is %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n, which generates log content such as 2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message.

    project

    Enter your SLS project name.

    logStore

    The Logstore name.

    endpoint

    Enter the private endpoint of your SLS project. For more information, see Endpoints.

    accessKeyId

    The AccessKey ID and secret used to access an SLS project. For more information, see Obtain an AccessKey pair.

    Important

    To enhance security, use namespace variables rather than enter your AccessKey pair in plaintext.

    Note

    To export job logs to an SLS project across accounts, create a custom policy by using the Alibaba Cloud account owning the SLS project, and grant that custom policy to the RAM role assumed by the account that manages Realtime Compute for Apache Flink resources. Below is the JSON code for the custom policy:

    • Access all SLS resources:

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Access specific SLS resources

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:ap-southeast-1:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    accessKeySecret

    flushIntervalSeconds

    Enter the time interval of exporting job logs to SLS. Unit: seconds.

    flushIntervalEventCount

    Enter the number of log entries to collect and send to SLS in a single batch.

    Note

    When both flushIntervalEventCount and flushIntervalSeconds are configured, logs are sent to SLS when any of the conditions is met.

    Export logs to OSS

    image

    Item

    Description

    name

    Enter your custom Appender name.

    type

    The output channel type, fixed to OSS.

    pattern

    The log output format.

    The default value is %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

    baseUri

    Enter your OSS bucket name.

    endpoint

    Enter the region-specific internal endpoint of OSS. For more information, see OSS regions and endpoints.

    You can also find the endpoint information in the OSS console:

    1. Click your bucket name.

    2. In the bucket details page, select Overview in the middle navigation pane.

    3. In the Port section, find the row Access from ECS over the VPC (internal network), and copy the corresponding value in the Endpoint column.

    accessKeyId

    Enter the AccessKey ID and secret used to access your OSS bucket across accounts. For more information, see Obtain an AccessKey pair.

    Important

    To enhance security, use namespace variables rather than enter your AccessKey pair in plaintext.

    Note

    If you access an OSS bucket within the same account, skip these configurations.

    secretAccessKey

    flushIntervalSeconds

    Enter the time interval at which logs are written to OSS. Unit: seconds.

    flushIntervalEventCount

    Enter the number of log entries to collect and send to OSS in a single batch.

    Note

    When both flushIntervalEventCount and flushIntervalSeconds are configured, logs are sent to OSS when any of the conditions is met.

    rollingBytes

    Enter the size of a single log file in OSS. Once the limit is reached, data is written to a new log file.

    Export logs to Kafka

    Note

    Exporting logs to a Kerberized Kafka cluster is not supported.

    • Prerequisites

      Add the code below by performing the steps:

      1. In the job details page, select the Configuration tab.

      2. In the Parameters section, click Edit.

      3. Copy and paste the following code to the Other Configuration field.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender

      The code snippet explicitly specifies the path of the KafkaAppender package, which is a logging plug-in provided by Realtime Compute for Apache Flink. This ensures Realtime Compute for Apache Flink can load the logging plug-in.

    • Configure log export

      image

      Item

      Description

      name

      Enter your custom Appender name.

      type

      The output channel type, fixed to KafkaVVP.

      pattern

      The log output format.

      The default value is %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

      bootstrap.servers

      Enter endpoints of Kafka brokers.

      acks

      Specify the number of partition replicas which have received a message before the Kafka producer determines the message has been written to the topic. For more information, see acks.

      buffer.memory

      Enter the producer buffer size. Unit: bytes.

      retries

      Enter the maximum number of retries allowed after a message fails to be sent.

      compression.type

      Enter the compression type used by the producer to generate data. Valid values: none, gzip, snappy, lz4, and zstd.

  5. Click Save.

  6. Start the deployment.

Via XML

  1. Go to the configuration page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the target workspace and click Console in the Actions column.

    3. In the left-side navigation pane, click O&M > Deployments, and then click the name of the target deployment.

    4. On the Configuration tab, in the Logging section, click Edit in the upper-right corner.

    5. Set Logging Profile to Custom Template.

    6. Click Edit XML.

  2. Configure log export settings.

    According to your log storage system, copy and paste the following code to the XML editor and replace the placeholder values with actual ones. To export logs at different levels to different storage systems, configure different log filter rules.

    Export logs to OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    Placeholder

    Description

    YOUR-BUCKET-NAME

    Replace it with the name of your OSS bucket.

    YOUR-ENDPOINT

    Replace it with the region-specific internal endpoint of OSS. For more information, see OSS regions and endpoints.

    You can also find the endpoint information in the OSS console:

    1. Click your bucket name.

    2. In the bucket details page, select Overview in the middle navigation pane.

    3. In the Port section, find the row Access from ECS over the VPC (internal network), and copy the corresponding value in the Endpoint column.

    YOUR-OSS-ACCESSKEYID

    Replace them with your AccessKey ID and secret used to access OSS across accounts. For information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    Important

    To enhance security, use namespace variables rather than enter your AccessKey pair in plaintext.

    Note

    If you access an OSS bucket within the same account, skip these configurations.

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    The time interval at which logs are written to OSS. Unit: seconds.

    flushIntervalEventCount

    Enter the number of log entries to collect and send to OSS in a single batch.

    Note

    When both flushIntervalEventCount and flushIntervalSeconds are configured, logs are sent to OSS when any of the conditions is met.

    rollingBytes

    Enter the size of a single log file in OSS. Once the limit is reached, data is written to a new log file.

    Export logs to SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    Note

    To prevent startup errors, don't modify the Twig variables in the code, including namespace, deploymentId, jobId, and deploymentName.

    Placeholder parameter

    Description

    YOUR-SLS-PROJECT

    Replace it with your SLS project name.

    YOUR-SLS-LOGSTORE

    Replace it with the Logstore name of Simple Log Service.

    YOUR-SLS-ENDPOINT

    Replace it with the region-specific private endpoint of SLS. For more information, see Endpoints.

    YOUR-SLS-ACCESSKEYID

    The AccessKey ID and secret used to access an SLS project. For more information, see Obtain an AccessKey pair.

    Important

    To enhance security, use namespace variables rather than enter your AccessKey pair in plaintext.

    Note

    To export job logs to an SLS project across accounts, create a custom policy by using the Alibaba Cloud account owning the SLS project, and grant that custom policy to the RAM role assumed by the account that manages Realtime Compute for Apache Flink resources. Below is the JSON code for the custom policy:

    • Access all SLS resources:

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Access specific SLS resources

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:ap-southeast-1:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    Enter the time interval of exporting job logs to SLS. Unit: seconds.

    flushIntervalEventCount

    Enter the number of log entries to collect and send to SLS in a single batch.

    Note

    When both flushIntervalEventCount and flushIntervalSeconds are configured, logs are sent to SLS when any of the conditions is met.

    Export logs to Kafka

    Note

    A Kafka cluster with Kerberos authentication enabled is not supported.

    • Prerequisites

      Explicitly specify the path of the KafkaAppender package, which is a logging plug-in provided by Realtime Compute for Apache Flink. This ensures Realtime Compute for Apache Flink can load the logging plug-in. Do the following:

      • Apply the setting to all jobs in the namespace:

        1. Go to O&M > Configurations.

        2. In the Other Configuration section, copy and paste the following code, and save changes.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • Apply the setting to a single job:

        1. In the deployment details page, select the Configuration tab.

        2. In the Parameters section, click Edit.

        3. Add the following code to the Other Configuration field, and save changes.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Configure log export

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      Placeholder

      Description

      YOUR-TOPIC-NAME

      Replace it with the name of the target Kafka topic.

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      Replace it with the endpoints of Kafka brokers.

      YOUR-ACKS-VALUE

      Replace it with the number of partition replicas that must receive a message before the producer determines the message has been written to the topic. For more information, see acks.

      YOUR-BUFFER-MEMORY-SIZE

      Replace it with the size of the producer buffer. Unit: bytes.

      YOUR-RETRIES-NUMBER

      Replace it with the maximum number of retries allowed after a message fails to be sent.

      YOUR-COMPRESSION-TYPE

      Replace it with the compression type that can be used by the producer to generate data. Valid values: none, gzip, snappy, lz4, and zstd.

      Note

      You can also configure parameters supported by the Apache Kafka client. For more information, see Apache Kafka.

  3. Click Save.

  4. Start the deployment.

Configure log export for all jobs

This section describes how to configure log export settings applicable to all jobs in a namespace.

Important
  • After you configure this setting, the logs of all jobs within the namespace are sent to OSS, SLS, or Kafka.

  • To ensure your Flink applications can load KafkaAppender logging plug-in, explicitly specify its path:

    1. Go to O&M > Configurations.

    2. In the Other Configuration field, copy and paste the following code snippet.

    plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
  1. Go to the namespace configurations page.

    1. Log on to the Realtime Compute for Apache Flink's management console.

    2. Click Console in the Actions column of the target workspace.

    3. In the top navigation bar of the development console, select the target namespace.

    4. In the left-side navigation pane, click O&M > Configurations.

    5. On the Deployment Defaults tab, select Defaults for Stream or Defaults for Batch.

    6. In the Logging section, set Logging Profile to Custom Template.

  2. Configure the log export settings.

    For detailed code, see the Via XML section of Configure log export for a single job.

  3. Click Save Changes.

Configure different log filter rules based on log levels

You can configure different log filter rules for appenders based on log levels by using ThresholdFilter of Log4j2. Log filter rules provide the following benefits:

  • Flexibility: Sends logs at different levels to different storage systems.

  • Efficiency: Reduces unnecessary log parsing and transmission, improving system performance.

  • Easy management: Facilitates tiered log management.

Procedure

  1. In the Logging section, set Logging Profile to Custom Template.

  2. Configure log export settings.

    The follow code snippet prints logs at the INFO level or higher to the development console of Realtime Compute for Apache Flink, and exports logs at the ERROR level or higher to SLS:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Console Appender configured to output only INFO level logs -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Configuration remains unchanged -->
          <!-- ... -->
        </Appender>
        
        <!-- SLS Appender configured to output only ERROR level and above logs -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS specific properties -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- Access credentials and other properties -->
          <!-- ... -->
        </Appender>
    
        <!-- Other Appenders definitions remain unchanged -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Directly configure loggers for StdOut and SLS with specific levels -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Other Loggers definitions with their specific configurations -->
        <!-- ... -->
    
        <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers -->
        </Root>
      </Loggers>
    </Configuration>

    Parameters in the preceding sample code:

    • Console Appender: A ThresholdFilter is used to ensure that logs at the INFO level or higher are output to the development console.

    • SLS Appender: A ThresholdFilter is used to ensure that logs at the ERROR level or higher are exported to SLS. For information about the specific properties of SLS Appender, see Export logs to SLS. Replace the placeholders such as YOUR_NAMESPACE and YOUR_SLS_PROJECT with the actual values of your SLS project.

      Note

      If a custom appender is used as SLS Appender and its type is not SLS, make sure that the type of the custom appender is supported by Realtime Compute for Apache Flink and the custom appender has the logic required to connect to Simple Log Service.

    • StdOutLogger and SLSLogger: They send logs to StdOut Appender and SLS Appender respectively, based on the log level you specify.

    • Root Logger: StdOut Appender and RollingFile Appender are configured for the root logger, but SLS Appender is not included. This avoids sending repeated logs to SLS if a specific logger is configured for the logs.

    For more information about related operations and Log4j parameters, see Apache Log4j.

References