Flink 複雑なイベント処理 (CEP) は、複雑なイベントストリームを動的に処理して、特定のイベントパターンをリアルタイムで検出し、アラートをトリガーする機能です。eコマースマーケティングでは、Flink CEP はユーザーの行動とトランザクションデータをリアルタイムで監視し、異常または重大なイベントを特定して、タイムリーなアラートを送信できます。
背景情報
eコマース業界の急速な成長により、ユーザーの行動やトランザクションデータの量が指数関数的に増加しています。従来のバッチ処理方法では、異常な行動、システムの脅威、ユーザーの解約をタイムリーに特定し、対応するという需要を満たすことができなくなりました。対照的に、動的な複雑なイベント処理 (CEP) エンジンは、多段階のユーザー行動をモデル化し、分析することができます。これにより、複雑なイベントパターンを自動的に特定し、脅威の初期段階でアラートをトリガーします。これが、リアルタイムのビジネス運用における動的 CEP の中核的な利点です。これには、次の 3 つの主要な特徴があります。
高いリアルタイム性能: ミリ秒レベルの応答を提供します。これにより、事後分析ではなくイベント中のアラートが可能になり、より迅速な意思決定に役立ちます。
柔軟で設定可能なルール: ルールポリシーの動的な更新をサポートします。これにより、サービスを再起動することなく、ビジネスの変化に迅速に対応できます。
強力な複雑なイベント認識: 複数イベントシーケンス、タイムウィンドウ、複合条件などの高度な論理マッチングをサポートします。これにより、複雑なビジネスシナリオを正確に捉えることができます。
eコマース業界における動的 CEP の一般的なシナリオには、以下が含まれますが、これらに限定されません。
シナリオ | 説明 |
クロスセルおよびアップセルの機会 | 製品を閲覧する際、ユーザーはしばしば異なるカテゴリにわたって関心を示します。たとえば、ユーザーが携帯電話を見た後、ヘッドフォンやパワーバンクを見るかもしれません。この行動は、クロスセルおよびアップセルの機会を提供します。電話ケースやヘッドフォンなどの補完的な製品を正確に推奨したり、「電話 + ヘッドフォンパッケージ割引」などのバンドル取引を提供したりすることで、プラットフォームは追加アイテムの購入率を高め、平均注文額を上げることができます。これはまた、ユーザーエクスペリエンスを向上させ、ユーザーのロイヤルティを高め、ビジネスの成長を促進します。 |
高価値ショッピングカートの回復 | ユーザーは、価格への敏感さやためらいから、高価値の商品をショッピングカートに追加しても購入を完了しない場合があります。これにより、売上の潜在的な損失が発生します。リアルタイムで放棄されたショッピングカートを特定し、期間限定の割引、在庫僅少アラート、または送料無料のオファーなどの介入をトリガーすることで、プラットフォームは高価値商品の損失を効果的に削減し、注文変換率を高め、潜在的な収益を回復できます。これにより、ユーザー価値とプラットフォーム収益の両方にとってウィンウィンの状況が生まれます。 |
高い購入意欲を持つユーザーの特定 | 短期間に同じ商品を何度も閲覧するユーザーは、高い購入意欲を示しています。この行動を特定し、限定クーポンや在庫リマインダーなどのパーソナライズされたマーケティングをトリガーすることで、プラットフォームはユーザーの意思決定プロセスを加速させ、コンバージョン率を高め、ユーザーエクスペリエンスを向上させ、売上を伸ばすことができます。 |
価格に敏感なユーザー操作 | 価格に敏感なユーザーは、商品を繰り返し閲覧し、価格が下がったときにのみショッピングカートに追加することがよくあります。この行動を分析することで、プラットフォームは価格が変更されたときに「フォローしている商品がセール中です!」などの通知やターゲットを絞ったオファーを送信できます。これにより、コンバージョン率が向上し、ユーザー操作の効率が向上します。 |
解約リスクアラート | 頻繁に商品を閲覧するものの、長期間注文しないユーザーは、解約のリスクがある可能性があります。この行動を特定し、限定クーポンの送信や人気商品の推奨などの回復措置を講じることで、プラットフォームは解約率を効果的に削減し、ユーザーのライフサイクルを延長し、ユーザーの定着率とプラットフォームの収益を向上させることができます。 |
ソリューションアーキテクチャ
Flink CEP は、複雑なイベントパターンを処理するための Apache Flink ライブラリです。Flink CEP を使用すると、複雑なイベントパターンを定義し、イベントストリームをリアルタイムで監視し、それらのパターンに一致するイベントシーケンスを特定できます。その後、ライブラリは一致する結果を生成します。ソリューションアーキテクチャは次のとおりです。

イベントストリーム
イベントストリームは、CEP 処理の入力ソースです。これは通常、時系列に並んだ一連のイベントを含む連続的なデータストリームです。各イベントは、パターンマッチングに使用される複数のプロパティを持つことができます。
パターンとルールの定義
検出したいイベントシーケンスや組み合わせを記述するイベントパターンとルールを定義できます。パターンには、イベントの順序、時間制約、および条件フィルターを含めることができます。たとえば、イベント A の後に 10 秒以内にイベント B が続くパターンを定義できます。
CEP エンジン分析
CEP エンジンはイベントストリームを受け入れ、定義されたパターンとルールに基づいて分析します。エンジンはイベントストリームを継続的に監視し、入力イベントを定義されたパターンと照合しようとします。照合プロセス中、エンジンはイベントの時間順序、プロパティ条件、タイムウィンドウなどの制約を考慮します。
CEP マッチング出力
イベントストリーム内のイベントシーケンスが定義されたパターンに正常に一致すると、CEP エンジンは出力を生成します。この出力は、一致したイベントシーケンス、ルールによってトリガーされた操作、または別のユーザー定義の出力フォーマットにすることができます。一致結果は、アラート、意思決定、データストレージなどの後続処理に使用できます。
前提条件
Realtime Compute for Apache Flink をアクティブ化していること。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
Message Queue for Apache Kafka をアクティブ化していること。詳細については、「Message Queue for Apache Kafka インスタンスをデプロイする」をご参照ください。
RDS for MySQL をアクティブ化していること。詳細については、「RDS for MySQL インスタンスを作成する」をご参照ください。
Realtime Compute for Apache Flink、ApsaraDB RDS for MySQL、および Message Queue for Apache Kafka が同じ VPC にあることを確認してください。同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、インターネット経由でサービスにアクセスする必要があります。詳細については、「VPC をまたいで他のサービスにアクセスするにはどうすればよいですか?」および「インターネットにアクセスするにはどうすればよいですか?」をご参照ください。
必要な権限を持つ Resource Access Management (RAM) ユーザーまたは RAM ロールがあること。
ステップ 1: 準備
ApsaraDB RDS for MySQL インスタンスを作成し、データソースを準備する
ApsaraDB RDS for MySQL データベースを作成します。詳細については、「データベースを作成する」をご参照ください。
宛先インスタンスに、
ecommerceという名前のデータベースを作成します。MySQL Change Data Capture (CDC) データソースを準備します。
宛先インスタンスの詳細ページで、ページの上部にある [データベースにログオン] をクリックします。
DMS ログオンダイアログボックスで、作成したデータベースアカウントのユーザー名とパスワードを入力し、[ログオン] をクリックします。
ログイン後、左側の
ecommerceデータベースをダブルクリックして切り替えます。SQL コンソールで、次のデータ定義言語 (DDL) 文を入力してテーブルを作成し、データを挿入します。
-- ルールテーブル 1 を作成 CREATE TABLE rds_demo1 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- ルールテーブル 2 を作成 CREATE TABLE rds_demo2 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- ルールテーブル 3 を作成 CREATE TABLE rds_demo3 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- ルールテーブル 4 を作成 CREATE TABLE rds_demo4 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- ルールテーブル 5 を作成 CREATE TABLE rds_demo5 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- ソーステーブルを作成 CREATE TABLE `click_stream1` ( id bigint not null primary key auto_increment, -- 自動インクリメントプライマリキー eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream2` ( id bigint not null primary key auto_increment, -- 自動インクリメントプライマリキー eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream3` ( id bigint not null primary key auto_increment, -- 自動インクリメントプライマリキー eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream4` ( id bigint not null primary key auto_increment, -- 自動インクリメントプライマリキー eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream5` ( id bigint not null primary key auto_increment, -- 自動インクリメントプライマリキー eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) );[実行] をクリックし、次に [直接実行] をクリックします。
Kafka Topic とグループリソースの作成
次の Kafka リソースを作成します。詳細については、「リソースを作成する」をご参照ください。
グループ: clickstream.consumer.
Topic: click_stream1、click_stream2、click_stream3、click_stream4、および click_stream5。
Topic を作成するときは、パーティションの数を 1 に設定します。そうしないと、一部のシナリオでサンプルデータが結果と一致しない場合があります。

ステップ 2: MySQL から Kafka へリアルタイムでデータを同期する
ユーザーのクリックストリームイベントを MySQL から Kafka に同期することで、複数のジョブが MySQL データベースにかける負荷を軽減します。
MySQL カタログを作成します。詳細については、「MySQL カタログを作成する」をご参照ください。
この例では、カタログ名は
mysql-catalogで、デフォルトのデータベースはecommerceです。Kafka カタログを作成します。詳細については、「Kafka JSON カタログを管理する」をご参照ください。
この例では、カタログ名は
kafka-catalogです。ページで、SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーします。
CREATE TEMPORARY TABLE `clickstream1` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- プライマリキーを定義します。 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- ウォーターマークを定義します。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream1', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream2` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- プライマリキーを定義します。 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- ウォーターマークを定義します。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream2', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream3` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- プライマリキーを定義します。 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- ウォーターマークを定義します。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream3', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream4` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- プライマリキーを定義します。 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- ウォーターマークを定義します。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream4', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream5` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- プライマリキーを定義します。 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- ウォーターマークを定義します。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream5', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); BEGIN STATEMENT SET; INSERT INTO `clickstream1` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream1`; INSERT INTO `clickstream2` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream2`; INSERT INTO `clickstream3` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream3`; INSERT INTO `clickstream4` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream4`; INSERT INTO `clickstream5` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream5`; END; -- 複数のシンクに書き込む場合に必要です。右上隅にある [デプロイ] をクリックしてジョブをデプロイします。
左側のナビゲーションウィンドウで、 を選択します。対象のジョブの [操作] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。
ステップ 3: CEP ジョブの開発、デプロイ、および開始
このセクションでは、cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar ジョブをデプロイする方法について説明します。このジョブは、Kafka からユーザーのクリックストリームイベントを消費し、それらを処理し、アラート情報を Realtime Compute for Apache Flink 開発コンソールに出力します。ビジネスアーキテクチャに基づいてコードを調整し、さまざまなデータ出力シナリオに適したダウンストリームコネクタを選択できます。サポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。
1. コード開発
このセクションでは、コアコードのみを示し、その機能を説明します。
2. ジョブのデプロイ
ページで、 をクリックして、5 つのストリームジョブを個別にデプロイします。

次の表にパラメーターを示します。
パラメーター | 説明 | 例 |
デプロイモード | ストリーム処理 | ストリーミングモード |
デプロイメント名 | 対応する JAR ジョブの名前を入力します。 |
|
エンジンバージョン | 現在のジョブで使用される Flink エンジンバージョン。 このトピックのコードの SDK は JDK 11 を使用します。 | vvr-8.0.11-jdk11-flink-1.17 |
JAR URI | 右側の | oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar |
エントリポイントクラス | プログラムのエントリポイントクラス。 | com.alibaba.ververica.cep.demo.CepDemo |
エントリポイントメイン引数 | ここでパラメーターを渡し、main メソッドで呼び出すことができます。 このトピックには、次のパラメーターを設定します:
|
|
デプロイの詳細については、「JAR ジョブをデプロイする」をご参照ください。
3. ジョブの開始
[ジョブ O&M] ページで、対象のジョブの [操作] 列で [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。シナリオ用の 5 つのジョブ、EcommerceCEPRunner1、EcommerceCEPRunner2、EcommerceCEPRunner3、EcommerceCEPRunner4、EcommerceCEPRunner5 を順番に開始します。
開始構成の詳細については、「ジョブを開始する」をご参照ください。
アイコンをクリックして、








