Knative Eventing は、Broker-Trigger モデルを使用して、サービス間のイベントのルーティングとフィルタリングを行います。このチュートリアルでは、イベントを受信する Knative Service、イベントをルーティングするブローカー、両者を接続するトリガー、そして最初のイベントをエンドツーエンドで送信する curl コマンドを含む、完全なイベント駆動型システムを ACK 上にデプロイします。
仕組み
Knative Eventing は、外部システムからイベントを取り込み、CloudEvents 標準を使用して内部で転送します。その中心にあるのが Broker-Trigger モデルです。ブローカーは任意のソースからイベントを受信し、各トリガーはそのブローカーをサブスクライブし、オプションのフィルターを適用して、一致するイベントを下流のサービスにディスパッチします。
このアーキテクチャは、6 つのコンポーネントで構成されています。
イベントソース — データベースの更新やクラウドメッセージングサービスなど、イベントの発生元。
Ingress — 外部イベントを Knative クラスター内に受信します。
チャンネル — Broker-Trigger モデル内でイベントを転送します。サポートされているチャンネルには、ApsaraMQ for Kafka、NATS Streaming、InMemoryChannel があります。デフォルトは InMemoryChannel です。
ブローカー — 設定されたトリガーに従って、さまざまなソースからのイベントをルーティングします。NATS や ApsaraMQ for Kafka などのバックエンドをサポートします。
トリガー — 特定のサービスにイベントをルーティングする方法を定義します。各トリガーにはイベントフィルターとサービスのターゲットがあり、一致するイベントのみが転送されます。
サービス — トリガーからイベントを受信した後にビジネスロジックを実行する最終的なプロセッサ。
前提条件
開始する前に、以下が完了していることを確認してください。
Knative Serving がインストールされていること。詳細については、「Knative のデプロイ」をご参照ください。
Knative Eventing がインストールされていること。詳細については、「Knative Eventing のデプロイ」をご参照ください。
ステップ 1:Knative Service のデプロイ
イベントを受信し、その内容をログに出力する Knative Service である event-display をデプロイします。このステップの後、イベントを消費する準備ができた実行中のサービスが作成されます。
event-display.yamlという名前のファイルに、次の内容を記述して作成します。apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:v1028サービスをデプロイします。
kubectl apply -f event-display.yamlサービスが準備完了であることを確認します。
kubectl get ksvc次のような出力が期待されます。
NAME URL LATESTCREATED LATESTREADY READY REASON event-display http://event-display.default.example.com event-display-00001 event-display-00001 TrueREADYの値がTrueであれば、サービスが実行中で、イベントを受信する準備ができていることを示します。サービスがReady状態にならない場合は、次のコマンドを実行して診断してください。kubectl describe ksvc event-display
ステップ 2:ブローカーとトリガーの作成
default という名前のブローカーと my-service-trigger という名前のトリガーを作成します。このステップの後、ブローカーに送信されたすべてのイベントは、自動的に event-display サービスにルーティングされます。
ブローカーの作成
broker.yamlという名前のファイルに、次の内容を記述して作成します。apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: default namespace: defaultブローカーをデプロイします。
kubectl apply -f broker.yamlブローカーが準備完了であることを確認します。
kubectl get broker次のような出力が期待されます。
NAME URL AGE READY REASON default http://broker-ingress.knative-eventing.svc.cluster.local/default/default 9s TrueURLフィールドは、このブローカーにイベントを送信する場所を示します。READYの値がTrueであれば、ブローカーが正しく設定されていることを示します。ブローカーがReady状態にならない場合は、次のコマンドを実行して診断してください。kubectl describe broker default
トリガーの作成
trigger.yamlという名前のファイルに、次の内容を記述して作成します。apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: my-service-trigger spec: broker: default subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-displayこのトリガーには
filterフィールドがないため、defaultブローカーに送信されたすべてのイベントをevent-displayに転送します。フィルターを省略することは、テストやデバッグに役立ちます。本番環境では、特定のイベントタイプのみをルーティングするためにフィルターを追加してください。トリガーをデプロイします。
kubectl apply -f trigger.yamlトリガーが準備完了であることを確認します。
kubectl get trigger次のような出力が期待されます。
NAME BROKER SUBSCRIBER_URI AGE READY REASON my-service-trigger default http://event-display.default.svc.cluster.local 22s TrueREADYの値がTrueであれば、トリガーがブローカーに正しくサブスクライブされ、イベントをevent-displayに転送することを示します。トリガーがReady状態にならない場合は、次のコマンドを実行して診断してください。kubectl describe trigger my-service-trigger
ステップ 3:イベントの送信と配信の確認
テストイベントをブローカーに送信し、event-display がそれを受信してログに記録することを確認します。このステップでは、kubectl port-forward を使用してブローカーの内部エンドポイントをローカルに公開し、CloudEvents 形式の HTTP POST リクエストを送信します。
ブローカーの Ingress ポートをローカルマシンに転送します。
kubectl port-forward svc/broker-ingress -n knative-eventing 8080:80別のターミナルで、ブローカーに POST リクエストを送信します。
curl -v "http://localhost:8080/default/default" \ -X POST \ -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: dev.knative.samples.helloworld" \ -H "Ce-Source: dev.knative.samples/helloworldsource" \ -H "Content-Type: application/json" \ -d '{"msg":"Hello World from the curl pod."}'期待される応答は、以下で終わります。
< HTTP/1.1 202 Accepted202 Accepted応答は、ブローカーがリクエストを受信し、イベントの処理を受け付けたことを示します。event-displayPod の名前を取得します。kubectl get pods --namespace=default | grep event-display次のような出力が期待されます。
event-display-00001-deployment-766f7b9fd6-gfcz5 2/2 Running 0 3m43sPod のログを表示して、イベントが配信されたことを確認します。
kubectl logs event-display-00001-deployment-766f7b9fd6-gfcz5次のような出力が期待されます。
Defaulted container "user-container" out of: user-container, queue-proxy ☁️ cloudevents.Event Context Attributes, specversion: 1.0 type: dev.knative.samples.helloworld source: dev.knative.samples/helloworldsource id: 536808d3-88be-4077-9d7a-a3f162705f79 datacontenttype: application/json Extensions, knativearrivaltime: 2024-10-28T11:48:56.929517041Z Data, { "msg": "Hello World from the curl pod1." }このログは、
event-displayがブローカーから CloudEvent を受信し、CloudEvents ヘッダーと JSON ペイロードを含むその全内容を出力したことを示しています。
次のステップ
トリガーにフィルターを追加して、特定のイベントタイプのみをルーティングします。詳細については、「トリガーのフィルタリング」をご参照ください。
本番環境レベルのイベント永続化のために、InMemoryChannel を ApsaraMQ for Kafka または NATS Streaming に置き換えます。
追加のトリガーをデプロイして、イベントを複数のサービスにファンアウトします。