Knative Eventing は、Broker-Trigger モデルを使用して、サーバーレスアプリケーションに堅牢なイベント駆動機能です。 このコンポーネントは、イベントの転送とフィルタリングを可能にし、複雑なイベントフローを調整するために使用できます。 Broker-Trigger モデルは、ユーザーの行動に基づいて自動的に通知を送信したり、さまざまなソースからのデータストリームを処理したりするなど、外部トリガーまたは内部トリガーに応答するアプリケーションに最適です。 このトピックでは、Knative Eventing を使用してシンプルなイベント駆動型アーキテクチャを構築する方法について説明します。これには、サービスのデプロイ、メッセージ配信メカニズム、および機能検証が含まれます。
仕組み
Knative Eventing は、さまざまな外部イベントシステムと連携するためのさまざまなイベントモデルを備えています。 イベントが取り込まれると、クラウドイベント標準を使用して内部的に転送され、Broker-Trigger イベント駆動モデルを使用して処理されます。
下図に示すように、Knative イベント駆動型アーキテクチャでは、Broker はメッセージ送信の仲介役として機能し、Trigger は特定の条件下でワークフローを自動的に開始します。 Broker-Trigger イベント駆動型モデルの原理は、Trigger に Brokerイベントをサブスクライブさせてフィルタリングさせ、その後、対応するサービスにイベントをディスパッチして消費させることです。
イベントソース:イベントの発生元。データベースの更新などの内部システム、またはクラウドメッセージングサービスなどの外部サービスを使用できます。
Ingress:Knative クラスターへの外部イベントを受信します。
チャネル:Knative Eventing で Broker-Trigger モデルを使用するには、イベントを転送する適切なチャネルを選択する必要があります。 サポートされているイベント転送チャネルには、ApsaraMQ for Kafka、NATS Streaming、InMemoryChannel などがあります。 デフォルトのチャネルは InMemoryChannel です。
Broker:イベントの仲介者として機能し、設定されたトリガーに従ってさまざまなソースからイベントをルーティングします。 ブローカーは複雑なイベントフローと処理ロジックを実装でき、NATS や ApsaraMQ for Kafka などの複数のプロトコルとバックエンドイベントサービスをサポートし、より柔軟かつ効率的にイベントの管理と配布を実行できます。
Trigger:イベントを特定のサービスにルーティングする方法を定義します。 各 Trigger は、通常はイベントタイプまたはコンテンツに基づくイベントフィルタ、およびサービスターゲットに関連付けられます。 フィルター条件に一致するイベントのみ、指定されたサービスに送信されます。
サービス:イベントの最終処理を行います。 トリガーによってルーティングされたイベントを受信した後、対応するビジネスロジックを実行します。
前提条件
Knative Serving と Knative Eventing がインストールされている必要があります。 詳細については、「Knative のデプロイ」「」および「Knative Eventing のデプロイ」「」をご参照ください。
以下のセクションでは、イベントの作成、送信、消費など、Knative Eventing フレームワークを使用して基本的なイベント駆動型システムを構築するプロセスについて説明します。
手順 1:Knative サービスのデプロイ
このセクションでは、Knative Serviceの例として event-display
を使用します。 event-display
の主な役割は、イベントを受け取り、その内容を出力することです。
以下の YAML テンプレートを使用して Knative サービスを作成します。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:v1028
Knative サービスをデプロイします。
kubectl apply -f event-display.yaml
Knative サービスが作成されているかどうかを確認します。
kubectl get ksvc
想定される出力:
NAME URL LATESTCREATED LATESTREADY READY REASON event-display http://event-display.default.example.com event-display-00001 event-display-00001 True
手順 2:Broker と Trigger の作成
名前が default
のBroker と名前が my-service-Trigger
の Trigger を作成します。 この Trigger は、明示的に処理されないイベントが、手順 1 でデプロイされた event-display
サービスにルーティングされるようにします。
Broker を作成します。
以下の YAML テンプレートを使用して Broker を作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: default namespace: default
Broker をデプロイします。
kubectl apply -f broker.yaml
Broker が作成されているかどうかを確認します。
kubectl get broker
想定される出力:
NAME URL AGE READY REASON default http://broker-ingress.knative-eventing.svc.cluster.local/default/default 9s True
出力には、Broker の情報と URL が表示されています。 URL は、イベントの送信先を示します。
トリガーを作成します。
以下のサンプルコードでトリガーを作成します。
デフォルトの Broker をサブスクライブして、イベントを
event-display
サービスにルーティングします。apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: my-service-trigger spec: broker: default subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
上記の YAML テンプレートによって作成された
my-service-trigger
はdefault Broker
にリンクされ、event-display
サービスをサブスクライバーとして指定します。 つまり、default Broker
に送信されたイベントは、それらを処理するために他の特定のトリガーが指定されていない場合、event-display
サービスに転送されます。トリガーをデプロイします。
kubectl apply -f trigger.yaml
トリガーが作成されたかどうかを確認します。
kubectl get trigger
想定される出力:
NAME BROKER SUBSCRIBER_URI AGE READY REASON my-service-trigger default http://event-display.default.svc.cluster.local 22s True
出力には、Trigger の準備ができていることが示されています。つまり、Trigger が正しく設定され、イベントの受信とルーティングが可能になっています。
手順 3:イベントの送信と、サービスによるイベント受信の可否を確認
HTTP POST リクエストを外部ソースから Broker に直接送信して、イベントトリガーをシミュレートします。 curl
コマンドを使用して、特定の CloudEvents ヘッダーを含むリクエストを Broker に送信し、event-display
サービスがこのメッセージを受信して出力が想定通りに行われるかどうかを確認します。
broker-ingress.knative-eventing
を使用して、Broker からイベントリクエストアドレスを取得します。 Broker にイベントリクエストを送信し、Knative Eventing システムの Broker Service が正しく構成され、イベントが受信できることを確認します。kubectl の
port-forward
機能を使用して、クラスター内の内部サービスポートをローカルマシンに転送し、サービスへのアクセスを許可します。kubectl port-forward svc/broker-ingress -n knative-eventing 8080:80
curl
コマンドを使用して POST リクエストをローカルポート 8080 に送信し、サービスがリクエストを正しく受信して処理できることを確認します。curl -v "http://localhost:8080/default/default" \ -X POST \ -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: dev.knative.samples.helloworld" \ -H "Ce-Source: dev.knative.samples/helloworldsource" \ -H "Content-Type: application/json" \ -d '{"msg":"Hello World from the curl pod."}'
想定される出力:
Note: Unnecessary use of -X or --request, POST is already inferred. * Trying 127.0.0.1:8080... * Connected to localhost (127.0.0.1) port 8080 (#0) > POST /default/default HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/8.1.2 > Accept: */* > Ce-Id: 53****3-88be-4077-9d7a-a3f162******9 > Ce-Specversion: 1.0 > Ce-Type: dev.knative.samples.helloworld > Ce-Source: dev.knative.samples/helloworldsource > Content-Type: application/json > Content-Length: 40 > < HTTP/1.1 202 Accepted < Allow: POST, OPTIONS < Date: Tue, 15 Oct 2024 09:36:42 GMT < Content-Length: 0 < * Connection #0 to host localhost left intact
想定される出力では、サービスが
http://localhost:8080/default/default
に送信された POST リクエストを正しく受信し、リクエストが受け取られたことを示す202 Accepted
ステータスコードが返されたことが確認できます。
event-display
ポッドのログを調べて、イベントソースを確認します。event-display
ポッドの詳細を取得し、そのログを表示します。kubectl get pods --namespace=default | grep event-display # The output is as follows: event-display-00001-deployment-766f7b9fd6-gfcz5 2/2 Running 0 3m43s
ログを確認します。
kubectl logs event-display-00001-deployment-766f7b9fd6-gfcz5 # The log output is as follows: Defaulted container "user-container" out of: user-container, queue-proxy ☁️ cloudevents.Event Context Attributes, specversion: 1.0 type: dev.knative.samples.helloworld source: dev.knative.samples/helloworldsource id: 536808d3-88be-4077-9d7a-a3f162705f79 datacontenttype: application/json Extensions, knativearrivaltime: 2024-10-28T11:48:56.929517041Z Data, { "msg": "Hello World from the curl pod1." }
想定される出力では、Kubernetes で実行されているアプリケーションが Knative Samples Hello World ソースから CloudEvents イベントを受信し、これらのイベントの内容が出力されていることを示しています。