All Products
Search
Document Center

Container Service for Kubernetes:Use Argo Workflows to implement dynamic DAG Fan-in and Fan-out

Last Updated:Dec 17, 2025

To accelerate tasks during workflow orchestration, you can use Fan-in and Fan-out to split a task into multiple sub-tasks, run the sub-tasks in parallel, and then aggregate the results. Kubernetes clusters for distributed Argo workflows (workflow clusters) support directed acyclic graph (DAG) dynamic Fan-in and Fan-out, which allows you to use auto scaling to schedule tens of thousands of vCPUs to improve efficiency and reclaim resources after tasks are completed to save costs. This topic describes how to use Argo Workflows in workflow clusters to implement dynamic Fan-in and Fan-out.

Background Information

Fan-in and Fan-out

image

Fan-in and Fan-out are commonly used to efficiently process concurrent tasks. By splitting tasks (Fan-out) and aggregating results (Fan-in), you can completely make use of multiple vCPUs and computing instances to process large amounts of data.

In the preceding figure, you can use a DAG to implement Fan-in and Fan-out during workflow orchestration. You can statically (static DAG) or dynamically (dynamic DAG) split a task into sub-tasks.

  • Static DAG: The categories of sub-tasks are fixed. For example, in data collection scenarios, you need to collect data from Database 1 and Database 2 and aggregate the results.

  • Dynamic DAG: Tasks are dynamically split into sub-tasks. The splitting depends on the output of the parent task.

    In the preceding figure, Task A scans the dataset to be processed. Sub-tasks (Bn) are launched to process subdatasets, such as subdirectories. Each sub-task corresponds to a subdataset. After the Bn sub-tasks are completed, Task C aggregates the results. The number of sub-tasks depends on the output of Task A. You can define splitting rules in Task A.

Kubernetes clusters for distributed Argo workflows

To greatly accelerate large tasks, you may need to split a large task into thousands of sub-tasks. To ensure that these sub-tasks can run concurrently, you need to schedule tens of thousands of vCPUs. However, concurrent sub-tasks will compete for resources. On-premises computing resources in data centers cannot fulfill the resource demand.

For example, you need to run regression tests to simulate all driving scenarios after you optimize the algorithm of an autonomous driving simulation task. Each driving scenario is a sub-task. To accelerate iterations, you want to run tests in all driving scenarios concurrently.

To resolve the preceding issue, you can use Kubernetes clusters for distributed Argo workflows to orchestrate workflows. You can host Argo Workflows in workflow clusters, provide comprehensive technical support, implement dynamic DAG Fan-in and Fan-out, and schedule on-cloud computing resources with auto scaling to run large numbers of sub-tasks based on tens of thousands of vCPUs concurrently. You can also reclaim resources after sub-tasks are completed to save costs. This solution applies to data processing, machine learning, simulation computation, and CI/CD scenarios.

Argo Workflows is a graduated project of Cloud Native Computing Foundation (CNCF). It targets workflow orchestration in the cloud native sector, uses Kubernetes CustomResourceDefinitions (CRDs) to orchestrate scheduled tasks and DAG workflows, and runs them in Kubernetes pods. For more information, see Argo Workflows.

Use Argo Workflows to implement Fan-in and Fan-out

Background information

The following example shows how to use Argo Workflows to implement Fan-in and Fan-out.

Create dynamic DAG Fan-in and Fan-out workflows to split a large log file in an Object Storage Service (OSS) bucket into multiple sub-files. Launch multiple sub-tasks to count the number of keywords in each sub-file and then merge the results.

Procedure

  1. Create a Kubernetes cluster for distributed Argo workflows.

  2. Mount an OSS volume to allow workflows to read objects in the OSS volume as local files.

    For more information, see Use volumes.

  3. Create a workflow based on the following YAML content.

    For more information, see Create a workflow.

    Show YAML content

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: dynamic-dag-map-reduce-
    spec:
      entrypoint: main
      # claim a OSS PVC, workflow can read/write file in OSS through PVC. 
      volumes:
        - name: workdir
          persistentVolumeClaim:
            claimName: pvc-oss
      # how many tasks to split, default is 5.
      arguments:
        parameters:
          - name: numParts
            value: "5"
      templates:
        - name: main
          # DAG definition.
          dag:
            tasks:
              # split log files to several small files, based on numParts.
              - name: split
                template: split
                arguments:
                  parameters:
                    - name: numParts
                      value: "{{workflow.parameters.numParts}}"
              # multiple map task to count words in each small file.
              - name: map
                template: map
                arguments:
                  parameters:
                    - name: partId
                      value: '{{item}}'
                depends: "split"
                # run as a loop, partId from split task json outputs.
                withParam: '{{tasks.split.outputs.result}}'
              - name: reduce
                template: reduce
                arguments:
                  parameters:
                    - name: numParts
                      value: "{{workflow.parameters.numParts}}"
                depends: "map"
        # The `split` task split the big log file to several small files. Each file has a unique ID (partId).
        # Finally, it dumps a list of partId to stdout as output parameters
        - name: split
          inputs:
            parameters:
              - name: numParts
          container:
            image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
            command: [python]
            args: ["split.py"]
            env:
            - name: NUM_PARTS
              value: "{{inputs.parameters.numParts}}"
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
        # One `map` per partID is started. Finds its own "part file" and processes it.
        - name: map
          inputs:
            parameters:
              - name: partId
          container:
            image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
            command: [python]
            args: ["count.py"]
            env:
            - name: PART_ID
              value: "{{inputs.parameters.partId}}"
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
        # The `reduce` task takes the "results directory" and returns a single result.
        - name: reduce
          inputs:
            parameters:
              - name: numParts
          container:
            image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
            command: [python]
            args: ["merge.py"]
            env:
            - name: NUM_PARTS
              value: "{{inputs.parameters.numParts}}"
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
          outputs:
            artifacts:
              - name: result
                path: /mnt/vol/result.json
  4. Use a dynamic DAG to implement Fan-in and Fan-out.

    1. After you create a Split task to split the large file, a JSON string is generated in the standard output of the task. The string contains the partId of each sub-task. Example:

      ["0", "1", "2", "3", "4"]
    2. All Map tasks launch with {{item}} as the input parameter. Each Map task uses withParam to reference the output of the Split task and then parses the JSON string to obtain the {{item}}.

                - name: map
                  template: map
                  arguments:
                    parameters:
                      - name: partId
                        value: '{{item}}'
                  depends: "split"
                  withParam: '{{tasks.split.outputs.result}}'

    For more information, see Argo Workflows.

  5. After the workflow runs, you can log on to the ACK One console to view the DAG process and results.

    image

  6. In the OSS object list, log-count-data.txt is the input log file, split-output and cout-output are directories that store the intermediate results, and result.json is the output file.

Source code

argo-workflow-examples

References

  • For more information about Kubernetes clusters for distributed Argo workflows, see ACK One overview.

  • For more information about Argo Workflows, see Argo Workflows.

  • If you have any questions about ACK One, join the DingTalk group 35688562.