本文為您介紹如何安裝並使用Realtime ComputeFlink版Python SDK。
前提條件
已建立AccessKey,詳情請參見建立AccessKey。
說明為避免主帳號泄露AccessKey帶來安全風險,建議您建立RAM使用者,授予RAM使用者Flink相關的存取權限,再使用RAM使用者的AccessKey調用SDK。相關文檔請參見:
建立RAM使用者以及對應AccessKey,請參見建立RAM使用者或建立AccessKey。
為RAM使用者授權,請參見RAM授權。
已準備Python環境,要求3.6及以上版本。
帳號具有相關訪問及操作許可權,詳情請參見許可權管理。
安裝 Flink Python SDK
通過pip安裝Python SDK。
進行作業開發和營運等操作時,需要調用Realtime Compute開發控制台API。安裝和使用詳情請參見開發控制台SDK中心。
pip3 install alibabacloud_ververica20220718==1.2.1查看工作空間資訊,進行工作空間的購買、資源調整等操作時,需要調用Realtime Compute管理主控台API。安裝和使用詳情請參見售賣控制台SDK中心。
pip3 install alibabacloud_foasconsole20211028==1.0.2
線上調試和產生SDK樣本
OpenAPI門戶提供了線上調用產品API、動態產生SDK範例程式碼和快速檢索介面等功能,可以顯著降低使用API的難度。您可以在Realtime Compute開發控制台API和Realtime Compute售賣控制台API頁面查看所需API的SDK樣本,並下載使用,具體操作步驟請參見快速開始。

參考樣本
查看已購買的工作空間
查詢目標地區下已購買的Flink工作空間的詳細資料。必填請求參數如下。
Region:地區ID。例如cn-hangzhou。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> foasconsole20211028Client:
"""
使用AK&SK初始化帳號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性,建議使用更安全的 STS 方式。以下程式碼範例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'foasconsole.aliyuncs.com'
return foasconsole20211028Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# 調用並列印API的傳回值
response=client.describe_instances_with_options(describe_instances_request, runtime)
print(response)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值
await client.describe_instances_with_options_async(describe_instances_request, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])擷取已部署作業列表
擷取專案空間下所有已部署作業的資訊。必填請求參數如下。
workspace:工作空間ID,可通過查看已購買的工作空間返回的ResourceId擷取。例如adf9e5147a****。namespace:專案空間名稱,例如script****-default。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化帳號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性,建議使用更安全的 STS 方式。以下程式碼範例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
#調用並列印API的傳回值
request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
print(request)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值。namespace為專案空間名稱
await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])啟動作業
啟動專案空間下一個已部署作業。必填請求參數如下。
workspace:工作空間ID,例如adf9e5147a****。namespace:專案空間名稱,例如script****-default。deploymentId:作業部署ID,可通過擷取已部署作業列表擷取。例如3171d4d1-5952-4d02-b978-e762493b****。kind:啟動位點類型。支援NONE(無狀態啟動)、LATEST_SAVEPOINT(最新的作業快照啟動)、FROM_SAVEPOINT(從指定快照啟動)、LATEST_STATE(最新狀態啟動)。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化帳號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性,建議使用更安全的 STS 方式。以下程式碼範例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值
client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
#作業啟動策略
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值
await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])擷取工作執行個體列表
擷取某個已部署作業下所有工作執行個體的資訊。必填請求參數如下。
workspace:工作空間ID,例如adf9e5147a****。namespace:專案空間名稱,例如script****-default。deploymentId:作業部署ID,可以通過擷取已部署作業列表擷取。例如3171d4d1-5952-4d02-b978-e762493b****。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化帳號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性,建議使用更安全的 STS 方式。以下程式碼範例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# 調用並列印API的傳回值
request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
print(request)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值
await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])停止工作執行個體
停止一個工作執行個體。必填請求參數如下。
workspace:工作空間ID,例如adf9e5147a****。namespace:專案空間名稱,例如script****-default。jobId:工作執行個體ID,您可以通過擷取工作執行個體列表擷取。例如3171d4d1-5952-4d02-b978-e762493b****。stopStrategy:作業停止策略。支援NONE(直接停止)、STOP_WITH_SAVEPOINT(產生作業快照後停止)、STOP_WITH_DRAIN(以drain的方式停止)。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化帳號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,並威脅帳號下所有資源的安全性,建議使用更安全的 STS 方式。以下程式碼範例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_ID。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設定了環境變數 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
# 作業停止策略
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值
client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# 複製代碼運行請自行列印 API 的傳回值
await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])相關文檔
Java SDK參考請參見Java SDK參考。