全部产品
Search
文档中心

Realtime Compute for Apache Flink:Sistem peringatan real-time e-commerce CEP Flink dinamis

更新时间:Nov 11, 2025

Flink Complex Event Processing (CEP) adalah fitur yang memproses aliran event kompleks secara dinamis untuk mendeteksi pola event tertentu secara real time dan memicu peringatan. Dalam pemasaran e-commerce, Flink CEP dapat memantau perilaku pengguna dan data transaksi secara real time guna mengidentifikasi event abnormal atau kritis serta mengirimkan peringatan tepat waktu.

Informasi latar belakang

Pertumbuhan pesat industri e-commerce telah menyebabkan peningkatan eksponensial pada volume data perilaku pengguna dan transaksi. Metode pemrosesan batch tradisional tidak lagi mampu memenuhi kebutuhan identifikasi dan respons cepat terhadap perilaku abnormal, ancaman sistem, dan churn pengguna. Sebaliknya, mesin Pemrosesan Peristiwa Kompleks (CEP) dinamis dapat memodelkan dan menganalisis perilaku pengguna multi-tahap, secara otomatis mengidentifikasi pola event kompleks, serta memicu peringatan pada tahap awal ancaman. Inilah keunggulan utama CEP dinamis dalam operasi bisnis real-time. Fitur ini memiliki tiga karakteristik utama berikut:

  • Kinerja real-time tinggi: Memberikan respons dalam hitungan milidetik, memungkinkan peringatan selama kejadian berlangsung—bukan analisis pasca-kejadian—sehingga membantu Anda membuat keputusan lebih cepat.

  • Aturan fleksibel dan dapat dikonfigurasi: Mendukung pembaruan dinamis kebijakan aturan, memungkinkan Anda beradaptasi cepat terhadap perubahan bisnis tanpa harus me-restart layanan.

  • Pengenalan event kompleks yang andal: Mendukung pencocokan logika lanjutan, seperti urutan multi-event, jendela waktu, dan kondisi gabungan, sehingga mampu menangkap secara akurat skenario bisnis yang kompleks.

Dalam industri e-commerce, skenario umum penggunaan CEP dinamis meliputi, namun tidak terbatas pada, hal-hal berikut:

Skenario

Deskripsi

Peluang cross-selling dan up-selling

Saat menelusuri produk, pengguna sering kali menunjukkan minat lintas kategori berbeda. Misalnya, pengguna mungkin melihat ponsel lalu mencari headphone atau power bank. Perilaku ini membuka peluang cross-selling dan up-selling. Dengan merekomendasikan produk pelengkap secara akurat, seperti casing ponsel atau headphone, atau menawarkan paket bundling seperti "diskon paket ponsel + headphone", platform dapat meningkatkan tingkat pembelian item tambahan dan menaikkan nilai pesanan rata-rata. Hal ini juga meningkatkan pengalaman pengguna dan memperkuat loyalitas pengguna, mendorong pertumbuhan bisnis.

Pemulihan keranjang belanja bernilai tinggi

Pengguna mungkin menambahkan item bernilai tinggi ke keranjang belanja mereka tetapi tidak menyelesaikan pembelian karena sensitivitas harga atau keraguan. Hal ini menyebabkan potensi kehilangan penjualan. Dengan mengidentifikasi keranjang belanja yang ditinggalkan secara real time dan memicu intervensi, seperti diskon berbatas waktu, peringatan stok rendah, atau penawaran gratis ongkos kirim, platform dapat secara efektif mengurangi kehilangan item bernilai tinggi, meningkatkan tingkat konversi pesanan, dan memulihkan pendapatan potensial. Ini menciptakan situasi win-win bagi nilai pengguna maupun pendapatan platform.

Identifikasi pengguna berminat tinggi

Pengguna yang menelusuri produk yang sama berulang kali dalam periode singkat menunjukkan minat beli yang tinggi. Dengan mengidentifikasi perilaku ini dan memicu pemasaran personalisasi, seperti kupon eksklusif atau pengingat stok, platform dapat mempercepat proses pengambilan keputusan pengguna, meningkatkan tingkat konversi, dan memperbaiki pengalaman pengguna, yang pada gilirannya mendorong penjualan.

Operasi pengguna sensitif harga

Pengguna sensitif harga sering kali menelusuri suatu produk berulang kali dan hanya menambahkannya ke keranjang belanja saat harganya turun. Dengan menganalisis perilaku ini, platform dapat mengirimkan notifikasi atau penawaran tertarget saat harga berubah, seperti "Produk yang Anda pantau sedang diskon!". Hal ini meningkatkan tingkat konversi dan meningkatkan efisiensi operasi pengguna.

Peringatan risiko churn

Pengguna yang sering menelusuri produk tetapi tidak melakukan pemesanan dalam waktu lama berisiko churn. Dengan mengidentifikasi perilaku ini dan mengambil langkah pemulihan, seperti mengirimkan kupon eksklusif atau merekomendasikan produk populer, platform dapat secara efektif mengurangi tingkat churn, memperpanjang siklus hidup pengguna, serta meningkatkan retensi pengguna dan pendapatan platform.

Arsitektur solusi

Flink CEP adalah library Apache Flink untuk memproses pola event kompleks. Dengan Flink CEP, Anda dapat mendefinisikan pola event kompleks, memantau aliran event secara real time, dan mengidentifikasi urutan event yang sesuai dengan pola tersebut. Library ini kemudian menghasilkan hasil pencocokan. Arsitektur solusi adalah sebagai berikut:

2

  1. Aliran Event

    Aliran event merupakan sumber input untuk pemrosesan CEP, biasanya berupa aliran data kontinu yang berisi rangkaian event tersusun secara kronologis. Setiap event dapat memiliki beberapa properti yang digunakan untuk pencocokan pola.

  2. Definisi Pola dan Aturan

    Anda dapat mendefinisikan pola event dan aturan yang menggambarkan urutan atau kombinasi event yang ingin dideteksi. Pola dapat mencakup urutan event, batasan waktu, dan filter kondisi. Misalnya, Anda dapat mendefinisikan pola di mana event A diikuti oleh event B dalam waktu 10 detik.

  3. Analisis Mesin CEP

    Mesin CEP menerima aliran event dan menganalisisnya berdasarkan pola dan aturan yang telah ditentukan. Mesin ini terus-menerus memantau aliran event dan mencoba mencocokkan event input dengan pola yang telah ditentukan, mempertimbangkan batasan seperti urutan waktu event, kondisi properti, dan jendela waktu selama proses pencocokan.

  4. Keluaran Pencocokan CEP

    Ketika urutan event dalam aliran event berhasil cocok dengan pola yang ditentukan, mesin CEP menghasilkan keluaran. Keluaran ini dapat berupa urutan event yang cocok, aksi yang dipicu oleh aturan, atau format keluaran lain yang ditentukan pengguna. Hasil pencocokan dapat digunakan untuk pemrosesan selanjutnya, seperti peringatan, pengambilan keputusan, atau penyimpanan data.

Prasyarat

Langkah 1: Persiapan

Buat instance ApsaraDB RDS for MySQL dan siapkan sumber data

  1. Buat database ApsaraDB RDS for MySQL. Untuk informasi lebih lanjut, lihat Buat database.

    Untuk instance tujuan, buat database bernama ecommerce.

  2. Siapkan sumber data Change Data Capture (CDC) MySQL.

    1. Pada halaman detail instance tujuan, klik Log On To Database di bagian atas halaman.

    2. Pada dialog logon DMS, masukkan nama pengguna dan kata sandi untuk akun database yang telah Anda buat, lalu klik Log On.

    3. Setelah berhasil login, klik ganda database ecommerce di sebelah kiri untuk beralih ke database tersebut.

    4. Pada Konsol SQL, masukkan pernyataan Data Definition Language (DDL) berikut untuk membuat tabel dan memasukkan data.

      -- Create rule table 1
      CREATE TABLE rds_demo1 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- Create rule table 2
      CREATE TABLE rds_demo2 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- Create rule table 3
      CREATE TABLE rds_demo3 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- Create rule table 4
      CREATE TABLE rds_demo4 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- Create rule table 5
      CREATE TABLE rds_demo5 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- Create the source table
      CREATE TABLE `click_stream1` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream2` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream3` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream4` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      
      CREATE TABLE `click_stream5` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
    5. Klik Execute, lalu klik Execute Directly.

Buat sumber daya topik dan grup Kafka

Buat sumber daya Kafka berikut. Untuk informasi lebih lanjut, lihat Buat sumber daya.

  • Grup: clickstream.consumer.

  • Topik: click_stream1, click_stream2, click_stream3, click_stream4, dan click_stream5.

    Saat membuat topik, atur jumlah partisi menjadi 1. Jika tidak, data sampel mungkin tidak sesuai dengan hasil pada beberapa skenario.

    image

Langkah 2: Sinkronisasi data dari MySQL ke Kafka secara real time

Menyinkronkan event clickstream pengguna dari MySQL ke Kafka mengurangi beban yang ditempatkan oleh banyak pekerjaan pada database MySQL.

  1. Buat katalog MySQL. Untuk informasi lebih lanjut, lihat Buat katalog MySQL.

    Pada contoh ini, katalog diberi nama mysql-catalog, dan database default adalah ecommerce.

  2. Buat katalog Kafka. Untuk informasi lebih lanjut, lihat Kelola katalog Kafka JSON.

    Pada contoh ini, katalog diberi nama kafka-catalog.

  3. Pada halaman Data Development > ETL, buat pekerjaan stream SQL dan salin kode berikut ke editor SQL.

    CREATE TEMPORARY TABLE `clickstream1` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream1',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream2` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream2',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream3` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream3',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream4` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream4',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream5` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream5',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    BEGIN STATEMENT SET; 
    INSERT INTO `clickstream1`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream1`;
    
    
    INSERT INTO `clickstream2`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream2`;
    
    
    INSERT INTO `clickstream3`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream3`;
    
    
    INSERT INTO `clickstream4`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream4`;
    
    
    INSERT INTO `clickstream5`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream5`;
    END;      -- Required when writing to multiple sinks.
  4. Di pojok kanan atas, klik Deploy untuk menerapkan pekerjaan.

  5. Di panel navigasi sebelah kiri, pilih Operation Center > Job O&M. Di kolom Actions untuk pekerjaan target, klik Start. Pilih Stateless Start lalu klik Start.

Langkah 3: Mengembangkan, menerapkan, dan menjalankan pekerjaan CEP

Bagian ini menjelaskan cara menerapkan pekerjaan cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar. Pekerjaan ini mengonsumsi event clickstream pengguna dari Kafka, memprosesnya, dan mencetak informasi peringatan ke Konsol pengembangan Realtime Compute for Apache Flink. Anda dapat menyesuaikan kode berdasarkan arsitektur bisnis Anda dan memilih konektor downstream yang sesuai untuk berbagai skenario keluaran data. Untuk informasi lebih lanjut tentang konektor yang didukung, lihat Konektor yang didukung.

1. Pengembangan kode

Bagian ini hanya menampilkan kode inti dan menjelaskan fungsinya.

Kelas utama

public class CepDemo {

    public static void checkArg(String argName, MultipleParameterTool params) {
        if (!params.has(argName)) {
            throw new IllegalArgumentException(argName + " must be set!");
        }
    }

  // Parse the rule table.
    private static Match_results parseOutput(String output) {
        String rule = "\\(id, version\\): \\((\\d+), (\\d+)\\).*?Event\\((\\d+), (\\w+), (\\d+), (\\d+)";
        Pattern pattern = Pattern.compile(rule);
        Matcher matcher = pattern.matcher(output);
        if (matcher.find()) {
            return new Match_results(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2)), Integer.parseInt(matcher.group(3)), matcher.group(4), Integer.parseInt(matcher.group(6)));
        }
        return null;
    }

    public static void main(String[] args) throws Exception {
        // Process args
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        // Kafka broker endpoints.
        checkArg(KAFKA_BROKERS_ARG, params);
        // Input topic.
        checkArg(INPUT_TOPIC_ARG, params);
        // group
        checkArg(INPUT_TOPIC_GROUP_ARG, params);

        // MySQL JDBC URL.
        checkArg(JDBC_URL_ARG, params);
        // MySQL table name.
        checkArg(TABLE_NAME_ARG, params);
        // Polling interval for the database.
        checkArg(JDBC_INTERVAL_MILLIS_ARG, params);
        // Specifies whether to use event time processing (true/false).
        checkArg(USING_EVENT_TIME, params);

        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Build Kafka source with new Source API based on FLIP-27   Kafka Source settings.
        KafkaSource<Event> kafkaSource =
                KafkaSource.<Event>builder()
                        .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                        .setTopics(params.get(INPUT_TOPIC_ARG))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                        .setDeserializer(new EventDeSerializationSchema())
                        .build();



        // DataStream Source   Watermark strategy.
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");


        // Group the stream by UserId for subsequent pattern matching.
        KeyedStream<Event, String> keyedStream =
                source.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                ).keyBy(new KeySelector<Event, String>() {
                    @Override
                    public String getKey(Event value) throws Exception {
                        return value.getUserId(); // Use only UserId as the key.
                    }
                });

        // Dynamic CEP patterns. Use the dynamically loaded pattern processor factory class JDBCPeriodicPatternProcessorDiscovererFactory to obtain patterns and perform pattern matching (read the rule table in the MySQL database).
        SingleOutputStreamOperator<String> output =
                CEP.dynamicPatterns(
                        keyedStream,// Source data stream.
                        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                params.get(JDBC_URL_ARG),
                                JDBC_DRIVE,
                                params.get(TABLE_NAME_ARG),
                                null,
                                Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                        Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                        TypeInformation.of(new TypeHint<String>() {})
                );

        // Print the output to the client.
        output.print();
  
        env.execute("CEP Demo");
    }
}

Skenario 1: Deteksi ketika pengguna menelusuri produk dari kategori berbeda dalam sesi yang sama dalam waktu 5 menit

Urutan data: Pola dimulai dengan event `view`, diikuti oleh event `view` lainnya.

public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

Skenario 2: Deteksi ketika pengguna menambahkan produk bernilai tinggi ke keranjang belanja tetapi tidak melakukan pembelian dalam waktu 10 menit

Urutan data: Pola dimulai dengan event `cart` (tambah ke keranjang) dengan harga lebih dari 200, diikuti oleh event `purchase`.

public class CartAddCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart") && event.getPrice() > 200;
    }
}
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

Skenario 3: Deteksi ketika pengguna menelusuri produk yang sama berulang kali dalam waktu 15 menit

Urutan data: Pola dimulai dengan event `view`, diikuti oleh event `view` lain yang diulang tiga kali.

-- The condition class is the same as in Test 1.
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

Skenario 4: Deteksi ketika pengguna menelusuri produk dan hanya menambahkannya ke keranjang belanja setelah harga turun

Urutan data: Pola dimulai dengan event `view`, diikuti oleh event `view` lain di mana harga produk lebih rendah daripada harga produk awal, dan akhirnya event `cart`.

-- The condition class is the same as in Test 1.
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
public class InitialCondition extends IterativeCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
        ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
        return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
    }

}
public class CartCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart");
    }
}

Skenario 5: Deteksi ketika pengguna menelusuri produk berulang kali dalam seminggu tetapi tidak melakukan pemesanan

Urutan data: Pola dimulai dengan event `view`, dan tidak boleh ada event `purchase` dalam jendela waktu berikutnya.

-- The condition class is the same as in Test 1.
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
-- The condition class is the same as in Test 2.
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

2. Terapkan pekerjaan

Pada halaman Operation Center > Job O&M, klik Deploy Job > JAR Job untuk menerapkan lima pekerjaan stream secara terpisah.

image

Tabel berikut menjelaskan parameter-parameter tersebut:

Parameter

Deskripsi

Contoh

Deployment Mode

Pemrosesan stream

Streaming Mode

Deployment Name

Masukkan nama pekerjaan JAR yang sesuai.

  • Nama pekerjaan Skenario 1: EcommerceCEPRunner1

  • Nama pekerjaan Skenario 2: EcommerceCEPRunner2

  • Nama pekerjaan Skenario 3: EcommerceCEPRunner3

  • Nama pekerjaan Skenario 4: EcommerceCEPRunner4

  • Nama pekerjaan Skenario 5: EcommerceCEPRunner5

Engine Version

Versi mesin Flink yang digunakan oleh pekerjaan saat ini.

SDK untuk kode dalam topik ini menggunakan JDK 11. Pilih versi yang menyertakan jdk11. Kami menyarankan Anda menggunakan versi Ververica Runtime (VVR) terbaru.

vvr-8.0.11-jdk11-flink-1.17

JAR URI

Klik ikon 上传 di sebelah kanan untuk mengunggah file cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar secara manual.

oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

Entry Point Class

Kelas titik masuk program.

com.alibaba.ververica.cep.demo.CepDemo

Entry Point Main Arguments

Anda dapat meneruskan parameter di sini dan memanggilnya dalam metode utama.

Konfigurasikan parameter berikut untuk topik ini:

  • bootstrap.servers: Titik akhir kluster Kafka.

  • clickstream_topic: Topik Kafka untuk clickstream yang dikonsumsi.

  • group: ID kelompok konsumen.

  • jdbcUrl: Titik akhir MySQL.

  • database: Nama database.

  • user: Nama pengguna.

  • password: Kata sandi pengguna.

  • tableName: Nama tabel aturan di MySQL.

  • Konfigurasi Skenario 1: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream1 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo1 --jdbcIntervalMs 3000 --usingEventTime false

  • Konfigurasi Skenario 2: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream2 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo2 --jdbcIntervalMs 3000 --usingEventTime false

  • Konfigurasi Skenario 3: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream3 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo3 --jdbcIntervalMs 3000 --usingEventTime false

  • Konfigurasi Skenario 4: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream4 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo4 --jdbcIntervalMs 3000 --usingEventTime false

  • Konfigurasi Skenario 5: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream5 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo5 --jdbcIntervalMs 3000 --usingEventTime false

Untuk informasi lebih lanjut tentang penerapan, lihat Terapkan pekerjaan JAR.

3. Jalankan pekerjaan

Pada halaman Job O&M, di kolom Actions untuk pekerjaan target, klik Start. Pilih Stateless Start lalu klik Start. Jalankan lima pekerjaan untuk skenario tersebut, yaitu EcommerceCEPRunner1, EcommerceCEPRunner2, EcommerceCEPRunner3, EcommerceCEPRunner4, dan EcommerceCEPRunner5, secara berurutan.

Untuk informasi lebih lanjut tentang konfigurasi start, lihat Jalankan pekerjaan.

Langkah 4: Kueri peringatan

Skenario 1: Deteksi ketika pengguna menelusuri produk dari kategori berbeda dalam sesi yang sama dalam waktu 5 menit

  1. Masukkan data aturan ke MySQL.

    -- Test data for the cross-selling and up-selling opportunities scenario.
    INSERT INTO rds_demo1 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"second_view","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":5}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Masukkan data uji perilaku pengguna ke MySQL.

    -- Test data for the cross-selling and up-selling opportunities scenario.
    INSERT INTO `click_stream1` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
  3. Lihat hasilnya di log Konsol pengembangan Realtime Compute for Apache Flink.

    • Di log JobManager, cari kata kunci `JDBCPeriodicPatternProcessorDiscoverer` untuk melihat aturan terbaru.

      image

    • Di TaskManager, lihat file log Stdout. Cari kata kunci A match for Pattern of (id, version): (1, 1) untuk melihat hasil pencocokan yang dicetak di log.

      image

Skenario 2: Deteksi ketika pengguna menambahkan produk bernilai tinggi ke keranjang belanja tetapi tidak melakukan pembelian dalam waktu 10 menit

  1. Masukkan data aturan ke MySQL.

    -- Test data for the high-value shopping cart recovery scenario.
    INSERT INTO rds_demo2 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"cart_add","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartAddCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"cart_add","target":"purchase","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":30}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Masukkan data uji perilaku pengguna ke MySQL.

    -- Test data for the high-value shopping cart recovery scenario.
    INSERT INTO `click_stream2` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
    ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
    ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
  3. Lihat hasilnya di log Konsol pengembangan Realtime Compute for Apache Flink.

    • Di log JobManager, cari kata kunci `JDBCPeriodicPatternProcessorDiscoverer` untuk melihat aturan terbaru.

      image

    • Di TaskManager, lihat file log Stdout. Cari kata kunci A match for Pattern of (id, version): (1, 1) untuk melihat hasil pencocokan yang dicetak di log.

      image

Skenario 3: Deteksi ketika pengguna menelusuri produk yang sama berulang kali dalam waktu 15 menit

  1. Masukkan data aturan ke MySQL.

    -- Test data for the high-intent user identification scenario.
    INSERT INTO rds_demo3 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":null,"nodes":[{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"initial_view","target":"repeat_view","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":15}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Masukkan data uji perilaku pengguna ke MySQL.

    -- Test data for the high-intent user identification scenario.
    INSERT INTO `click_stream3` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
  3. Lihat hasilnya di log Konsol pengembangan Realtime Compute for Apache Flink.

    • Di log JobManager, cari kata kunci `JDBCPeriodicPatternProcessorDiscoverer` untuk melihat aturan terbaru.

      image

    • Di TaskManager, lihat file log Stdout. Cari kata kunci A match for Pattern of (id, version): (1, 1) untuk melihat hasil pencocokan yang dicetak di log.

      image

Skenario 4: Deteksi ketika pengguna menelusuri produk dan hanya menambahkannya ke keranjang belanja setelah harga turun

  1. Masukkan data aturan ke MySQL.

    -- Test data for the price-sensitive user operations scenario.
    INSERT INTO rds_demo4 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"view_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.InitialCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"view_price_drop","target":"cart_after_price_drop","type":"STRICT"},{"source":"initial_view","target":"view_price_drop","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":10}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Masukkan data uji perilaku pengguna ke MySQL.

    -- Test data for the price-sensitive user operations scenario.
    INSERT INTO `click_stream4` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
  3. Lihat hasilnya di log Konsol pengembangan Realtime Compute for Apache Flink.

    • Di log JobManager, cari kata kunci `JDBCPeriodicPatternProcessorDiscoverer` untuk melihat aturan terbaru.

      image

    • Di TaskManager, lihat file log Stdout. Cari kata kunci A match for Pattern of (id, version): (1, 1) untuk melihat hasil pencocokan yang dicetak di log.

      image

Skenario 5: Deteksi ketika pengguna menelusuri produk berulang kali dalam seminggu tetapi tidak melakukan pemesanan

  1. Masukkan data aturan ke MySQL.

    -- Test data for the churn risk scenario. 
    INSERT INTO rds_demo5 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":10,"to":10,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"purchase","type":"NOT_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"DAYS","size":7}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Masukkan data uji perilaku pengguna ke MySQL.

    -- Test data for the churn risk scenario. 
    INSERT INTO `click_stream5` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. Lihat hasilnya di log Konsol pengembangan Realtime Compute for Apache Flink.

    • Di log JobManager, cari kata kunci `JDBCPeriodicPatternProcessorDiscoverer` untuk melihat aturan terbaru.

      image

    • Di TaskManager, lihat file log Stdout. Cari kata kunci A match for Pattern of (id, version): (1, 1) untuk melihat hasil pencocokan yang dicetak di log.

      image

Referensi