This topic describes how to publish events to EventBridge by using SDK for Python to call the required API operation.

Prerequisites

Before you start, make sure that the following prerequisites are met:

Publish an event

Sample code:

# -*- coding: utf-8 -*-

from alibabacloud_eventbridge.client import Client as EventBridgeClient
from alibabacloud_eventbridge import models as event_bridge_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient


class put_events(object):
    def __init__(self):
        pass

    @staticmethod
    def create_client():
        """
        Uses the create_client() function to initialize common request parameters.
        """
        config = event_bridge_models.Config(

        )
        # Your AccessKey ID.
        config.access_key_id = "<accessKeyId>"
        # Your AccessKey secret.
        config.access_key_secret = "<accessKeySecret>"
        # Your endpoint.
        config.endpoint = "<endpoint>"
        return EventBridgeClient(config)

    @staticmethod
    def put_events(client):
        """
        PutEvents
        """
        event = event_bridge_models.CloudEvent(

        )
        event.datacontenttype = "application/json"
        event.data = UtilClient.to_bytes("test")
        event.id = "a5074581-7e74-4e4c-868f-47e7afdf****"
        event.source = "acs.oss"
        event.specversion = "1.0"
        event.type = "oss:ObjectCreated:PostObject"
        event.time = "2020-08-24T13:54:05.965Asia/Shanghai"
        event.subject = "1.0"
        event.type = "acs:oss:cn-hangzhou:1234567:xls-papk/game_apk/123.jpg"
        event.extensions = {
            "aliyuneventbusname": "demo-bus"
        }
        try:
            resp = client.put_events([
                event
            ])
            ConsoleClient.log("--------------------Publish event to the aliyun EventBus--------------------")
            ConsoleClient.log(UtilClient.to_jsonstring(resp.to_map()))
        except Exception as error:
            ConsoleClient.log(error.message)

    @staticmethod
    def main(args):
        client = put_events.create_client()
        put_events.put_events(client)

put_events.main("")