このトピックでは、Realtime Compute for Apache Flink Python 用 SDK のインストール方法と使用方法について説明します。
前提条件
AccessKey ペアが作成されていること。詳細については、「AccessKey ペアの作成」をご参照ください。
説明Alibaba Cloud アカウントの AccessKey ペアを保護するために、Resource Access Management (RAM) ユーザーを作成し、Realtime Compute for Apache Flink へのアクセス権限を RAM ユーザーに付与してから、RAM ユーザーの AccessKey ペアを使用して Realtime Compute for Apache Flink SDK を呼び出すことをお勧めします。
RAM ユーザーの作成方法と RAM ユーザーの AccessKey ペアの取得方法の詳細については、「RAM ユーザーの作成」または「AccessKey ペアの取得」をご参照ください。
RAM ユーザーに権限を付与する方法の詳細については、「RAM ユーザーへの権限の付与」をご参照ください。
Python 環境が準備されていること。Python のバージョンは 3.6 以降である必要があります。
使用するアカウントに必要なアクセス権限と操作権限が付与されていること。詳細については、「権限管理」をご参照ください。
Realtime Compute for Apache Flink Python 用 SDK のインストール
pip を使用して Realtime Compute for Apache Flink Python 用 SDK をインストールします。
ドラフト開発やデプロイ O&M などの操作を実行するには、Realtime Compute for Apache Flink の開発コンソールの API 操作を呼び出す必要があります。SDK のインストール方法と使用方法の詳細については、SDK Center をご参照ください。
pip3 install alibabacloud_ververica20220718==1.2.1
ワークスペース情報の表示、ワークスペースの購入、ワークスペース内のリソースの再構成などの操作を実行するには、Realtime Compute for Apache Flink の管理コンソールの API 操作を呼び出す必要があります。SDK のインストール方法と使用方法の詳細については、SDK Center をご参照ください。
pip3 install alibabacloud_foasconsole20211028==1.0.2
オンラインデバッグの実行と SDK サンプルコードの生成
OpenAPI Explorer を使用して、API 操作をオンラインで呼び出し、API 操作を検索し、SDK サンプルコードを動的に生成できます。これにより、API 操作の使用が簡素化されます。必要な API 操作の SDK サンプルコードは、OpenAPI Explorer の Realtime Compute for Apache Flink ページと Realtime Compute Selling Console ページで表示およびダウンロードできます。詳細については、「IDE での Alibaba Cloud SDK for Python の使用」をご参照ください。
例
購入したワークスペースの表示
指定したリージョンで購入した Realtime Compute for Apache Flink ワークスペースの詳細をクエリできます。必須のリクエストパラメータ:
Region
:リージョン ID(例:cn-hangzhou)。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> foasconsole20211028Client:
"""
AccessKey ID と AccessKey シークレットを使用してクライアントを初期化します。
@return: Client
@throws Exception
"""
# プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウントのすべてのリソースでセキュリティ問題が発生する可能性があります。Security Token Service (STS) トークンを使用することをお勧めします。次のコードは参照用です。
config = open_api_models.Config(
# 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# ビジネス要件に基づいてエンドポイントを変更します。
config.endpoint = f'foasconsole.aliyuncs.com'
return foasconsole20211028Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# API 操作を呼び出し、戻り値を出力します。
response=client.describe_instances_with_options(describe_instances_request, runtime)
print(response)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。
await client.describe_instances_with_options_async(describe_instances_request, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
すべてのデプロイに関する情報の取得
名前空間内のすべてのデプロイに関する情報を取得できます。必須のリクエストパラメータ:
workspace
:ワークスペースの ID。購入したワークスペースの表示 の ResourceId パラメータの戻り値からこのパラメータの値を取得できます。例:adf9e5147a****。namespace
:名前空間の名前(例:script****-default)。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
AccessKey ID と AccessKey シークレットを使用してクライアントを初期化します。
@return: Client
@throws Exception
"""
# プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウントのすべてのリソースでセキュリティ問題が発生する可能性があります。STS トークンを使用することをお勧めします。次のコードは参照用です。
config = open_api_models.Config(
# 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# ビジネス要件に基づいてエンドポイントを変更します。
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# API 操作を呼び出し、戻り値を出力します。
request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
print(request)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。namespace は名前空間の名前を示します。
await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
デプロイの開始
名前空間内のデプロイを開始できます。必須のリクエストパラメータ:
workspace
:ワークスペースの ID(例:adf9e5147a****)。namespace
:名前空間の名前(例:script****-default)。deploymentId
:デプロイの ID。すべてのデプロイに関する情報の取得 でこのパラメータの値を取得できます。例:3171d4d1-5952-4d02-b978-e762493b****。kind
:開始オフセットのタイプ。有効な値:NONE、LATEST_SAVEPOINT、FROM_SAVEPOINT、LATEST_STATE。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
AccessKey ID と AccessKey シークレットを使用してクライアントを初期化します。
@return: Client
@throws Exception
"""
# プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウントのすべてのリソースでセキュリティ問題が発生する可能性があります。STS トークンを使用することをお勧めします。次のコードは参照用です。
config = open_api_models.Config(
# 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# ビジネス要件に基づいてエンドポイントを変更します。
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。
client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
# デプロイの起動戦略。
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。
await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
すべてのジョブに関する情報の取得
デプロイ内のすべてのジョブに関する情報を取得できます。必須のリクエストパラメータ:
workspace
:ワークスペースの ID(例:adf9e5147a****)。namespace
:名前空間の名前(例:script****-default)。deploymentId
:デプロイの ID。すべてのデプロイに関する情報の取得 で取得できます。例:3171d4d1-5952-4d02-b978-e762493b****。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
AccessKey ID と AccessKey シークレットを使用してクライアントを初期化します。
@return: Client
@throws Exception
"""
# プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウントのすべてのリソースでセキュリティ問題が発生する可能性があります。STS トークンを使用することをお勧めします。次のコードは参照用です。
config = open_api_models.Config(
# 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# ビジネス要件に基づいてエンドポイントを変更します。
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# API 操作を呼び出し、戻り値を出力します。
request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
print(request)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。
await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
ジョブの停止
デプロイ内のジョブを停止できます。パラメータの説明:
workspace
:ワークスペースの ID(例:adf9e5147a****)。namespace
:名前空間の名前(例:script****-default)。jobId
:ジョブの ID。すべてのジョブに関する情報の取得 で取得できます。例:3171d4d1-5952-4d02-b978-e762493b****。stopStrategy
:ジョブの停止に使用する戦略。有効な値:NONE、STOP_WITH_SAVEPOINT、STOP_WITH_DRAIN。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
AccessKey ID と AccessKey シークレットを使用してクライアントを初期化します。
@return: Client
@throws Exception
"""
# プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウントのすべてのリソースでセキュリティ問題が発生する可能性があります。STS トークンを使用することをお勧めします。次のコードは参照用です。
config = open_api_models.Config(
# 必須。ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が設定されていることを確認してください。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必須。ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# ビジネス要件に基づいてエンドポイントを変更します。
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
# ジョブの停止に使用する戦略。
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。
client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# サンプルコードを実行して、API 操作の戻り値を取得します。
await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージは参照用にのみ出力されます。
# エラーメッセージを出力します。
print(error.message)
# トラブルシューティング用の URL を表示します。
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
参考文献
Java 用 SDK の詳細については、「Java 用 SDK」をご参照ください。