すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:動的 Flink CEP eコマースリアルタイムアラートシステム

最終更新日:Nov 10, 2025

Flink 複雑なイベント処理 (CEP) は、複雑なイベントストリームを動的に処理して、特定のイベントパターンをリアルタイムで検出し、アラートをトリガーする機能です。eコマースマーケティングでは、Flink CEP はユーザーの行動とトランザクションデータをリアルタイムで監視し、異常または重大なイベントを特定して、タイムリーなアラートを送信できます。

背景情報

eコマース業界の急速な成長により、ユーザーの行動やトランザクションデータの量が指数関数的に増加しています。従来のバッチ処理方法では、異常な行動、システムの脅威、ユーザーの解約をタイムリーに特定し、対応するという需要を満たすことができなくなりました。対照的に、動的な複雑なイベント処理 (CEP) エンジンは、多段階のユーザー行動をモデル化し、分析することができます。これにより、複雑なイベントパターンを自動的に特定し、脅威の初期段階でアラートをトリガーします。これが、リアルタイムのビジネス運用における動的 CEP の中核的な利点です。これには、次の 3 つの主要な特徴があります。

  • 高いリアルタイム性能: ミリ秒レベルの応答を提供します。これにより、事後分析ではなくイベント中のアラートが可能になり、より迅速な意思決定に役立ちます。

  • 柔軟で設定可能なルール: ルールポリシーの動的な更新をサポートします。これにより、サービスを再起動することなく、ビジネスの変化に迅速に対応できます。

  • 強力な複雑なイベント認識: 複数イベントシーケンス、タイムウィンドウ、複合条件などの高度な論理マッチングをサポートします。これにより、複雑なビジネスシナリオを正確に捉えることができます。

eコマース業界における動的 CEP の一般的なシナリオには、以下が含まれますが、これらに限定されません。

シナリオ

説明

クロスセルおよびアップセルの機会

製品を閲覧する際、ユーザーはしばしば異なるカテゴリにわたって関心を示します。たとえば、ユーザーが携帯電話を見た後、ヘッドフォンやパワーバンクを見るかもしれません。この行動は、クロスセルおよびアップセルの機会を提供します。電話ケースやヘッドフォンなどの補完的な製品を正確に推奨したり、「電話 + ヘッドフォンパッケージ割引」などのバンドル取引を提供したりすることで、プラットフォームは追加アイテムの購入率を高め、平均注文額を上げることができます。これはまた、ユーザーエクスペリエンスを向上させ、ユーザーのロイヤルティを高め、ビジネスの成長を促進します。

高価値ショッピングカートの回復

ユーザーは、価格への敏感さやためらいから、高価値の商品をショッピングカートに追加しても購入を完了しない場合があります。これにより、売上の潜在的な損失が発生します。リアルタイムで放棄されたショッピングカートを特定し、期間限定の割引、在庫僅少アラート、または送料無料のオファーなどの介入をトリガーすることで、プラットフォームは高価値商品の損失を効果的に削減し、注文変換率を高め、潜在的な収益を回復できます。これにより、ユーザー価値とプラットフォーム収益の両方にとってウィンウィンの状況が生まれます。

高い購入意欲を持つユーザーの特定

短期間に同じ商品を何度も閲覧するユーザーは、高い購入意欲を示しています。この行動を特定し、限定クーポンや在庫リマインダーなどのパーソナライズされたマーケティングをトリガーすることで、プラットフォームはユーザーの意思決定プロセスを加速させ、コンバージョン率を高め、ユーザーエクスペリエンスを向上させ、売上を伸ばすことができます。

価格に敏感なユーザー操作

価格に敏感なユーザーは、商品を繰り返し閲覧し、価格が下がったときにのみショッピングカートに追加することがよくあります。この行動を分析することで、プラットフォームは価格が変更されたときに「フォローしている商品がセール中です!」などの通知やターゲットを絞ったオファーを送信できます。これにより、コンバージョン率が向上し、ユーザー操作の効率が向上します。

解約リスクアラート

頻繁に商品を閲覧するものの、長期間注文しないユーザーは、解約のリスクがある可能性があります。この行動を特定し、限定クーポンの送信や人気商品の推奨などの回復措置を講じることで、プラットフォームは解約率を効果的に削減し、ユーザーのライフサイクルを延長し、ユーザーの定着率とプラットフォームの収益を向上させることができます。

ソリューションアーキテクチャ

Flink CEP は、複雑なイベントパターンを処理するための Apache Flink ライブラリです。Flink CEP を使用すると、複雑なイベントパターンを定義し、イベントストリームをリアルタイムで監視し、それらのパターンに一致するイベントシーケンスを特定できます。その後、ライブラリは一致する結果を生成します。ソリューションアーキテクチャは次のとおりです。

2

  1. イベントストリーム

    イベントストリームは、CEP 処理の入力ソースです。これは通常、時系列に並んだ一連のイベントを含む連続的なデータストリームです。各イベントは、パターンマッチングに使用される複数のプロパティを持つことができます。

  2. パターンとルールの定義

    検出したいイベントシーケンスや組み合わせを記述するイベントパターンとルールを定義できます。パターンには、イベントの順序、時間制約、および条件フィルターを含めることができます。たとえば、イベント A の後に 10 秒以内にイベント B が続くパターンを定義できます。

  3. CEP エンジン分析

    CEP エンジンはイベントストリームを受け入れ、定義されたパターンとルールに基づいて分析します。エンジンはイベントストリームを継続的に監視し、入力イベントを定義されたパターンと照合しようとします。照合プロセス中、エンジンはイベントの時間順序、プロパティ条件、タイムウィンドウなどの制約を考慮します。

  4. CEP マッチング出力

    イベントストリーム内のイベントシーケンスが定義されたパターンに正常に一致すると、CEP エンジンは出力を生成します。この出力は、一致したイベントシーケンス、ルールによってトリガーされた操作、または別のユーザー定義の出力フォーマットにすることができます。一致結果は、アラート、意思決定、データストレージなどの後続処理に使用できます。

前提条件

ステップ 1: 準備

ApsaraDB RDS for MySQL インスタンスを作成し、データソースを準備する

  1. ApsaraDB RDS for MySQL データベースを作成します。詳細については、「データベースを作成する」をご参照ください。

    宛先インスタンスに、ecommerce という名前のデータベースを作成します。

  2. MySQL Change Data Capture (CDC) データソースを準備します。

    1. 宛先インスタンスの詳細ページで、ページの上部にある [データベースにログオン] をクリックします。

    2. DMS ログオンダイアログボックスで、作成したデータベースアカウントのユーザー名とパスワードを入力し、[ログオン] をクリックします。

    3. ログイン後、左側の ecommerce データベースをダブルクリックして切り替えます。

    4. SQL コンソールで、次のデータ定義言語 (DDL) 文を入力してテーブルを作成し、データを挿入します。

      -- ルールテーブル 1 を作成
      CREATE TABLE rds_demo1 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- ルールテーブル 2 を作成
      CREATE TABLE rds_demo2 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- ルールテーブル 3 を作成
      CREATE TABLE rds_demo3 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- ルールテーブル 4 を作成
      CREATE TABLE rds_demo4 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- ルールテーブル 5 を作成
      CREATE TABLE rds_demo5 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- ソーステーブルを作成
      CREATE TABLE `click_stream1` (
        id bigint not null primary key auto_increment,  -- 自動インクリメントプライマリキー
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream2` (
        id bigint not null primary key auto_increment,  -- 自動インクリメントプライマリキー
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream3` (
        id bigint not null primary key auto_increment,  -- 自動インクリメントプライマリキー
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream4` (
        id bigint not null primary key auto_increment,  -- 自動インクリメントプライマリキー
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      
      CREATE TABLE `click_stream5` (
        id bigint not null primary key auto_increment,  -- 自動インクリメントプライマリキー
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
    5. [実行] をクリックし、次に [直接実行] をクリックします。

Kafka Topic とグループリソースの作成

次の Kafka リソースを作成します。詳細については、「リソースを作成する」をご参照ください。

  • グループ: clickstream.consumer.

  • Topic: click_stream1、click_stream2、click_stream3、click_stream4、および click_stream5。

    Topic を作成するときは、パーティションの数を 1 に設定します。そうしないと、一部のシナリオでサンプルデータが結果と一致しない場合があります。

    image

ステップ 2: MySQL から Kafka へリアルタイムでデータを同期する

ユーザーのクリックストリームイベントを MySQL から Kafka に同期することで、複数のジョブが MySQL データベースにかける負荷を軽減します。

  1. MySQL カタログを作成します。詳細については、「MySQL カタログを作成する」をご参照ください。

    この例では、カタログ名は mysql-catalog で、デフォルトのデータベースは ecommerce です。

  2. Kafka カタログを作成します。詳細については、「Kafka JSON カタログを管理する」をご参照ください。

    この例では、カタログ名は kafka-catalog です。

  3. [データ開発] > [ETL] ページで、SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーします。

    CREATE TEMPORARY TABLE `clickstream1` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- プライマリキーを定義します。
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- ウォーターマークを定義します。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream1',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream2` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- プライマリキーを定義します。
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- ウォーターマークを定義します。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream2',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream3` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- プライマリキーを定義します。
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- ウォーターマークを定義します。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream3',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream4` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- プライマリキーを定義します。
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- ウォーターマークを定義します。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream4',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream5` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- プライマリキーを定義します。
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- ウォーターマークを定義します。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream5',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    BEGIN STATEMENT SET; 
    INSERT INTO `clickstream1`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream1`;
    
    
    INSERT INTO `clickstream2`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream2`;
    
    
    INSERT INTO `clickstream3`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream3`;
    
    
    INSERT INTO `clickstream4`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream4`;
    
    
    INSERT INTO `clickstream5`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream5`;
    END;      -- 複数のシンクに書き込む場合に必要です。
  4. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

  5. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象のジョブの [操作] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

ステップ 3: CEP ジョブの開発、デプロイ、および開始

このセクションでは、cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar ジョブをデプロイする方法について説明します。このジョブは、Kafka からユーザーのクリックストリームイベントを消費し、それらを処理し、アラート情報を Realtime Compute for Apache Flink 開発コンソールに出力します。ビジネスアーキテクチャに基づいてコードを調整し、さまざまなデータ出力シナリオに適したダウンストリームコネクタを選択できます。サポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。

1. コード開発

このセクションでは、コアコードのみを示し、その機能を説明します。

メインクラス

public class CepDemo {

    public static void checkArg(String argName, MultipleParameterTool params) {
        if (!params.has(argName)) {
            throw new IllegalArgumentException(argName + " must be set!");
        }
    }

  // ルールテーブルを解析します。
    private static Match_results parseOutput(String output) {
        String rule = "\\(id, version\\): \\((\\d+), (\\d+)\\).*?Event\\((\\d+), (\\w+), (\\d+), (\\d+)";
        Pattern pattern = Pattern.compile(rule);
        Matcher matcher = pattern.matcher(output);
        if (matcher.find()) {
            return new Match_results(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2)), Integer.parseInt(matcher.group(3)), matcher.group(4), Integer.parseInt(matcher.group(6)));
        }
        return null;
    }

    public static void main(String[] args) throws Exception {
        // 引数を処理します
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        // Kafka ブローカーのエンドポイント。
        checkArg(KAFKA_BROKERS_ARG, params);
        // 入力 Topic。
        checkArg(INPUT_TOPIC_ARG, params);
        // グループ
        checkArg(INPUT_TOPIC_GROUP_ARG, params);

        // MySQL JDBC URL。
        checkArg(JDBC_URL_ARG, params);
        // MySQL テーブル名。
        checkArg(TABLE_NAME_ARG, params);
        // データベースのポーリング間隔。
        checkArg(JDBC_INTERVAL_MILLIS_ARG, params);
        // イベント時間処理を使用するかどうかを指定します (true/false)。
        checkArg(USING_EVENT_TIME, params);

        // ストリーミング実行環境を設定します
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // FLIP-27 に基づく新しい Source API で Kafka ソースを構築します   Kafka ソース設定。
        KafkaSource<Event> kafkaSource =
                KafkaSource.<Event>builder()
                        .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                        .setTopics(params.get(INPUT_TOPIC_ARG))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                        .setDeserializer(new EventDeSerializationSchema())
                        .build();



        // DataStream ソース   ウォーターマーク戦略。
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");


        // 後続のパターンマッチングのために、ストリームを UserId でグループ化します。
        KeyedStream<Event, String> keyedStream =
                source.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                ).keyBy(new KeySelector<Event, String>() {
                    @Override
                    public String getKey(Event value) throws Exception {
                        return value.getUserId(); // キーとして UserId のみを使用します。
                    }
                });

        // 動的 CEP パターン。動的にロードされたパターンプロセッサファクトリクラス JDBCPeriodicPatternProcessorDiscovererFactory を使用してパターンを取得し、パターンマッチングを実行します (MySQL データベースのルールテーブルを読み取ります)。
        SingleOutputStreamOperator<String> output =
                CEP.dynamicPatterns(
                        keyedStream,// ソースデータストリーム。
                        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                params.get(JDBC_URL_ARG),
                                JDBC_DRIVE,
                                params.get(TABLE_NAME_ARG),
                                null,
                                Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                        Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                        TypeInformation.of(new TypeHint<String>() {})
                );

        // 出力をクライアントに出力します。
        output.print();
  
        env.execute("CEP Demo");
    }
}

シナリオ 1: ユーザーが 5 分以内に同じセッションで異なるカテゴリの製品を閲覧した場合を検出する

データ順序: パターンは `view` イベントで始まり、その後に別の `view` イベントが続きます。

public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

シナリオ 2: ユーザーがショッピングカートに高価値の商品を追加したが、10 分以内に購入しなかった場合を検出する

データ順序: パターンは `cart` イベント (カートに追加) で始まり、価格が 200 を超え、その後に `purchase` イベントが続きます。

public class CartAddCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart") && event.getPrice() > 200;
    }
}
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

シナリオ 3: ユーザーが 15 分以内に同じ商品を複数回閲覧した場合を検出する

データ順序: パターンは `view` イベントで始まり、その後に 3 回繰り返される別の `view` イベントが続きます。

-- 条件クラスはテスト 1 と同じです。
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

シナリオ 4: ユーザーが商品を閲覧し、価格が下がった後にのみショッピングカートに追加した場合を検出する

データ順序: パターンは `view` イベントで始まり、その後に最初の製品価格よりも低い製品価格の別の `view` イベントが続き、最後に `cart` イベントが続きます。

-- 条件クラスはテスト 1 と同じです。
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
public class InitialCondition extends IterativeCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
        ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
        return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
    }

}
public class CartCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart");
    }
}

シナリオ 5: ユーザーが 1 週間以内に商品を複数回閲覧したが、注文しなかった場合を検出する

データ順序: パターンは `view` イベントで始まり、後続のタイムウィンドウ内で `purchase` イベントは発生しません。

-- 条件クラスはテスト 1 と同じです。
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
-- 条件クラスはテスト 2 と同じです。
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

2. ジョブのデプロイ

[オペレーションセンター] > [ジョブ O&M] ページで、[ジョブのデプロイ] > [JAR ジョブ] をクリックして、5 つのストリームジョブを個別にデプロイします。

image

次の表にパラメーターを示します。

パラメーター

説明

デプロイモード

ストリーム処理

ストリーミングモード

デプロイメント名

対応する JAR ジョブの名前を入力します。

  • シナリオ 1 ジョブ名: EcommerceCEPRunner1

  • シナリオ 2 ジョブ名: EcommerceCEPRunner2

  • シナリオ 3 ジョブ名: EcommerceCEPRunner3

  • シナリオ 4 ジョブ名: EcommerceCEPRunner4

  • シナリオ 5 ジョブ名: EcommerceCEPRunner5

エンジンバージョン

現在のジョブで使用される Flink エンジンバージョン。

このトピックのコードの SDK は JDK 11 を使用します。jdk11 を含むバージョンを選択してください。最新の Ververica Runtime (VVR) バージョンを使用することをお勧めします。

vvr-8.0.11-jdk11-flink-1.17

JAR URI

右側の 上传 アイコンをクリックして、cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar ファイルを手動でアップロードします。

oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

エントリポイントクラス

プログラムのエントリポイントクラス。

com.alibaba.ververica.cep.demo.CepDemo

エントリポイントメイン引数

ここでパラメーターを渡し、main メソッドで呼び出すことができます。

このトピックには、次のパラメーターを設定します:

  • bootstrap.servers: Kafka クラスターのエンドポイント。

  • clickstream_topic: 消費されるクリックストリームの Kafka Topic。

  • group: 使用者グループ ID。

  • jdbcUrl: MySQL エンドポイント。

  • database: データベース名。

  • user: ユーザー名。

  • password: ユーザーパスワード。

  • tableName: MySQL のルールテーブルの名前。

  • シナリオ 1 の構成: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream1 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo1 --jdbcIntervalMs 3000 --usingEventTime false

  • シナリオ 2 の構成: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream2 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo2 --jdbcIntervalMs 3000 --usingEventTime false

  • シナリオ 3 の構成: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream3 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo3 --jdbcIntervalMs 3000 --usingEventTime false

  • シナリオ 4 の構成: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream4 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo4 --jdbcIntervalMs 3000 --usingEventTime false

  • シナリオ 5 の構成: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream5 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo5 --jdbcIntervalMs 3000 --usingEventTime false

デプロイの詳細については、「JAR ジョブをデプロイする」をご参照ください。

3. ジョブの開始

[ジョブ O&M] ページで、対象のジョブの [操作] 列で [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。シナリオ用の 5 つのジョブ、EcommerceCEPRunner1、EcommerceCEPRunner2、EcommerceCEPRunner3、EcommerceCEPRunner4、EcommerceCEPRunner5 を順番に開始します。

開始構成の詳細については、「ジョブを開始する」をご参照ください。

ステップ 4: アラートのクエリ

シナリオ 1: ユーザーが 5 分以内に同じセッションで異なるカテゴリの製品を閲覧した場合を検出する

  1. ルールデータを MySQL に挿入します。

    -- クロスセルおよびアップセルの機会シナリオのテストデータ。
    INSERT INTO rds_demo1 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"second_view","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":5}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. ユーザー行動テストデータを MySQL に挿入します。

    -- クロスセルおよびアップセルの機会シナリオのテストデータ。
    INSERT INTO `click_stream1` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
  3. Realtime Compute for Apache Flink 開発コンソールのログで結果を表示します。

    • JobManager ログで、キーワード `JDBCPeriodicPatternProcessorDiscoverer` を検索して最新のルールを表示します。

      image

    • TaskManager で、Stdout ログファイルを表示します。キーワード A match for Pattern of (id, version): (1, 1) を検索して、ログに表示される一致を表示します。

      image

シナリオ 2: ユーザーがショッピングカートに高価値の商品を追加したが、10 分以内に購入しなかった場合を検出する

  1. ルールデータを MySQL に挿入します。

    -- 高価値ショッピングカート回復シナリオのテストデータ。
    INSERT INTO rds_demo2 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"cart_add","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartAddCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"cart_add","target":"purchase","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":30}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. ユーザー行動テストデータを MySQL に挿入します。

    -- 高価値ショッピングカート回復シナリオのテストデータ。
    INSERT INTO `click_stream2` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
    ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
    ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
  3. Realtime Compute for Apache Flink 開発コンソールのログで結果を表示します。

    • JobManager ログで、キーワード `JDBCPeriodicPatternProcessorDiscoverer` を検索して最新のルールを表示します。

      image

    • TaskManager で、Stdout ログファイルを表示します。キーワード A match for Pattern of (id, version): (1, 1) を検索して、ログに表示される一致を表示します。

      image

シナリオ 3: ユーザーが 15 分以内に同じ商品を複数回閲覧した場合を検出する

  1. ルールデータを MySQL に挿入します。

    -- 高い購入意欲を持つユーザー特定シナリオのテストデータ。
    INSERT INTO rds_demo3 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":null,"nodes":[{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"initial_view","target":"repeat_view","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":15}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. ユーザー行動テストデータを MySQL に挿入します。

    -- 高い購入意欲を持つユーザー特定シナリオのテストデータ。
    INSERT INTO `click_stream3` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
  3. Realtime Compute for Apache Flink 開発コンソールのログで結果を表示します。

    • JobManager ログで、キーワード `JDBCPeriodicPatternProcessorDiscoverer` を検索して最新のルールを表示します。

      image

    • TaskManager で、Stdout ログファイルを表示します。キーワード A match for Pattern of (id, version): (1, 1) を検索して、ログに表示される一致を表示します。

      image

シナリオ 4: ユーザーが商品を閲覧し、価格が下がった後にのみショッピングカートに追加した場合を検出する

  1. ルールデータを MySQL に挿入します。

    -- 価格に敏感なユーザー操作シナリオのテストデータ。
    INSERT INTO rds_demo4 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"view_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.InitialCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"view_price_drop","target":"cart_after_price_drop","type":"STRICT"},{"source":"initial_view","target":"view_price_drop","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":10}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. ユーザー行動テストデータを MySQL に挿入します。

    -- 価格に敏感なユーザー操作シナリオのテストデータ。
    INSERT INTO `click_stream4` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
  3. Realtime Compute for Apache Flink 開発コンソールのログで結果を表示します。

    • JobManager ログで、キーワード `JDBCPeriodicPatternProcessorDiscoverer` を検索して最新のルールを表示します。

      image

    • TaskManager で、Stdout ログファイルを表示します。キーワード A match for Pattern of (id, version): (1, 1) を検索して、ログに表示される一致を表示します。

      image

シナリオ 5: ユーザーが 1 週間以内に商品を複数回閲覧したが、注文しなかった場合を検出する

  1. ルールデータを MySQL に挿入します。

    -- 解約リスクシナリオのテストデータ。 
    INSERT INTO rds_demo5 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":10,"to":10,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"purchase","type":"NOT_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"DAYS","size":7}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. ユーザー行動テストデータを MySQL に挿入します。

    -- 解約リスクシナリオのテストデータ。 
    INSERT INTO `click_stream5` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. Realtime Compute for Apache Flink 開発コンソールのログで結果を表示します。

    • JobManager ログで、キーワード `JDBCPeriodicPatternProcessorDiscoverer` を検索して最新のルールを表示します。

      image

    • TaskManager で、Stdout ログファイルを表示します。キーワード A match for Pattern of (id, version): (1, 1) を検索して、ログに表示される一致を表示します。

      image

関連ドキュメント