全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konfigurasikan output log pekerjaan

更新时间:Feb 07, 2026

Selain melihat log pekerjaan secara langsung di halaman Job Exploration pada Konsol Flink, Anda dapat mengekspor log pekerjaan ke penyimpanan eksternal seperti Object Storage Service (OSS), Simple Log Service (SLS), atau Kafka. Anda juga dapat mengonfigurasi tingkat log untuk setiap tujuan output. Topik ini menjelaskan cara mengonfigurasi output log pekerjaan. Setelah konfigurasi selesai, Anda dapat melihat log pekerjaan di penyimpanan yang ditentukan.

Perhatian

  • Setelah Anda mengonfigurasi output log ke OSS, SLS, atau Kafka, Anda harus melakukan restart pekerjaan.

  • Jika Anda mengonfigurasi pengiriman log ke penyimpanan eksternal dan tidak menonaktifkan fitur pengarsipan log, OSS atau penyimpanan fully managed yang dikonfigurasi saat Anda membeli ruang kerja akan terus menyimpan log. Jika Anda menonaktifkan fitur tersebut, Anda tidak lagi dapat melihat log pekerjaan di halaman Konsol Flink.

    image

  • Anda dapat menggunakan variabel proyek dalam format ${secret_values.xxxx} pada konfigurasi log. Untuk informasi selengkapnya, lihat Variabel proyek.

Konfigurasikan output log untuk satu pekerjaan

Gunakan UI

  1. Buka halaman konfigurasi output log untuk satu pekerjaan.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Klik Actions di kolom Console ruang kerja target.

    3. Di panel navigasi sebelah kiri, klik Operation Center > Job O&M, lalu klik nama pekerjaan target.

    4. Pada tab Deployment Details, klik Edit di pojok kanan atas bagian Logging.

    5. Untuk Log Template, pilih Custom Template.

  2. Klik Copy And Edit From System Template > default.

  3. Klik Add Appender dan pilih sistem penyimpanan target.

  4. Konfigurasikan informasi output log untuk penyimpanan target.

    Untuk mengirimkan log dengan tingkat berbeda ke sistem penyimpanan yang berbeda, lihat Konfigurasikan output terpisah untuk tingkat log berbeda guna mengatur aturan pemfilteran tingkat log yang berbeda untuk appenders.

    Konfigurasi untuk SLS

    image

    Parameter

    Deskripsi

    name

    Nama kustom appender.

    type

    Jenis channel output. Nilainya tetap SLS. Anda tidak perlu mengubah nilai ini.

    pattern

    Format output log.

    Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n, yang menghasilkan konten log seperti 2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message.

    flushIntervalSeconds

    Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.

    flushIntervalEventCount

    Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.

    Catatan

    Jika dikonfigurasi bersama flushIntervalSeconds, kondisi pertama yang mencapai nilai yang ditetapkan akan memicu operasi penulisan.

    authenticationMode

    • AccessKey

    • SLS Token

      Catatan

      Jika Anda memilih mode ini, log hanya dapat dikirimkan ke Logstore yang berada di wilayah yang sama dengan ruang kerja Flink. Mode ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi lebih baru.

    project

    Nama proyek.

    logStore

    Nama Logstore.

    endpoint

    Titik akhir pribadi proyek SLS Anda di wilayah tersebut. Untuk informasi selengkapnya, lihat Endpoints.

    accessKeyId

    Ganti placeholder dengan ID AccessKey dan Rahasia AccessKey akun layanan untuk SLS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.

    Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.

    Catatan

    Jika proyek SLS dan layanan Flink berada di akun yang berbeda, Anda harus memberikan izin kepada akun Flink untuk menulis data ke SLS. Untuk informasi selengkapnya, lihat Buat kebijakan kustom. Kode berikut menunjukkan isi kebijakan:

    Tidak ada batasan pada cakupan SLS

    {
    
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "log:Get*",
                    "log:PostLogStoreLogs"
                ],
                "Resource": "*"
            }
        ]
    }

    Tentukan cakupan resource SLS

    {
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "log:PostLogStoreLogs",
                    "log:GetLogStore"
                ],
                "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
            }
        ]
    }

    accessKeySecret

    Konfigurasi untuk OSS

    image

    Parameter

    Deskripsi

    name

    Nama kustom appender.

    type

    Jenis channel output. Nilainya tetap OSS. Anda tidak perlu mengubah nilai ini.

    pattern

    Format output log.

    Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

    baseUri

    Masukkan hanya nama Bucket OSS.

    endpoint

    Titik akhir layanan OSS di wilayah tersebut. Untuk informasi selengkapnya, lihat Wilayah dan titik akhir.

    Endpoint adalah informasi Endpoint (Region Node) pada baris yang sesuai dengan ECS VPC Network Access (Private Network).

    accessKeyId

    ID AccessKey dan Rahasia AccessKey akun layanan untuk OSS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.

    Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.

    Catatan

    Parameter ini hanya diperlukan jika Anda mengonfigurasi output log ke bucket OSS yang dimiliki oleh Akun Alibaba Cloud yang berbeda. Jika bucket berada di akun yang sama, Anda tidak perlu menentukan parameter ini. Hapus parameter ini.

    secretAccessKey

    flushIntervalSeconds

    Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.

    flushIntervalEventCount

    Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.

    Catatan

    Jika dikonfigurasi bersamaan dengan `flushIntervalSeconds`, operasi penulisan akan dilakukan segera setelah salah satu kondisi mencapai nilai yang ditetapkan.

    rollingBytes

    Ukuran file log tunggal di OSS. Setelah ukuran maksimum tercapai, data selanjutnya akan ditulis ke file log baru.

    Konfigurasi untuk Kafka

    Catatan

    Kluster Kafka yang mengaktifkan otentikasi Kerberos tidak didukung.

    • Prasyarat

      Plugin log KafkaAppender yang disediakan oleh Realtime Compute for Apache Flink dimuat oleh class loader plugin Flink. Sebelum menggunakan plugin ini, Anda harus secara eksplisit menentukan path paket plugin log KafkaAppender agar aplikasi Flink dapat memuatnya. Untuk mengonfigurasi plugin untuk satu pekerjaan—yang hanya berlaku untuk pekerjaan saat ini—lakukan langkah-langkah berikut:

      Pada halaman Job O&M, klik nama pekerjaan target. Pada tab Deployment Details, di bagian Running Parameter Settings, tambahkan kode berikut ke Other Configurations.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Antarmuka konfigurasi

      image

      Parameter

      Deskripsi

      name

      Nama kustom appender.

      type

      Jenis channel output. Nilainya tetap KafkaVVP. Anda tidak perlu mengubah nilai ini.

      pattern

      Format output log.

      Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

      bootstrap.servers

      Alamat broker Kafka tempat log ditulis.

      acks

      Jumlah replika partisi yang harus menerima pesan sebelum produsen menganggap penulisan berhasil. Untuk informasi selengkapnya, lihat acks.

      buffer.memory

      Ukuran buffer produsen. Satuan: byte.

      retries

      Jumlah percobaan ulang setelah kegagalan pengiriman.

      compression.type

      Jenis kompresi yang dapat digunakan produsen saat menghasilkan data. Nilai yang valid meliputi none, gzip, snappy, lz4, dan zstd.

  5. Klik Save.

  6. Klik Start di bagian atas halaman.

Gunakan XML

  1. Buka halaman konfigurasi output log untuk satu pekerjaan.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sebelah kiri, klik Operation Center > Job O&M, lalu klik nama pekerjaan target.

    4. Pada tab Configuration, di bagian Logging, klik Edit di pojok kanan atas.

    5. Atur Logging Profile menjadi Custom Template.

  2. Konfigurasikan informasi output log.

    Berdasarkan penyimpanan target, salin konfigurasi yang sesuai dan tempelkan ke kotak input. Kemudian perbarui nilai parameter agar sesuai dengan konfigurasi penyimpanan Anda. Untuk mengirimkan log dengan tingkat berbeda ke sistem penyimpanan yang berbeda, lihat Konfigurasikan output terpisah untuk tingkat log berbeda guna mengatur aturan pemfilteran tingkat log yang berbeda untuk appenders.

    Konfigurasi untuk OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    Parameter

    Deskripsi

    YOUR-BUCKET-NAME

    Ganti dengan nama Bucket OSS Anda.

    YOUR-ENDPOINT

    Ganti dengan endpoint OSS Anda. Untuk informasi selengkapnya, lihat Wilayah dan titik akhir.

    Endpoint adalah nilai pada kolom Endpoint (Region) pada baris VPC Access From ECS (internal Network).

    YOUR-OSS-ACCESSKEYID

    Ganti dengan ID AccessKey dan Rahasia AccessKey akun layanan untuk OSS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.

    Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.

    Catatan

    Parameter ini hanya diperlukan jika Anda mengonfigurasi output log ke bucket OSS yang dimiliki oleh Akun Alibaba Cloud yang berbeda. Jika bucket berada di akun yang sama, Anda tidak perlu menentukan parameter ini. Hapus parameter ini.

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.

    flushIntervalEventCount

    Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.

    Catatan

    Jika dikonfigurasi bersama flushIntervalSeconds, pengaturan mana pun yang mencapai nilai yang dikonfigurasi terlebih dahulu akan memicu penulisan.

    rollingBytes

    Ukuran file log tunggal di OSS. Setelah ukuran maksimum tercapai, data selanjutnya akan ditulis ke file log baru.

    Konfigurasi untuk SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    Catatan

    Namespace, deploymentId, jobId, dan deploymentName dalam kode tersebut adalah variabel Twig. Jangan mengubahnya. Jika tidak, pekerjaan gagal dimulai.

    Parameter

    Deskripsi

    YOUR-SLS-PROJECT

    Ganti dengan nama proyek SLS Anda.

    YOUR-SLS-LOGSTORE

    Ganti dengan nama Logstore SLS Anda.

    YOUR-SLS-ENDPOINT

    Ganti dengan titik akhir pribadi proyek SLS Anda di wilayah tersebut. Untuk informasi selengkapnya, lihat Endpoints.

    YOUR-SLS-ACCESSKEYID

    Ganti dengan ID AccessKey dan Rahasia AccessKey akun layanan untuk SLS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.

    Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.

    Catatan

    Jika proyek SLS dan layanan Flink berada di akun yang berbeda, Anda harus memberikan izin kepada akun Flink untuk menulis data ke SLS. Untuk informasi selengkapnya, lihat Buat kebijakan kustom. Kode berikut menunjukkan isi kebijakan:

    • Tidak ada batasan pada cakupan SLS

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Tentukan cakupan resource SLS. Kode berikut memberikan contoh.

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.

    flushIntervalEventCount

    Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.

    Catatan

    Jika digunakan bersama flushIntervalSeconds, pengaturan yang mencapai nilai yang dikonfigurasi terlebih dahulu akan memicu penulisan data tunggal.

    Konfigurasi untuk Kafka

    Catatan

    Kluster Kafka yang mengaktifkan otentikasi Kerberos tidak didukung.

    • Prasyarat

      Plugin log KafkaAppender yang disediakan oleh Realtime Compute for Apache Flink dimuat oleh class loader plugin Flink. Sebelum menggunakan plugin ini, Anda harus secara eksplisit menentukan path paket plugin log KafkaAppender agar aplikasi Flink dapat memuatnya. Anda dapat menggunakan salah satu metode berikut:

      • Konfigurasikan templat pekerjaan (berlaku untuk semua pekerjaan dalam proyek)

        Pada halaman Configuration Management di Konsol pengembangan Flink, di bagian Other Configurations, tambahkan kode berikut.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • Konfigurasikan satu pekerjaan (hanya berlaku untuk pekerjaan saat ini)

        Pada halaman Job O&M, klik nama pekerjaan target. Di bagian Additional Settings area Runtime Parameter Settings pada tab Deployment Details, tambahkan kode berikut.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Konfigurasi log

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      Parameter

      Deskripsi

      YOUR-TOPIC-NAME

      Nama topik Kafka tempat log ditulis.

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      Alamat broker Kafka tempat log ditulis.

      YOUR-ACKS-VALUE

      Jumlah replika partisi yang harus menerima pesan sebelum produsen menganggap penulisan berhasil. Untuk informasi selengkapnya, lihat acks.

      YOUR-BUFFER-MEMORY-SIZE

      Ukuran buffer produsen. Satuan: byte.

      YOUR-RETRIES-NUMBER

      Jumlah percobaan ulang setelah kegagalan pengiriman.

      YOUR-COMPRESSION-TYPE

      Jenis kompresi yang dapat digunakan produsen saat menghasilkan data. Nilai yang valid meliputi none, gzip, snappy, lz4, dan zstd.

      Catatan

      Anda juga dapat mengatur semua parameter konfigurasi yang didukung oleh klien Apache Kafka. Untuk informasi selengkapnya, lihat Apache Kafka.

  3. Klik Save.

  4. Klik Start di bagian atas halaman.

Konfigurasikan channel output log untuk semua pekerjaan dalam proyek

Anda dapat mengonfigurasi templat untuk mengatur output log default ke OSS, SLS, atau Kafka untuk semua pekerjaan dalam proyek.

Penting
  • Setelah menerapkan konfigurasi ini, log semua pekerjaan yang dibuat selanjutnya dalam proyek akan disimpan di OSS, SLS, atau Kafka.

  • Plugin logging KafkaAppender yang disediakan oleh Realtime Compute for Apache Flink dimuat menggunakan class loader plugin Flink. Sebelum menggunakan plugin ini, Anda harus secara eksplisit menentukan path paket tempat plugin logging KafkaAppender berada agar aplikasi Flink Anda dapat memuat plugin tersebut dengan sukses. Oleh karena itu, Anda perlu menambahkan kode berikut di halaman Configuration Management pada Konsol pengembangan Realtime Compute for Apache Flink di bawah Other Configuration.

    plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
  1. Buka halaman konfigurasi templat log pekerjaan.

    1. Masuk ke Konsol manajemen Realtime Compute for Apache Flink.

    2. Klik Actions di kolom Console ruang kerja target, lalu pilih Project target dari bagian atas halaman Real-time Computing Development Console.

    3. Di panel navigasi sebelah kiri, klik Operation Center > Configuration Management.

    4. Pada tab Deployment Defaults, pilih jenis pekerjaan.

    5. Di bagian Logging, pilih Log Template sebagai Custom Template.

  2. Konfigurasikan channel output log untuk semua pekerjaan dalam proyek.

    Untuk informasi selengkapnya tentang kode, lihat Konfigurasikan output log untuk satu pekerjaan (Gunakan XML).

  3. Klik Save Changes.

Konfigurasikan output terpisah untuk tingkat log berbeda

Anda dapat menggunakan ThresholdFilter log4j2 untuk mengonfigurasi aturan pemfilteran tingkat log yang berbeda untuk appenders yang berbeda. Manfaat konfigurasi ini adalah:

  • Fleksibilitas: Anda dapat mengatur tingkat log yang berbeda untuk sistem penyimpanan eksternal yang berbeda sesuai kebutuhan.

  • Efisiensi: Mengurangi pemrosesan dan transmisi log yang tidak perlu serta meningkatkan kinerja sistem.

  • Kejelasan: Konfigurasi terpisah membuat alur log lebih jelas dan manajemen tingkat lebih mudah.

Lakukan langkah-langkah berikut untuk mengonfigurasi pengaturan:

  1. Di bagian Logging, atur Logging Profile menjadi Custom Template.

  2. Konfigurasikan informasi output log.

    Topik ini menggunakan contoh di mana log tingkat INFO dan lebih tinggi dioutput ke Konsol pengembangan Flink, dan hanya log tingkat ERROR dan lebih tinggi yang dioutput ke SLS. Kode berikut memberikan contoh konfigurasi.

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Console Appender configured to output only INFO level logs -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Configuration remains unchanged -->
          <!-- ... -->
        </Appender>
        
        <!-- SLS Appender configured to output only ERROR level and above logs -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS specific properties -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- Access credentials and other properties -->
          <!-- ... -->
        </Appender>
    
        <!-- Other Appenders definitions remain unchanged -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Directly configure loggers for StdOut and SLS with specific levels -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Other Loggers definitions with their specific configurations -->
        <!-- ... -->
    
        <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers -->
        </Root>
      </Loggers>
    </Configuration>

    Dalam konfigurasi ini:

    • Console Appender: ThresholdFilter memastikan bahwa log tingkat INFO dan lebih tinggi dioutput ke Konsol pengembangan Flink.

    • SLS Appender: ThresholdFilter memastikan bahwa hanya log tingkat ERROR dan lebih tinggi yang dikirimkan ke SLS. Untuk informasi selengkapnya tentang properti SLS Appender, lihat Konfigurasi untuk SLS. Ganti YOUR_NAMESPACE, YOUR_SLS_PROJECT, dan placeholder lainnya dengan informasi proyek SLS aktual Anda.

      Catatan

      Jika SLS Appender adalah appender kustom dan jenisnya bukan SLS, pastikan jenis yang benar digunakan dan kelas appender memiliki logika yang diperlukan untuk terhubung ke SLS.

    • StdOutLogger dan SLSLogger: Masing-masing hanya mengirimkan log ke StdOut Appender dan SLS Appender, dengan pembatasan tingkat log yang berbeda.

    • Root Logger: StdOut Appender dan RollingFile Appender dikonfigurasi, tetapi SLS Appender tidak disertakan. Hal ini menghindari pengiriman log duplikat ke SLS ketika logger spesifik sudah dikonfigurasi.

    Untuk informasi selengkapnya tentang operasi dan parameter konfigurasi log4j, lihat Apache Log4j.

Referensi