All Products
Search
Document Center

CloudFlow:Perform callbacks on asynchronous tasks

Last Updated:Oct 30, 2023

This topic describes the callback feature of Serverless workflow. Compared with polling, a callback effectively reduces the delay and unnecessary pressure on the server caused by polling. In addition, callback can be used with queues to orchestrate non-Function Compute tasks. In this way, Serverless workflow allows you to orchestrate any type of computing resources.

Overview

Long-running tasks are asynchronously submitted and a task ID is returned. You can use either polling or callback to check whether an asynchronous task ends. The Poll for task status topic describes how to use polling to check whether a task ends. The callback feature of Serverless workflow has the following benefits:

  • Eliminate unnecessary delay caused by long polling.
  • Eliminate unnecessary pressure on and waste of server resources caused by highly concurrent polling in large-traffic scenarios.
  • Orchestrate tasks that do not involve functions in Function Compute, such as processes running in an on-premises data center or an Elastic Compute Service (ECS) instance.
  • Automate steps that require manual intervention, such as notifying that a task has been approved.

The following figure shows how to use Message Service (MNS) queues with the callback API to orchestrate user-created resources in Serverless workflow.

fnf-doc-service-integration-mns-queue

Callback usage

In the task step, specify pattern: waitForCallback. As shown in the following figure, after the task, such as Function Compute call, specified in resourceArn is submitted, this step stores taskToken to the context object of the step and is suspended until Serverless workflow receives that the callback or the specified task times out. When taskToken is passed to the ReportTaskSucceed or ReportTaskFailed operation for callback, this step continues.

fnf-doc-callback-state-machine
  - type: task
      name: mytask
      resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
      pattern: waitForCallback  # Enables the task step to wait for callback after the task is submitted.
      inputMappings:
        - target: taskToken
          source: $context.task.token  # Uses taskToken in the context object as an input for the function that is specified in resourceArn.
      outputMappings:
        - target: k
          source: $local.key  # Maps output {"key": "value"} in ReportTaskSucceeded to {"k": "value"} and uses the mapped data as the output of this step.         

Example

This example consists of the following three steps:

  1. Prepare a task function.
  2. Start a flow.
  3. Perform callback.

Step 1: Prepare a task function

Create a simple function. The function directly returns the input.
  • Service: fnf-demo.
  • Function: echo.
  • Runtime environment: Python 2.7.
  • Entry point: index.handler.
#! /usr/bin/env python
import json

def handler(event, context):
    return event          

Step 2: Start a flow

In the Serverless workflow console, create the following flow and execute it.
  • Flow name: fnf-demo-callback.
  • Flow role: a role with the Function Compute Invocation permission.
version: v1
type: flow
steps:
  - type: task
    name: mytask
    resourceArn: acs:fc:::services/fnf-demo/functions/echo
    pattern: waitForCallback
    inputMappings:
      - target: taskToken
        source: $context.task.token
    outputMappings:
      - target: s
        source: $local.status         

After the flow starts, the mytask step is suspended in the TaskSubmitted event and waits for the callback. The event output contains taskToken that identifies the callback task.

Screen Shot 2019-08-15 at 11.00.09 AM

Step 3: Perform the callback

Use Serverless workflow Python SDK to run the callback.py script locally or in any environment where Python can run. Replace {task-token} with the value of the TaskSubmitted event.
cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback

# Install Serverless Workflow Python SDK in a virtual environment.
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf

# Run the worker process.
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
                        
# The worker.py code:

import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest

def main():
  account_id = os.environ['ACCOUNT_ID']
  akid = os.environ['AK_ID']
  ak_secret = os.environ['AK_SECRET']

  fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")

  task_token = sys.argv[1]
  print "task token " + task_token
  try:
    request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
    request.set_Output("{\"status\": \"ok\"}")
    request.set_TaskToken(task_token)
    resp = fnf_client.do_action_with_exception(request)
    print "Report task succeeded finished"
  except ServerException as e:
    print(e)

if __name__ == '__main__':
    main()                      
After the callback by using the preceding script is successful, the mytask step continues, and the "{"status": "ok"}" output specified in ReportTaskSucceeded is mapped by outputMappings to "{"s": "ok"}".