全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konfigurasi ekspor log

更新时间:Nov 06, 2025

Anda dapat mengekspor log pekerjaan pada level tertentu ke penyimpanan eksternal seperti Object Storage Service (OSS), Simple Log Service (SLS), dan Kafka. Topik ini menjelaskan cara mengonfigurasi pengaturan ekspor log.

Catatan Penggunaan

  • Setelah Anda mengonfigurasi pengaturan ekspor log, mulai ulang pekerjaan tersebut.

  • Jika Allow Log Archives tetap menyala setelah konfigurasi, log pekerjaan akan terus disimpan di bucket OSS yang terkait dengan ruang kerja. Namun, jika Anda mematikan Allow Log Archives, Anda tidak akan bisa lagi melihat log di konsol.

    image

  • Anda dapat menggunakan variabel namespace dalam format ${secret_values.xxxx} di konfigurasi log.

Konfigurasikan ekspor log untuk satu pekerjaan

Melalui konsol

  1. Pergi ke halaman pengaturan log.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sisi kiri, pilih O&M > Deployments, lalu pilih penyebaran target.

    4. Pada tab Configuration halaman detail penyebaran, di bagian Logging, klik Edit di pojok kanan atas.

    5. Untuk Logging Profile, pilih Custom Template.

  2. Klik Apply with System Profiles > default.

  3. Klik Add Appender dan pilih sistem penyimpanan target.

  4. Konfigurasikan pengaturan output log.

    Untuk mengekspor log pada level berbeda ke sistem penyimpanan berbeda, konfigurasikan aturan filter log untuk appender.

    Ekspor log ke SLS

    image

    Item

    Deskripsi

    name

    Masukkan nama Appender kustom Anda.

    type

    Jenis saluran keluaran, tetap pada SLS.

    pattern

    Format ekspor log.

    Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n, yang menghasilkan konten log seperti 2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - Ini adalah pesan log uji.

    project

    Masukkan nama proyek SLS Anda.

    logStore

    Nama Logstore.

    endpoint

    Masukkan endpoint privat proyek SLS Anda. Untuk informasi lebih lanjut, lihat Endpoint.

    accessKeyId

    ID AccessKey dan rahasia yang digunakan untuk mengakses proyek SLS. Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel namespace daripada memasukkan pasangan AccessKey Anda dalam teks biasa.

    Catatan

    Untuk mengekspor log pekerjaan ke proyek SLS lintas akun, buat kebijakan kustom menggunakan akun Alibaba Cloud yang memiliki proyek SLS, dan berikan kebijakan kustom tersebut kepada peran RAM yang diasumsikan oleh akun yang mengelola sumber daya Realtime Compute for Apache Flink. Berikut adalah kode JSON untuk kebijakan kustom:

    • Akses semua sumber daya SLS:

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Akses sumber daya SLS tertentu

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:ap-southeast-1:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    accessKeySecret

    flushIntervalSeconds

    Masukkan interval waktu untuk mengekspor log pekerjaan ke SLS. Unit: detik.

    flushIntervalEventCount

    Masukkan jumlah entri log yang dikumpulkan dan dikirim ke SLS dalam satu batch.

    Catatan

    Saat kedua flushIntervalEventCount dan flushIntervalSeconds dikonfigurasi, log dikirim ke SLS saat salah satu kondisi terpenuhi.

    Ekspor log ke OSS

    image

    Item

    Deskripsi

    name

    Masukkan nama Appender kustom Anda.

    type

    Jenis saluran keluaran, tetap pada OSS.

    pattern

    Format output log.

    Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

    baseUri

    Masukkan nama bucket OSS Anda.

    endpoint

    Masukkan endpoint internal spesifik wilayah OSS. Untuk informasi lebih lanjut, lihat Wilayah dan endpoint.

    Anda juga dapat menemukan informasi endpoint di konsol OSS:

    1. Klik nama bucket Anda.

    2. Di halaman detail bucket, pilih Overview di panel navigasi tengah.

    3. Di bagian Port, temukan baris Access from ECS over the VPC (internal network), dan salin nilai yang sesuai di kolom Endpoint.

    accessKeyId

    Masukkan ID AccessKey dan rahasia yang digunakan untuk mengakses bucket OSS Anda lintas akun. Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel namespace daripada memasukkan pasangan AccessKey Anda dalam teks biasa.

    Catatan

    Jika Anda mengakses bucket OSS dalam akun yang sama, lewati konfigurasi ini.

    secretAccessKey

    flushIntervalSeconds

    Masukkan interval waktu saat log ditulis ke OSS. Unit: detik.

    flushIntervalEventCount

    Masukkan jumlah entri log yang dikumpulkan dan dikirim ke OSS dalam satu batch.

    Catatan

    Saat kedua flushIntervalEventCount dan flushIntervalSeconds dikonfigurasi, log dikirim ke OSS saat salah satu kondisi terpenuhi.

    rollingBytes

    Masukkan ukuran file log tunggal di OSS. Setelah batas tercapai, data ditulis ke file log baru.

    Ekspor log ke Kafka

    Catatan

    Ekspor log ke kluster Kafka dengan otentikasi Kerberos tidak didukung.

    • Prasyarat

      Tambahkan kode berikut dengan melakukan langkah-langkah berikut:

      1. Di halaman detail pekerjaan, pilih tab Configuration.

      2. Di bagian Parameters, klik Edit.

      3. Salin dan tempel kode berikut ke bidang Other Configuration.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender

      Potongan kode ini secara eksplisit menentukan jalur paket KafkaAppender, yang merupakan plugin logging yang disediakan oleh Realtime Compute for Apache Flink. Ini memastikan bahwa Realtime Compute for Apache Flink dapat memuat plugin logging.

    • Konfigurasikan Ekspor Log

      image

      Item

      Deskripsi

      name

      Masukkan nama Appender kustom Anda.

      type

      Jenis saluran keluaran, tetap pada KafkaVVP.

      pattern

      Format output log.

      Nilai default adalah %d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.

      bootstrap.servers

      Masukkan endpoint broker Kafka.

      acks

      Tentukan jumlah replika partisi yang telah menerima pesan sebelum produser Kafka menentukan bahwa pesan telah ditulis ke topik. Untuk informasi lebih lanjut, lihat acks.

      buffer.memory

      Masukkan ukuran buffer produser. Unit: byte.

      retries

      Masukkan jumlah maksimum percobaan ulang yang diizinkan setelah pesan gagal dikirim.

      compression.type

      Masukkan jenis kompresi yang digunakan oleh produser untuk menghasilkan data. Nilai valid: none, gzip, snappy, lz4, dan zstd.

  5. Klik Save.

  6. Mulai penyebaran.

Via XML

  1. Pergi ke halaman konfigurasi.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sisi kiri, klik O&M > Deployments, lalu klik nama penyebaran target.

    4. Pada tab Configuration, di bagian Logging, klik Edit di pojok kanan atas.

    5. Atur Logging Profile ke Custom Template.

    6. Klik Edit XML.

  2. Konfigurasikan pengaturan ekspor log.

    Sesuai dengan sistem penyimpanan log Anda, salin dan tempel kode berikut ke editor XML dan ganti nilai placeholder dengan nilai aktual. Untuk mengekspor log pada level berbeda ke sistem penyimpanan berbeda, konfigurasikan aturan filter log berbeda.

    Ekspor log ke OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- Jalur log efektif akhir adalah: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Jangan ubah baris ini -->
          <Property name="baseUri">oss://NAMA-BUCKET-ANDA/</Property>
          <Property name="endpoint">https://ENDPOINT-ANDA</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    Placeholder

    Deskripsi

    NAMA-BUCKET-ANDA

    Ganti dengan nama bucket OSS Anda.

    ENDPOINT-ANDA

    Ganti dengan endpoint internal spesifik wilayah OSS. Untuk informasi lebih lanjut, lihat Wilayah dan endpoint.

    Anda juga dapat menemukan informasi endpoint di konsol OSS:

    1. Klik nama bucket Anda.

    2. Di halaman detail bucket, pilih Overview di panel navigasi tengah.

    3. Di bagian Port, temukan baris Access from ECS over the VPC (internal network), dan salin nilai yang sesuai di kolom Endpoint.

    ACCESSKEYID-OSS-ANDA

    Ganti dengan ID AccessKey dan rahasia Anda yang digunakan untuk mengakses OSS lintas akun. Untuk informasi tentang cara mendapatkan pasangan AccessKey, lihat Dapatkan pasangan AccessKey.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel namespace daripada memasukkan pasangan AccessKey Anda dalam teks biasa.

    Catatan

    Jika Anda mengakses bucket OSS dalam akun yang sama, lewati konfigurasi ini.

    ACCESSKEYSECRET-OSS-ANDA

    flushIntervalSeconds

    Interval waktu saat log ditulis ke OSS. Unit: detik.

    flushIntervalEventCount

    Masukkan jumlah entri log yang dikumpulkan dan dikirim ke OSS dalam satu batch.

    Catatan

    Saat kedua flushIntervalEventCount dan flushIntervalSeconds dikonfigurasi, log dikirim ke OSS saat salah satu kondisi terpenuhi.

    rollingBytes

    Masukkan ukuran file log tunggal di OSS. Setelah batas tercapai, data ditulis ke file log baru.

    Ekspor log ke SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- Jalur log efektif akhir adalah: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Jangan ubah baris ini -->
          <Property name="project">PROYEK-SLS-ANDA</Property>  
          <Property name="logStore">LOGSTORE-SLS-ANDA</Property> 
          <Property name="endpoint">ENDPOINT-SLS-ANDA</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    Catatan

    Untuk mencegah kesalahan saat startup, jangan ubah variabel Twig di kode, termasuk namespace, deploymentId, jobId, dan deploymentName.

    Parameter placeholder

    Deskripsi

    PROYEK-SLS-ANDA

    Ganti dengan nama proyek SLS Anda.

    LOGSTORE-SLS-ANDA

    Ganti dengan nama Logstore dari Layanan Log Sederhana.

    ENDPOINT-SLS-ANDA

    Ganti dengan endpoint privat spesifik wilayah SLS. Untuk informasi lebih lanjut, lihat Endpoint.

    ACCESSKEYID-SLS-ANDA

    ID AccessKey dan rahasia yang digunakan untuk mengakses proyek SLS. Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel namespace daripada memasukkan pasangan AccessKey Anda dalam teks biasa.

    Catatan

    Untuk mengekspor log pekerjaan ke proyek SLS lintas akun, buat kebijakan kustom menggunakan akun Alibaba Cloud yang memiliki proyek SLS, dan berikan kebijakan kustom tersebut kepada peran RAM yang diasumsikan oleh akun yang mengelola sumber daya Realtime Compute for Apache Flink. Berikut adalah kode JSON untuk kebijakan kustom:

    • Akses semua sumber daya SLS:

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • Akses sumber daya SLS tertentu

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:ap-southeast-1:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    ACCESSKEYSECRET-SLS-ANDA

    flushIntervalSeconds

    Masukkan interval waktu ekspor log pekerjaan ke SLS. Unit: detik.

    flushIntervalEventCount

    Masukkan jumlah entri log yang dikumpulkan dan dikirim ke SLS dalam satu batch.

    Catatan

    Saat kedua flushIntervalEventCount dan flushIntervalSeconds dikonfigurasi, log dikirim ke SLS saat salah satu kondisi terpenuhi.

    Ekspor log ke Kafka

    Catatan

    Kluster Kafka dengan otentikasi Kerberos yang diaktifkan tidak didukung.

    • Prasyarat

      Tentukan secara eksplisit jalur paket KafkaAppender, yang merupakan plugin logging yang disediakan oleh Realtime Compute for Apache Flink. Ini memastikan bahwa Realtime Compute for Apache Flink dapat memuat plugin logging. Lakukan langkah-langkah berikut:

      • Terapkan pengaturan ke semua pekerjaan dalam namespace:

        1. Pergi ke O&M > Configurations.

        2. Di bagian Other Configuration, salin dan tempel kode berikut, lalu simpan perubahan.

      plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • Terapkan pengaturan ke pekerjaan tunggal:

        1. Di halaman detail penyebaran, pilih tab Configuration.

        2. Di bagian Parameters, klik Edit.

        3. Tambahkan kode berikut ke bidang Other Configuration, lalu simpan perubahan.

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • Konfigurasikan Ekspor Log

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="NAMA-TOPIC-ANDA">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">SERVER-BOOTSTRAP-KAFKA-ANDA</Property>
               <Property name="acks">NILAI-ACKS-ANDA</Property>
               <Property name="buffer.memory">UKURAN-MEMORI-BUFFER-ANDA</Property>
                <Property name="retries">JUMLAH-PERCUBAAN-ULANG-ANDA</Property>
               <Property name="compression.type">TIPE-KOMPRESI-ANDA</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      Placeholder

      Deskripsi

      NAMA-TOPIC-ANDA

      Ganti dengan nama topik Kafka target.

      SERVER-BOOTSTRAP-KAFKA-ANDA

      Ganti dengan endpoint broker Kafka.

      NILAI-ACKS-ANDA

      Ganti dengan jumlah replika partisi yang harus menerima pesan sebelum produser menentukan bahwa pesan telah ditulis ke topik. Untuk informasi lebih lanjut, lihat acks.

      UKURAN-MEMORI-BUFFER-ANDA

      Ganti dengan ukuran buffer produser. Unit: byte.

      JUMLAH-PERCUBAAN-ULANG-ANDA

      Ganti dengan jumlah maksimum percobaan ulang yang diizinkan setelah pesan gagal dikirim.

      TIPE-KOMPRESI-ANDA

      Ganti dengan tipe kompresi yang dapat digunakan oleh produser untuk menghasilkan data. Nilai valid: none, gzip, snappy, lz4, dan zstd.

      Catatan

      Anda juga dapat mengonfigurasi parameter yang didukung oleh klien Apache Kafka. Untuk informasi lebih lanjut, lihat Apache Kafka.

  3. Klik Save.

  4. Mulai penyebaran.

Konfigurasikan ekspor log untuk semua pekerjaan

Bagian ini menjelaskan cara mengonfigurasi pengaturan ekspor log yang berlaku untuk semua pekerjaan dalam namespace.

Penting
  • Setelah Anda mengonfigurasi pengaturan ini, log dari semua pekerjaan dalam namespace akan dikirim ke OSS, SLS, atau Kafka.

  • Untuk memastikan aplikasi Flink Anda dapat memuat plugin logging KafkaAppender, tentukan jalurnya secara eksplisit:

    1. Pergi ke O&M > Configurations.

    2. Di bidang Other Configuration, salin dan tempel potongan kode berikut.

    plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
  1. Pergi ke halaman konfigurasi namespace.

    1. Masuk ke Konsol Manajemen Realtime Compute for Apache Flink.

    2. Klik Console di kolom Actions ruang kerja target.

    3. Di bilah navigasi atas konsol pengembangan, pilih namespace target.

    4. Di panel navigasi sisi kiri, klik O&M > Configurations.

    5. Pada tab Deployment Defaults, pilih Defaults for Stream atau Defaults for Batch.

    6. Di bagian Logging, atur Logging Profile ke Custom Template.

  2. Konfigurasikan pengaturan ekspor log.

    Untuk kode lengkapnya, lihat bagian Via XML dari Konfigurasikan Ekspor Log untuk Pekerjaan Tunggal.

  3. Klik Save Changes.

Konfigurasikan aturan filter log berbeda berdasarkan level log

Anda dapat mengonfigurasi aturan filter log berbeda untuk appender berdasarkan level log menggunakan ThresholdFilter dari Log4j2. Aturan filter log memberikan manfaat sebagai berikut:

  • Fleksibilitas: Mengirim log pada level berbeda ke sistem penyimpanan yang berbeda.

  • Efisiensi: Mengurangi parsing log dan transmisi yang tidak perlu, sehingga meningkatkan kinerja sistem.

  • Pengelolaan Mudah: Memfasilitasi pengelolaan log bertingkat.

Prosedur

  1. Di bagian Logging, atur Logging Profile ke Custom Template.

  2. Konfigurasikan pengaturan ekspor log.

    Potongan kode berikut mencetak log pada level INFO atau lebih tinggi ke konsol pengembangan Realtime Compute for Apache Flink, serta mengekspor log pada level ERROR atau lebih tinggi ke SLS:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Appender Console dikonfigurasi untuk hanya menampilkan log level INFO -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- Appender RollingFile (tidak ada filter yang ditunjukkan, mencatat semua level karena Root level adalah INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Konfigurasi tetap tidak berubah -->
          <!-- ... -->
        </Appender>
        
        <!-- Appender SLS dikonfigurasi untuk hanya menampilkan log level ERROR atau lebih tinggi -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- Properti spesifik SLS -->
          <Property name="namespace">NAMESPACE_ANDA</Property>
          <Property name="project">PROYEK_SLS_ANDA</Property>
          <Property name="logStore">LOGSTORE_SLS_ANDA</Property>
          <Property name="endpoint">ENDPOINT_SLS_ANDA</Property>
          <!-- Kredensial akses dan properti lainnya -->
          <!-- ... -->
        </Appender>
    
        <!-- Definisi Appender lainnya tetap tidak berubah -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Konfigurasikan logger secara langsung untuk StdOut dan SLS dengan level tertentu -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Definisi Logger lainnya dengan konfigurasi spesifik mereka -->
        <!-- ... -->
    
        <!-- Logger Root tanpa AppenderRef spesifik untuk SLS dan StdOut, untuk menghindari pencatatan duplikat -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Hilangkan SLS dari Root untuk mencegah pencatatan duplikat jika logger spesifik dikonfigurasi untuk log tersebut -->
        </Root>
      </Loggers>
    </Configuration>

    Parameter dalam kode contoh sebelumnya:

    • Appender Console: Sebuah ThresholdFilter digunakan untuk memastikan bahwa log pada level INFO atau lebih tinggi dicetak ke konsol pengembangan.

    • Appender SLS: Sebuah ThresholdFilter digunakan untuk memastikan bahwa log pada level ERROR atau lebih tinggi diekspor ke SLS. Untuk informasi tentang properti spesifik Appender SLS, lihat Ekspor Log ke SLS. Ganti placeholder seperti NAMESPACE_ANDA dan PROYEK_SLS_ANDA dengan nilai aktual proyek SLS Anda.

      Catatan

      Jika appender kustom digunakan sebagai Appender SLS dan jenisnya bukan SLS, pastikan bahwa jenis appender kustom tersebut didukung oleh Realtime Compute for Apache Flink dan memiliki logika yang diperlukan untuk terhubung ke Layanan Log Sederhana.

    • StdOutLogger dan SLSLogger: Mereka mengirim log ke Appender StdOut dan Appender SLS masing-masing, berdasarkan level log yang Anda tentukan.

    • Logger Root: Appender StdOut dan RollingFile dikonfigurasikan untuk logger root, tetapi Appender SLS tidak termasuk. Ini menghindari pengiriman log duplikat ke SLS jika logger spesifik dikonfigurasikan untuk log tersebut.

    Untuk informasi lebih lanjut tentang operasi terkait dan parameter Log4j, lihat Apache Log4j.

Referensi