すべてのプロダクト
Search
ドキュメントセンター

DataWorks:DataWorks OpenAPI を使用したデータリネージのクエリ

最終更新日:Jun 17, 2026

このガイドでは、DataWorks OpenAPI (2024-05-18) を使用して、データテーブルとフィールドのデータリネージをプログラムでクエリする方法について説明します。自動化された大規模なリネージ分析のために、API 呼び出しの例と SDK コードを提供します。

データリネージとは

四半期売上が大幅に増加したことを示すビジネスレポートを確認しているとします。データアナリストやマネージャーとして、次のような疑問が生じるかもしれません。

  • この「売上」指標は どのように計算 されているのか?

  • その 生のビジネスデータ のソースは何か?注文テーブルから来ているのか、それとも支払いトランザクションテーブルから来ているのか?

  • データは、生の段階から最終レポートに至るまで、クレンジング、変換、集計など、どのような 処理ステップ を経たのか?

  • この指標のデータが不正確な場合、どのダウンストリームの レポートやアプリケーション が影響を受けるのか?

    image

明確なデータリネージには、以下の主要な利点があります。

  1. データ追跡とトラブルシューティング

    データの異常やエラーを発見した場合、データリネージをアップストリームにたどることで、問題の原因となった計算ロジックやソースデータを迅速に特定できます。これにより、トラブルシューティングの時間が大幅に短縮されます。
































































  2. 影響分析

    データテーブルの構造、フィールド、または計算ロジックを変更する必要がある場合、ダウンストリームのリネージを分析して、どのデータやビジネスレポートが影響を受けるかを正確に評価できます。これにより、変更による予期せぬ結果を回避できます。
































































  3. データガバナンスと信頼

    明確なデータリネージは、データ資産管理、データ標準の実装、データ品質監視の基盤です。データライフサイクルを透過的にし、ステークホルダーのデータに対する信頼を高めます。
































































  4. コスト最適化と資産インベントリ

    データリネージを分析することで、ダウンストリームのコンシューマーがいないデータテーブルや計算タスクを特定できます。これにより、データウェアハウスのコストを最適化し、廃止された資産を整理できます。
































































DataWorks では、システムが MaxCompute SQL や EMR Spark タスクなど、さまざまな計算タスクによって生成されたデータリネージを自動的に解析・記録します。 DataWorks OpenAPI を使用すると、このリネージ情報にプログラムでアクセスし、リネージ分析機能を独自のデータ管理プラットフォームや自動化された O&M ワークフローに統合できます。

前提条件:エンティティ ID の取得

データリネージをクエリするには、まず対象のデータテーブルまたはフィールドの一意な識別子が必要です。この識別子は エンティティ ID と呼ばれ、メタデータおよびリネージ関連の API 呼び出しに必須のパラメーターです。

エンティティ ID は、以下の 2 つの方法のいずれかで取得できます。

1. コンソールからのエンティティ ID の取得

少数の既知のテーブルやフィールドの場合、最も速い方法は、コンソールから手動で ID をコピーすることです。

テーブルのエンティティ ID の取得

  1. DataWorks コンソールで、[データマップ] モジュールに移動します。

  2. 対象のテーブルを検索し、その詳細ページに移動します。

  3. 左側の Table Basic Information パネルで、Entity ID を見つけてコピーします。

    エンティティ ID の形式は maxcompute-table:::<project_name>::<table_name> です。

フィールドのエンティティ ID の取得

  1. 対象のテーブルの詳細ページで、[リネージ] タブに移動し、[フィールドリネージ] を選択します。

  2. フィールドリネージグラフで、目的のフィールドノードをクリックします。

  3. 右側にフィールドの詳細パネルが表示されます。パネルで、Entity ID を見つけてコピーします。

    エンティティ ID の形式は maxcompute-column:::<project_name>::<table_name>::<field_name> です。

2. API によるエンティティ ID の一括取得

複数のエンティティ ID を取得する必要がある場合、手動操作は手間がかかります。この場合、OpenAPI を使用して一括クエリを実行します。

ListLineages API によるデータリネージのクエリ

エンティティ ID を取得した後、主要な ListLineages API を使用して、そのアップストリームおよびダウンストリームのリネージ関係をクエリできます。

1. 主要な API パラメーター

ListLineages API の主要なリクエストパラメーターを次の表に示します。OpenAPI ポータルで API をオンラインでデバッグできます。

パラメーター

タイプ

説明

SrcEntityId

String

このパラメーターを使用して ダウンストリームリネージをクエリ します。ソース (アップストリーム) のエンティティ ID を渡して、そのすべてのダウンストリームリネージをクエリします。

DstEntityId

String

このパラメーターを使用して アップストリームリネージをクエリ します。宛先 (ダウンストリーム) のエンティティ ID を渡して、そのすべてのアップストリームリネージをクエリします。

SrcEntityName

String

DstEntityId とともに使用して、あいまい検索を実行し、アップストリームエンティティをフィルタリングします。

DstEntityName

String

SrcEntityId とともに使用して、あいまい検索を実行し、ダウンストリームエンティティをフィルタリングします。

NeedAttachRelationship

Boolean

レスポンスに詳細なリネージ関係情報を含めるかどうかを指定します。完全なコンテキストを取得するには、このパラメーターを true に設定します。

重要
  • SrcEntityIdDstEntityId の両方を指定すると、API は指定されたアップストリームエンティティとダウンストリームエンティティ間のリネージ関係を返します。

  • SrcEntityIdDstEntityId が同じ場合、API はエンティティが自身を指すリネージ関係を返します。

2. 呼び出し例

MaxCompute テーブルがあり、その エンティティ IDmaxcompute-table:::test_project::test_table であるとします。

例 1:ダウンストリームリネージのクエリ

このテーブルのすべてのダウンストリームテーブルをクエリするには、これを ソース として指定します。

  • SrcEntityIdmaxcompute-table:::test_project::test_table

  • NeedAttachRelationshiptrue

名前に "report" を含むダウンストリームテーブルのみを検索するには、DstEntityName パラメーターを追加します。

  • DstEntityNamereport

例 2:アップストリームリネージのクエリ

どのテーブルやタスクがこのテーブルを生成するかを判断するには、これを 宛先 として指定します。

  • DstEntityIdmaxcompute-table:::test_project::test_table

  • NeedAttachRelationshiptrue

SrcEntityName パラメーターを使用して、アップストリームソースをフィルタリングすることもできます。

3. API レスポンス

ListLineages の呼び出しが成功すると、リネージ関係のリストが返されます。各リネージ関係には、ソースエンティティ、宛先エンティティ、およびそれらの関連付けに関する情報が含まれます。

単一のリネージ関係のサンプルレスポンス (JSON):

{
  "SrcEntity": {
    "Id": "maxcompute-table:::test_project::table_from",
    "Name": "table_from",
    "Attributes": {
      "rawEntityId": "maxcompute-table:::test_project::table_from"
    }
  },
  "DstEntity": {
    "Id": "maxcompute-table:::test_project::table_to",
    "Name": "table_to",
    "Attributes": {
      "project": "test_project",
      "region": "cn-shanghai",
      "table": "table_to"
    }
  },
  "Relationships": [
    {
      "Id": "123456789:maxcompute-table.test_project.table_from:maxcompute-table.test_project.table_to:maxcompute.SQL.76543xxx",
      "CreateTime": 1681089163548,
      "Task": {
        "Id": "76543xxx",
        "Type": "dataworks-sql",
        "Attributes": {
          "engine": "maxcompute",
          "channel": "1st",
          "taskInstanceId": "12345xxx",
          "projectId": "123456",
          "taskId": "76543xxx"
        }
      }
    }
  ]
}

レスポンスの解釈方法:

  • SrcEntityDstEntity:それぞれリネージのアップストリームエンティティとダウンストリームエンティティを表します。これらの Id を使用して、GetTable または GetColumn API を呼び出し、より詳細なメタデータを取得できます。

  • RelationshipsSrcEntityDstEntity がどのように関連付けられているかを説明します。

    • Task:このリネージ関係を作成したタスクを説明します。タスクが DataWorks のスケジュールされたタスクである場合、Task.Attributes には taskIdtaskInstanceId が含まれます。これらの ID を使用して GetTask API を呼び出し、詳細なタスク定義と実行ステータスを取得できます。

Java SDK ウォークスルー

次の例は、Java SDK を使用して完全なリネージクエリワークフローを実装する方法を示しています。

1. 前提条件

  • JDK バージョン:JDK 8 以降。

  • Maven 依存関係:プロジェクトの pom.xml ファイルに次の依存関係を追加します。${latest.version}SDK の最新バージョン番号に置き換えます。

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>dataworks_public20240518</artifactId>
    <version>${latest.version}</version>
</dependency>

2. 完全なコード例

次のコードは、クライアントを初期化し、指定されたテーブルのアップストリームおよびダウンストリームのリネージをクエリし、主要な情報を出力する方法を示しています。

import java.util.List;
import java.util.Map;
import com.aliyun.dataworks_public20240518.Client;
import com.aliyun.dataworks_public20240518.models.GetTableRequest;
import com.aliyun.dataworks_public20240518.models.GetTableResponse;
import com.aliyun.dataworks_public20240518.models.LineageEntity;
import com.aliyun.dataworks_public20240518.models.LineageRelationship;
import com.aliyun.dataworks_public20240518.models.LineageTask;
import com.aliyun.dataworks_public20240518.models.ListLineagesRequest;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponse;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponseBody.ListLineagesResponseBodyPagingInfo;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponseBody.ListLineagesResponseBodyPagingInfoLineages;
import com.aliyun.dataworks_public20240518.models.Table;
import com.aliyun.tea.TeaException;
public class LineageQuerySample {
  /**
     * <b>description</b> :
     * <p>認証情報を使用してクライアントを初期化します。</p>
     *
     * @return Client
     * @throws Exception
     */
  public static com.aliyun.dataworks_public20240518.Client createClient() throws Exception {
    com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
      // AccessKey ID
      .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      // AccessKey Secret
      .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // エンドポイントについては、https://api.aliyun.com/product/dataworks-public をご参照ください。
    config.endpoint = "dataworks.cn-hangzhou.aliyuncs.com";
    return new com.aliyun.dataworks_public20240518.Client(config);
  }
  public static void main(String[] args_) throws Exception {
    Client client = LineageQuerySample.createClient();
    // クエリ対象のテーブルのエンティティ ID。お使いの MaxCompute テーブルのエンティティ ID に置き換えてください。
    String tableId = "maxcompute-table:::test_project::test_table";
    try {
      // 1. アップストリームリネージをクエリします。
      ListLineagesRequest listLineagesRequest = new ListLineagesRequest()
        .setDstEntityId(tableId)
        .setNeedAttachRelationship(true)
        .setPageNumber(1)
        // デフォルトでは 10 件のレコードが返されます。最大値は 100 です。
        .setPageSize(10);
      // キーワードベースのあいまい一致と、アップストリームテーブル名のフィルタリングをサポートしています。
      listLineagesRequest.setSrcEntityName("demo");
      ListLineagesResponse listLineagesResponse = client.listLineages(listLineagesRequest);
      String requestId = listLineagesResponse.getBody().getRequestId();
      System.out.println("\nQuery upstream lineage");
      // トラブルシューティングのためにリクエスト ID を出力します。
      System.out.println(requestId);
      ListLineagesResponseBodyPagingInfo pagingInfo = listLineagesResponse.getBody().getPagingInfo();
      if (pagingInfo.getTotalCount() > 0 && pagingInfo.getLineages() != null) {
        for (ListLineagesResponseBodyPagingInfoLineages lineage : pagingInfo.getLineages()) {
          // 単一のリネージ関係を取得し、対応するアップストリームテーブルをクエリします。
          LineageEntity srcEntity = lineage.getSrcEntity();
          System.out.println("============================================");
          System.out.println("ID: " + srcEntity.getId());
          System.out.println("Name: " + srcEntity.getName());
          // アップストリームテーブルの情報を取得します。
          Table table = getTable(client, srcEntity.getId());
          if (table != null) {
            System.out.println("Comment: " + table.getComment());
            System.out.println("Create Time: " + table.getCreateTime());
            System.out.println("Modify Time: " + table.getModifyTime());
          }
        }
      }
            // 2. ダウンストリームリネージをクエリします。
            listLineagesRequest = new ListLineagesRequest()
                    .setSrcEntityId(tableId)
                    .setNeedAttachRelationship(true)
                    .setPageNumber(1)
                    // デフォルトでは 10 件のレコードが返されます。最大値は 100 です。
                    .setPageSize(10);
            listLineagesResponse = client.listLineages(listLineagesRequest);
            requestId = listLineagesResponse.getBody().getRequestId();
            System.out.println("\nQuery downstream lineage");
            // トラブルシューティングのためにリクエスト ID を出力します。
            System.out.println(requestId);
            pagingInfo = listLineagesResponse.getBody().getPagingInfo();
            if (pagingInfo.getTotalCount() > 0 && pagingInfo.getLineages() != null) {
                for (ListLineagesResponseBodyPagingInfoLineages lineage : pagingInfo.getLineages()) {
                    // 単一のリネージ関係を取得し、対応するダウンストリームテーブルをクエリします。
                    LineageEntity dstEntity = lineage.getDstEntity();
                    System.out.println("============================================");
                    System.out.println("ID: " + dstEntity.getId());
                    System.out.println("Name: " + dstEntity.getName());
                    // ダウンストリームテーブルの情報を取得します。
                    Table table = getTable(client, dstEntity.getId());
                    if (table != null) {
                        System.out.println("Comment: " + table.getComment());
                        System.out.println("Create Time: " + table.getCreateTime());
                        System.out.println("Modify Time: " + table.getModifyTime());
                    }
                    // リネージ関係を解析します。
                    List<LineageRelationship> relationships = lineage.getRelationships();
                    if (relationships != null) {
                        for (LineageRelationship relationship : relationships) {
                            System.out.println("\n\tRelationshipId: " + relationship.getId());
                            System.out.println("\tRelationshipCreateTime: " + relationship.getCreateTime());
                            // タスク詳細を解析します。
                            LineageTask task = relationship.getTask();
                            Map<String, String> attributes = task.getAttributes();
                            // DataWorks のスケジュールされたタスクの場合、属性からタスク ID とタスクインスタンス ID を取得できます。
                            if (attributes != null && attributes.containsKey("taskId") && attributes.containsKey("taskInstanceId")) {
                                System.out.println("\tTaskId: " + attributes.get("taskId"));
                                System.out.println("\tTaskInstanceId: " + attributes.get("taskInstanceId"));
                            }
                        }
                    }
                }
            }
        } catch (TeaException error) {
            // 本番環境では、堅牢な例外処理を実装してください。
            // エラーメッセージ
            System.out.println(error.getMessage());
            // 診断 URL
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 本番環境では、堅牢な例外処理を実装してください。
            // エラーメッセージ
            System.out.println(error.getMessage());
            // 診断 URL
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
    public static Table getTable(Client client, String tableId) {
        // ID を指定してテーブル情報をクエリします。
        GetTableRequest getTableRequest = new GetTableRequest()
                .setId(tableId)
                .setIncludeBusinessMetadata(true);
        try {
            GetTableResponse getTableResponse = client.getTable(getTableRequest);
            return getTableResponse.getBody().getTable();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return null;
    }
}

Python SDK ウォークスルー

次の例は、Python SDK を使用して完全なリネージクエリワークフローを実装する方法を示しています。

1. 前提条件

  • Python バージョン:Python 3.6 以降。

  • SDK のインストール:pip を使用して DataWorks Python SDK をインストールします。${latest.version}SDK の最新バージョン番号に置き換えます。

pip install alibabacloud_dataworks_public20240518==${latest.version}

2. 完全なコード例

次のコードは、クライアントを初期化し、指定されたテーブルのアップストリームおよびダウンストリームのリネージをクエリし、主要な情報を出力する方法を示しています。

# -*- coding: utf-8 -*-
import os
import sys
from alibabacloud_dataworks_public20240518.client import Client as dataworks_public20240518Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dataworks_public20240518 import models as dataworks_public_20240518_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class LineageQuerySample:

    @staticmethod
    def create_client():
        """AccessKey を使用してクライアントを初期化します。"""
        config = open_api_models.Config(
            # AccessKey ID
            access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
            # AccessKey Secret
            access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
        )
        # エンドポイントについては、https://api.aliyun.com/product/dataworks-public をご参照ください。
        config.endpoint = 'dataworks.cn-hangzhou.aliyuncs.com'
        return dataworks_public20240518Client(config)

    @staticmethod
    def get_table(client, table_id):
        """エンティティ ID を指定してテーブル情報を取得します。"""
        get_table_request = dataworks_public_20240518_models.GetTableRequest(
            id=table_id,
            include_business_metadata=True
        )
        try:
            response = client.get_table(get_table_request)
            return response.body.table
        except Exception as e:
            print(e)
            return None

    @staticmethod
    def main():
        client = LineageQuerySample.create_client()
        # クエリ対象のテーブルのエンティティ ID。お使いの MaxCompute テーブルのエンティティ ID に置き換えてください。
        table_id = 'maxcompute-table:::test_project::test_table'
        runtime = util_models.RuntimeOptions()

        try:
            # 1. アップストリームリネージをクエリします。
            upstream_request = dataworks_public_20240518_models.ListLineagesRequest(
                dst_entity_id=table_id,
                need_attach_relationship=True,
                page_number=1,
                # デフォルトでは 10 件のレコードが返されます。最大値は 100 です。
                page_size=10,
                # キーワードベースのあいまい一致と、アップストリームテーブル名のフィルタリングをサポートしています。
                src_entity_name='demo'
            )
            upstream_response = client.list_lineages_with_options(upstream_request, runtime)
            print('\nQuery upstream lineage')
            print(upstream_response.body.request_id)
            paging_info = upstream_response.body.paging_info
            if paging_info.total_count > 0 and paging_info.lineages:
                for lineage in paging_info.lineages:
                    src_entity = lineage.src_entity
                    print('============================================')
                    print(f'ID: {src_entity.id}')
                    print(f'Name: {src_entity.name}')
                    table = LineageQuerySample.get_table(client, src_entity.id)
                    if table:
                        print(f'Comment: {table.comment}')
                        print(f'Create Time: {table.create_time}')
                        print(f'Modify Time: {table.modify_time}')

            # 2. ダウンストリームリネージをクエリします。
            downstream_request = dataworks_public_20240518_models.ListLineagesRequest(
                src_entity_id=table_id,
                need_attach_relationship=True,
                page_number=1,
                page_size=10
            )
            downstream_response = client.list_lineages_with_options(downstream_request, runtime)
            print('\nQuery downstream lineage')
            print(downstream_response.body.request_id)
            paging_info = downstream_response.body.paging_info
            if paging_info.total_count > 0 and paging_info.lineages:
                for lineage in paging_info.lineages:
                    dst_entity = lineage.dst_entity
                    print('============================================')
                    print(f'ID: {dst_entity.id}')
                    print(f'Name: {dst_entity.name}')
                    table = LineageQuerySample.get_table(client, dst_entity.id)
                    if table:
                        print(f'Comment: {table.comment}')
                        print(f'Create Time: {table.create_time}')
                        print(f'Modify Time: {table.modify_time}')
                    # リネージ関係を解析します。
                    if lineage.relationships:
                        for relationship in lineage.relationships:
                            print(f'\n\tRelationshipId: {relationship.id}')
                            print(f'\tRelationshipCreateTime: {relationship.create_time}')
                            task = relationship.task
                            attributes = task.attributes
                            if attributes and 'taskId' in attributes and 'taskInstanceId' in attributes:
                                print(f'\tTaskId: {attributes["taskId"]}')
                                print(f'\tTaskInstanceId: {attributes["taskInstanceId"]}')

        except Exception as error:
            # 本番環境では、堅牢な例外処理を実装してください。
            print(error)
            UtilClient.assert_as_string(str(error))


if __name__ == '__main__':
    LineageQuerySample.main()