You can view the logs of a deployment on the Diagnostics tab in the console of fully managed Flink. You can also configure parameters to export the logs of a deployment to an external storage, such as Object Storage Service (OSS), Simple Log Service, or Kafka. This way, you can view the logs in the console of the external storage. This topic describes how to configure parameters to export the logs of a deployment to OSS, Simple Log Service, or Kafka.
Precautions
After you configure parameters to export the logs of a deployment to OSS, Simple Log Service, or Kafka, you must restart the deployment.
You can specify a key in the
${secret_values.xxxx}
format in the log configuration to reference variables that are configured in the key. For more information, see Manage keys.
Configure parameters to export the logs of a deployment
Go to the Configuration tab.
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the desired deployment.
In the upper-right corner of the Logging section on the Configuration tab, click Edit.
Set Logging Profile to Custom Template.
Configure parameters to export the logs of the deployment.
Copy and paste the deployment code to the code editor based on the storage to which you want to export the logs, and change the values of the specified parameters.
Export logs to OSS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender name="OSS" type="OSS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property> <Property name="endpoint">https://YOUR-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> <Property name="rollingBytes">10485760</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="OSS"/> </Root> </Loggers> </Configuration>
Parameter
Description
YOUR-BUCKET-NAME
Replace the value of this parameter with the name of your OSS bucket.
YOUR-ENDPOINT
Replace the value of this parameter with the endpoint of your OSS. For more information, see Regions and endpoints.
Replace the value of this parameter with Endpoint information in the row where VPC Access from ECS (Internal Network) is located.
YOUR-OSS-ACCESSKEYID
Replace the value of the YOUR-OSS-ACCESSKEYID parameter with the AccessKey ID of the account and the value of the YOUR-OSS-ACCESSKEYSECRET parameter with the AccessKey secret of the account that you use to configure OSS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.
To avoid security risks that are caused by plaintext AccessKey pairs, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.
NoteIf you configure OSS within an account that is different from the account of the fully managed Flink service, you must configure these parameters. If you configure OSS within an account that is the same as the account of the fully managed Flink service, you can delete these parameters.
YOUR-OSS-ACCESSKEYSECRET
flushIntervalSeconds
The time interval at which logs are written to the storage. Unit: seconds.
flushIntervalEventCount
The log count threshold that triggers log synchronization. Each time the number of accumulated logs reaches the value of this parameter, the logs are written to the storage.
NoteIf this parameter and the flushIntervalSeconds parameter are both configured, logs are written to the storage only if one of the conditions is met.
rollingBytes
The size of a log file in OSS. If the size of a log file reaches the value of this parameter, data is written to a new log file.
Export logs to Simple Log Service
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="5 MB"/> </Policies> <DefaultRolloverStrategy max="1"/> </Appender> <Appender name="SLS" type="SLS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="project">YOUR-SLS-PROJECT</Property> <Property name="logStore">YOUR-SLS-LOGSTORE</Property> <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property> <Property name="deploymentName">{{ deploymentName }}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="SLS"/> </Root> </Loggers> </Configuration>
NoteThe namespace, deploymentId, jobId, and deploymentName variables in the code are the variables in Twig templates. You do not need to modify these variables. If you modify one of the preceding variables, an error is reported when you start the deployment.
Parameter
Description
YOUR-SLS-PROJECT
Replace the value of this parameter with the project name of Simple Log Service.
YOUR-SLS-LOGSTORE
Replace the value of this parameter with the Logstore name of Simple Log Service.
YOUR-SLS-ENDPOINT
Replace the value of this parameter with the internal endpoint of the region where Simple Log Service resides. For more information, see Endpoints.
YOUR-SLS-ACCESSKEYID
Replace the value of the YOUR-SLS-ACCESSKEYID parameter with the AccessKey ID of the account and the value of the YOUR-SLS-ACCESSKEYSECRET parameter with the AccessKey secret of the account that you use to configure Simple Log Service. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.
To avoid security risks that are caused by plaintext AccessKey pairs, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.
NoteIf you configure Simple Log Service within an account that is different from the account of the fully managed Flink service, you must grant the account of the fully managed Flink service the permissions to write data to Simple Log Service. For more information, see Create a custom policy. The following sample code describes the policy document.
Access Simple Log Service without specifying the access scope of Simple Log Service
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:Get*", "log:PostLogStoreLogs" ], "Resource": "*" } ] }
Access Simple Log Service by specifying the access scope of Simple Log Service
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:PostLogStoreLogs", "log:GetLogStore" ], "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest" } ] }
YOUR-SLS-ACCESSKEYSECRET
flushIntervalSeconds
The time interval at which logs are written to the storage. Unit: seconds.
flushIntervalEventCount
The log count threshold that triggers log synchronization. Each time the number of accumulated logs reaches the value of this parameter, the logs are written to the storage.
NoteIf this parameter and the flushIntervalSeconds parameter are both configured, logs are written to the storage only if one of the conditions is met.
Export logs to Kafka
NoteA Kafka cluster in which the Kerberos authentication is enabled is not supported.
Prerequisites
The KafkaAppender logging plug-in provided by Realtime Compute for Apache Flink is loaded by using the plug-in class loader of Realtime Compute for Apache Flink. Before you use the KafkaAppender logging plug-in, you must specify the path of the package that is used to store the KafkaAppender logging plug-in in the deployment code. In this case, Realtime Compute for Apache Flink can load the KafkaAppender logging plug-in. To export logs to Kafka by using the KafkaAppender logging plug-in, perform one of the following operations:
Configure parameters to export the logs of all deployments in the current namespace to Kafka
On the Deployment Defaults tab of the Configurations page in the console of fully managed Flink, add the following code to the Other Configuration field:
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
Configure parameters to export the logs of the current deployment to Kafka
On the Deployments page, click the name of the desired deployment. On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section and add the following code to the Other Configuration field:
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
Export logs to Kafka
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME"> <Layout type="PatternLayout" pattern="%date %message"/> <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property> <Property name="acks">YOUR-ACKS-VALUE</Property> <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property> <Property name="retries">YOUR-RETRIES-NUMBER</Property> <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property> </Appender> <Appender type="Async" name="AsyncAppender"> <AppenderRef ref="KafkaVVPAppender"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="AsyncAppender"/> </Root> </Loggers> </Configuration>
Parameter
Description
YOUR-TOPIC-NAME
Replace the value of this parameter with the name of the Kafka topic to which you want to write data.
YOUR-KAFKA-BOOTSTRAP-SERVERS
Replace the value of this parameter with the IP addresses or endpoints and port numbers of Kafka brokers to which you want to write data.
YOUR-ACKS-VALUE
Replace the value of this parameter with the number of partition replicas that must receive a message before the producer considers that the message is written to the topic. For more information, see acks.
YOUR-BUFFER-MEMORY-SIZE
Replace the value of this parameter with the size of the producer buffer. Unit: bytes.
YOUR-RETRIES-NUMBER
Replace the value of this parameter with the maximum number of retries allowed after a message fails to be sent.
YOUR-COMPRESSION-TYPE
Replace the value of this parameter with the compression type that can be used by the producer to generate data. The compression type can be none, gzip, snappy, lz4, or zstd.
NoteYou can also configure the parameters that are supported by the Apache Kafka client. For more information, see Apache Kafka.
Click Save.
In the upper-right corner of the Deployments page, click Start.
Configure parameters to export the logs of all deployments in a workspace
You can set Logging Profile to Custom Template on the Deployment Defaults tab to enable the logs of all deployments in a workspace to be automatically exported to OSS, Simple Log Service, or Kafka.
After you set Logging Profile to Custom Template, the logs of all deployments that are created in the workspace are stored in OSS, Simple Log Service, or Kafka.
Go to the Deployment Defaults tab.
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click Configurations.
On the Deployment Defaults tab, select a deployment type.
In the Logging section, set Logging Profile to Custom Template.
Configure parameters to export logs of all deployments in the namespace. For more information, see Configure parameters to export the logs of a deployment.
Click Save Changes.
References
A startup error is reported when parameters are configured to export the logs of the deployment to Simple Log Service. For more information, see What do I do if a deployment startup error is reported after I configure parameters to export the logs of the deployment to Simple Log Service?
You can view logs of a deployment. For more information, see View startup logs and operational logs of a deployment.
If an error occurs when a deployment starts or is running, you can view the exception logs of the deployment in the console of fully managed Flink. For more information, see View the exception logs of a deployment.
If you cannot locate an issue based on the logs at the INFO level, you can change the level of the logs to DEBUG. For more information, see Change the log level for a running deployment.
You can view audit events of Realtime Compute for Apache Flink by using ActionTrail. For more information, see View audit events of Realtime Compute for Apache Flink.