All Products
Search
Document Center

Realtime Compute for Apache Flink:Konfigurasikan output log pekerjaan

Last Updated:Mar 20, 2026

Selain melihat log pekerjaan secara langsung di halaman Job O&M Konsol Flink, Anda dapat mengonfigurasi pekerjaan agar mengirimkan log ke penyimpanan eksternal seperti OSS, SLS, atau Kafka. Setelah dikonfigurasi, Anda dapat melihat log tersebut dan menyesuaikan tingkat log output. Topik ini menjelaskan cara mengonfigurasi output log pekerjaan.

Catatan

  • Mulai ulang pekerjaan setelah mengonfigurasi output log ke OSS, SLS, atau Kafka.

  • Jika Anda mengonfigurasi log ke penyimpanan lain tetapi tidak menonaktifkan fitur pengarsipan log, penyimpanan OSS atau penyimpanan terkelola penuh yang dikonfigurasi saat membeli ruang kerja akan terus menyimpan log. Setelah menonaktifkan pengarsipan log, halaman Konsol Flink tidak dapat menampilkan log pekerjaan.

    image

  • Anda dapat menggunakan ${secret_values.xxxx} dalam konfigurasi log untuk mereferensikan variabel yang dikelola di Variable Management. Untuk informasi selengkapnya, lihat Project Variables.

Konfigurasikan channel output log untuk satu pekerjaan

Metode UI

  1. Buka titik masuk untuk mengonfigurasi output log untuk satu pekerjaan.

    1. Masuk ke Konsol Manajemen Real-time Computing.

    2. Klik Console di bawah kolom Operation ruang kerja target.

    3. Di panel navigasi sebelah kiri, klik Operation Center > Job O&M, lalu klik nama pekerjaan target.

    4. Pada tab Deployment Details, klik Edit di sebelah kanan area Log Configuration.

    5. Atur Log Template menjadi Custom Template.

  2. Klik Copy and Edit from System Template > default.

  3. Klik Add Output Channel, lalu pilih channel penyimpanan target.

  4. Konfigurasikan output log untuk penyimpanan tujuan.

    Untuk mengirimkan log dengan tingkat berbeda ke lokasi penyimpanan berbeda, lihat Konfigurasikan output terpisah untuk tingkat log berbeda untuk mengatur aturan filter tingkat log berbeda untuk appender.

    Konfigurasikan output ke SLS

    image

    Parameter

    Deskripsi

    name

    Nama kustom untuk appender.

    type

    Jenis channel output. Nilainya tetap SLS. Jangan ubah nilai ini.

    pattern

    Format output untuk log.

    Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n. Contoh entri log adalah 2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message.

    flushIntervalSeconds

    Interval, dalam detik, saat log ditulis ke penyimpanan.

    flushIntervalEventCount

    Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan.

    Catatan

    Jika parameter `flushIntervalSeconds` dan parameter ini diatur bersamaan, log akan ditulis ketika salah satu ambang batas tercapai.

    authenticationMode

    • AccessKey

    • SLS Token

      Catatan

      Jika Anda memilih mode ini, Anda hanya dapat mengirimkan log ke Logstore yang berada di wilayah yang sama dengan ruang kerja Flink Anda. Mode ini didukung hanya pada Ververica Runtime (VVR) 11.5 dan versi lebih baru.

    project

    Nama proyek.

    logStore

    Nama Logstore.

    endpoint

    Titik akhir jaringan pribadi untuk wilayah tempat layanan SLS Anda berada. Untuk informasi selengkapnya, lihat Endpoints.

    accessKeyId

    Ganti dengan ID AccessKey dan rahasia AccessKey akun layanan SLS Anda. Untuk mendapatkan AccessKey, lihat Get an AccessKey.

    Untuk menghindari risiko keamanan penggunaan AccessKey dalam teks biasa, contoh ini menggunakan variabel untuk nilai AccessKey. Untuk informasi selengkapnya, lihat Project variables.

    Catatan

    Jika layanan SLS dan Flink Anda berada di akun berbeda, Anda harus memberikan izin kepada akun Flink untuk menulis ke SLS. Untuk informasi selengkapnya, lihat Create a custom policy. Kebijakannya sebagai berikut:

    Tidak ada batasan pada cakupan SLS

    {
    
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "log:Get*",
                    "log:PostLogStoreLogs"
                ],
                "Resource": "*"
            }
        ]
    }

    Cakupan resource SLS spesifik

    {
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "log:PostLogStoreLogs",
                    "log:GetLogStore"
                ],
                "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
            }
        ]
    }

    accessKeySecret

    Konfigurasikan output ke OSS

    image

    Parameter

    Deskripsi

    name

    Nama kustom untuk appender.

    type

    Jenis channel output. Nilainya tetap OSS. Jangan ubah nilai ini.

    pattern

    Format output untuk log.

    Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

    baseUri

    Masukkan nama bucket OSS.

    endpoint

    Titik akhir untuk wilayah tempat layanan OSS Anda berada. Untuk informasi selengkapnya, lihat Regions and endpoints.

    Gunakan nilai Endpoint (Region) dari baris ECS VPC network access (internal network).

    accessKeyId

    ID AccessKey dan rahasia AccessKey akun layanan OSS. Untuk mendapatkan AccessKey, lihat Get an AccessKey.

    Untuk menghindari risiko keamanan penggunaan AccessKey dalam teks biasa, contoh ini menggunakan variabel untuk nilai AccessKey. Untuk informasi selengkapnya, lihat Project variables.

    Catatan

    Parameter ini hanya diperlukan jika Anda mengirimkan log ke bucket OSS yang berada di akun berbeda dengan layanan Flink Anda. Jika berada di akun yang sama, hapus parameter ini.

    secretAccessKey

    flushIntervalSeconds

    Interval, dalam detik, saat log ditulis ke penyimpanan.

    flushIntervalEventCount

    Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan.

    Catatan

    Jika parameter `flushIntervalSeconds` dan parameter ini diatur bersamaan, log akan ditulis ketika salah satu ambang batas tercapai.

    rollingBytes

    Ukuran maksimum file log tunggal di OSS. Saat file log mencapai ukuran ini, file log baru dibuat untuk data selanjutnya.

    Konfigurasikan output ke Kafka

    Catatan

    Kluster Kafka yang mengaktifkan otentikasi Kerberos tidak didukung.

    • Prasyarat

      Plugin logging KafkaAppender untuk komputasi real-time dimuat oleh class loader plugin Flink. Untuk memuat plugin ini secara berhasil dalam aplikasi Flink, Anda harus secara eksplisit menentukan path paketnya. Langkah-langkah berikut menjelaskan cara mengonfigurasi plugin untuk satu pekerjaan. Konfigurasi ini hanya berlaku untuk pekerjaan tersebut.

      Di halaman Job O&M, klik nama pekerjaan target. Pada tab Deployment Details, di bagian Other Configurations dalam area Runtime Parameter Configuration, tambahkan kode berikut.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Antarmuka konfigurasi

      image

      Parameter

      Deskripsi

      name

      Nama kustom untuk appender.

      type

      Jenis channel output. Nilainya tetap KafkaVVP. Jangan ubah nilai ini.

      pattern

      Format output untuk log.

      Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

      bootstrap.servers

      Alamat broker Kafka.

      acks

      Menentukan jumlah replika partisi yang harus menerima pesan sebelum produsen menganggap penulisan berhasil. Untuk informasi selengkapnya tentang nilai-nilainya, lihat acks.

      buffer.memory

      Ukuran buffer produsen, dalam byte.

      retries

      Jumlah kali percobaan ulang mengirim pesan jika gagal.

      compression.type

      Jenis kompresi untuk data yang dihasilkan oleh produsen. Nilai yang valid adalah `none`, `gzip`, `snappy`, `lz4`, dan `zstd`.

  5. Klik Save.

  6. Klik Start di bagian atas halaman.

Metode XML

  1. Akses titik masuk untuk mengonfigurasi output log untuk satu pekerjaan.

    1. Masuk ke Konsol Manajemen Flink.

    2. Klik Operation di kolom Actions ruang kerja target, lalu klik Console.

    3. Di panel navigasi sebelah kiri, klik Operation Center > Job O&M. Lalu, klik nama pekerjaan target.

    4. Pada tab Deployment Details, klik Edit di sisi kanan bagian Log Configuration.

    5. Atur Log Template menjadi Custom Template.

  2. Konfigurasikan informasi output log.

    Berdasarkan penyimpanan target Anda, salin konfigurasi yang sesuai dan tempelkan ke kotak input. Lalu ubah nilai parameter yang ditentukan agar sesuai dengan pengaturan penyimpanan Anda. Jika Anda juga perlu mengonfigurasi log dengan tingkat berbeda agar dikirimkan ke penyimpanan berbeda secara terpisah, lihat Konfigurasikan output terpisah untuk log dengan tingkat berbeda untuk mengatur aturan filter tingkat log berbeda untuk Appender.

    Konfigurasikan ke OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    Parameter

    Deskripsi

    YOUR-BUCKET-NAME

    Ganti dengan nama Bucket OSS Anda.

    YOUR-ENDPOINT

    Ganti dengan endpoint OSS Anda. Untuk informasi selengkapnya, lihat Regions and endpoints.

    Endpoint adalah informasi Endpoint (region node) pada baris untuk VPC network access (private network) for ECS.

    YOUR-OSS-ACCESSKEYID

    Ganti placeholder dengan ID AccessKey dan rahasia AccessKey untuk akun layanan OSS Anda. Untuk mendapatkan pasangan AccessKey, lihat Obtain an AccessKey.

    Untuk menghindari risiko keamanan yang ditimbulkan oleh AccessKey dalam teks biasa, contoh ini menggunakan variabel untuk memberikan nilai AccessKey. Untuk detailnya, lihat Project variables.

    Catatan

    Parameter ini hanya diperlukan jika Anda mengonfigurasi output ke akun OSS yang berbeda dari akun Flink. Jika akunnya sama, jangan isi parameter ini dan hapuslah.

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    Interval, dalam detik, untuk sinkronisasi log ke penyimpanan. Ini menentukan seberapa sering data log ditulis.

    flushIntervalEventCount

    Interval, dalam jumlah entri log, untuk sinkronisasi log ke penyimpanan. Ini menentukan berapa banyak entri log yang dikumpulkan sebelum ditulis.

    Catatan

    Saat digunakan bersama flushIntervalSeconds, pengoperasian penulisan dipicu oleh pengaturan mana pun yang mencapai nilai yang dikonfigurasi terlebih dahulu.

    rollingBytes

    Ukuran file log tunggal di OSS. Setelah mencapai nilai maksimum, data selanjutnya ditulis ke file log baru.

    Konfigurasikan ke SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    Catatan

    Variabel Twig `namespace`, `deploymentId`, `jobId`, dan `deploymentName` dalam kode tidak boleh diubah, karena modifikasi akan menyebabkan error saat startup pekerjaan.

    Parameter

    Deskripsi

    YOUR-SLS-PROJECT

    Ganti dengan nama Proyek SLS Anda.

    YOUR-SLS-LOGSTORE

    Ganti dengan nama Logstore SLS Anda.

    YOUR-SLS-ENDPOINT

    Ganti dengan titik akhir jaringan pribadi wilayah tempat instans SLS Anda berada. Untuk informasi selengkapnya, lihat service endpoints.

    YOUR-SLS-ACCESSKEYID

    Ganti dengan ID AccessKey dan rahasia AccessKey akun layanan SLS Anda. Untuk informasi selengkapnya, lihat Obtain an AccessKey.

    Untuk menghindari risiko keamanan yang ditimbulkan oleh AccessKey dalam teks biasa, contoh ini menggunakan variabel untuk menentukan nilai AccessKey. Untuk informasi selengkapnya, lihat Project Variables.

    Catatan

    Jika Anda mengonfigurasi layanan SLS dan Flink di akun Alibaba Cloud yang berbeda, Anda harus mengonfigurasi akun layanan SLS untuk memberikan izin kepada akun Flink agar dapat menulis ke SLS. Untuk petunjuk detailnya, lihat Create a custom policy. Konten kebijakan spesifiknya sebagai berikut:

    • Tidak ada batasan cakupan SLS

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Tentukan cakupan resource SLS, seperti pada contoh.

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    Interval, dalam detik, untuk sinkronisasi log ke penyimpanan. Ini menentukan seberapa sering data log ditulis.

    flushIntervalEventCount

    Interval, dalam jumlah entri log, untuk sinkronisasi log ke penyimpanan. Ini menentukan berapa banyak entri log yang dikumpulkan sebelum ditulis.

    Catatan

    Saat Anda mengonfigurasi flushIntervalSeconds, operasi penulisan dipicu ketika interval waktu atau ambang batas nilai yang ditentukan tercapai terlebih dahulu.

    Konfigurasikan ke Kafka

    Catatan

    Kluster Kafka dengan otentikasi Kerberos tidak didukung.

    • Prasyarat

      Ekstensi log KafkaAppender, yang disediakan oleh komputasi real-time, dimuat melalui class loader plugin Flink. Sebelum menggunakan ekstensi ini, Anda harus secara eksplisit menentukan path paket tempat ekstensi log KafkaAppender berada untuk memastikan aplikasi Flink dapat memuatnya secara berhasil. Untuk melakukan ini, ikuti langkah-langkah berikut:

      • Konfigurasikan templat pekerjaan (berlaku untuk semua pekerjaan dalam proyek ini)

        Pada halaman Configuration Management Konsol pengembangan Flink, tambahkan kode berikut di bawah Other Configurations.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • Konfigurasikan satu pekerjaan (hanya berlaku untuk pekerjaan saat ini)

        Pada halaman Job O&M, klik nama pekerjaan target. Pada tab Deployment Details, tambahkan kode berikut di bagian Runtime Parameter Configuration, di bawah Other Configurations.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Konfigurasi log

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      Parameter

      Deskripsi

      YOUR-TOPIC-NAME

      Nama Topik Kafka untuk penulisan.

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      Alamat broker Kafka untuk penulisan.

      YOUR-ACKS-VALUE

      Menentukan jumlah replika partisi yang harus menerima pesan sebelum produsen menganggap operasi penulisan berhasil. Untuk nilai yang valid, lihat acks.

      YOUR-BUFFER-MEMORY-SIZE

      Ukuran buffer produsen, dalam byte.

      YOUR-RETRIES-NUMBER

      Jumlah percobaan ulang untuk pengiriman yang gagal.

      YOUR-COMPRESSION-TYPE

      Jenis kompresi yang tersedia saat produsen menghasilkan data, termasuk none, gzip, snappy, lz4, atau zstd.

      Catatan

      Anda juga dapat mengonfigurasi semua parameter yang didukung oleh klien Apache Kafka. Untuk informasi selengkapnya, lihat Apache Kafka.

  3. Klik Save.

  4. Klik Start di bagian atas halaman.

Konfigurasikan channel output log untuk semua pekerjaan dalam proyek

Konfigurasikan templat agar secara default mengirimkan log untuk semua pekerjaan dalam proyek ke OSS, SLS, atau Kafka.

Penting
  • Setelah dikonfigurasi, log untuk semua pekerjaan berikutnya yang dibuat dalam proyek ini akan disimpan di OSS, SLS, atau Kafka.

  • Plugin log KafkaAppender yang disediakan oleh Flink dimuat oleh class loader plugin Flink. Sebelum digunakan, tentukan secara eksplisit path paket plugin log KafkaAppender agar aplikasi Flink dapat memuatnya secara berhasil. Oleh karena itu, pada halaman Configuration Management Konsol pengembangan Flink, tambahkan kode berikut di bawah Other Configurations.

    plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
  1. Akses titik masuk konfigurasi templat log pekerjaan.

    1. Masuk ke Konsol Manajemen Flink.

    2. Klik Operation di kolom Actions ruang kerja target, lalu klik Console. Pada halaman Konsol pengembangan Flink, pilih project target di bagian atas.

    3. Di panel navigasi sebelah kiri, klik Operation Center > Configuration Management.

    4. Pada tab Job Default Configuration, pilih jenis pekerjaan.

    5. Di bagian Log Configuration, atur Log Template menjadi Custom Template.

  2. Konfigurasikan channel output log untuk semua pekerjaan dalam proyek.

    Untuk detail kode, lihat Konfigurasikan output log untuk satu pekerjaan (metode XML).

  3. Klik Save Changes.

Konfigurasikan tingkat log berbeda untuk output

Gunakan ThresholdFilter log4j2 untuk mengonfigurasi aturan filter tingkat log berbeda untuk Appender berbeda. Keuntungan konfigurasi ini adalah:

  • Fleksibilitas: Atur tingkat log berbeda untuk penyimpanan (eksternal) berbeda sesuai kebutuhan.

  • Efisiensi: Kurangi pemrosesan dan transmisi log yang tidak perlu, meningkatkan kinerja sistem.

  • Kejelasan: Konfigurasi terpisah membuat alur log lebih jelas dan manajemen tingkat lebih mudah.

Konfigurasikan sebagai berikut:

  1. Di bagian Log Configuration, atur Log Template menjadi Custom Template.

  2. Konfigurasikan informasi output log.

    Contoh ini menunjukkan cara mengonfigurasi Konsol pengembangan Flink (Console) agar mengeluarkan log tingkat INFO dan lebih tinggi, serta Simple Log Service (SLS) hanya mengeluarkan log tingkat ERROR dan lebih tinggi. Contoh konfigurasinya sebagai berikut:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Console Appender configured to output only INFO level logs -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Configuration remains unchanged -->
          <!-- ... -->
        </Appender>
        
        <!-- SLS Appender configured to output only ERROR level and above logs -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS specific properties -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- Access credentials and other properties -->
          <!-- ... -->
        </Appender>
    
        <!-- Other Appenders definitions remain unchanged -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Directly configure loggers for StdOut and SLS with specific levels -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Other Loggers definitions with their specific configurations -->
        <!-- ... -->
    
        <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers -->
        </Root>
      </Loggers>
    </Configuration>

    Dalam konfigurasi ini:

    • Console Appender: Menggunakan ThresholdFilter untuk memastikan bahwa log tingkat INFO dan lebih tinggi dikeluarkan ke Konsol pengembangan Flink.

    • SLS Appender: Menggunakan ThresholdFilter untuk memastikan bahwa hanya log tingkat ERROR dan lebih tinggi yang dikirim ke SLS. Untuk properti spesifik Appender SLS, lihat Konfigurasikan ke SLS. Ganti placeholder seperti YOUR_NAMESPACE dan YOUR_SLS_PROJECT dengan informasi proyek SLS aktual Anda.

      Catatan

      Jika jenis Appender SLS bukan SLS tetapi Appender kustom, pastikan Anda menggunakan jenis yang benar dan kelas Appender memiliki logika yang diperlukan untuk terhubung ke SLS.

    • StdOutLogger dan SLSLogger: Masing-masing hanya mengirim log ke Appender StdOut dan Appender SLS, dengan batasan tingkat log berbeda.

    • Root Logger: Mengonfigurasi Appender StdOut dan Appender RollingFile, tetapi tidak menyertakan Appender SLS. Hal ini mencegah pengiriman log ganda ke SLS ketika konfigurasi Logger spesifik sudah ada.

    Untuk informasi selengkapnya tentang operasi tambahan dan parameter konfigurasi log4j, lihat Apache Log4j.

Referensi