All Products
Search
Document Center

Realtime Compute for Apache Flink:Referensi SDK Python

Last Updated:Jun 21, 2026

Topik ini menjelaskan cara menginstal dan menggunakan SDK Python untuk Realtime Compute for Apache Flink.

Prasyarat

  • Buat pasangan Kunci Akses (AccessKey pair). Untuk informasi selengkapnya, lihat Create an AccessKey pair.

    Catatan

    Untuk menghindari risiko keamanan akibat terpaparnya pasangan Kunci Akses Akun Alibaba Cloud Anda, kami menyarankan agar Anda menggunakan pasangan Kunci Akses milik Pengguna RAM. Buat Pengguna RAM, berikan izin yang diperlukan untuk mengakses Realtime Compute for Apache Flink, lalu gunakan pasangan Kunci Akses pengguna tersebut untuk memanggil SDK. Untuk informasi selengkapnya, lihat topik berikut:

  • Anda telah menginstal Python 3.6 atau versi yang lebih baru.

  • Akun Anda memiliki izin yang diperlukan. Untuk informasi selengkapnya, lihat Manage permissions.

Instal Flink Python SDK

Instal SDK Python menggunakan pip.

  • Saat melakukan operasi seperti pengembangan pekerjaan (job development) dan O&M, Anda harus memanggil API Realtime Compute Development Console. Untuk detail instalasi dan penggunaan, lihat Development Console SDK Center.

    pip3 install alibabacloud_ververica20220718==1.2.1
  • Untuk melihat informasi ruang kerja (workspace), membeli ruang kerja, atau menyesuaikan sumber daya, Anda perlu memanggil API Realtime Compute Selling Console. Untuk informasi selengkapnya mengenai instalasi dan penggunaan, lihat Realtime Compute Selling Console SDK Center.

    pip3 install alibabacloud_foasconsole20211028==1.0.2

Uji API dan hasilkan contoh SDK secara online

OpenAPI Explorer menyederhanakan penggunaan API. Anda dapat menggunakannya untuk melakukan panggilan API, menghasilkan kode contoh SDK secara dinamis, serta mencari operasi API dengan cepat guna mempercepat proses pengembangan Anda. Anda dapat melihat dan mengunduh kode contoh SDK pada halaman referensi API untuk development console dan Realtime Compute Selling Console. Untuk langkah-langkah lengkapnya, lihat Quick starts.

Pada bagian SDK Sample, pilih Python lalu klik Download Complete Project untuk mendapatkan proyek contoh SDK lengkap untuk API tersebut.

Contoh kode

Catatan
  • Titik akhir (endpoints) untuk Realtime Compute Selling Console tercantum di Endpoints.

  • Titik akhir (endpoints) untuk development console tercantum di Endpoints.

Lihat ruang kerja yang telah dibeli

Contoh ini menunjukkan cara mengkueri detail ruang kerja yang telah dibeli di wilayah tertentu. Parameter permintaan berikut wajib.

Region: ID wilayah. Contohnya, cn-hangzhou..

# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
    def __init__(self):
        pass
    @staticmethod
    def create_client() -> foasconsole20211028Client:
        """
        Gunakan pasangan Kunci Akses untuk menginisialisasi client.
        @return: Client
        @throws Exception
        """
        # Menyematkan pasangan Kunci Akses secara langsung ke dalam kode proyek Anda dapat menimbulkan risiko keamanan. Kami menyarankan menggunakan metode yang lebih aman, seperti STS. Kode berikut hanya sebagai referensi.
        config = open_api_models.Config(
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah disetel di lingkungan runtime Anda.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah disetel di lingkungan runtime Anda.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Sesuaikan endpoint sesuai kebutuhan aktual Anda.
        config.endpoint = f'foasconsole.aliyuncs.com'
        return foasconsole20211028Client(config)
    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Panggil API dan cetak respons.
            response=client.describe_instances_with_options(describe_instances_request, runtime)
            print(response)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya.
            await client.describe_instances_with_options_async(describe_instances_request, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Daftar deployment

Contoh ini menunjukkan cara mendaftar semua deployment dalam suatu namespace. Parameter permintaan berikut wajib.

  • workspace: ID ruang kerja. Anda dapat memperoleh ID ini dari ResourceId yang dikembalikan oleh contoh View purchased workspaces. Contoh: adf9e5147a****.

  • namespace: Nama namespace. Contoh: script****-default.

# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
    def __init__(self):
        pass
    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Gunakan pasangan Kunci Akses untuk menginisialisasi client.
        @return: Client
        @throws Exception
        """
        # Menyematkan pasangan Kunci Akses secara langsung ke dalam kode proyek Anda dapat menimbulkan risiko keamanan. Kami menyarankan menggunakan metode yang lebih aman, seperti STS. Kode berikut hanya sebagai referensi.
        config = open_api_models.Config(
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah disetel di lingkungan runtime Anda.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah disetel di lingkungan runtime Anda.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Sesuaikan endpoint sesuai kebutuhan aktual Anda.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)
    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # Panggil API dan cetak respons.
            request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
            print(request)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya. Parameter `namespace` menentukan nama namespace.
            await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Mulai Pekerjaan

Contoh ini menunjukkan cara menjalankan pekerjaan dari suatu deployment dalam namespace. Parameter permintaan berikut wajib.

  • workspace: ID ruang kerja. Contoh: adf9e5147a****.

  • namespace: Nama namespace. Contoh: script****-default.

  • deploymentId: ID deployment. Anda dapat memperoleh ID ini dengan memanggil operasi ListDeployments. Contoh: 3171d4d1-5952-4d02-b978-e762493b****.

  • kind: Jenis offset awal. Nilai yang valid: NONE (start tanpa status), LATEST_SAVEPOINT (start dari titik simpan terbaru), FROM_SAVEPOINT (start dari titik simpan tertentu), dan LATEST_STATE (start dari status terbaru).

# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
    def __init__(self):
        pass
    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Gunakan pasangan Kunci Akses untuk menginisialisasi client.
        @return: Client
        @throws Exception
        """
        # Menyematkan pasangan Kunci Akses secara langsung ke dalam kode proyek Anda dapat menimbulkan risiko keamanan. Kami menyarankan menggunakan metode yang lebih aman, seperti STS. Kode berikut hanya sebagai referensi.
        config = open_api_models.Config(
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah disetel di lingkungan runtime Anda.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah disetel di lingkungan runtime Anda.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Sesuaikan endpoint sesuai kebutuhan aktual Anda.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)
    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya.
            client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            # Strategi pemulihan untuk pekerjaan.
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya.
            await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Daftar pekerjaan (jobs)

Contoh ini menunjukkan cara mendaftar semua pekerjaan untuk deployment tertentu. Parameter permintaan berikut wajib.

  • workspace: ID ruang kerja. Contoh: adf9e5147a****.

  • namespace: Nama namespace. Contoh: script****-default.

  • deploymentId: ID deployment. Anda dapat memperoleh ID ini dengan memanggil operasi ListDeployments. Contoh: 3171d4d1-5952-4d02-b978-e762493b****.

# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
    def __init__(self):
        pass
    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Gunakan pasangan Kunci Akses untuk menginisialisasi client.
        @return: Client
        @throws Exception
        """
        # Menyematkan pasangan Kunci Akses secara langsung ke dalam kode proyek Anda dapat menimbulkan risiko keamanan. Kami menyarankan menggunakan metode yang lebih aman, seperti STS. Kode berikut hanya sebagai referensi.
        config = open_api_models.Config(
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah disetel di lingkungan runtime Anda.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah disetel di lingkungan runtime Anda.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Sesuaikan endpoint sesuai kebutuhan aktual Anda.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)
    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Panggil API dan cetak respons.
            request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
            print(request)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya.
            await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Hentikan Pekerjaan

Contoh ini menunjukkan cara menghentikan pekerjaan. Parameter permintaan berikut wajib.

  • workspace: ID ruang kerja. Contoh: adf9e5147a****.

  • namespace: Nama namespace. Contoh: script****-default.

  • jobId: ID pekerjaan. Anda dapat memperoleh ID ini dengan memanggil operasi ListJobs. Contoh: 3171d4d1-5952-4d02-b978-e762493b****.

  • stopStrategy: Strategi penghentian. Nilai yang valid: NONE (hentikan segera), STOP_WITH_SAVEPOINT (buat titik simpan sebelum menghentikan), dan STOP_WITH_DRAIN (hentikan dengan drain).

# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
    def __init__(self):
        pass
    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Gunakan pasangan Kunci Akses untuk menginisialisasi client.
        @return: Client
        @throws Exception
        """
        # Menyematkan pasangan Kunci Akses secara langsung ke dalam kode proyek Anda dapat menimbulkan risiko keamanan. Kami menyarankan menggunakan metode yang lebih aman, seperti STS. Kode berikut hanya sebagai referensi.
        config = open_api_models.Config(
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID telah disetel di lingkungan runtime Anda.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Wajib. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah disetel di lingkungan runtime Anda.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Sesuaikan endpoint sesuai kebutuhan aktual Anda.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)
    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            # Strategi penghentian untuk pekerjaan.
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya.
            client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Jika Anda menyalin kode ini untuk dijalankan, cetak sendiri respons API-nya.
            await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # Ini hanya untuk demonstrasi. Terapkan penanganan error yang tepat di kode produksi Anda dan jangan abaikan exception.
            # Pesan error
            print(error.message)
            # URL pemecahan masalah
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Dokumentasi terkait

Untuk detail SDK Java, lihat Java SDK reference.