All Products
Search
Document Center

Realtime Compute for Apache Flink:Configure parameters to export logs of a deployment

Last Updated:Apr 15, 2024

You can view the logs of a deployment on the Diagnostics tab in the Realtime Compute for Apache Flink console. You can also configure parameters to export the logs of a deployment to an external storage, such as Object Storage Service (OSS), Simple Log Service, or Kafka. This way, you can view the logs in the console of the external storage. This topic describes how to configure parameters to export the logs of a deployment to OSS, Simple Log Service, or Kafka.

Precautions

  • If you do not turn off Allow Log Archives after you configure parameters to export the logs of a deployment to an external storage, the same logs are stored in the external storage and the OSS bucket that you selected when you purchase the workspace. After you turn off Allow Log Archives for a deployment, you cannot view the logs of the deployment in the development console of Realtime Compute for Apache Flink.

    image

  • After you configure parameters to export the logs of a deployment to OSS, Simple Log Service, or Kafka, you must restart the deployment.

  • You can specify a key in the ${secret_values.xxxx} format in the log configuration to reference variables that are configured in the key. For more information, see Manage keys.

Configure parameters to export the logs of a deployment

  1. Go to the Configuration tab.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the desired deployment.

    4. In the upper-right corner of the Logging section on the Configuration tab, click Edit.

    5. Set Logging Profile to Custom Template.

  2. Configure parameters to export the logs of the deployment.

    Copy and paste the deployment code to the code editor based on the storage to which you want to export the logs, and change the values of the specified parameters.

    Export logs to OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    Parameter

    Description

    YOUR-BUCKET-NAME

    Replace the value of this parameter with the name of your OSS bucket.

    YOUR-ENDPOINT

    Replace the value of this parameter with the endpoint of your OSS. For more information, see Regions and endpoints.

    Replace the value of this parameter with Endpoint information in the row where VPC Access from ECS (Internal Network) is located.

    YOUR-OSS-ACCESSKEYID

    Replace the value of the YOUR-OSS-ACCESSKEYID parameter with the AccessKey ID of the account and the value of the YOUR-OSS-ACCESSKEYSECRET parameter with the AccessKey secret of the account that you use to configure OSS. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    To avoid security risks that are caused by plaintext AccessKey pairs, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    Note

    If you configure OSS within an account that is different from the Realtime Compute for Apache Flink account, you must configure these parameters. If you configure OSS within an account that is the same as the Realtime Compute for Apache Flink account, you can delete these parameters.

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    The time interval at which logs are written to the storage. Unit: seconds.

    flushIntervalEventCount

    The log count threshold that triggers log synchronization. Each time the number of accumulated logs reaches the value of this parameter, the logs are written to the storage.

    Note

    If this parameter and the flushIntervalSeconds parameter are both configured, logs are written to the storage only if one of the conditions is met.

    rollingBytes

    The size of a log file in OSS. If the size of a log file reaches the value of this parameter, data is written to a new log file.

    Export logs to Simple Log Service

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    Note

    The namespace, deploymentId, jobId, and deploymentName variables in the code are the variables in Twig templates. You do not need to modify these variables. If you modify one of the preceding variables, an error is reported when you start the deployment.

    Parameter

    Description

    YOUR-SLS-PROJECT

    Replace the value of this parameter with the project name of Simple Log Service.

    YOUR-SLS-LOGSTORE

    Replace the value of this parameter with the Logstore name of Simple Log Service.

    YOUR-SLS-ENDPOINT

    Replace the value of this parameter with the internal endpoint of the region where Simple Log Service resides. For more information, see Endpoints.

    YOUR-SLS-ACCESSKEYID

    Replace the value of the YOUR-SLS-ACCESSKEYID parameter with the AccessKey ID of the account and the value of the YOUR-SLS-ACCESSKEYSECRET parameter with the AccessKey secret of the account that you use to configure Simple Log Service. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

    To avoid security risks that are caused by plaintext AccessKey pairs, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    Note

    If you configure Simple Log Service within an account that is different from the Realtime Compute for Apache Flink account, you must grant the Realtime Compute for Apache Flink account the permissions to write data to Simple Log Service. For more information, see Create a custom policy. The following sample code describes the policy document.

    • Access Simple Log Service without specifying the access scope of Simple Log Service

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Access Simple Log Service by specifying the access scope of Simple Log Service

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    The time interval at which logs are written to the storage. Unit: seconds.

    flushIntervalEventCount

    The log count threshold that triggers log synchronization. Each time the number of accumulated logs reaches the value of this parameter, the logs are written to the storage.

    Note

    If this parameter and the flushIntervalSeconds parameter are both configured, logs are written to the storage only if one of the conditions is met.

    Export logs to Kafka

    Note

    A Kafka cluster in which the Kerberos authentication is enabled is not supported.

    • Prerequisites

      The KafkaAppender logging plug-in provided by Realtime Compute for Apache Flink is loaded by using the plug-in class loader of Realtime Compute for Apache Flink. Before you use the KafkaAppender logging plug-in, you must specify the path of the package that is used to store the KafkaAppender logging plug-in in the deployment code. In this case, Realtime Compute for Apache Flink can load the KafkaAppender logging plug-in. To export logs to Kafka by using the KafkaAppender logging plug-in, perform one of the following operations:

      • Configure parameters to export the logs of all deployments in the current namespace to Kafka

        On the Deployment Defaults tab of the Configurations page in the development console of Realtime Compute for Apache Flink, add the following code to the Other Configuration field:

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • Configure parameters to export the logs of the current deployment to Kafka

        On the Deployments page, click the name of the desired deployment. On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section and add the following code to the Other Configuration field:

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Export logs to Kafka

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%date %message"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      Parameter

      Description

      YOUR-TOPIC-NAME

      Replace the value of this parameter with the name of the Kafka topic to which you want to write data.

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      Replace the value of this parameter with the IP addresses or endpoints and port numbers of Kafka brokers to which you want to write data.

      YOUR-ACKS-VALUE

      Replace the value of this parameter with the number of partition replicas that must receive a message before the producer considers that the message is written to the topic. For more information, see acks.

      YOUR-BUFFER-MEMORY-SIZE

      Replace the value of this parameter with the size of the producer buffer. Unit: bytes.

      YOUR-RETRIES-NUMBER

      Replace the value of this parameter with the maximum number of retries allowed after a message fails to be sent.

      YOUR-COMPRESSION-TYPE

      Replace the value of this parameter with the compression type that can be used by the producer to generate data. The compression type can be none, gzip, snappy, lz4, or zstd.

      Note

      You can also configure the parameters that are supported by the Apache Kafka client. For more information, see Apache Kafka.

  3. Click Save.

  4. In the upper-right corner of the Deployments page, click Start.

Configure parameters to export the logs of all deployments in a workspace

You can set Logging Profile to Custom Template on the Deployment Defaults tab to enable the logs of all deployments in a workspace to be automatically exported to OSS, Simple Log Service, or Kafka.

Important

After you set Logging Profile to Custom Template, the logs of all deployments that are created in the workspace are stored in OSS, Simple Log Service, or Kafka.

  1. Go to the Deployment Defaults tab.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Configurations.

    4. On the Deployment Defaults tab, select a deployment type.

    5. In the Logging section, set Logging Profile to Custom Template.

    6. Configure parameters to export logs of all deployments in the namespace. For more information, see Configure parameters to export the logs of a deployment.

  2. Click Save Changes.

References