All Products
Search
Document Center

Realtime Compute for Apache Flink:Configure job log output

Last Updated:Feb 06, 2026

In addition to viewing job logs directly on the Flink console’s Job Exploration page, you can export job logs to external storage such as Object Storage Service (OSS), Simple Log Service (SLS), or Kafka. You can also configure the log level for each output destination. This topic describes how to configure job log output. After you complete the configuration, you can view the job logs in the specified storage.

Precautions

  • After you configure log output to OSS, SLS, or Kafka, you must restart the job.

  • If you configure logs to be sent to external storage and do not disable the log archiving feature, the OSS or fully managed storage configured when you purchased the workspace continues to save logs. If you disable the feature, you can no longer view job logs on the Flink console page.

    image

  • You can use project variables in the ${secret_values.xxxx} format in the log configuration. For more information, see Project variables.

Configure log output for a single job

Use the UI

  1. Go to the log output configuration page for a single job.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Click Actions in the Console column of the target workspace.

    3. In the navigation pane on the left, click Operation Center > Job O&M, and then click the target job name.

    4. On the Deployment Details tab, click Edit in the upper-right corner of the Logging section.

    5. For Log Template, select Custom Template.

  2. Click Copy And Edit From System Template > default.

  3. Click Add Appender and select the target storage system.

  4. Configure the log output information for the target storage.

    To output logs of different levels to different storage systems, see Configure separate outputs for different log levels to configure different log level filtering rules for appenders.

    Configure for SLS

    image

    Parameter

    Description

    name

    The custom name of the appender.

    type

    The type of the output channel. The value is fixed to SLS. You do not need to change this value.

    pattern

    The output format of the logs.

    The default value is %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n, which generates log content such as 2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message.

    flushIntervalSeconds

    The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.

    flushIntervalEventCount

    The number of log entries to obtain before writing them to the storage at a time.

    Note

    When configured with flushIntervalSeconds, the first to reach its set value triggers a write operation.

    authenticationMode

    • AccessKey

    • SLS Token

      Note

      If you select this mode, logs can be delivered only to a Logstore that is in the same region as the Flink workspace. This mode is supported only in Ververica Runtime (VVR) 11.5 and later.

    project

    The project name.

    logStore

    The Logstore name.

    endpoint

    The private endpoint of your SLS project in the region. For more information, see Endpoints.

    accessKeyId

    Replace the placeholder with the AccessKey ID and AccessKey secret of the service account for SLS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.

    Note

    If the SLS project and the Flink service are in different accounts, you must grant the Flink account the permissions to write data to SLS. For more information, see Create a custom policy. The following code shows the policy content:

    No restrictions on the SLS scope

    {
    
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "log:Get*",
                    "log:PostLogStoreLogs"
                ],
                "Resource": "*"
            }
        ]
    }

    Specify the SLS resource scope

    {
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "log:PostLogStoreLogs",
                    "log:GetLogStore"
                ],
                "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
            }
        ]
    }

    accessKeySecret

    Configure for OSS

    image

    Parameter

    Description

    name

    The custom name of the appender.

    type

    The type of the output channel. The value is fixed to OSS. You do not need to change this value.

    pattern

    The output format of the logs.

    The default value is %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

    baseUri

    Enter only the OSS Bucket name.

    endpoint

    The endpoint of the OSS service in the region. For more information, see Regions and endpoints.

    The Endpoint is the Endpoint (Region Node) information in the row corresponding to ECS VPC Network Access (Private Network).

    accessKeyId

    The AccessKey ID and AccessKey secret of the service account for OSS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.

    Note

    This parameter is required only when you configure log output to an OSS bucket that is owned by a different Alibaba Cloud account. If the bucket is in the same account, you do not need to specify this parameter. Delete this parameter.

    secretAccessKey

    flushIntervalSeconds

    The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.

    flushIntervalEventCount

    The number of log entries to obtain before writing them to the storage at a time.

    Note

    When configured simultaneously with `flushIntervalSeconds`, a write operation is performed as soon as either condition reaches its set value.

    rollingBytes

    The size of a single log file in OSS. After the maximum size is reached, subsequent data is written to a new log file.

    Configure for Kafka

    Note

    Kafka clusters with Kerberos authentication enabled are not supported.

    • Prerequisites

      The KafkaAppender log plugin provided by Realtime Compute for Apache Flink is loaded by the Flink plugin class loader. Before you use the plugin, you must explicitly specify the package path of the KafkaAppender log plugin so that the Flink application can load it. To configure the plugin for a single job—which applies only to the current job—perform the following steps:

      On the Job O&M page, click the name of the target job. On the Deployment Details tab, in the Running Parameter Settings section, add the following code to Other Configurations.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Configuration interface

      image

      Parameter

      Description

      name

      The custom name of the appender.

      type

      The type of the output channel. The value is fixed to KafkaVVP. You do not need to change this value.

      pattern

      The output format of the logs.

      The default value is %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

      bootstrap.servers

      The address of the Kafka broker to which logs are written.

      acks

      The number of partition replicas that must receive the message before the producer considers the write successful. For more information, see acks.

      buffer.memory

      The size of the producer buffer. Unit: bytes.

      retries

      The number of retries after a send failure.

      compression.type

      The compression type that the producer can use when it generates data. Valid values include none, gzip, snappy, lz4, and zstd.

  5. Click Save.

  6. Click Start at the top of the page.

Use XML

  1. Go to the log output configuration page for a single job.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the target workspace and click Console in the Actions column.

    3. In the navigation pane on the left, click Operation Center > Job O&M, and then click the name of the target job.

    4. On the Configuration tab, in the Logging section, click Edit in the upper-right corner.

    5. Set Logging Profile to Custom Template.

  2. Configure the log output information.

    Based on the target storage, copy the corresponding configuration and paste it into the input box. Then update the parameter values to match your storage configuration. To output logs of different levels to different storage systems, see Configure separate outputs for different log levels to configure different log level filtering rules for appenders.

    Configure for OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    Parameter

    Description

    YOUR-BUCKET-NAME

    Replace with your OSS Bucket name.

    YOUR-ENDPOINT

    Replace with the endpoint of your OSS. For more information, see Regions and endpoints.

    The endpoint is the value in the Endpoint (Region) column of the VPC Access From ECS (internal Network) row.

    YOUR-OSS-ACCESSKEYID

    Replace with the AccessKey ID and AccessKey secret of the service account for OSS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.

    Note

    This parameter is required only when you configure log output to an OSS bucket that is owned by a different Alibaba Cloud account. If the bucket is in the same account, you do not need to specify this parameter. Delete this parameter.

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.

    flushIntervalEventCount

    The number of log entries to obtain before writing them to the storage at a time.

    Note

    When configured together with flushIntervalSeconds, whichever setting reaches its configured value first triggers a write.

    rollingBytes

    The size of a single log file in OSS. After the maximum size is reached, subsequent data is written to a new log file.

    Configure for SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    Note

    The namespace, deploymentId, jobId, and deploymentName in the code are Twig variables. Do not modify them. Otherwise, the job fails to start.

    Parameter

    Description

    YOUR-SLS-PROJECT

    Replace with your SLS project name.

    YOUR-SLS-LOGSTORE

    Replace with your SLS Logstore name.

    YOUR-SLS-ENDPOINT

    Replace with the private endpoint of your SLS project in the region. For more information, see Endpoints.

    YOUR-SLS-ACCESSKEYID

    Replace with the AccessKey ID and AccessKey secret of the service account for SLS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.

    Note

    If the SLS project and the Flink service are in different accounts, you must grant the Flink account the permissions to write data to SLS. For more information, see Create a custom policy. The following code shows the policy content:

    • No restrictions on the SLS scope

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Specify the SLS resource scope. The following code provides an example.

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.

    flushIntervalEventCount

    The number of log entries to obtain before writing them to the storage at a time.

    Note

    When used together with flushIntervalSeconds, the setting that reaches its configured value first triggers a single data write.

    Configure for Kafka

    Note

    Kafka clusters with Kerberos authentication enabled are not supported.

    • Prerequisites

      The KafkaAppender log plugin provided by Realtime Compute for Apache Flink is loaded by the Flink plugin class loader. Before you use the plugin, you must explicitly specify the package path of the KafkaAppender log plugin so that the Flink application can load it. You can use one of the following methods:

      • Configure a job template (effective for all jobs in the project)

        In the Flink development console's Configuration Management page, in the Other Configurations section, add the following code.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • Configure a single job (effective only for the current job)

        On the Job O&M page, click the target job name. In the Additional Settings section of the Runtime Parameter Settings area on the Deployment Details tab, add the following code.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Log configuration

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      Parameter

      Description

      YOUR-TOPIC-NAME

      The name of the Kafka topic to which logs are written.

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      The address of the Kafka broker to which logs are written.

      YOUR-ACKS-VALUE

      The number of partition replicas that must receive the message before the producer considers the write successful. For more information, see acks.

      YOUR-BUFFER-MEMORY-SIZE

      The size of the producer buffer. Unit: bytes.

      YOUR-RETRIES-NUMBER

      The number of retries after a send failure.

      YOUR-COMPRESSION-TYPE

      The compression type that the producer can use when it generates data. Valid values include none, gzip, snappy, lz4, and zstd.

      Note

      You can also set all configuration parameters supported by the Apache Kafka client. For more information, see Apache Kafka.

  3. Click Save.

  4. Click Start at the top of the page.

Configure log output channels for all jobs in a project

You can configure a template to set the default log output to OSS, SLS, or Kafka for all jobs in a project.

Important
  • After you apply this configuration, the logs of all jobs subsequently created in the project are stored in OSS, SLS, or Kafka.

  • The KafkaAppender logging plug-in provided by Realtime Compute for Apache Flink is loaded using Flink's plug-in class loader. Before you use the plug-in, you must explicitly specify the package path where the KafkaAppender logging plug-in is located so that your Flink application can successfully load the plug-in. Therefore, you need to add the following code in the Configuration Management page of the Realtime Compute for Apache Flink development console under Other Configuration.

    plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
  1. Go to the job log template configuration page.

    1. Log on to the Realtime Compute for Apache Flink's management console.

    2. Click Actions in the Console column of the target workspace, and select the target Project from the top of the Real-time Computing Development Console page.

    3. In the navigation pane on the left, click Operation Center > Configuration Management.

    4. On the Deployment Defaults tab, select the job type.

    5. In the Logging section, select Log Template as Custom Template.

  2. Configure the log output channels for all jobs in the project.

    For more information about the code, see Configure log output for a single job (Use XML).

  3. Click Save Changes.

Configure separate outputs for different log levels

You can use the ThresholdFilter of log4j2 to configure different log level filtering rules for different appenders. The benefits of this configuration are:

  • Flexibility: You can set different log levels for different external storage systems as needed.

  • Efficiency: It reduces unnecessary log processing and transmission and improves system performance.

  • Clarity: Separate configurations make the log flow clearer and level management more convenient.

Perform the following steps to configure the settings:

  1. In the Logging section, set Logging Profile to Custom Template.

  2. Configure the log output information.

    This topic uses an example in which logs of the INFO level and higher are output to the Flink development console, and only logs of the ERROR level and higher are output to SLS. The following code provides a configuration example.

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Console Appender configured to output only INFO level logs -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Configuration remains unchanged -->
          <!-- ... -->
        </Appender>
        
        <!-- SLS Appender configured to output only ERROR level and above logs -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS specific properties -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- Access credentials and other properties -->
          <!-- ... -->
        </Appender>
    
        <!-- Other Appenders definitions remain unchanged -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Directly configure loggers for StdOut and SLS with specific levels -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Other Loggers definitions with their specific configurations -->
        <!-- ... -->
    
        <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers -->
        </Root>
      </Loggers>
    </Configuration>

    In this configuration:

    • Console Appender: A ThresholdFilter ensures that logs of the INFO level and higher are output to the Flink development console.

    • SLS Appender: A ThresholdFilter ensures that only logs of the ERROR level and higher are sent to SLS. For more information about the properties of the SLS Appender, see Configure for SLS. Replace YOUR_NAMESPACE, YOUR_SLS_PROJECT, and other placeholders with your actual SLS project information.

      Note

      If the SLS Appender is a custom appender and its type is not SLS, make sure that the correct type is used and the appender class has the logic required to connect to SLS.

    • StdOutLogger and SLSLogger: They send logs only to the StdOut Appender and SLS Appender, respectively, and each has different log level restrictions.

    • Root Logger: The StdOut Appender and RollingFile Appender are configured, but the SLS Appender is not included. This avoids sending duplicate logs to SLS when a specific logger is already configured.

    For more information about operations and log4j configuration parameters, see Apache Log4j.

References