すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Python SDK

最終更新日:Mar 10, 2026

このトピックでは、Realtime Compute for Apache Flink の Python SDK のインストール方法と使用方法について説明します。

前提条件

  • AccessKey が作成されていること。詳細については、「AccessKey の作成」をご参照ください。

    説明

    AccessKey の漏洩によるセキュリティリスクを防ぐため、ルートアカウントの AccessKey を使用しないことを推奨します。代わりに、Resource Access Management (RAM) ユーザーを作成し、その RAM ユーザーに必要な Flink のアクセス権限を付与した上で、RAM ユーザーの AccessKey を使用して SDK を呼び出してください。詳細については、以下のトピックをご参照ください。

  • Python 3.6 以降の環境が必要です。

  • ご利用のアカウントに必要なアクセス権限と操作権限が付与されていること。詳細については、「権限管理」をご参照ください。

Flink Python SDK のインストール

pip を使用して Python SDK をインストールできます。

  • ジョブ開発や運用保守などの操作を実行するには、Realtime Compute for Apache Flink の開発コンソールの API を呼び出します。SDK のインストールと使用方法の詳細については、「開発コンソール SDK センター」をご参照ください。

    pip3 install alibabacloud_ververica20220718==1.2.1
  • ワークスペース情報の表示、ワークスペースの購入、ワークスペース内のリソースの再設定などの操作を実行するには、Realtime Compute for Apache Flink の管理コンソールの API を呼び出します。SDK のインストールと使用方法の詳細については、「SDK センター」をご参照ください。

    pip3 install alibabacloud_foasconsole20211028==1.0.2

オンラインデバッグと SDK サンプルコードの生成

OpenAPI Explorer は、オンラインでの API 呼び出し、動的な SDK サンプルコードの生成、迅速な API 検索などの機能を提供します。これらの機能により、API の使用が簡素化されます。必要な API の SDK サンプルは、開発コンソールの API および 管理コンソールの API のページで表示およびダウンロードできます。詳細については、「クイックスタート」をご参照ください。

image

説明
  • Realtime Compute for Apache Flink の管理コンソールのエンドポイントの詳細については、「エンドポイント」をご参照ください。

  • Realtime Compute for Apache Flink の開発コンソールのエンドポイントの詳細については、「サービスエンドポイント」をご参照ください。

購入済みワークスペースの表示

指定されたリージョンで購入した Flink ワークスペースの詳細をクエリできます。必須のリクエストパラメーターは以下の通りです。

Region:リージョン ID です。例:cn-hangzhou。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> foasconsole20211028Client:
        """
        AccessKey ID と AccessKey Secret を使用してクライアントを初期化します。
        @return: Client
        @throws Exception
        """
        # プロジェクトコードが漏洩すると、AccessKey が漏洩し、アカウントのすべてのリソースのセキュリティが脅かされる可能性があります。STS などのより安全な方法を使用してください。以下のサンプルコードは参照用です。
        config = open_api_models.Config(
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # 必要に応じてエンドポイントを変更してください。
        config.endpoint = f'foasconsole.aliyuncs.com'
        return foasconsole20211028Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # API を呼び出し、戻り値を表示します。
            response=client.describe_instances_with_options(describe_instances_request, runtime)
            print(response)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。
            await client.describe_instances_with_options_async(describe_instances_request, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

デプロイ済みジョブのリストの取得

名前空間内のすべてのデプロイメントに関する情報を取得できます。必須のリクエストパラメーターは以下の通りです。

  • workspace:ワークスペース ID。この ID は、「購入済みワークスペースの表示」で説明されている操作によって返される ResourceId から取得できます。例:adf9e5147a****。

  • namespace:プロジェクト名。例:script****-default。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        AccessKey ID と AccessKey Secret を使用してクライアントを初期化します。
        @return: Client
        @throws Exception
        """
        # プロジェクトコードが漏洩すると、AccessKey が漏洩し、アカウントのすべてのリソースのセキュリティが脅かされる可能性があります。STS などのより安全な方法を使用してください。以下のサンプルコードは参照用です。
        config = open_api_models.Config(
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # 必要に応じてエンドポイントを変更してください。
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # API を呼び出し、戻り値を表示します。
            request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
            print(request)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。namespace はプロジェクト名です。
            await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

ジョブの開始

この操作は、プロジェクトにデプロイされたジョブを開始します。以下のリクエストパラメーターが必要です。

  • workspace:ワークスペース ID。例:adf9e5147a****。

  • namespace:プロジェクト名。例:script****-default。

  • deploymentId:ジョブのデプロイメント ID。この ID は、「デプロイ済みジョブのリストの取得」で説明されているように取得できます。例:3171d4d1-5952-4d02-b978-e762493b****。

  • kind:開始オフセットのタイプ。有効な値:`NONE` (ステートレスで開始)、`LATEST_SAVEPOINT` (最新のスナップショットから開始)、`FROM_SAVEPOINT` (指定したスナップショットから開始)、`LATEST_STATE` (最新のステートから開始)。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        AccessKey ID と AccessKey Secret を使用してクライアントを初期化します。
        @return: Client
        @throws Exception
        """
        # プロジェクトコードが漏洩すると、AccessKey が漏洩し、アカウントのすべてのリソースのセキュリティが脅かされる可能性があります。STS などのより安全な方法を使用してください。以下のサンプルコードは参照用です。
        config = open_api_models.Config(
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # 必要に応じてエンドポイントを変更してください。
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。
            client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            # ジョブ開始ポリシー
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。
            await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

ジョブインスタンスのリストの取得

デプロイメント内のすべてのジョブインスタンスに関する情報を取得できます。必須のリクエストパラメーターは以下の通りです。

  • workspace:ワークスペース ID。例:adf9e5147a****。

  • namespace:プロジェクト名。例:script****-default。

  • deploymentId:ジョブのデプロイメント ID。この ID は、「デプロイ済みジョブのリストの取得」で説明されているように取得できます。例:3171d4d1-5952-4d02-b978-e762493b****。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        AccessKey ID と AccessKey Secret を使用してクライアントを初期化します。
        @return: Client
        @throws Exception
        """
        # プロジェクトコードが漏洩すると、AccessKey が漏洩し、アカウントのすべてのリソースのセキュリティが脅かされる可能性があります。STS などのより安全な方法を使用してください。以下のサンプルコードは参照用です。
        config = open_api_models.Config(
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # 必要に応じてエンドポイントを変更してください。
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # API を呼び出し、戻り値を表示します。
            request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
            print(request)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。
            await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

ジョブインスタンスの停止

ジョブインスタンスを停止できます。必須のリクエストパラメーターは以下の通りです。。

  • workspace:ワークスペース ID。例:adf9e5147a****。

  • namespace:プロジェクト名。例:script****-default。

  • jobId:ジョブインスタンス ID。この ID は、「ジョブインスタンスのリストの取得」で説明されているように取得できます。例:3171d4d1-5952-4d02-b978-e762493b****。

  • stopStrategy:ジョブを停止するために使用されるポリシー。有効な値:`NONE` (即時停止)、`STOP_WITH_SAVEPOINT` (スナップショットを作成してから停止)、`STOP_WITH_DRAIN` (ドレインして停止)。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        AccessKey ID と AccessKey Secret を使用してクライアントを初期化します。
        @return: Client
        @throws Exception
        """
        # プロジェクトコードが漏洩すると、AccessKey が漏洩し、アカウントのすべてのリソースのセキュリティが脅かされる可能性があります。STS などのより安全な方法を使用してください。以下のサンプルコードは参照用です。
        config = open_api_models.Config(
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # 必要に応じてエンドポイントを変更してください。
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            # ジョブ停止ポリシー
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。
            client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # コードをコピーして実行する場合は、API の戻り値を自分で表示してください。
            await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # これはデモンストレーション用です。例外の処理には注意してください。プロジェクトで例外を無視しないでください。
            # エラーメッセージ
            print(error.message)
            # トラブルシューティング URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

参考文献

詳細については、「Java SDK リファレンス」をご参照ください。