Simple Log Service menyediakan Flink Log Connector untuk terhubung dengan Flink. Open source Flink dan Realtime Compute for Apache Flink didukung. Topik ini menjelaskan cara menghubungkan Flink ke Simple Log Service untuk mengonsumsi data log.
Prasyarat
Sebuah Proyek dan penyimpanan log telah dibuat. Untuk informasi lebih lanjut, lihat Buat sebuah proyek dan Buat sebuah penyimpanan log.
Pengguna Resource Access Management (RAM) telah dibuat, dan izin yang diperlukan telah diberikan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Buat pengguna RAM dan berikan izin kepada pengguna RAM.
Variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan variabel lingkungan di Linux, macOS, dan Windows.
PentingPasangan AccessKey dari akun Alibaba Cloud memiliki izin untuk semua operasi API. Kami menyarankan Anda menggunakan pasangan AccessKey dari pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin O&M.
Kami menyarankan Anda untuk tidak menyimpan ID AccessKey atau rahasia AccessKey dalam kode proyek Anda. Jika tidak, pasangan AccessKey mungkin bocor, dan keamanan semua sumber daya dalam akun Anda mungkin terganggu.
Informasi latar belakang
Flink Log Connector terdiri dari Flink Log Consumer dan Flink Log Producer. Berikut adalah perbedaan antara keduanya:
Flink Log Consumer membaca data dari Simple Log Service. Flink Log Consumer mendukung semantik exactly-once dan load balancing di antara shard.
Flink Log Producer menulis data ke Simple Log Service.
Sebelum menggunakan Flink Log Connector, Anda harus menambahkan dependensi Maven ke proyek Anda. Contoh kode:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.38</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>Anda dapat merujuk pada kode sumber di GitHub untuk menulis kode dalam bahasa pemrograman lainnya. Untuk informasi lebih lanjut, kunjungi aliyun-log-flink-connector.
Flink Log Consumer
Flink Log Consumer dapat mengonsumsi data log dari penyimpanan log. Semantik exactly-once diterapkan selama konsumsi log. Flink Log Consumer mendeteksi perubahan jumlah shard dalam penyimpanan log.
Setiap subtask Flink mengonsumsi data dari beberapa shard dalam penyimpanan log. Jika shard dalam penyimpanan log dibagi atau digabungkan, shard yang dikonsumsi oleh subtask juga berubah.
Saat menggunakan Flink Log Consumer untuk mengonsumsi data dari Simple Log Service, Anda dapat memanggil operasi API berikut:
GetCursorOrData
Anda dapat memanggil operasi ini untuk menarik data log dari shard. Jika Anda sering memanggil operasi ini, lalu lintas data mungkin melebihi kemampuan shard. Anda dapat menggunakan parameter ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS untuk mengontrol interval panggilan API. Anda dapat menggunakan parameter ConfigConstants.LOG_MAX_NUMBER_PER_FETCH untuk mengontrol jumlah log yang ditarik oleh setiap panggilan API. Untuk informasi lebih lanjut tentang kemampuan shard, lihat Shard.
Contoh:
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");ListShards
Anda dapat memanggil operasi ini untuk melihat semua shard dalam penyimpanan log dan status setiap shard. Jika shard sering dibagi dan digabungkan, Anda dapat menyesuaikan interval panggilan untuk mendeteksi perubahan jumlah shard secara tepat waktu. Contoh:
// Panggil operasi ListShards sekali setiap 30 detik. configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");CreateConsumerGroup
Anda dapat memanggil operasi ini untuk membuat grup konsumen untuk menyinkronkan checkpoint.
UpdateCheckPoint
Anda dapat memanggil operasi ini untuk menyinkronkan snapshot Flink ke grup konsumen.
Konfigurasikan parameter startup.
Kode berikut memberikan contoh cara mengonsumsi data. Kelas java.util.Properties digunakan sebagai alat konfigurasi, dan konfigurasi Flink Log Consumer disertakan dalam kelas ConfigConstants.
Properties configProps = new Properties(); // Tentukan titik akhir Simple Log Service. configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com"); // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan. String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret); // Tentukan proyek. String project = "your-project"; // Tentukan penyimpanan log. String logstore = "your-logstore"; // Tentukan posisi awal konsumsi log. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); // Tentukan metode yang ingin Anda gunakan untuk deserialisasi data. FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<FastLogGroupList> dataStream = env.addSource( new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps) ); dataStream.addSink(new SinkFunction<FastLogGroupList>() { @Override public void invoke(FastLogGroupList logGroupList, Context context) throws Exception { for (FastLogGroup logGroup : logGroupList.getLogGroups()) { int logsCount = logGroup.getLogsCount(); String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (int i = 0; i < logsCount; ++i) { FastLog row = logGroup.getLogs(i); for (int j = 0; j < row.getContentsCount(); ++j) { FastLogContent column = row.getContents(j); // Proses log. System.out.println(column.getKey()); System.out.println(column.getValue()); } } } } }); // Anda juga dapat menggunakan RawLogGroupListDeserializer. RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource( new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps) ); rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() { @Override public void invoke(RawLogGroupList logGroupList, Context context) throws Exception { for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) { String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (RawLog row : logGroup.getLogs()) { // Proses log. } } } });CatatanJumlah subtask Flink tidak bergantung pada jumlah shard dalam penyimpanan log. Jika jumlah shard lebih besar dari jumlah subtask, setiap subtask mengonsumsi data dari satu atau lebih shard. Jika jumlah shard lebih kecil dari jumlah subtask, beberapa subtask akan idle hingga shard baru dihasilkan. Data setiap shard hanya dapat dikonsumsi oleh satu subtask.
Tentukan posisi awal konsumsi log.
Saat menggunakan Flink Log Consumer untuk mengonsumsi data dari penyimpanan log, Anda dapat menggunakan parameter ConfigConstants.LOG_CONSUMER_BEGIN_POSITION untuk menentukan posisi awal konsumsi log. Anda dapat mulai mengonsumsi data dari log paling awal, log terbaru, atau titik waktu tertentu. Selain itu, Flink Log Consumer memungkinkan Anda melanjutkan konsumsi dari grup konsumen tertentu. Anda dapat mengatur parameter ke salah satu nilai berikut:
Consts.LOG_BEGIN_CURSOR: mulai mengonsumsi data dari log paling awal.
Consts.LOG_END_CURSOR: mulai mengonsumsi data dari log terbaru.
Consts.LOG_FROM_CHECKPOINT: mulai mengonsumsi data dari checkpoint yang disimpan dalam grup konsumen tertentu. Anda dapat menggunakan parameter ConfigConstants.LOG_CONSUMERGROUP untuk menentukan grup konsumen.
UnixTimestamp: mulai mengonsumsi data dari titik waktu tertentu. Nilai tersebut adalah timestamp UNIX yang mewakili jumlah detik yang telah berlalu sejak 1 Januari 1970, 00:00:00 UTC. Anda harus menentukan string tipe data INTEGER.
Contoh:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);CatatanJika Anda telah mengonfigurasi pemulihan konsumsi dari backend status Flink saat memulai tugas Flink, Flink Log Connector menggunakan checkpoint yang disimpan dalam backend status.
Opsional:Konfigurasikan pemantauan kemajuan konsumsi.
Flink Log Consumer memungkinkan Anda memantau kemajuan konsumsi. Anda dapat memperoleh posisi konsumsi real-time dari setiap shard. Posisi konsumsi ditunjukkan oleh timestamp. Untuk informasi lebih lanjut, lihat Langkah 3: Lihat status grup konsumen.
Contoh:
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "nama grup konsumen Anda");CatatanLangkah ini bersifat opsional. Jika tidak ada grup konsumen yang ada setelah Anda mengonfigurasi pemantauan kemajuan konsumsi, Flink Log Consumer membuat grup konsumen. Jika grup konsumen sudah ada, Anda tidak perlu melakukan operasi apa pun, dan snapshot di Flink Log Consumer secara otomatis disinkronkan ke grup konsumen. Anda dapat melihat kemajuan konsumsi Flink Log Consumer di konsol Simple Log Service.
Konfigurasikan pemulihan konsumsi dan semantik exactly-once.
Jika fitur checkpointing Flink diaktifkan, Flink Log Consumer secara berkala menyimpan kemajuan konsumsi setiap shard. Jika subtask gagal, Flink memulihkan subtask dan mulai mengonsumsi data dari checkpoint terbaru.
Interval penyimpanan checkpoint menentukan jumlah maksimum data yang akan dikonsumsi jika subtask gagal. Anda dapat menggunakan kode berikut untuk mengonfigurasi interval:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Konfigurasikan semantik exactly-once. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Simpan checkpoint setiap 5 detik. env.enableCheckpointing(5000);Untuk informasi lebih lanjut tentang checkpoint Flink, lihat Checkpoint.
Flink Log Producer
Flink Log Producer menulis data ke Simple Log Service.
Flink Log Producer hanya mendukung semantik Flink at-least-once. Jika tugas gagal, beberapa data yang ditulis ke Simple Log Service mungkin diduplikasi. Namun, tidak ada data yang hilang.
Saat menggunakan Flink Log Producer untuk menulis data ke Simple Log Service, Anda dapat memanggil operasi API berikut:
PutLogs
ListShards
Inisialisasi Flink Log Producer.
Inisialisasi parameter Properties.
Flink Log Producer diinisialisasi dengan cara yang sama seperti Flink Log Consumer. Contoh berikut menunjukkan cara mengonfigurasi parameter inisialisasi Flink Log Producer. Anda dapat menggunakan nilai default parameter atau menentukan nilai kustom untuk memenuhi kebutuhan bisnis Anda. Contoh:
// Jumlah thread I/O yang digunakan untuk mengirim data. Nilai default adalah jumlah core. ConfigConstants.IO_THREAD_NUM // Waktu maksimum selama log dapat disimpan sebelum dikirim. Nilai default: 2000. Unit: milidetik. ConfigConstants.FLUSH_INTERVAL_MS // Ukuran total memori yang dapat digunakan oleh tugas. Nilai default: 100. Unit: MB. ConfigConstants.TOTAL_SIZE_IN_BYTES // Waktu maksimum pemblokiran untuk mengirim log ketika penggunaan memori mencapai batas atas. Nilai default: 60000. Unit: milidetik. ConfigConstants.MAX_BLOCK_TIME_MS // Jumlah maksimum percobaan ulang. Nilai default: 10. ConfigConstants.MAX_RETRIESMuat ulang LogSerializationSchema dan tentukan metode yang digunakan untuk serialisasi data menjadi grup log mentah.
Grup log mentah adalah kumpulan log. Untuk informasi lebih lanjut tentang field log, lihat Log.
Jika Anda ingin menulis data ke shard tertentu, Anda dapat menggunakan parameter LogPartitioner untuk menghasilkan kunci hash untuk data log. Parameter ini bersifat opsional. Jika Anda tidak mengonfigurasi parameter ini, data ditulis ke shard acak.
Contoh:
FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { // Hasilkan kunci hash 32-bit. public String getHashKey(String element) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); String hash = new BigInteger(1, md.digest()).toString(16); while(hash.length() < 32) hash = "0" + hash; return hash; } catch (NoSuchAlgorithmException e) { } return "0000000000000000000000000000000000000000000000000000000000000000"; } });
Tulis hasil simulasi ke Simple Log Service dalam format string. Contoh:
// Serialisasi data menjadi grup log mentah. class SimpleLogSerializer implements LogSerializationSchema<String> { public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan. public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // Tentukan titik akhir Simple Log Service. configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); // Tentukan ID AccessKey dan rahasia AccessKey akun Anda. configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); // Tentukan proyek tempat Anda ingin menulis log. configProps.put(ConfigConstants.LOG_PROJECT, sProject); // Tentukan penyimpanan log tempat Anda ingin menulis log. configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } // Simulasikan pembuatan log. public static class EventsGenerator implements SourceFunction<String> { private boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
Contoh konsumsi
Dalam contoh ini, Flink Log Consumer menyimpan data yang dibaca ke aliran data dalam format FastLogGroupList. Kemudian, Flink Log Consumer menggunakan fungsi flatMap untuk mengonversi data dari format FastLogGroupList ke format string JSON dan menampilkan output di CLI atau menulis output ke file teks.
package com.aliyun.openservices.log.flink.sample;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkConsumerSample {
private static final String SLS_ENDPOINT = "your-endpoint";
// Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static final String SLS_PROJECT = "your-project";
private static final String SLS_LOGSTORE = "your-logstore";
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Configuration conf = new Configuration();
// Direktori checkpoint seperti "file:///tmp/flink"
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
Properties configProps = new Properties();
configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");
FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
DataStream<FastLogGroupList> stream = env.addSource(
new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));
stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
for (FastLogGroup logGroup : value.getLogGroups()) {
int logCount = logGroup.getLogsCount();
for (int i = 0; i < logCount; i++) {
FastLog log = logGroup.getLogs(i);
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", logGroup.getTopic());
jsonObject.put("source", logGroup.getSource());
for (int j = 0; j < log.getContentsCount(); j++) {
jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
}
out.collect(jsonObject.toJSONString());
}
}
}).returns(String.class);
stream.writeAsText("log-" + System.nanoTime());
env.execute("Flink consumer");
}
}