API untuk mesin streaming Lindorm sepenuhnya kompatibel dengan API Apache Kafka open source. Anda dapat menggunakan API Apache Kafka untuk menulis data ke mesin streaming Lindorm dari program Anda. Selain itu, alat pihak ketiga open source seperti Fluentd dan Debezium dapat digunakan untuk mengumpulkan dan menulis data ke mesin streaming Lindorm. Topik ini menjelaskan cara menggunakan klien Apache Kafka open source untuk terhubung ke mesin streaming Lindorm dan menulis data ke dalamnya, serta menyediakan beberapa contoh kode.
Prasyarat
Lingkungan Java telah diinstal menggunakan Java Development Kit (JDK) versi 1.7 atau yang lebih baru.
Alamat IP klien telah ditambahkan ke daftar izin instance Lindorm Anda. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Izin.
Nilai Lindorm Stream Kafka Endpoint telah diperoleh. Untuk informasi lebih lanjut, lihat Lihat Titik Akhir.
CatatanLindorm Stream Kafka Endpoint menentukan titik akhir virtual private cloud (VPC) dari mesin streaming Lindorm Anda. Pastikan aplikasi Anda dan instance Lindorm berada di VPC yang sama.
Prosedur
Unduh klien Apache Kafka open source. Tambahkan dependensi Maven ke file pom.xml. Contoh kode berikut disediakan:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>Hubungkan ke mesin streaming Lindorm dan tulis data ke dalamnya. Contoh kode lengkap disediakan:
CatatanData dalam format JSON, Avro, atau CSV dapat ditulis ke mesin streaming Lindorm.
Nilai Lindorm Stream Kafka Endpoint dalam contoh kode adalah titik akhir VPC. Untuk informasi tentang cara mendapatkan titik akhir, lihat Lihat Titik Akhir.
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.codehaus.jettison.json.JSONObject; import java.util.Properties; import java.util.concurrent.Future; public class KafkaToLindormStreamDemo { public static void main(String[] args) { Properties props = new Properties(); // Konfigurasikan Lindorm Stream Kafka Endpoint. Nilai Lindorm Stream Kafka Endpoint adalah titik akhir VPC. Pastikan aplikasi Anda dan instance Lindorm Anda ditempatkan di VPC yang sama. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Lindorm Stream Kafka Endpoint"); // Tentukan topik tempat Anda ingin menyimpan data fisik tabel data streaming Anda. String topic = "log_topic"; props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); try { JSONObject json = new JSONObject(); // Tulis data ke mesin streaming. json.put("timestamp", System.currentTimeMillis()); json.put("loglevel", "ERROR"); json.put("thread", "[ReportFinishedTask7-thread-4]"); json.put("class", "engine.ImporterTaskManager(318)"); json.put("detail", "Remove tasks fail: job name=e35318e5-52ea-48ab-ad2a-0144ffc6955e , task name=prepare_e35318e5-52ea-48ab-ad2a-0144ffc6955e , runningTasks=0"); Future<RecordMetadata> future = producer.send( new ProducerRecord<String, String>(topic, json.getString("thread") + json.getLong("timestamp"), json.toString())); producer.flush(); try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { System.out.println("Produce exception " + t.getMessage()); t.printStackTrace(); } } catch (Exception e) { System.out.println("Produce exception " + e.getMessage()); e.printStackTrace(); } } }