All Products
Search
Document Center

DataWorks:Use DataWorks Open API for lineage query

Last Updated:Jun 23, 2026

This guide shows you how to use the DataWorks OpenAPI (2024-05-18) to programmatically query the data lineage of data tables and columns. We provide API call examples and SDK code for automated, large-scale lineage analysis.

Key concepts

Imagine reviewing a business report that shows a significant increase in quarterly sales. As a data analyst or manager, you might ask:

  • How is this "sales" metric calculated?

  • What is the source of its raw business data? Does it come from an order table or a payment transaction table?

  • What processing steps did the data go through, from its raw state to the final report, such as cleaning, transformation, and aggregation?

  • If the data for this metric is incorrect, which downstream reports or applications will be affected?

    image

Clear data lineage provides the following core benefits:

  1. Data tracing and troubleshooting
    When you find a data anomaly or error, you can follow the data lineage upstream to quickly locate the calculation logic or source data that caused the problem. This significantly reduces troubleshooting time.

  2. Impact analysis
    When you need to change a data table structure, column, or calculation logic, you can analyze the downstream lineage to accurately assess which data and business reports will be affected. This helps avoid unforeseen consequences of a change.

  3. Data governance and trust
    Clear data lineage is the foundation for data asset management, data standard implementation, and data quality monitoring. It makes the data lifecycle transparent, increasing stakeholder trust in the data.

  4. Cost optimization and asset inventory
    By analyzing data lineage, you can identify data tables or compute tasks that have no downstream consumers. This allows you to optimize data warehouse costs and retire obsolete assets.

In DataWorks, the system automatically parses and records the data lineage generated by various compute tasks, such as MaxCompute SQL and EMR Spark tasks. With the DataWorks OpenAPI, you can programmatically access this lineage information to integrate lineage analysis capabilities into your own data management platform or automated O&M workflows.

Prerequisites: Get entity ID

To query data lineage, you first need the unique identifier for your target data table or column. This identifier, called an entity ID, is a required parameter for metadata and lineage-related API calls.

You can obtain an entity ID in one of the following two ways:

1. Get entity ID from the console

For a small number of known tables or columns, the fastest method is to manually copy the ID from the console.

Get table entity ID

  1. In the DataWorks console, go to the Data Map module.

  2. Search for and navigate to the details page of the target table.

  3. In the Table Basic Information panel on the left, find the Entity ID and copy it.

    The entity ID is in the format maxcompute-table:::<project_name>::<table_name>.

Get column entity ID

  1. On the details page of the target table, switch to the Lineage tab and select Column Lineage.

  2. In the column lineage graph, click the column node you want to inspect.

  3. A details panel appears on the right. Find the Entity ID in the panel and copy it.

    The entity ID is in the format maxcompute-column:::<project_name>::<table_name>::<column_name>.

2. Get entity IDs in bulk by using the API

When you need to obtain entity IDs in bulk, manual operations become tedious. In this case, use the OpenAPI for batch queries:

  • Get table IDs in bulk: Call the ListTables API. For more information, see ListTables.

  • Get column IDs in bulk: Call the ListColumns API. For more information, see ListColumns.

Query lineage by using the ListLineages API

After you obtain the entity ID, you can use the ListLineages API to query the upstream and downstream lineage of the entity.

1. Key parameters

The following table describes the key request parameters of the ListLineages API. You can debug the API online in the OpenAPI portal.

Parameter

Type

Description

SrcEntityId

String

Used to query downstream lineage. Pass in the source (upstream) entity ID, and the API returns all downstream lineage of the entity.

DstEntityId

String

Used to query upstream lineage. Pass in the destination (downstream) entity ID, and the API returns all upstream lineage of the entity.

SrcEntityName

String

Used together with DstEntityId to perform fuzzy search and filter upstream entities.

DstEntityName

String

Used together with SrcEntityId to perform fuzzy search and filter downstream entities.

NeedAttachRelationship

Boolean

Specifies whether to include detailed lineage relationship information in the response. We recommend that you set this parameter to true to get the full context.

Important
  • If you specify both SrcEntityId and DstEntityId, the API returns the lineage relationship between the specified upstream and downstream entities.

  • If SrcEntityId and DstEntityId are the same ID, the API returns the self-referencing lineage relationship of that entity.

2. Examples

Assume that you have a MaxCompute table with an entity ID of maxcompute-table:::test_project::test_table.

Example 1: Query downstream lineage of the table

To query all downstream tables of this table, specify it as the source:

  • SrcEntityId: maxcompute-table:::test_project::test_table

  • NeedAttachRelationship: true

To find only downstream tables whose names contain "report", add the DstEntityName parameter:

  • DstEntityName: report

Example 2: Query upstream lineage of the table

To query which tables or tasks produce this table, specify it as the destination:

  • DstEntityId: maxcompute-table:::test_project::test_table

  • NeedAttachRelationship: true

Similarly, you can use the SrcEntityName parameter to filter upstream sources.

3. Understand the API response

After a successful call to the ListLineages API, you receive a list of lineage relationships. Each relationship contains the source entity, destination entity, and their association details.

Sample response for a single lineage relationship (JSON):

{
  "SrcEntity": {
    "Id": "maxcompute-table:::test_project::table_from",
    "Name": "table_from",
    "Attributes": {
      "rawEntityId": "maxcompute-table:::test_project::table_from"
    }
  },
  "DstEntity": {
    "Id": "maxcompute-table:::test_project::table_to",
    "Name": "table_to",
    "Attributes": {
      "project": "test_project",
      "region": "cn-shanghai",
      "table": "table_to"
    }
  },
  "Relationships": [
    {
      "Id": "123456789:maxcompute-table.test_project.table_from:maxcompute-table.test_project.table_to:maxcompute.SQL.76543xxx",
      "CreateTime": 1761089163548,
      "Task": {
        "Id": "76543xxx",
        "Type": "dataworks-sql",
        "Attributes": {
          "engine": "maxcompute",
          "channel": "1st",
          "taskInstanceId": "12345xxx",
          "projectId": "123456",
          "taskId": "76543xxx"
        }
      }
    }
  ]
}

How to interpret the response:

  • SrcEntity and DstEntity: Represent the upstream and downstream entities of the lineage, respectively. You can use their Id to call the GetTable or GetColumn API to obtain more detailed metadata.

  • Relationships: Describes how SrcEntity and DstEntity are associated.

    • Task: Describes the task that generated this lineage relationship. If the task is a DataWorks scheduled task, Task.Attributes contains taskId and taskInstanceId. You can use these IDs to call the GetTask API to obtain the detailed task definition and running status.

Java SDK walkthrough

The following example uses the Java SDK to demonstrate how to implement a complete lineage query workflow in code.

1. Prepare the environment

  • JDK version: Make sure that JDK 8 or later is installed.

  • Maven dependency: Add the following dependency to the pom.xml file of your project. Replace ${latest.version} with the latest SDK version

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>dataworks_public20240518</artifactId>
    <version>${latest.version}</version>
</dependency>

2. Complete code example

The following code demonstrates how to initialize the client, query upstream and downstream lineage of a specified table, and print key information.

import java.util.List;
import java.util.Map;
import com.aliyun.dataworks_public20240518.Client;
import com.aliyun.dataworks_public20240518.models.GetTableRequest;
import com.aliyun.dataworks_public20240518.models.GetTableResponse;
import com.aliyun.dataworks_public20240518.models.LineageEntity;
import com.aliyun.dataworks_public20240518.models.LineageRelationship;
import com.aliyun.dataworks_public20240518.models.LineageTask;
import com.aliyun.dataworks_public20240518.models.ListLineagesRequest;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponse;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponseBody.ListLineagesResponseBodyPagingInfo;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponseBody.ListLineagesResponseBodyPagingInfoLineages;
import com.aliyun.dataworks_public20240518.models.Table;
import com.aliyun.tea.TeaException;
public class LineageQuerySample {
  /**
     * <b>description</b> :
     * <p>Initialize the client with credentials.</p>
     *
     * @return Client
     * @throws Exception
     */
  public static com.aliyun.dataworks_public20240518.Client createClient() throws Exception {
    com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
      // Your AccessKey ID
      .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      // Your AccessKey Secret
      .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // For the endpoint, see https://api.aliyun.com/product/dataworks-public
    config.endpoint = "dataworks.cn-hangzhou.aliyuncs.com";
    return new com.aliyun.dataworks_public20240518.Client(config);
  }
  public static void main(String[] args_) throws Exception {
    Client client = LineageQuerySample.createClient();
    // The entity ID of the table to query. Replace this with the entity ID of the MaxCompute table you want to query.
    String tableId = "maxcompute-table:::test_project::test_table";
    try {
      // 1. Query upstream lineage
      ListLineagesRequest listLineagesRequest = new ListLineagesRequest()
        .setDstEntityId(tableId)
        .setNeedAttachRelationship(true)
        .setPageNumber(1)
        // The default number of records per page is 10. The maximum is 100.
        .setPageSize(10);
      // Filter upstream tables by keyword matching on table name
      listLineagesRequest.setSrcEntityName("demo");
      ListLineagesResponse listLineagesResponse = client.listLineages(listLineagesRequest);
      String requestId = listLineagesResponse.getBody().getRequestId();
      System.out.println("\nQuery upstream lineage");
      // Print the request ID for troubleshooting
      System.out.println(requestId);
      ListLineagesResponseBodyPagingInfo pagingInfo = listLineagesResponse.getBody().getPagingInfo();
      if (pagingInfo.getTotalCount() > 0 && pagingInfo.getLineages() != null) {
        for (ListLineagesResponseBodyPagingInfoLineages lineage : pagingInfo.getLineages()) {
          // Get a single lineage record and query the corresponding upstream table
          LineageEntity srcEntity = lineage.getSrcEntity();
          System.out.println("============================================");
          System.out.println("ID: " + srcEntity.getId());
          System.out.println("Name: " + srcEntity.getName());
          // Get upstream table information
          Table table = getTable(client, srcEntity.getId());
          if (table != null) {
            System.out.println("Comment: " + table.getComment());
            System.out.println("Create Time: " + table.getCreateTime());
            System.out.println("Modify Time: " + table.getModifyTime());
          }
        }
      }
            // 2. Query downstream lineage
            listLineagesRequest = new ListLineagesRequest()
                    .setSrcEntityId(tableId)
                    .setNeedAttachRelationship(true)
                    .setPageNumber(1)
                    // The default number of records per page is 10. The maximum is 100.
                    .setPageSize(10);
            listLineagesResponse = client.listLineages(listLineagesRequest);
            requestId = listLineagesResponse.getBody().getRequestId();
            System.out.println("\nQuery downstream lineage");
            // Print the request ID for troubleshooting
            System.out.println(requestId);
            pagingInfo = listLineagesResponse.getBody().getPagingInfo();
            if (pagingInfo.getTotalCount() > 0 && pagingInfo.getLineages() != null) {
                for (ListLineagesResponseBodyPagingInfoLineages lineage : pagingInfo.getLineages()) {
                    // Get a single lineage record and query the corresponding downstream table
                    LineageEntity dstEntity = lineage.getDstEntity();
                    System.out.println("============================================");
                    System.out.println("ID: " + dstEntity.getId());
                    System.out.println("Name: " + dstEntity.getName());
                    // Get downstream table information
                    Table table = getTable(client, dstEntity.getId());
                    if (table != null) {
                        System.out.println("Comment: " + table.getComment());
                        System.out.println("Create Time: " + table.getCreateTime());
                        System.out.println("Modify Time: " + table.getModifyTime());
                    }
                    // Parse lineage relationships
                    List<LineageRelationship> relationships = lineage.getRelationships();
                    if (relationships != null) {
                        for (LineageRelationship relationship : relationships) {
                            System.out.println("\n\tRelationshipId: " + relationship.getId());
                            System.out.println("\tRelationshipCreateTime: " + relationship.getCreateTime());
                            // Parse task details
                            LineageTask task = relationship.getTask();
                            Map<String, String> attributes = task.getAttributes();
                            // For DataWorks scheduled tasks, you can get the task ID and task instance ID from attributes
                            if (attributes != null && attributes.containsKey("taskId") && attributes.containsKey("taskInstanceId")) {
                                System.out.println("\tTaskId: " + attributes.get("taskId"));
                                System.out.println("\tTaskInstanceId: " + attributes.get("taskInstanceId"));
                            }
                        }
                    }
                }
            }
        } catch (TeaException error) {
            // This is for demonstration only. Handle exceptions with care and do not ignore them in production.
            // Error message
            System.out.println(error.getMessage());
            // Diagnostic URL
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // This is for demonstration only. Handle exceptions with care and do not ignore them in production.
            // Error message
            System.out.println(error.getMessage());
            // Diagnostic URL
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
    public static Table getTable(Client client, String tableId) {
        // Query table information by ID
        GetTableRequest getTableRequest = new GetTableRequest()
                .setId(tableId)
                .setIncludeBusinessMetadata(true);
        try {
            GetTableResponse getTableResponse = client.getTable(getTableRequest);
            return getTableResponse.getBody().getTable();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return null;
    }
}

Python SDK walkthrough

The following example uses the Python SDK to demonstrate how to implement a complete lineage query workflow in code.

1. Prepare the environment

  • Python version: Make sure that Python 3.6 or later is installed.

  • Install the SDK: Install the DataWorks Python SDK by using pip. Replace ${latest.version} with the latest SDK version

pip install alibabacloud_dataworks_public20240518==${latest.version}

2. Complete code example

The following code demonstrates how to initialize the client, query upstream and downstream lineage of a specified table, and print key information.

# -*- coding: utf-8 -*-
import os
import sys
from alibabacloud_dataworks_public20240518.client import Client as dataworks_public20240518Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dataworks_public20240518 import models as dataworks_public_20240518_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class LineageQuerySample:

    @staticmethod
    def create_client():
        """Initialize the client with AccessKey credentials."""
        config = open_api_models.Config(
            # Your AccessKey ID
            access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
            # Your AccessKey Secret
            access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
        )
        # For the endpoint, see https://api.aliyun.com/product/dataworks-public
        config.endpoint = 'dataworks.cn-hangzhou.aliyuncs.com'
        return dataworks_public20240518Client(config)

    @staticmethod
    def get_table(client, table_id):
        """Get table information by entity ID."""
        get_table_request = dataworks_public_20240518_models.GetTableRequest(
            id=table_id,
            include_business_metadata=True
        )
        try:
            response = client.get_table(get_table_request)
            return response.body.table
        except Exception as e:
            print(e)
            return None

    @staticmethod
    def main():
        client = LineageQuerySample.create_client()
        # The entity ID of the table to query. Replace this with the entity ID of the MaxCompute table you want to query.
        table_id = 'maxcompute-table:::test_project::test_table'
        runtime = util_models.RuntimeOptions()

        try:
            # 1. Query upstream lineage
            upstream_request = dataworks_public_20240518_models.ListLineagesRequest(
                dst_entity_id=table_id,
                need_attach_relationship=True,
                page_number=1,
                # The default number of records per page is 10. The maximum is 100.
                page_size=10,
                # Filter upstream tables by keyword matching on table name
                src_entity_name='demo'
            )
            upstream_response = client.list_lineages_with_options(upstream_request, runtime)
            print('\nQuery upstream lineage')
            print(upstream_response.body.request_id)
            paging_info = upstream_response.body.paging_info
            if paging_info.total_count > 0 and paging_info.lineages:
                for lineage in paging_info.lineages:
                    src_entity = lineage.src_entity
                    print('============================================')
                    print(f'ID: {src_entity.id}')
                    print(f'Name: {src_entity.name}')
                    table = LineageQuerySample.get_table(client, src_entity.id)
                    if table:
                        print(f'Comment: {table.comment}')
                        print(f'Create Time: {table.create_time}')
                        print(f'Modify Time: {table.modify_time}')

            # 2. Query downstream lineage
            downstream_request = dataworks_public_20240518_models.ListLineagesRequest(
                src_entity_id=table_id,
                need_attach_relationship=True,
                page_number=1,
                page_size=10
            )
            downstream_response = client.list_lineages_with_options(downstream_request, runtime)
            print('\nQuery downstream lineage')
            print(downstream_response.body.request_id)
            paging_info = downstream_response.body.paging_info
            if paging_info.total_count > 0 and paging_info.lineages:
                for lineage in paging_info.lineages:
                    dst_entity = lineage.dst_entity
                    print('============================================')
                    print(f'ID: {dst_entity.id}')
                    print(f'Name: {dst_entity.name}')
                    table = LineageQuerySample.get_table(client, dst_entity.id)
                    if table:
                        print(f'Comment: {table.comment}')
                        print(f'Create Time: {table.create_time}')
                        print(f'Modify Time: {table.modify_time}')
                    # Parse lineage relationships
                    if lineage.relationships:
                        for relationship in lineage.relationships:
                            print(f'\n\tRelationshipId: {relationship.id}')
                            print(f'\tRelationshipCreateTime: {relationship.create_time}')
                            task = relationship.task
                            attributes = task.attributes
                            if attributes and 'taskId' in attributes and 'taskInstanceId' in attributes:
                                print(f'\tTaskId: {attributes["taskId"]}')
                                print(f'\tTaskInstanceId: {attributes["taskInstanceId"]}')

        except Exception as error:
            # This is for demonstration only. Handle exceptions with care and do not ignore them in production.
            print(error)
            UtilClient.assert_as_string(str(error))


if __name__ == '__main__':
    LineageQuerySample.main()