In addition to viewing job logs directly on the Flink console’s Job Exploration page, you can export job logs to external storage such as Object Storage Service (OSS), Simple Log Service (SLS), or Kafka. You can also configure the log level for each output destination. This topic describes how to configure job log output. After you complete the configuration, you can view the job logs in the specified storage.
Precautions
After you configure log output to OSS, SLS, or Kafka, you must restart the job.
If you configure logs to be sent to external storage and do not disable the log archiving feature, the OSS or fully managed storage configured when you purchased the workspace continues to save logs. If you disable the feature, you can no longer view job logs on the Flink console page.

You can use project variables in the
${secret_values.xxxx}format in the log configuration. For more information, see Project variables.
Configure log output for a single job
Use the UI
Go to the log output configuration page for a single job.
Log on to the Realtime Compute for Apache Flink console.
Click Actions in the Console column of the target workspace.
In the navigation pane on the left, click , and then click the target job name.
On the Deployment Details tab, click Edit in the upper-right corner of the Logging section.
For Log Template, select Custom Template.
Click .
Click Add Appender and select the target storage system.
Configure the log output information for the target storage.
To output logs of different levels to different storage systems, see Configure separate outputs for different log levels to configure different log level filtering rules for appenders.
Configure for SLS

Parameter
Description
name
The custom name of the appender.
type
The type of the output channel. The value is fixed to SLS. You do not need to change this value.
pattern
The output format of the logs.
The default value is
%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n, which generates log content such as2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message.flushIntervalSeconds
The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.
flushIntervalEventCount
The number of log entries to obtain before writing them to the storage at a time.
NoteWhen configured with flushIntervalSeconds, the first to reach its set value triggers a write operation.
authenticationMode
AccessKey
SLS Token
NoteIf you select this mode, logs can be delivered only to a Logstore that is in the same region as the Flink workspace. This mode is supported only in Ververica Runtime (VVR) 11.5 and later.
project
The project name.
logStore
The Logstore name.
endpoint
The private endpoint of your SLS project in the region. For more information, see Endpoints.
accessKeyId
Replace the placeholder with the AccessKey ID and AccessKey secret of the service account for SLS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.
To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.
NoteIf the SLS project and the Flink service are in different accounts, you must grant the Flink account the permissions to write data to SLS. For more information, see Create a custom policy. The following code shows the policy content:
accessKeySecret
Configure for OSS

Parameter
Description
name
The custom name of the appender.
type
The type of the output channel. The value is fixed to OSS. You do not need to change this value.
pattern
The output format of the logs.
The default value is
%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.baseUri
Enter only the OSS Bucket name.
endpoint
The endpoint of the OSS service in the region. For more information, see Regions and endpoints.
The Endpoint is the Endpoint (Region Node) information in the row corresponding to ECS VPC Network Access (Private Network).
accessKeyId
The AccessKey ID and AccessKey secret of the service account for OSS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.
To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.
NoteThis parameter is required only when you configure log output to an OSS bucket that is owned by a different Alibaba Cloud account. If the bucket is in the same account, you do not need to specify this parameter. Delete this parameter.
secretAccessKey
flushIntervalSeconds
The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.
flushIntervalEventCount
The number of log entries to obtain before writing them to the storage at a time.
NoteWhen configured simultaneously with `flushIntervalSeconds`, a write operation is performed as soon as either condition reaches its set value.
rollingBytes
The size of a single log file in OSS. After the maximum size is reached, subsequent data is written to a new log file.
Configure for Kafka
NoteKafka clusters with Kerberos authentication enabled are not supported.
Prerequisites
The KafkaAppender log plugin provided by Realtime Compute for Apache Flink is loaded by the Flink plugin class loader. Before you use the plugin, you must explicitly specify the package path of the KafkaAppender log plugin so that the Flink application can load it. To configure the plugin for a single job—which applies only to the current job—perform the following steps:
On the Job O&M page, click the name of the target job. On the Deployment Details tab, in the Running Parameter Settings section, add the following code to Other Configurations.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appenderConfiguration interface

Parameter
Description
name
The custom name of the appender.
type
The type of the output channel. The value is fixed to KafkaVVP. You do not need to change this value.
pattern
The output format of the logs.
The default value is
%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.bootstrap.servers
The address of the Kafka broker to which logs are written.
acks
The number of partition replicas that must receive the message before the producer considers the write successful. For more information, see acks.
buffer.memory
The size of the producer buffer. Unit: bytes.
retries
The number of retries after a send failure.
compression.type
The compression type that the producer can use when it generates data. Valid values include none, gzip, snappy, lz4, and zstd.
Click Save.
Click Start at the top of the page.
Use XML
Go to the log output configuration page for a single job.
Log on to the Realtime Compute for Apache Flink console.
Find the target workspace and click Console in the Actions column.
In the navigation pane on the left, click , and then click the name of the target job.
On the Configuration tab, in the Logging section, click Edit in the upper-right corner.
Set Logging Profile to Custom Template.
Configure the log output information.
Based on the target storage, copy the corresponding configuration and paste it into the input box. Then update the parameter values to match your storage configuration. To output logs of different levels to different storage systems, see Configure separate outputs for different log levels to configure different log level filtering rules for appenders.
Configure for OSS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender name="OSS" type="OSS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property> <Property name="endpoint">https://YOUR-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> <Property name="rollingBytes">10485760</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="OSS"/> </Root> </Loggers> </Configuration>Parameter
Description
YOUR-BUCKET-NAME
Replace with your OSS Bucket name.
YOUR-ENDPOINT
Replace with the endpoint of your OSS. For more information, see Regions and endpoints.
The endpoint is the value in the Endpoint (Region) column of the VPC Access From ECS (internal Network) row.
YOUR-OSS-ACCESSKEYID
Replace with the AccessKey ID and AccessKey secret of the service account for OSS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.
To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.
NoteThis parameter is required only when you configure log output to an OSS bucket that is owned by a different Alibaba Cloud account. If the bucket is in the same account, you do not need to specify this parameter. Delete this parameter.
YOUR-OSS-ACCESSKEYSECRET
flushIntervalSeconds
The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.
flushIntervalEventCount
The number of log entries to obtain before writing them to the storage at a time.
NoteWhen configured together with flushIntervalSeconds, whichever setting reaches its configured value first triggers a write.
rollingBytes
The size of a single log file in OSS. After the maximum size is reached, subsequent data is written to a new log file.
Configure for SLS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="5 MB"/> </Policies> <DefaultRolloverStrategy max="1"/> </Appender> <Appender name="SLS" type="SLS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="project">YOUR-SLS-PROJECT</Property> <Property name="logStore">YOUR-SLS-LOGSTORE</Property> <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property> <Property name="deploymentName">{{ deploymentName }}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="SLS"/> </Root> </Loggers> </Configuration>NoteThe namespace, deploymentId, jobId, and deploymentName in the code are Twig variables. Do not modify them. Otherwise, the job fails to start.
Parameter
Description
YOUR-SLS-PROJECT
Replace with your SLS project name.
YOUR-SLS-LOGSTORE
Replace with your SLS Logstore name.
YOUR-SLS-ENDPOINT
Replace with the private endpoint of your SLS project in the region. For more information, see Endpoints.
YOUR-SLS-ACCESSKEYID
Replace with the AccessKey ID and AccessKey secret of the service account for SLS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.
To avoid security risks caused by plaintext AccessKey pairs, this example uses variables to specify the AccessKey pair. For more information, see Project variables.
NoteIf the SLS project and the Flink service are in different accounts, you must grant the Flink account the permissions to write data to SLS. For more information, see Create a custom policy. The following code shows the policy content:
No restrictions on the SLS scope
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:Get*", "log:PostLogStoreLogs" ], "Resource": "*" } ] }Specify the SLS resource scope. The following code provides an example.
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:PostLogStoreLogs", "log:GetLogStore" ], "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest" } ] }
YOUR-SLS-ACCESSKEYSECRET
flushIntervalSeconds
The interval at which logs are synchronized to the storage. This is the interval at which log data is written. Unit: seconds.
flushIntervalEventCount
The number of log entries to obtain before writing them to the storage at a time.
NoteWhen used together with flushIntervalSeconds, the setting that reaches its configured value first triggers a single data write.
Configure for Kafka
NoteKafka clusters with Kerberos authentication enabled are not supported.
Prerequisites
The KafkaAppender log plugin provided by Realtime Compute for Apache Flink is loaded by the Flink plugin class loader. Before you use the plugin, you must explicitly specify the package path of the KafkaAppender log plugin so that the Flink application can load it. You can use one of the following methods:
Configure a job template (effective for all jobs in the project)
In the Flink development console's Configuration Management page, in the Other Configurations section, add the following code.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appenderConfigure a single job (effective only for the current job)
On the Job O&M page, click the target job name. In the Additional Settings section of the Runtime Parameter Settings area on the Deployment Details tab, add the following code.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
Log configuration
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME"> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/> <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property> <Property name="acks">YOUR-ACKS-VALUE</Property> <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property> <Property name="retries">YOUR-RETRIES-NUMBER</Property> <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property> </Appender> <Appender type="Async" name="AsyncAppender"> <AppenderRef ref="KafkaVVPAppender"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="AsyncAppender"/> </Root> </Loggers> </Configuration>Parameter
Description
YOUR-TOPIC-NAME
The name of the Kafka topic to which logs are written.
YOUR-KAFKA-BOOTSTRAP-SERVERS
The address of the Kafka broker to which logs are written.
YOUR-ACKS-VALUE
The number of partition replicas that must receive the message before the producer considers the write successful. For more information, see acks.
YOUR-BUFFER-MEMORY-SIZE
The size of the producer buffer. Unit: bytes.
YOUR-RETRIES-NUMBER
The number of retries after a send failure.
YOUR-COMPRESSION-TYPE
The compression type that the producer can use when it generates data. Valid values include none, gzip, snappy, lz4, and zstd.
NoteYou can also set all configuration parameters supported by the Apache Kafka client. For more information, see Apache Kafka.
Click Save.
Click Start at the top of the page.
Configure log output channels for all jobs in a project
You can configure a template to set the default log output to OSS, SLS, or Kafka for all jobs in a project.
After you apply this configuration, the logs of all jobs subsequently created in the project are stored in OSS, SLS, or Kafka.
The KafkaAppender logging plug-in provided by Realtime Compute for Apache Flink is loaded using Flink's plug-in class loader. Before you use the plug-in, you must explicitly specify the package path where the KafkaAppender logging plug-in is located so that your Flink application can successfully load the plug-in. Therefore, you need to add the following code in the Configuration Management page of the Realtime Compute for Apache Flink development console under Other Configuration.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
Go to the job log template configuration page.
Log on to the Realtime Compute for Apache Flink's management console.
Click Actions in the Console column of the target workspace, and select the target Project from the top of the Real-time Computing Development Console page.
In the navigation pane on the left, click .
On the Deployment Defaults tab, select the job type.
In the Logging section, select Log Template as Custom Template.
Configure the log output channels for all jobs in the project.
For more information about the code, see Configure log output for a single job (Use XML).
Click Save Changes.
Configure separate outputs for different log levels
You can use the ThresholdFilter of log4j2 to configure different log level filtering rules for different appenders. The benefits of this configuration are:
Flexibility: You can set different log levels for different external storage systems as needed.
Efficiency: It reduces unnecessary log processing and transmission and improves system performance.
Clarity: Separate configurations make the log flow clearer and level management more convenient.
Perform the following steps to configure the settings:
In the Logging section, set Logging Profile to Custom Template.
Configure the log output information.
This topic uses an example in which logs of the INFO level and higher are output to the Flink development console, and only logs of the ERROR level and higher are output to SLS. The following code provides a configuration example.
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN"> <Appenders> <!-- Console Appender configured to output only INFO level logs --> <Appender name="StdOut" type="Console"> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/> </Appender> <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) --> <Appender name="RollingFile" type="RollingFile"> <!-- Configuration remains unchanged --> <!-- ... --> </Appender> <!-- SLS Appender configured to output only ERROR level and above logs --> <Appender name="SLS" type="SLS"> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/> <!-- SLS specific properties --> <Property name="namespace">YOUR_NAMESPACE</Property> <Property name="project">YOUR_SLS_PROJECT</Property> <Property name="logStore">YOUR_SLS_LOGSTORE</Property> <Property name="endpoint">YOUR_SLS_ENDPOINT</Property> <!-- Access credentials and other properties --> <!-- ... --> </Appender> <!-- Other Appenders definitions remain unchanged --> <!-- ... --> </Appenders> <Loggers> <!-- Directly configure loggers for StdOut and SLS with specific levels --> <Logger name="StdOutLogger" level="INFO" additivity="false"> <AppenderRef ref="StdOut"/> </Logger> <Logger name="SLSLogger" level="ERROR" additivity="false"> <AppenderRef ref="SLS"/> </Logger> <!-- Other Loggers definitions with their specific configurations --> <!-- ... --> <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging --> <Root level="INFO"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers --> </Root> </Loggers> </Configuration>In this configuration:
Console Appender: A ThresholdFilter ensures that logs of the INFO level and higher are output to the Flink development console.
SLS Appender: A ThresholdFilter ensures that only logs of the ERROR level and higher are sent to SLS. For more information about the properties of the SLS Appender, see Configure for SLS. Replace
YOUR_NAMESPACE,YOUR_SLS_PROJECT, and other placeholders with your actual SLS project information.NoteIf the SLS Appender is a custom appender and its type is not SLS, make sure that the correct type is used and the appender class has the logic required to connect to SLS.
StdOutLogger and SLSLogger: They send logs only to the StdOut Appender and SLS Appender, respectively, and each has different log level restrictions.
Root Logger: The StdOut Appender and RollingFile Appender are configured, but the SLS Appender is not included. This avoids sending duplicate logs to SLS when a specific logger is already configured.
For more information about operations and log4j configuration parameters, see Apache Log4j.
References
To view job logs, see View startup and operational logs.
If a job fails to start or an exception occurs during runtime, you can view the exception logs. For more information, see View exception logs.
If logs of the INFO level cannot meet your troubleshooting requirements, you can change the log level to DEBUG. For more information, see Dynamically change the log level of a running job.
You can use ActionTrail to view Flink audit events. For more information, see View Flink audit events.