すべてのプロダクト
Search
ドキュメントセンター

:Lindorm ストリーミングエンジンにオープンソースの Apache Kafka クライアントを使用してデータを書き込む

最終更新日:Jan 14, 2025

Lindorm ストリーミングエンジンの API は、オープンソースの Apache Kafka の API と完全に互換性があります。Apache Kafka API を使用すると、プログラムで Lindorm ストリーミングエンジンにデータを書き込むことができます。また、Fluentd や Debezium などのオープンソースのサードパーティツールを使用して、データを収集し、Lindorm ストリーミングエンジンにデータを書き込むこともできます。このトピックでは、オープンソースの Apache Kafka クライアントを使用して Lindorm ストリーミングエンジンに接続する方法と、Lindorm ストリーミングエンジンにデータを書き込む方法について説明します。また、いくつかのサンプルコードも提供します。

前提条件

  • Java Development Kit(JDK)1.7 以降を使用して Java 環境がインストールされていること。

  • クライアントの IP アドレスが Lindorm インスタンスの許可リストに追加されていること。詳細については、「ホワイトリストを構成する」をご参照ください。

  • Lindorm Stream Kafka エンドポイントの値が取得されていること。詳細については、「エンドポイントを表示する」をご参照ください。

    説明

    Lindorm Stream Kafka エンドポイントは、Lindorm ストリーミングエンジンの仮想プライベートクラウド(VPC)エンドポイントを指定します。アプリケーションと Lindorm インスタンスが同じ VPC にデプロイされていることを確認してください。

手順

  1. オープンソースの Apache Kafka クライアントをダウンロードします。pom.xml ファイルに Maven 依存関係を追加します。次のサンプルコードが提供されています。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
  2. Lindorm ストリーミングエンジンに接続し、エンジンにデータを書き込みます。完全なサンプルコードが提供されています。

    説明
    • JSON、Avro、または CSV 形式のデータを Lindorm ストリーミングエンジンに書き込むことができます。

    • サンプルコードの Lindorm Stream Kafka エンドポイントの値は VPC エンドポイントです。エンドポイントの取得方法については、「エンドポイントを表示する」をご参照ください。

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.concurrent.Future;
    
    public class KafkaToLindormStreamDemo {
    
        public static void main(String[] args) {
            Properties props = new Properties();
    
            // Lindorm Stream Kafka エンドポイントを構成します。Lindorm Stream Kafka エンドポイントの値は VPC エンドポイントです。アプリケーションと Lindorm インスタンスが同じ VPC にデプロイされていることを確認してください。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Lindorm Stream Kafka Endpoint");
           // ストリーミングデータテーブルの物理データを格納するトピックを指定します。
            String topic = "log_topic";
    
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            try {
                JSONObject json = new JSONObject();
                // ストリーミングエンジンにデータを書き込みます。
                json.put("timestamp", System.currentTimeMillis());
                json.put("loglevel", "ERROR");
                json.put("thread", "[ReportFinishedTask7-thread-4]");
                json.put("class", "engine.ImporterTaskManager(318)");
                json.put("detail", "Remove tasks fail: job name=e35318e5-52ea-48ab-ad2a-0144ffc6955e , task name=prepare_e35318e5-52ea-48ab-ad2a-0144ffc6955e , runningTasks=0");
                Future<RecordMetadata> future = producer.send(
              new ProducerRecord<String, String>(topic, json.getString("thread") + json.getLong("timestamp"),
                  json.toString()));
                producer.flush();
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    System.out.println("Produce exception " + t.getMessage());
                    t.printStackTrace();
                }
            } catch (Exception e) {
                System.out.println("Produce exception " + e.getMessage());
                e.printStackTrace();
            }
        }
    }