すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ログエクスポートの設定

最終更新日:Nov 06, 2025

特定のレベルのジョブログを Object Storage Service (OSS)、Simple Log Service (SLS)、Kafka などの外部ストレージにエクスポートできます。このトピックでは、ログエクスポート設定の構成方法について説明します。

使用上の注意

  • ログエクスポート設定を構成した後、ジョブを再起動します。

  • ログエクスポート設定を構成した後、[ログアーカイブを許可] がオンのままである場合、ジョブログは引き続きワークスペースに関連付けられた OSS バケットに保存されます。[ログアーカイブを許可] をオフにすると、コンソールでログを表示できなくなります。

    image

  • ログ構成で、名前空間変数${secret_values.xxxx} フォーマットで使用できます。

単一ジョブのログエクスポートの設定

コンソール経由

  1. ログ設定ページに移動します。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、O&M > デプロイメント を選択し、対象のデプロイメントを選択します。

    4. デプロイメント詳細ページの [設定] タブの [ロギング] セクションで、右上隅にある [編集] をクリックします。

    5. [ロギングプロファイル][カスタムテンプレート] を選択します。

  2. [システムプロファイルで適用] > [default] をクリックします。

  3. [Appender の追加] をクリックし、対象のストレージシステムを選択します。

  4. ログ出力設定を構成します。

    異なるレベルのログを異なるストレージシステムにエクスポートするには、appender のログフィルター規則を設定します

    SLS へのログのエクスポート

    image

    項目

    説明

    name

    カスタム Appender 名を入力します。

    type

    出力チャンネルタイプ。SLS に固定されています。

    pattern

    ログのエクスポート形式。

    デフォルト値は %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n で、2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message のようなログコンテンツを生成します。

    project

    SLS プロジェクト名を入力します。

    logStore

    Logstore 名。

    endpoint

    SLS プロジェクトのプライベートエンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    SLS プロジェクトへのアクセスに使用する AccessKey ID とシークレット。詳細については、「AccessKey ペアの取得」をご参照ください。

    重要

    セキュリティを強化するため、AccessKey ペアをプレーンテキストで入力するのではなく、名前空間変数 を使用してください。

    説明

    アカウントをまたいでジョブログを SLS プロジェクトにエクスポートするには、SLS プロジェクトを所有する Alibaba Cloud アカウントを使用して カスタムポリシーを作成し、そのカスタムポリシーを Realtime Compute for Apache Flink リソースを管理するアカウントが引き受ける RAM ロールに付与します。以下は、カスタムポリシーの JSON コードです。

    • すべての SLS リソースへのアクセス:

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • 特定の SLS リソースへのアクセス

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:ap-southeast-1:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    accessKeySecret

    flushIntervalSeconds

    ジョブログを SLS にエクスポートする時間間隔を入力します。単位: 秒。

    flushIntervalEventCount

    1 つのバッチで収集して SLS に送信するログエントリの数を入力します。

    説明

    [flushIntervalEventCount][flushIntervalSeconds] の両方が設定されている場合、いずれかの条件が満たされるとログが SLS に送信されます。

    OSS へのログのエクスポート

    image

    項目

    説明

    name

    カスタム Appender 名を入力します。

    type

    出力チャンネルタイプ。OSS に固定されています。

    pattern

    ログ出力形式。

    デフォルト値は %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n です。

    baseUri

    OSS バケット名を入力します。

    endpoint

    OSS のリージョン固有の内部エンドポイントを入力します。詳細については、「リージョンとエンドポイント」をご参照ください。

    OSS コンソールでエンドポイント情報を見つけることもできます:

    1. バケット名をクリックします。

    2. バケット詳細ページで、中央のナビゲーションウィンドウで [概要] を選択します。

    3. [ポート] セクションで、[VPC (内部ネットワーク) 経由の ECS からのアクセス] の行を見つけ、[エンドポイント] 列の対応する値をコピーします。

    accessKeyId

    アカウントをまたいで OSS バケットにアクセスするために使用する AccessKey ID とシークレットを入力します。詳細については、「AccessKey ペアの取得」をご参照ください。

    重要

    セキュリティを強化するため、AccessKey ペアをプレーンテキストで入力するのではなく、名前空間変数 を使用してください。

    説明

    同じアカウント内の OSS バケットにアクセスする場合は、これらの設定をスキップしてください。

    secretAccessKey

    flushIntervalSeconds

    ログが OSS に書き込まれる時間間隔を入力します。単位: 秒。

    flushIntervalEventCount

    1 つのバッチで収集して OSS に送信するログエントリの数を入力します。

    説明

    [flushIntervalEventCount][flushIntervalSeconds] の両方が設定されている場合、いずれかの条件が満たされるとログが OSS に送信されます。

    rollingBytes

    OSS 内の単一ログファイルのサイズを入力します。制限に達すると、データは新しいログファイルに書き込まれます。

    Kafka へのログのエクスポート

    説明

    Kerberos 認証が有効な Kafka クラスターへのログのエクスポートはサポートされていません。

    • 前提条件

      以下の手順を実行して、下のコードを追加します:

      1. ジョブ詳細ページで、[設定] タブを選択します。

      2. [パラメーター] セクションで、[編集] をクリックします。

      3. 次のコードをコピーして [その他の設定] フィールドに貼り付けます。

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender

      このコードスニペットは、Realtime Compute for Apache Flink が提供するロギングプラグインである KafkaAppender パッケージのパスを明示的に指定します。これにより、Realtime Compute for Apache Flink がロギングプラグインをロードできるようになります。

    • ログエクスポートの設定

      image

      項目

      説明

      name

      カスタム Appender 名を入力します。

      type

      出力チャンネルタイプ。KafkaVVP に固定されています。

      pattern

      ログ出力形式。

      デフォルト値は %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n です。

      bootstrap.servers

      Kafka ブローカーのエンドポイントを入力します。

      acks

      Kafka プロデューサーがメッセージがトピックに書き込まれたと判断する前に、メッセージを受信したパーティションレプリカの数を指定します。詳細については、「acks」をご参照ください。

      buffer.memory

      プロデューサーのバッファーサイズを入力します。単位: バイト。

      retries

      メッセージの送信に失敗した後に許可される最大再試行回数を入力します。

      compression.type

      プロデューサーがデータを生成するために使用する圧縮タイプを入力します。有効な値: none, gzip, snappy, lz4, zstd。

  5. [保存] をクリックします。

  6. デプロイメントを開始します。

XML 経由

  1. 設定ページに移動します。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、O&M > デプロイメント をクリックし、対象のデプロイメントの名前をクリックします。

    4. [設定] タブの [ロギング] セクションで、右上隅にある [編集] をクリックします。

    5. [ロギングプロファイル][カスタムテンプレート] に設定します。

    6. [XML の編集] をクリックします。

  2. ログエクスポート設定を構成します。

    ログストレージシステムに応じて、次のコードを XML エディターにコピーして貼り付け、プレースホルダーの値を実際の値に置き換えます。異なるレベルのログを異なるストレージシステムにエクスポートするには、異なるログフィルター規則を設定します

    OSS へのログのエクスポート

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- 最終的に有効なログパスは ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ です -->
          <Property name="namespace">{{ namespace }}</Property> <!-- この行は変更しないでください -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    プレースホルダー

    説明

    YOUR-BUCKET-NAME

    OSS バケットの名前に置き換えます。

    YOUR-ENDPOINT

    OSS のリージョン固有の内部エンドポイントに置き換えます。詳細については、「リージョンとエンドポイント」をご参照ください。

    OSS コンソールでエンドポイント情報を見つけることもできます:

    1. バケット名をクリックします。

    2. バケット詳細ページで、中央のナビゲーションウィンドウで [概要] を選択します。

    3. [ポート] セクションで、[VPC (内部ネットワーク) 経由の ECS からのアクセス] の行を見つけ、[エンドポイント] 列の対応する値をコピーします。

    YOUR-OSS-ACCESSKEYID

    アカウントをまたいで OSS にアクセスするために使用する AccessKey ID とシークレットに置き換えます。AccessKey ペアの取得方法については、「AccessKey ペアの取得」をご参照ください。

    重要

    セキュリティを強化するため、AccessKey ペアをプレーンテキストで入力するのではなく、名前空間変数 を使用してください。

    説明

    同じアカウント内の OSS バケットにアクセスする場合は、これらの設定をスキップしてください。

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    ログが OSS に書き込まれる時間間隔。単位: 秒。

    flushIntervalEventCount

    1 つのバッチで収集して OSS に送信するログエントリの数を入力します。

    説明

    [flushIntervalEventCount][flushIntervalSeconds] の両方が設定されている場合、いずれかの条件が満たされるとログが OSS に送信されます。

    rollingBytes

    OSS 内の単一ログファイルのサイズを入力します。制限に達すると、データは新しいログファイルに書き込まれます。

    SLS へのログのエクスポート

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- 最終的に有効なログパスは ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ です -->
          <Property name="namespace">{{ namespace }}</Property> <!-- この行は変更しないでください -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    説明

    起動エラーを防ぐため、コード内の namespacedeploymentIdjobIddeploymentName などの Twig 変数を変更しないでください。

    プレースホルダーパラメーター

    説明

    YOUR-SLS-PROJECT

    SLS プロジェクト名に置き換えます。

    YOUR-SLS-LOGSTORE

    Simple Log Service の Logstore 名に置き換えます。

    YOUR-SLS-ENDPOINT

    SLS のリージョン固有のプライベートエンドポイントに置き換えます。詳細については、「エンドポイント」をご参照ください。

    YOUR-SLS-ACCESSKEYID

    SLS プロジェクトへのアクセスに使用する AccessKey ID とシークレット。詳細については、「AccessKey ペアの取得」をご参照ください。

    重要

    セキュリティを強化するため、AccessKey ペアをプレーンテキストで入力するのではなく、名前空間変数 を使用してください。

    説明

    アカウントをまたいでジョブログを SLS プロジェクトにエクスポートするには、SLS プロジェクトを所有する Alibaba Cloud アカウントを使用して カスタムポリシーを作成し、そのカスタムポリシーを Realtime Compute for Apache Flink リソースを管理するアカウントが引き受ける RAM ロールに付与します。以下は、カスタムポリシーの JSON コードです。

    • すべての SLS リソースへのアクセス:

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • 特定の SLS リソースへのアクセス

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:ap-southeast-1:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    ジョブログを SLS にエクスポートする時間間隔を入力します。単位: 秒。

    flushIntervalEventCount

    1 つのバッチで収集して SLS に送信するログエントリの数を入力します。

    説明

    [flushIntervalEventCount][flushIntervalSeconds] の両方が設定されている場合、いずれかの条件が満たされるとログが SLS に送信されます。

    Kafka へのログのエクスポート

    説明

    Kerberos 認証が有効な Kafka クラスターはサポートされていません。

    • 前提条件

      Realtime Compute for Apache Flink が提供するロギングプラグインである KafkaAppender パッケージのパスを明示的に指定します。これにより、Realtime Compute for Apache Flink がロギングプラグインをロードできるようになります。次の手順を実行します:

      • 名前空間内のすべてのジョブに設定を適用します:

        1. O&M > 設定 に移動します。

        2. [その他の設定] セクションで、次のコードをコピーして貼り付け、変更を保存します。

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • 単一のジョブに設定を適用します:

        1. デプロイメント詳細ページで、[設定] タブを選択します。

        2. [パラメーター] セクションで、[編集] をクリックします。

        3. 次のコードを [その他の設定] フィールドに追加し、変更を保存します。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • ログエクスポートの設定

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      プレースホルダー

      説明

      YOUR-TOPIC-NAME

      対象の Kafka トピックの名前に置き換えます。

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      Kafka ブローカーのエンドポイントに置き換えます。

      YOUR-ACKS-VALUE

      プロデューサーがメッセージがトピックに書き込まれたと判断する前にメッセージを受信する必要があるパーティションレプリカの数に置き換えます。詳細については、「acks」をご参照ください。

      YOUR-BUFFER-MEMORY-SIZE

      プロデューサーバッファーのサイズに置き換えます。単位: バイト。

      YOUR-RETRIES-NUMBER

      メッセージの送信に失敗した後に許可される最大再試行回数に置き換えます。

      YOUR-COMPRESSION-TYPE

      プロデューサーがデータを生成するために使用できる圧縮タイプに置き換えます。有効な値: none, gzip, snappy, lz4, zstd

      説明

      Apache Kafka クライアントでサポートされているパラメーターを設定することもできます。詳細については、「Apache Kafka」をご参照ください。

  3. [保存] をクリックします。

  4. デプロイメントを開始します。

すべてのジョブのログエクスポートの設定

このセクションでは、名前空間内のすべてのジョブに適用されるログエクスポート設定の構成方法について説明します。

重要
  • この設定を構成すると、名前空間内のすべてのジョブのログが OSS、SLS、または Kafka に送信されます。

  • Flink アプリケーションが KafkaAppender ロギングプラグインをロードできるようにするには、そのパスを明示的に指定します:

    1. O&M > 設定 に移動します。

    2. [その他の設定] フィールドに、次のコードスニペットをコピーして貼り付けます。

    plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
  1. 名前空間設定ページに移動します。

    1. Realtime Compute for Apache Flink の管理コンソールにログインします。

    2. 対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

    3. 開発コンソール の上部ナビゲーションバーで、対象の名前空間を選択します。

    4. 左側のナビゲーションウィンドウで、O&M > 設定 をクリックします。

    5. [デプロイメントのデフォルト] タブで、[ストリームのデフォルト] または [バッチのデフォルト] を選択します。

    6. [ロギング] セクションで、[ロギングプロファイル][カスタムテンプレート] に設定します。

  2. ログエクスポート設定を構成します。

    詳細なコードについては、「単一ジョブのログエクスポートの設定」の「XML 経由」セクションをご参照ください。

  3. [変更を保存] をクリックします。

ログレベルに基づく異なるログフィルター規則の設定

Log4j2 の ThresholdFilter を使用して、ログレベルに基づいて appender の異なるログフィルター規則を設定できます。ログフィルター規則には、次の利点があります:

  • 柔軟性: 異なるレベルのログを異なるストレージシステムに送信します。

  • 効率: 不要なログの解析と転送を削減し、システムのパフォーマンスを向上させます。

  • 管理の容易さ: 階層的なログ管理を容易にします。

手順

  1. [ロギング] セクションで、[ロギングプロファイル][カスタムテンプレート] に設定します。

  2. ログエクスポート設定を構成します。

    次のコードスニペットは、INFO レベル以上のログを Realtime Compute for Apache Flink の開発コンソールに出力し、ERROR レベル以上のログを SLS にエクスポートします:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- INFO レベルのログのみを出力するように設定されたコンソール Appender -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (フィルターは表示されていませんが、ルートレベルが INFO であるため、すべてのレベルのログを記録します) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- 設定は変更されません -->
          <!-- ... -->
        </Appender>
        
        <!-- ERROR レベル以上のログのみを出力するように設定された SLS Appender -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS 固有のプロパティ -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- アクセス資格情報およびその他のプロパティ -->
          <!-- ... -->
        </Appender>
    
        <!-- その他の Appender 定義は変更されません -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- StdOut および SLS のロガーを特定のレベルで直接設定します -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- その他のロガー定義とそれらの特定の設定 -->
        <!-- ... -->
    
        <!-- 重複ロギングを避けるため、SLS および StdOut の特定の AppenderRef を持たないルートロガー -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- 他のロガーの場合に重複ロギングを防ぐために、ルートから SLS を除外します -->
        </Root>
      </Loggers>
    </Configuration>

    上記のサンプルコードのパラメーター:

    • コンソール Appender: ThresholdFilter を使用して、INFO レベル以上のログが開発コンソールに出力されるようにします。

    • SLS Appender: ThresholdFilter を使用して、ERROR レベル以上のログが SLS にエクスポートされるようにします。SLS Appender の特定のプロパティについては、「SLS へのログのエクスポート」をご参照ください。YOUR_NAMESPACEYOUR_SLS_PROJECT などのプレースホルダーを、SLS プロジェクトの実際の値に置き換えます。

      説明

      カスタム appender が SLS Appender として使用され、そのタイプが SLS でない場合は、カスタム appender のタイプが Realtime Compute for Apache Flink でサポートされており、カスタム appender が Simple Log Service に接続するために必要なロジックを持っていることを確認してください。

    • StdOutLogger と SLSLogger: 指定したログレベルに基づいて、それぞれ StdOut Appender と SLS Appender にログを送信します。

    • ルートロガー: ルートロガーには StdOut Appender と RollingFile Appender が設定されていますが、SLS Appender は含まれていません。これにより、ログに特定のロガーが設定されている場合に、SLS に繰り返しログが送信されるのを防ぎます。

    関連する操作と Log4j パラメーターの詳細については、「Apache Log4j」をご参照ください。

リファレンス