ApsaraMQ for MNS を使用すると、ワークフロークラスターは外部イベントに自動的に反応できます。OSS、EventBridge、またはその他のソースから ApsaraMQ for MNS キューまたはトピックにメッセージが到着すると、Argo Events がそれを取得し、対応するワークフローをトリガーします。
このガイドでは、順に設定する 3 つの Argo Events コンポーネントについて説明します。
イベントバス — 名前空間内のすべてのイベント駆動型ワークフローで共有される通信バックボーン
イベントソース — SMQ キューに接続し、受信メッセージをイベントとして転送します
イベントセンサー — イベントソースからのイベントをリッスンし、条件が一致するとワークフローを起動します
前提条件
開始する前に、以下を確認してください。
イベント駆動型の機能が有効になっています。詳しくは、「イベント駆動型の機能を有効化する」を参照してください。
ApsaraMQ for MNS は、必要な権限で有効化されています。この機能は、ApsaraMQ for MNS の課金対象となります。詳しくは、「ApsaraMQ for MNS の有効化と権限の付与」および「課金概要」をご参照ください。
ワークフロークラスターを作成し、kubectl を設定しておきます。詳細については、「ワークフロークラスターの作成」および「クラスターの KubeConfig ファイルを取得し、kubectl を使用してクラスターに接続する」をご参照ください。
Alibaba Cloud Argo CLI がインストールされていること。詳細については、「Alibaba Cloud Argo CLI をインストール」をご参照ください。
ステップ 1: イベントバスを作成
イベントバスは、名前空間内のすべてのイベント駆動型ワークフローで共有されます。NATS または SMQ を使用して作成できます。
| オプション | 説明 | 使用するタイミング |
|---|---|---|
| NATS | クラスター内で Pod として実行 | 軽量なテスト、または SMQ 統合が不要な場合 |
| SMQ | Alibaba Cloud ApsaraMQ for MNS をトランスポートレイヤーとして使用 | 信頼性の高いクロスサービスメッセージングと Alibaba Cloud エコシステムとの統合 |
イベントバスをすでに持っている場合は、「ステップ 2: イベントソースを作成」にスキップしてください。
NATS を使用して作成
event-bus.yamlファイルを作成します。apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: nats: native: replicas: 3 auth: tokenファイルを適用してイベントバスを作成します。
このコマンドの実行後、デフォルトの名前空間にイベントバス Pod が作成されます。以降のすべてのステップは、同じ名前空間で実行する必要があります。
kubectl apply -f event-bus.yamlイベントバス Pod が起動したことを確認します。
kubectl get pod
SMQ を使用して作成
SMQ コンソールにログインします。SMQ コンソール の [トピック一覧] ページで、
argoeventbusという名前のトピックを作成します。[トピックの詳細] ページで、[エンドポイント] セクションからエンドポイントをコピーします。管理者権限を持つ RAM ユーザーとしてRAM コンソールにログインします。RAM ユーザーを作成し、
AliyunMNSFullAccess権限を付与し、新しいユーザーの AccessKey ID と AccessKey Secret を取得します。詳細については、「RAM ユーザーを作成」、「RAM ユーザーに権限を付与」、「AccessKey ペアを作成」、および「RAM ユーザーの AccessKey 情報の表示」をご参照ください。認証情報を保存するための Kubernetes Secret を作成します。
kubectl create secret generic mns-secret \ --from-literal=accesskey=<your-access-key-id> \ --from-literal=secretkey=<your-access-key-secret>event-bus-mns.yamlファイルを作成します。ステップ 1 でコピーしたtopicとendpointの値を置き換えます。apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: alimns: accessKey: key: accesskey name: mns-secret secretKey: key: secretkey name: mns-secret topic: argoeventbus # SMQ トピック名 endpoint: http://165***368.mns.<region>.aliyuncs.com # SMQ エンドポイントファイルを適用してイベントバスを作成します。
kubectl apply -f event-bus-mns.yaml
ステップ 2: イベントソースを作成
イベントソースは SMQ キューに接続し、各メッセージをセンサーが処理できるイベントとして転送します。
SMQコンソールにログインします。[キューリスト] ページで、
test-event-queueという名前のキューを作成します。[キューの詳細] ページで、[エンドポイント] セクションからエンドポイントをコピーします。SMQ を使用してイベントバスをすでに作成している場合 (したがって、
mns-secretをすでに持っている場合) は、ステップ 3 と 4 をスキップして直接ステップ 5 に進みます。管理者権限を持つ RAM ユーザーとしてRAM コンソールにログインします。RAM ユーザーを作成し、
AliyunMNSFullAccess権限を付与し、AccessKey ID と AccessKey Secret を取得します。Kubernetes Secret を作成します。
kubectl create secret generic mns-secret \ --from-literal=accesskey=<your-access-key-id> \ --from-literal=secretkey=<your-access-key-secret>event-source.yamlファイルを作成します。queueとendpointを実際の値に置き換えます。apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: ali-mns spec: mns: example: jsonBody: true accessKey: key: accesskey name: mns-secret secretKey: key: secretkey name: mns-secret queue: test-event-queue # SMQ キュー名 waitTimeSeconds: 20 endpoint: http://165***368.mns.<region>.aliyuncs.com # SMQ エンドポイントファイルを適用します。
kubectl apply -f event-source.yamlイベントソース Pod が起動したことを確認します。
kubectl get pod
ステップ 3: イベントセンサーを作成
センサーは ali-mns からのイベントをリッスンし、イベントが到着するたびにワークフローを起動します。イベントの data.body フィールドをワークフローの message パラメーターに直接マッピングします。SMQ キューに送信するコンテンツがワークフロー入力となります。
センサー YAML の parameters ブロックがこのマッピングを制御します。
| フィールド | 値 | 説明 |
|---|---|---|
src.dependencyName | test-dep | イベントを読み取る依存関係 |
src.dataKey | body | 抽出する data 内のフィールド (Base64 エンコードされたメッセージ本文) |
dest | spec.arguments.parameters.0.value | 抽出された値を書き込むワークフローパラメーター |
以下の内容で
event-sensor.yamlファイルを作成します。ファイルを適用します。
kubectl apply -f event-sensor.yamlセンサー Pod が起動したことを確認します。
kubectl get pod
ApsaraMQ for MNS を使用してイベントバスを作成した場合、センサーの起動時に ApsaraMQ for MNS キューが自動的に作成されます。これは ackone-argowf-<namespace>-<sensor-name>-<sensor-uid> という命名パターンに従います。ステップ 4: ワークフローがトリガーされたことを確認
「SMQ コンソール」にログインします。「[キュー一覧]」ページで、
test-event-queueを見つけ、「[送信および受信メッセージ]」を「[操作]」列でクリックします。「メッセージの送受信クイックスタート」ページで、
test trigger argo workflowをメッセージ本文として入力し、[メッセージを送信] をクリックします。ワークフローがトリガーされたことを確認します。
argo list出力には実行中のワークフローが表示されます。
NAME STATUS AGE DURATION PRIORITY ali-mns-workflow-5prz7 Running 6s 6s 0ワークフローログを取得してメッセージ本文を確認します。
argo logs ali-mns-workflow-5prz7重要ali-mns-workflow-5prz7を前のステップで返されたワークフロー名に置き換えます。ログ内のメッセージ本文は Base64 エンコードされています。
ステップ 5: イベント関連リソースを削除
イベントセンサー、イベントソース、およびイベントバスを削除します。
kubectl delete sensor ali-mns kubectl delete eventsource ali-mns kubectl delete eventbus defaultすべての Pod が削除されたことを確認します。
kubectl get pod