This topic describes how to publish events to EventBridge by using SDK for Python to call the required API operation.
Prerequisites
Before you start, make sure that the following prerequisites are met:
Publish an event
Sample code:
# -*- coding: utf-8 -*-
from alibabacloud_eventbridge.client import Client as EventBridgeClient
from alibabacloud_eventbridge import models as event_bridge_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient
class put_events(object):
def __init__(self):
pass
@staticmethod
def create_client():
"""
Uses the create_client() function to initialize common request parameters.
"""
config = event_bridge_models.Config(
)
# Your AccessKey ID.
config.access_key_id = "<accessKeyId>"
# Your AccessKey secret.
config.access_key_secret = "<accessKeySecret>"
# Your endpoint.
config.endpoint = "<endpoint>"
return EventBridgeClient(config)
@staticmethod
def put_events(client):
"""
PutEvents
"""
event = event_bridge_models.CloudEvent(
)
event.datacontenttype = "application/json"
event.data = UtilClient.to_bytes("test")
event.id = "a5074581-7e74-4e4c-868f-47e7afdf****"
event.source = "acs.oss"
event.specversion = "1.0"
event.type = "oss:ObjectCreated:PostObject"
event.time = "2020-08-24T13:54:05.965Asia/Shanghai"
event.subject = "1.0"
event.type = "acs:oss:cn-hangzhou:1234567:xls-papk/game_apk/123.jpg"
event.extensions = {
"aliyuneventbusname": "demo-bus"
}
try:
resp = client.put_events([
event
])
ConsoleClient.log("--------------------Publish event to the aliyun EventBus--------------------")
ConsoleClient.log(UtilClient.to_jsonstring(resp.to_map()))
except Exception as error:
ConsoleClient.log(error.message)
@staticmethod
def main(args):
client = put_events.create_client()
put_events.put_events(client)
put_events.main("")