Topik ini menjelaskan cara menginstal dan menggunakan kit pengembangan perangkat lunak (SDK) Python untuk Realtime Compute for Apache Flink.
Prasyarat
AccessKey telah dibuat. Untuk informasi selengkapnya, lihat Create an AccessKey.
CatatanUntuk mencegah risiko keamanan akibat kebocoran AccessKey, kami menyarankan agar Anda tidak menggunakan AccessKey akun root. Sebagai gantinya, buat Pengguna Resource Access Management (RAM), berikan izin akses Flink yang diperlukan kepada pengguna RAM tersebut, lalu gunakan AccessKey pengguna RAM untuk memanggil SDK. Untuk informasi selengkapnya, lihat topik berikut:
Untuk membuat pengguna RAM dan AccessKey, lihat Create a RAM user atau Create an AccessKey.
Untuk memberikan izin kepada pengguna RAM, lihat Authorize a RAM user in the management console.
Diperlukan lingkungan Python 3.6 atau versi yang lebih baru.
Akun Anda memiliki izin akses dan operasi yang diperlukan. Untuk informasi selengkapnya, lihat Permission management.
Instal Flink Python SDK
Anda dapat menginstal Python SDK menggunakan pip.
Untuk melakukan operasi seperti pengembangan pekerjaan dan O&M, Anda dapat memanggil API Konsol pengembangan untuk Realtime Compute for Apache Flink. Untuk informasi selengkapnya tentang cara menginstal dan menggunakan SDK, lihat Development Console SDK Center.
pip3 install alibabacloud_ververica20220718==1.2.1Untuk melakukan operasi seperti melihat informasi ruang kerja, membeli ruang kerja, dan mengonfigurasi ulang resource dalam ruang kerja, Anda dapat memanggil API Konsol management untuk Realtime Compute for Apache Flink. Untuk informasi selengkapnya tentang cara menginstal dan menggunakan SDK, lihat SDK Center.
pip3 install alibabacloud_foasconsole20211028==1.0.2
Debugging online dan pembuatan contoh SDK
OpenAPI Explorer menyediakan fitur seperti pemanggilan API online, pembuatan kode contoh SDK secara dinamis, dan pengambilan API cepat. Fitur-fitur ini menyederhanakan penggunaan API. Anda dapat melihat dan mengunduh contoh SDK untuk API yang diperlukan di halaman APIs for the development console dan APIs for the management console. Untuk informasi selengkapnya, lihat Quick start.

Contoh
Untuk informasi selengkapnya tentang titik akhir Konsol management Realtime Compute for Apache Flink, lihat Endpoints.
Untuk informasi selengkapnya tentang titik akhir Konsol pengembangan Realtime Compute for Apache Flink, lihat Service Endpoints.
Lihat ruang kerja yang telah dibeli
Anda dapat mengkueri detail ruang kerja Flink yang telah dibeli di wilayah tertentu. Parameter permintaan yang diperlukan tercantum di bawah ini.
Region: ID wilayah. Contoh: cn-hangzhou.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> foasconsole20211028Client:
"""
Menginisialisasi klien dengan ID AccessKey dan rahasia AccessKey.
@return: Client
@throws Exception
"""
# Kebocoran kode proyek dapat menyebabkan kebocoran AccessKey dan mengancam keamanan semua resource di bawah akun Anda. Gunakan metode yang lebih aman, seperti menggunakan STS. Kode contoh berikut hanya untuk referensi.
config = open_api_models.Config(
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah diatur.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah diatur.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Ubah titik akhir sesuai kebutuhan.
config.endpoint = f'foasconsole.aliyuncs.com'
return foasconsole20211028Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# Panggil API dan cetak nilai kembali.
response=client.describe_instances_with_options(describe_instances_request, runtime)
print(response)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API.
await client.describe_instances_with_options_async(describe_instances_request, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Dapatkan daftar pekerjaan yang telah diterapkan
Anda dapat memperoleh informasi tentang semua deployment dalam namespace. Parameter permintaan yang diperlukan tercantum di bawah ini.
workspace: ID ruang kerja. Anda dapat memperoleh ID ini dari ResourceId yang dikembalikan oleh operasi yang dijelaskan dalam View purchased workspaces. Contoh: adf9e5147a****.namespace: Nama proyek. Contoh: script****-default.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Menginisialisasi klien dengan ID AccessKey dan rahasia AccessKey.
@return: Client
@throws Exception
"""
# Kebocoran kode proyek dapat menyebabkan kebocoran AccessKey dan mengancam keamanan semua resource di bawah akun Anda. Gunakan metode yang lebih aman, seperti menggunakan STS. Kode contoh berikut hanya untuk referensi.
config = open_api_models.Config(
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah diatur.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah diatur.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Ubah titik akhir sesuai kebutuhan.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# Panggil API dan cetak nilai kembali.
request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
print(request)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API. namespace adalah nama proyek.
await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Jalankan pekerjaan
Operasi ini menjalankan pekerjaan yang telah diterapkan dalam proyek. Parameter permintaan berikut diperlukan.
workspace: ID ruang kerja. Contoh: adf9e5147a****.namespace: Nama proyek. Contoh: script****-default.deploymentId: ID deployment pekerjaan. Anda dapat memperoleh ID ini sebagaimana dijelaskan dalam Retrieve the list of deployed jobs. Contoh: 3171d4d1-5952-4d02-b978-e762493b****.kind: Jenis offset awal. Nilai yang valid: `NONE` (start tanpa status), `LATEST_SAVEPOINT` (mulai dari snapshot terbaru), `FROM_SAVEPOINT` (mulai dari snapshot tertentu), dan `LATEST_STATE` (mulai dari state terbaru).
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Menginisialisasi klien dengan ID AccessKey dan rahasia AccessKey.
@return: Client
@throws Exception
"""
# Kebocoran kode proyek dapat menyebabkan kebocoran AccessKey dan mengancam keamanan semua resource di bawah akun Anda. Gunakan metode yang lebih aman, seperti menggunakan STS. Kode contoh berikut hanya untuk referensi.
config = open_api_models.Config(
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah diatur.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah diatur.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Ubah titik akhir sesuai kebutuhan.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API.
client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
# Kebijakan start pekerjaan
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API.
await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Dapatkan daftar instans pekerjaan
Anda dapat memperoleh informasi tentang semua instans pekerjaan dalam deployment. Parameter permintaan yang diperlukan tercantum di bawah ini.
workspace: ID ruang kerja. Contoh: adf9e5147a****.namespace: Nama proyek. Contoh: script****-default.deploymentId: ID deployment pekerjaan. Anda dapat memperoleh ID ini sebagaimana dijelaskan dalam Retrieve the list of deployed jobs. Contoh: 3171d4d1-5952-4d02-b978-e762493b****.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Menginisialisasi klien dengan ID AccessKey dan rahasia AccessKey.
@return: Client
@throws Exception
"""
# Kebocoran kode proyek dapat menyebabkan kebocoran AccessKey dan mengancam keamanan semua resource di bawah akun Anda. Gunakan metode yang lebih aman, seperti menggunakan STS. Kode contoh berikut hanya untuk referensi.
config = open_api_models.Config(
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah diatur.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah diatur.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Ubah titik akhir sesuai kebutuhan.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# Panggil API dan cetak nilai kembali.
request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
print(request)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API.
await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Hentikan instans pekerjaan
Anda dapat menghentikan instans pekerjaan. Parameter permintaan yang diperlukan tercantum di bawah ini. .
workspace: ID ruang kerja. Contoh: adf9e5147a****.namespace: Nama proyek. Contoh: script****-default.jobId: ID instans pekerjaan. Anda dapat memperoleh ID ini sebagaimana dijelaskan dalam Retrieve the list of job instances. Contoh: 3171d4d1-5952-4d02-b978-e762493b****.stopStrategy: Kebijakan yang digunakan untuk menghentikan pekerjaan. Nilai yang valid: `NONE` (hentikan segera), `STOP_WITH_SAVEPOINT` (buat snapshot lalu hentikan), dan `STOP_WITH_DRAIN` (hentikan dengan drain).
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Menginisialisasi klien dengan ID AccessKey dan rahasia AccessKey.
@return: Client
@throws Exception
"""
# Kebocoran kode proyek dapat menyebabkan kebocoran AccessKey dan mengancam keamanan semua resource di bawah akun Anda. Gunakan metode yang lebih aman, seperti menggunakan STS. Kode contoh berikut hanya untuk referensi.
config = open_api_models.Config(
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah diatur.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah diatur.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Ubah titik akhir sesuai kebutuhan.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
# Kebijakan henti pekerjaan
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API.
client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# Jika Anda menyalin kode untuk menjalankannya, cetak sendiri nilai kembali API.
await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# Ini hanya untuk demonstrasi. Tangani pengecualian dengan hati-hati. Jangan abaikan pengecualian dalam proyek Anda.
# Pesan kesalahan
print(error.message)
# URL pemecahan masalah
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Referensi
Untuk informasi selengkapnya, lihat Java SDK Reference.