Selain melihat log pekerjaan secara langsung di halaman Job Exploration pada Konsol Flink, Anda dapat mengekspor log pekerjaan ke penyimpanan eksternal seperti Object Storage Service (OSS), Simple Log Service (SLS), atau Kafka. Anda juga dapat mengonfigurasi tingkat log untuk setiap tujuan output. Topik ini menjelaskan cara mengonfigurasi output log pekerjaan. Setelah konfigurasi selesai, Anda dapat melihat log pekerjaan di penyimpanan yang ditentukan.
Perhatian
Setelah Anda mengonfigurasi output log ke OSS, SLS, atau Kafka, Anda harus melakukan restart pekerjaan.
Jika Anda mengonfigurasi pengiriman log ke penyimpanan eksternal dan tidak menonaktifkan fitur pengarsipan log, OSS atau penyimpanan fully managed yang dikonfigurasi saat Anda membeli ruang kerja akan terus menyimpan log. Jika Anda menonaktifkan fitur tersebut, Anda tidak lagi dapat melihat log pekerjaan di halaman Konsol Flink.

Anda dapat menggunakan variabel proyek dalam format
${secret_values.xxxx}pada konfigurasi log. Untuk informasi selengkapnya, lihat Variabel proyek.
Konfigurasikan output log untuk satu pekerjaan
Gunakan UI
Buka halaman konfigurasi output log untuk satu pekerjaan.
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Actions di kolom Console ruang kerja target.
Di panel navigasi sebelah kiri, klik , lalu klik nama pekerjaan target.
Pada tab Deployment Details, klik Edit di pojok kanan atas bagian Logging.
Untuk Log Template, pilih Custom Template.
Klik .
Klik Add Appender dan pilih sistem penyimpanan target.
Konfigurasikan informasi output log untuk penyimpanan target.
Untuk mengirimkan log dengan tingkat berbeda ke sistem penyimpanan yang berbeda, lihat Konfigurasikan output terpisah untuk tingkat log berbeda guna mengatur aturan pemfilteran tingkat log yang berbeda untuk appenders.
Konfigurasi untuk SLS

Parameter
Deskripsi
name
Nama kustom appender.
type
Jenis channel output. Nilainya tetap SLS. Anda tidak perlu mengubah nilai ini.
pattern
Format output log.
Nilai default adalah
%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n, yang menghasilkan konten log seperti2024-10-01 14:23:45,678{GMT+8} INFO com.example.MyClass - This is a test log message.flushIntervalSeconds
Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.
flushIntervalEventCount
Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.
CatatanJika dikonfigurasi bersama flushIntervalSeconds, kondisi pertama yang mencapai nilai yang ditetapkan akan memicu operasi penulisan.
authenticationMode
AccessKey
SLS Token
CatatanJika Anda memilih mode ini, log hanya dapat dikirimkan ke Logstore yang berada di wilayah yang sama dengan ruang kerja Flink. Mode ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi lebih baru.
project
Nama proyek.
logStore
Nama Logstore.
endpoint
Titik akhir pribadi proyek SLS Anda di wilayah tersebut. Untuk informasi selengkapnya, lihat Endpoints.
accessKeyId
Ganti placeholder dengan ID AccessKey dan Rahasia AccessKey akun layanan untuk SLS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.
Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.
CatatanJika proyek SLS dan layanan Flink berada di akun yang berbeda, Anda harus memberikan izin kepada akun Flink untuk menulis data ke SLS. Untuk informasi selengkapnya, lihat Buat kebijakan kustom. Kode berikut menunjukkan isi kebijakan:
accessKeySecret
Konfigurasi untuk OSS

Parameter
Deskripsi
name
Nama kustom appender.
type
Jenis channel output. Nilainya tetap OSS. Anda tidak perlu mengubah nilai ini.
pattern
Format output log.
Nilai default adalah
%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.baseUri
Masukkan hanya nama Bucket OSS.
endpoint
Titik akhir layanan OSS di wilayah tersebut. Untuk informasi selengkapnya, lihat Wilayah dan titik akhir.
Endpoint adalah informasi Endpoint (Region Node) pada baris yang sesuai dengan ECS VPC Network Access (Private Network).
accessKeyId
ID AccessKey dan Rahasia AccessKey akun layanan untuk OSS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.
Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.
CatatanParameter ini hanya diperlukan jika Anda mengonfigurasi output log ke bucket OSS yang dimiliki oleh Akun Alibaba Cloud yang berbeda. Jika bucket berada di akun yang sama, Anda tidak perlu menentukan parameter ini. Hapus parameter ini.
secretAccessKey
flushIntervalSeconds
Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.
flushIntervalEventCount
Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.
CatatanJika dikonfigurasi bersamaan dengan `flushIntervalSeconds`, operasi penulisan akan dilakukan segera setelah salah satu kondisi mencapai nilai yang ditetapkan.
rollingBytes
Ukuran file log tunggal di OSS. Setelah ukuran maksimum tercapai, data selanjutnya akan ditulis ke file log baru.
Konfigurasi untuk Kafka
CatatanKluster Kafka yang mengaktifkan otentikasi Kerberos tidak didukung.
Prasyarat
Plugin log KafkaAppender yang disediakan oleh Realtime Compute for Apache Flink dimuat oleh class loader plugin Flink. Sebelum menggunakan plugin ini, Anda harus secara eksplisit menentukan path paket plugin log KafkaAppender agar aplikasi Flink dapat memuatnya. Untuk mengonfigurasi plugin untuk satu pekerjaan—yang hanya berlaku untuk pekerjaan saat ini—lakukan langkah-langkah berikut:
Pada halaman Job O&M, klik nama pekerjaan target. Pada tab Deployment Details, di bagian Running Parameter Settings, tambahkan kode berikut ke Other Configurations.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appenderAntarmuka konfigurasi

Parameter
Deskripsi
name
Nama kustom appender.
type
Jenis channel output. Nilainya tetap KafkaVVP. Anda tidak perlu mengubah nilai ini.
pattern
Format output log.
Nilai default adalah
%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n.bootstrap.servers
Alamat broker Kafka tempat log ditulis.
acks
Jumlah replika partisi yang harus menerima pesan sebelum produsen menganggap penulisan berhasil. Untuk informasi selengkapnya, lihat acks.
buffer.memory
Ukuran buffer produsen. Satuan: byte.
retries
Jumlah percobaan ulang setelah kegagalan pengiriman.
compression.type
Jenis kompresi yang dapat digunakan produsen saat menghasilkan data. Nilai yang valid meliputi none, gzip, snappy, lz4, dan zstd.
Klik Save.
Klik Start di bagian atas halaman.
Gunakan XML
Buka halaman konfigurasi output log untuk satu pekerjaan.
Masuk ke Konsol Realtime Compute for Apache Flink.
Temukan ruang kerja target dan klik Console di kolom Actions.
Di panel navigasi sebelah kiri, klik , lalu klik nama pekerjaan target.
Pada tab Configuration, di bagian Logging, klik Edit di pojok kanan atas.
Atur Logging Profile menjadi Custom Template.
Konfigurasikan informasi output log.
Berdasarkan penyimpanan target, salin konfigurasi yang sesuai dan tempelkan ke kotak input. Kemudian perbarui nilai parameter agar sesuai dengan konfigurasi penyimpanan Anda. Untuk mengirimkan log dengan tingkat berbeda ke sistem penyimpanan yang berbeda, lihat Konfigurasikan output terpisah untuk tingkat log berbeda guna mengatur aturan pemfilteran tingkat log yang berbeda untuk appenders.
Konfigurasi untuk OSS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender name="OSS" type="OSS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property> <Property name="endpoint">https://YOUR-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> <Property name="rollingBytes">10485760</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="OSS"/> </Root> </Loggers> </Configuration>Parameter
Deskripsi
YOUR-BUCKET-NAME
Ganti dengan nama Bucket OSS Anda.
YOUR-ENDPOINT
Ganti dengan endpoint OSS Anda. Untuk informasi selengkapnya, lihat Wilayah dan titik akhir.
Endpoint adalah nilai pada kolom Endpoint (Region) pada baris VPC Access From ECS (internal Network).
YOUR-OSS-ACCESSKEYID
Ganti dengan ID AccessKey dan Rahasia AccessKey akun layanan untuk OSS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.
Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.
CatatanParameter ini hanya diperlukan jika Anda mengonfigurasi output log ke bucket OSS yang dimiliki oleh Akun Alibaba Cloud yang berbeda. Jika bucket berada di akun yang sama, Anda tidak perlu menentukan parameter ini. Hapus parameter ini.
YOUR-OSS-ACCESSKEYSECRET
flushIntervalSeconds
Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.
flushIntervalEventCount
Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.
CatatanJika dikonfigurasi bersama flushIntervalSeconds, pengaturan mana pun yang mencapai nilai yang dikonfigurasi terlebih dahulu akan memicu penulisan.
rollingBytes
Ukuran file log tunggal di OSS. Setelah ukuran maksimum tercapai, data selanjutnya akan ditulis ke file log baru.
Konfigurasi untuk SLS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="5 MB"/> </Policies> <DefaultRolloverStrategy max="1"/> </Appender> <Appender name="SLS" type="SLS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="project">YOUR-SLS-PROJECT</Property> <Property name="logStore">YOUR-SLS-LOGSTORE</Property> <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property> <Property name="deploymentName">{{ deploymentName }}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="SLS"/> </Root> </Loggers> </Configuration>CatatanNamespace, deploymentId, jobId, dan deploymentName dalam kode tersebut adalah variabel Twig. Jangan mengubahnya. Jika tidak, pekerjaan gagal dimulai.
Parameter
Deskripsi
YOUR-SLS-PROJECT
Ganti dengan nama proyek SLS Anda.
YOUR-SLS-LOGSTORE
Ganti dengan nama Logstore SLS Anda.
YOUR-SLS-ENDPOINT
Ganti dengan titik akhir pribadi proyek SLS Anda di wilayah tersebut. Untuk informasi selengkapnya, lihat Endpoints.
YOUR-SLS-ACCESSKEYID
Ganti dengan ID AccessKey dan Rahasia AccessKey akun layanan untuk SLS. Untuk informasi tentang cara mendapatkan Pasangan Kunci Akses, lihat Dapatkan Pasangan Kunci Akses.
Untuk menghindari risiko keamanan akibat Pasangan Kunci Akses dalam teks biasa, contoh ini menggunakan variabel untuk menentukan Pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.
CatatanJika proyek SLS dan layanan Flink berada di akun yang berbeda, Anda harus memberikan izin kepada akun Flink untuk menulis data ke SLS. Untuk informasi selengkapnya, lihat Buat kebijakan kustom. Kode berikut menunjukkan isi kebijakan:
Tidak ada batasan pada cakupan SLS
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:Get*", "log:PostLogStoreLogs" ], "Resource": "*" } ] }Tentukan cakupan resource SLS. Kode berikut memberikan contoh.
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:PostLogStoreLogs", "log:GetLogStore" ], "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest" } ] }
YOUR-SLS-ACCESSKEYSECRET
flushIntervalSeconds
Interval sinkronisasi log ke penyimpanan. Ini adalah interval penulisan data log. Satuan: detik.
flushIntervalEventCount
Jumlah entri log yang dikumpulkan sebelum ditulis ke penyimpanan sekaligus.
CatatanJika digunakan bersama flushIntervalSeconds, pengaturan yang mencapai nilai yang dikonfigurasi terlebih dahulu akan memicu penulisan data tunggal.
Konfigurasi untuk Kafka
CatatanKluster Kafka yang mengaktifkan otentikasi Kerberos tidak didukung.
Prasyarat
Plugin log KafkaAppender yang disediakan oleh Realtime Compute for Apache Flink dimuat oleh class loader plugin Flink. Sebelum menggunakan plugin ini, Anda harus secara eksplisit menentukan path paket plugin log KafkaAppender agar aplikasi Flink dapat memuatnya. Anda dapat menggunakan salah satu metode berikut:
Konfigurasikan templat pekerjaan (berlaku untuk semua pekerjaan dalam proyek)
Pada halaman Configuration Management di Konsol pengembangan Flink, di bagian Other Configurations, tambahkan kode berikut.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appenderKonfigurasikan satu pekerjaan (hanya berlaku untuk pekerjaan saat ini)
Pada halaman Job O&M, klik nama pekerjaan target. Di bagian Additional Settings area Runtime Parameter Settings pada tab Deployment Details, tambahkan kode berikut.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
Konfigurasi log
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME"> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/> <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property> <Property name="acks">YOUR-ACKS-VALUE</Property> <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property> <Property name="retries">YOUR-RETRIES-NUMBER</Property> <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property> </Appender> <Appender type="Async" name="AsyncAppender"> <AppenderRef ref="KafkaVVPAppender"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="AsyncAppender"/> </Root> </Loggers> </Configuration>Parameter
Deskripsi
YOUR-TOPIC-NAME
Nama topik Kafka tempat log ditulis.
YOUR-KAFKA-BOOTSTRAP-SERVERS
Alamat broker Kafka tempat log ditulis.
YOUR-ACKS-VALUE
Jumlah replika partisi yang harus menerima pesan sebelum produsen menganggap penulisan berhasil. Untuk informasi selengkapnya, lihat acks.
YOUR-BUFFER-MEMORY-SIZE
Ukuran buffer produsen. Satuan: byte.
YOUR-RETRIES-NUMBER
Jumlah percobaan ulang setelah kegagalan pengiriman.
YOUR-COMPRESSION-TYPE
Jenis kompresi yang dapat digunakan produsen saat menghasilkan data. Nilai yang valid meliputi none, gzip, snappy, lz4, dan zstd.
CatatanAnda juga dapat mengatur semua parameter konfigurasi yang didukung oleh klien Apache Kafka. Untuk informasi selengkapnya, lihat Apache Kafka.
Klik Save.
Klik Start di bagian atas halaman.
Konfigurasikan channel output log untuk semua pekerjaan dalam proyek
Anda dapat mengonfigurasi templat untuk mengatur output log default ke OSS, SLS, atau Kafka untuk semua pekerjaan dalam proyek.
Setelah menerapkan konfigurasi ini, log semua pekerjaan yang dibuat selanjutnya dalam proyek akan disimpan di OSS, SLS, atau Kafka.
Plugin logging KafkaAppender yang disediakan oleh Realtime Compute for Apache Flink dimuat menggunakan class loader plugin Flink. Sebelum menggunakan plugin ini, Anda harus secara eksplisit menentukan path paket tempat plugin logging KafkaAppender berada agar aplikasi Flink Anda dapat memuat plugin tersebut dengan sukses. Oleh karena itu, Anda perlu menambahkan kode berikut di halaman Configuration Management pada Konsol pengembangan Realtime Compute for Apache Flink di bawah Other Configuration.
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
Buka halaman konfigurasi templat log pekerjaan.
Masuk ke Konsol manajemen Realtime Compute for Apache Flink.
Klik Actions di kolom Console ruang kerja target, lalu pilih Project target dari bagian atas halaman Real-time Computing Development Console.
Di panel navigasi sebelah kiri, klik .
Pada tab Deployment Defaults, pilih jenis pekerjaan.
Di bagian Logging, pilih Log Template sebagai Custom Template.
Konfigurasikan channel output log untuk semua pekerjaan dalam proyek.
Untuk informasi selengkapnya tentang kode, lihat Konfigurasikan output log untuk satu pekerjaan (Gunakan XML).
Klik Save Changes.
Konfigurasikan output terpisah untuk tingkat log berbeda
Anda dapat menggunakan ThresholdFilter log4j2 untuk mengonfigurasi aturan pemfilteran tingkat log yang berbeda untuk appenders yang berbeda. Manfaat konfigurasi ini adalah:
Fleksibilitas: Anda dapat mengatur tingkat log yang berbeda untuk sistem penyimpanan eksternal yang berbeda sesuai kebutuhan.
Efisiensi: Mengurangi pemrosesan dan transmisi log yang tidak perlu serta meningkatkan kinerja sistem.
Kejelasan: Konfigurasi terpisah membuat alur log lebih jelas dan manajemen tingkat lebih mudah.
Lakukan langkah-langkah berikut untuk mengonfigurasi pengaturan:
Di bagian Logging, atur Logging Profile menjadi Custom Template.
Konfigurasikan informasi output log.
Topik ini menggunakan contoh di mana log tingkat INFO dan lebih tinggi dioutput ke Konsol pengembangan Flink, dan hanya log tingkat ERROR dan lebih tinggi yang dioutput ke SLS. Kode berikut memberikan contoh konfigurasi.
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN"> <Appenders> <!-- Console Appender configured to output only INFO level logs --> <Appender name="StdOut" type="Console"> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/> </Appender> <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) --> <Appender name="RollingFile" type="RollingFile"> <!-- Configuration remains unchanged --> <!-- ... --> </Appender> <!-- SLS Appender configured to output only ERROR level and above logs --> <Appender name="SLS" type="SLS"> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/> <!-- SLS specific properties --> <Property name="namespace">YOUR_NAMESPACE</Property> <Property name="project">YOUR_SLS_PROJECT</Property> <Property name="logStore">YOUR_SLS_LOGSTORE</Property> <Property name="endpoint">YOUR_SLS_ENDPOINT</Property> <!-- Access credentials and other properties --> <!-- ... --> </Appender> <!-- Other Appenders definitions remain unchanged --> <!-- ... --> </Appenders> <Loggers> <!-- Directly configure loggers for StdOut and SLS with specific levels --> <Logger name="StdOutLogger" level="INFO" additivity="false"> <AppenderRef ref="StdOut"/> </Logger> <Logger name="SLSLogger" level="ERROR" additivity="false"> <AppenderRef ref="SLS"/> </Logger> <!-- Other Loggers definitions with their specific configurations --> <!-- ... --> <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging --> <Root level="INFO"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers --> </Root> </Loggers> </Configuration>Dalam konfigurasi ini:
Console Appender: ThresholdFilter memastikan bahwa log tingkat INFO dan lebih tinggi dioutput ke Konsol pengembangan Flink.
SLS Appender: ThresholdFilter memastikan bahwa hanya log tingkat ERROR dan lebih tinggi yang dikirimkan ke SLS. Untuk informasi selengkapnya tentang properti SLS Appender, lihat Konfigurasi untuk SLS. Ganti
YOUR_NAMESPACE,YOUR_SLS_PROJECT, dan placeholder lainnya dengan informasi proyek SLS aktual Anda.CatatanJika SLS Appender adalah appender kustom dan jenisnya bukan SLS, pastikan jenis yang benar digunakan dan kelas appender memiliki logika yang diperlukan untuk terhubung ke SLS.
StdOutLogger dan SLSLogger: Masing-masing hanya mengirimkan log ke StdOut Appender dan SLS Appender, dengan pembatasan tingkat log yang berbeda.
Root Logger: StdOut Appender dan RollingFile Appender dikonfigurasi, tetapi SLS Appender tidak disertakan. Hal ini menghindari pengiriman log duplikat ke SLS ketika logger spesifik sudah dikonfigurasi.
Untuk informasi selengkapnya tentang operasi dan parameter konfigurasi log4j, lihat Apache Log4j.
Referensi
Pekerjaan gagal dimulai setelah Anda mengonfigurasi output log ke SLS
Untuk melihat log pekerjaan, lihat Lihat log startup dan operasional.
Jika pekerjaan gagal dimulai atau terjadi pengecualian selama runtime, Anda dapat melihat log pengecualian. Untuk informasi selengkapnya, lihat Lihat log pengecualian.
Jika log tingkat INFO tidak memenuhi kebutuhan troubleshooting Anda, Anda dapat mengubah tingkat log menjadi DEBUG. Untuk informasi selengkapnya, lihat Ubah dinamis tingkat log pekerjaan yang sedang berjalan.
Anda dapat menggunakan ActionTrail untuk melihat event audit Flink. Untuk informasi selengkapnya, lihat Lihat event audit Flink.