The streaming engine of Lindorm is used to process streaming data. This engine can store streaming data and perform lightweight computing for streaming data. This topic describes how to use the Lindorm streaming engine to perform real-time statistical operations on transaction data and store statistical results in Lindorm wide tables. The statistical results are the number of orders and transaction amount per hour.

Prerequisites

  • Java Development Kit (JDK) V1.7 or later is installed.
  • The IP address of your client is added to the allowlist of the Lindorm instance. For information about how to configure an allowlist, see Configure a whitelist.
  • The endpoint of the Lindorm streaming engine is obtained. For information about how to obtain the endpoint of the Lindorm streaming engine, see View endpoints.

Procedure

  1. Create a Lindorm wide table. Use Lindorm-cli to connect to LindormTable and create a table in LindormTable. For more information, see Use Lindorm-cli to connect to and use LindormTable.
    Note A Lindorm wide table is created to store the statistical results of transaction data.
    CREATE TABLE IF NOT EXISTS order_stat (
      biz VARCHAR,
      WINDOWSTART LONG,
      WINDOWEND LONG,
      total_order_price DOUBLE,
      count LONG,
      primary key (biz,WINDOWSTART)
    );
  2. Create a streaming data table. Use Lindorm-cli to connect to the Lindorm streaming engine. For more information, see Use Lindorm-cli to connect to and use the Lindorm streaming engine.
    CREATE STREAM IF NOT EXISTS orders (
      `biz` STRING,
      `order_id` STRING,
      `price` DOUBLE,
      `detail` STRING,
      `timestamp` LONG
    ) WITH (
      value_format = 'json',
      key_value = 'order_id',
      stream_topic = 'order_topic',
      TIMESTAMP = 'timestamp'
    );
  3. Create a serving table. For the Lindorm streaming engine, a serving table is an external table whose transaction data is stored in a Lindorm wide table. You can execute the following statement to associate the serving table with the Lindorm wide table:
    CREATE External Table IF NOT EXISTS lindorm_order_stat WITH (
      table_type = 'lindorm.table',
      table_name = 'order_stat',
      endpoint = 'ld-bp17pwu1541ia****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020'
    );
    Note
    • table_type: specifies that the source data is stored in LindormTable.
    • table_name: specifies that the table name of the Lindorm wide table.
    • endpoint: specifies the HBase-compatible endpoint of LindormTable.
  4. Create a stream processing task to calculate the number of orders and transaction amount per hour.
    CREATE CQ lindorm_order_state
    Insert Into lindorm_order_stat
    SELECT biz, SUM(price) AS total_order_price , COUNT(*) AS count from orders window TUMBLING ( size 1 hour) GROUP BY biz;
  5. Use the Kafka API to write data to a specified topic named order_topic in the streaming data table.
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.Future;
    
    public class KafkaToLindormStreamDemo {
    
      public static void main(String[] args) {
        Properties props = new Properties();
        // Configure the Lindorm Stream Kafka Endpoint parameter.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-bp17pwu1541ia****-proxy-stream-public.lindorm.rds.aliyuncs.com:30080");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
        // Construct a Message Queue for Apache Kafka topic. 
        String topic = "order_topic";
    
        try {
          for (int i = 0; i < 100; i++) {
            JSONObject json = new JSONObject();
            json.put("biz", "biz1");
            json.put("order_id", UUID.randomUUID().toString());
            json.put("price", new Random().nextInt(1000));
            json.put("detail", "order detail");
            json.put("timestamp", System.currentTimeMillis() - i * 30 * 60 * 1000);
            Future<RecordMetadata> future = producer.send(
                new ProducerRecord<String, String>(topic, json.getString("order_id"),
                    json.toString()));
            producer.flush();
            try {
              RecordMetadata recordMetadata = future.get();
              System.out.println("Produce ok:" + recordMetadata.toString());
            } catch (Throwable t) {
              System.out.println("Produce exception " + t.getMessage());
              t.printStackTrace();
            }
          }
        } catch (Exception e) {
          System.out.println("Produce exception " + e.getMessage());
          e.printStackTrace();
        }
      }
    }
  6. Use Lindorm-cli to connect to the Lindorm streaming engine and query the statistical results from the engine.
    SELECT * FROM order_stat;
    The following result is returned:
    +------+---------------+---------------+-------------------+-------+
    | biz  |  WINDOWSTART  |   WINDOWEND   | total_order_price | count |
    +------+---------------+---------------+-------------------+-------+
    | biz1 | 1645869600000 | 1645873200000 | 694.000000        | 1     |
    | biz1 | 1645873200000 | 1645876800000 | 1170.000000       | 2     |
    | biz1 | 1645876800000 | 1645880400000 | 453.000000        | 2     |
    | biz1 | 1645880400000 | 1645884000000 | 958.000000        | 2     |
    | biz1 | 1645884000000 | 1645887600000 | 365.000000        | 2     |
    | biz1 | 1645887600000 | 1645891200000 | 674.000000        | 2     |
    | biz1 | 1645891200000 | 1645894800000 | 1467.000000       | 2     |
    | biz1 | 1645894800000 | 1645898400000 | 1430.000000       | 2     |
    | biz1 | 1645898400000 | 1645902000000 | 422.000000        | 2     |
    | biz1 | 1645902000000 | 1645905600000 | 822.000000        | 2     |
    | biz1 | 1645905600000 | 1645909200000 | 1521.000000       | 2     |
    | biz1 | 1645909200000 | 1645912800000 | 930.000000        | 2     |
    | biz1 | 1645912800000 | 1645916400000 | 1501.000000       | 2     |