What is the ODPS event mechanism?
ODPS events are used to monitor ODPS resources such as tables and instances (only applied to table monitoring at present). When the table state changes, ODPS will send messages to the pre-registered (pre-subscribed) URL. The event notifications can only be received after you subscribe to the event. You can subscribe to one or more events in every project. The event is user data. Like for the table data, you will need the operation permission on the project to create or modify the event. For information about the event Restful API, see Article [1]. Why do we need the ODPS event mechanism? Just consider the following scenario: When User A cares about the operations on Table T (create/delete/insert/modify/...), and Table T is not created by User A, how can User A perceive the operation? One method is to take the initiative to poll this table for a certain operation, but the disadvantage is self-evident. Another method is to register a callback method and when the operation on the table is triggered, User A will receive the notification in a passive way. This method can save the user logic for polling and waiting for the operations on the table. The ODPS event mechanism is the implementation of the second method. In actual production, there are a lot of demands for the above application scenario, and a rich variety of ODPS event application has been realized, such as: • Data map: subscribe to the events of some projects and the meta data of the tables in these projects is displayed according to the event notifications. • Cross-cluster copy: listen to the event notifications to copy corresponding tables. • Ant Financial: rely on the event notification mechanism for workflow management, statistics, authorization and so on. As a matter of fact, there are a large number of users subscribing to the events of tables in every project, and other projects as well. How is the ODPS event mechanism implemented? In this section, we will first regard the ODPS event mechanism as a black box, and introduce its functions and usage from the users’ perspective. Afterwards, we will take this as the starting point for in-depth analysis on the internals of the ODPS event mechanism. Finally, we will propose some thoughts on the current event mechanism. Subscribe to (register) an event and event notifications In network programming, asynchronous programming driven by event notifications is often utilized to reduce the multi-thread stress. For example, libevent[2]. To compile a server program using this library, you can follow the steps below: void on_accept(int sock, short event, void* arg); int main(int argc, char* argv[]) { // create socket s struct sockadddr_in addrin; int s = socket(AF_INET, SOCK_STREAM, 0); BOOL bReuseaddr=TRUE; setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL)); memset(&addrin, 0, sizeof(addrin)); addrin.sin_family = AF_INET; addrin.sin_port = htons(PORT); addrin.sin_addr.s_addr = INADDR_ANY; bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr)); listen(s, BACKLOG); // Create an event base. struct event_base* eb = event_base_new(); // Create an event and bind the callback function. struct event e; event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL); // Register the event. event_base_set(eb, &e); event_add(&e, NULL); // Initiate the event dispatching. event_base_dispatch(eb); return 0; } Extract the logic thread of the above event notifications: Create an event base, create an event and bind the callback function. Then register the event to the event base and initiate the event dispatcher. During this process, the event producer is the socket (the event's multiplex interface bound to this socket, to be strict, such as epoll), the event forwarder is the event base in libevent and event dispatcher, and the event consumer is the callback function for event processing. The same procedure applies to ODPS events. You do not need to create the event base or dispatcher. You first create an event, then bind the callback processing logic and finally register the event to the event base. The code is as follows: public class TestOdpsEvent { /** * Create an event method. */ static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException { Event event = new Event(); event.setName(eventName); // Specify the event name. event.setComment(comment); // Event annotation. event.setType(Event.SourceType.TABLE); // Specify the event type. Currently TABLE is supported. Event.Config config = new Event.Config(); config.setName("source"); config.setValue(tableName); // Specify the event source (that is, the table name). “*” indicates all the tables. event.setConfig(config); event.setUri(new URI(callbackUri)); // A callback address is specified. return event; } public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException { Account account = new AliyunAccount("xxxxxx", "xxxxxx"); Odps odps = new Odps(account); String odps_endpoint = "http://xxxx/xxx"; odps.setEndpoint(odps_endpoint); odps.setDefaultProject("project1"); InternalOdps iodps = new InternalOdps(odps); // Create an event and bind the callback function. String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event"); // Register the event. iodps.events().create(e); // View the created event. Iterator<Event> events = iodps.events().iterator(); while(events.hasNext()) { Event event1 = events.next(); System.out.println("Event found: " + event1.getName()); System.out.println("Event uri: " + event1.getUri()); // iodps.events().delete(event1.getName()); // Delete an event. } } } The code above has specified a callback address. When the table changes, this callback address will be notified. You can perform corresponding processing with the appropriate logic according to the event notification you receive at the callback address. The event callback address, as the entry of event processing logic, supports multiple protocols, including but not limited to Kuafu, HTTP, and HTTPS. Unlike libevent, the producer, forwarder and consumer of ODPS events can be located in different network areas. After you register the event, the ODPS event mechanism will immediately notify the callback address you have registered upon the occurrence of the event. Analysis on the ODPS event mechanism The three parts in Figure 3-1 represent the event registration, notification forwarding and event deletion processes respectively. MessageService is the internal messaging service inside ODPS. It serves to forward event notifications to the callback address that you have registered. To facilitate understanding, we can regard Create topic, Create subscription, and Add endpoint as three operations of event registration on the MessageService layer. The specific implementation of the event mechanism on the MessageService layer will be detailed later on. ![]() Figure 3-1: Event creation, forwarding and deletion In the event registration process shown in Figure 3-1, users' requests are handled by the createEventHandler of OdpsWorker. The createEventHandler checks for the existence of the corresponding MessageService topic, subscription and endpoint in order. If they do not exist, it will create them. The event deletion process shown in Figure 3-1 is relatively simple. Users' requests are handled by the deleteEventHandler of OdpsWorker. The deleteEventHandler deletes the corresponding MessageService subscription directly. In the event forwarding process shown in Figure 3-1, the event producer is mainly the DDL tasks (in fact, HiveServer and CREATETABLE events are also issued from here due to historical reasons). When you execute operations on the meta data of the table, the DDL task will be triggered, such as drop table, add partition, and insert into partition. DDL tasks will send event notifications for corresponding operations. The event notifications are sent to the event forwarder - MessageService. MessageService will send the event notification to the callback address bound to the corresponding event. MessageService acts as the event forwarder and mainly accomplishes the following functions: 1) Maintain the correspondence between different events and callback addresses as the event base (one event corresponds to one or more callback addresses) 2) Match corresponding events according to the event notifications as the event dispatcher, and forward the event notification to various callback addresses of the corresponding event. At present, different events are categorized by two attributes: the project name and event source. The event source is currently the table name. The two key information items are included in the event notifications issued by DDL tasks. MessageService matches events based on the two information items. To introduce the implementation methods of MessageService matching, we need to understand the basic concepts of ODPS MessageService. (For the convenience of understanding, some concepts of MessageService are simplified in this article, for example, the partition concept is hidden. In Article [3], we introduce in detail the design and implementation of the MessageService.) See Figure 3-2: ![]() Figure 3-2: Basic concepts of MessageService ODPS MessageService contains four basic concepts: topic, subscription, filter, and endpoint. MessageService adopts the typical publishing/subscription model. You can create a topic, and create one or more subscriptions (including one or more endpoints) to subscribe to this topic. The message publisher sends messages to the topic. The message is forwarded to all the endpoints of the matched subscriptions of all the filters for the topic. Among them, the topic author, the subscription creator, the message sender and the message receiver can be different users. When you create a subscription, you should specify the filter matcher. The filter of the message should be specified when you send the message. When a message is sent to a topic, the filter in the message needs to match the filter matcher of the various subscriptions of this topic. If the matching succeeds, a copy of this message will be sent to all the endpoints of this subscription. Otherwise, the copy will not be sent to them, and the filter will attempt to match the filter matchers of other subscriptions. Examples and matching rules of the filter and filter_matcher are given below: ![]() This mechanism of MessageService can fulfill the functions as an event forwarder as mentioned above. It expresses an event as a subscription, and an event notification as a message, and every endpoint records one callback address. Every project corresponds to one topic and the filter is used to differentiate event sources. When an event notification is generated, it will be sent to the topic where the project that generates this notification is located. Then after matching, the notification will be forwarded to the corresponding callback addresses of all the endpoints. An example of the event notification message body is as follows: <?xml version="1.0" encoding="UTF-8"?> <Notification> <Account>ALIYUN$odpstest1@aliyun.com</Account> <Project>a_2_test_event</Project> <SourceType>Table</SourceType> <SourceName>backup_partition</SourceName> <Reason>CREATETABLE</Reason> <TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp> <Properties/> <OdpsMessagerId>1</OdpsMessagerId> <OdpsMessagerTime>1474208492</OdpsMessagerTime> </Notification> <?xml version="1.0" encoding="UTF-8"?> <Notification> <Account>ALIYUN$odpstest1@aliyun.com</Account> <Project>a_2_test_event</Project> <SourceType>Table</SourceType> <SourceName>backup_partition</SourceName> <Reason>ADDPARTITION</Reason> <TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp> <Properties> <Property> <Name>Name</Name> <Value>ds=ds1/pt=pt1</Value> </Property> </Properties> <OdpsMessagerId>4</OdpsMessagerId> <OdpsMessagerTime>1474289142</OdpsMessagerTime> </Notification> After you have subscribed to the Table “backup_partition” event of Project a_2_test_event, when the CREATETABLE or ADDPARTITION operation occurs in this table, you will receive notifications regarding the above two events. Every event notification is a message in the XML format. SourceType indicates the resource type of the event that you subscribe to. It may be table event notification or event notification of other types of resources (At present, only table event notification is supported). SourceName indicates the table name that you subscribe to. Reason indicates the operation in the table. In the example above, the operations are creating a table, and adding a partition respectively (more operation types are listed in the Appendix). Properties contains some additional notification attributes which are usually used to specify the table partition for the operation. OdpsMessagerId is unique across all the tables in a project. ODPSMessagerTime is the time when this notification is generated. In the online service environment of ODPS, every project p1 corresponds to a topic named SQL_p1 (Due to historical reasons, the prefix SQL_ is hardcoded. But the prefix doesn't matter as long as it can differentiate the topic for the event mechanism and topics in other applications). This topic will be automatically created when you register the event for the first time (or you can manually create one when you create the project). All the event notifications for the p1 will be sent to this topic. This topic will be deleted when its corresponding project is deleted. ODPS MessageService offers persistence, sequence-preserving and failover functions for event notifications of the event mechanism, striving to eliminate message loss. But it is still hard to achieve elimination of message loss. Next we will analyze the possible message loss cases in the event mechanism: The event producer failure, the MessageService failure, the event receiver failure and the MessageService's hot upgrading. 1) Event producer failure: Before the event notification arrives at MessageService, the event notification producer may fail. The specific probability of message loss is dependent on the persistence configuration, failover capacity and retry mechanism of the event producer. 2) MessageService failure: MessageService failures can be divided into two situations: MessageService failure before the message arrives at MessageService and MessageService failure after the message arrives. In the former case, the message client provided by the MessageService will retry three times at an interval of five milliseconds. If the message has been sent to MessageService successfully, the message will be first processed to be persistent. Only when both of the following two conditions are met will a message in MessageService be deleted: a. The message has been successfully sent; b. The message fails to be sent and the maximum number of retries has been exceeded (Currently MessageService configures 3,600 retries at an interval of 60 seconds). We can see that the biggest risk of event (message) loss lies in the period of time before it arrives at MessageService. 3) Event receiver failure: If the receiver fails after it receives the event notification and before it starts to process the event notification, MessageService does not offer the interface to redeliver the message to the receiver. Of course there is another solution for this issue, that is, to use the MessageService model similar to Kafka as the forwarder, which can ensure the high reliability of event notifications. Kafka model [4] will not take the initiative to push the message, but it only makes the message persistent to achieve a high throughput and high reliability. The message subscriber should provide the range of message ID to pull messages from the partition of a topic. When the subscriber fails and hopes to retrieve history messages, he or she only needs to specify the message ID range. Messages that fall into this range can be retrieved as long as they haven't expired. But the message pulling mode of the Kafka model does not have the full functions as an event dispatcher, and thus cannot support the asynchronous event notification programming mode currently required by ODPS. In fact, the intention for designing ODPS MessageService is the ODPS event notification mechanism (Before MessageService became available, ODPS worker fulfills the duty of MessageService). 4) MessageService's hot upgrading: Despite “hot upgrading” as we call it, the switchover between new and old services also takes some time. An online hot upgrading may take as long as more than four hours in an exaggerated case (the time for completing the switchover of all topics). During the switchover however, the topics of new and old message services in the switchover status refuse to offer services (the topic will resume services after the switchover). In one word, ODPS event notification mechanism guarantees high availability to some extent, but fails to fully eliminate the probability of message loss, with the biggest risk being that the MessageService does not offer services. At this time, the number of lost messages on a certain topic of the MessageService is proportionate to the no-service duration. Summary The ODPS event mechanism brings a lot of convenience for users to listen to the changes in resources. It incorporates universal asynchronous programming models for events, provides user-friendly interfaces and supports online data maps and cross-cluster copy among other numerous services. However, there are still shortcomings such as: 1) Coarse granularity of event listening (subscription/registration) and they cannot be customized: We once received a request from a user who wanted to listen to the CREATETABLE event to a table, but the current mechanism only supports table-level listening. As a result, the user had to filter the various events for the table on his own. 2) The reliability of the event mechanism needs to be further enhanced: We once encountered a four-hour-long MessageService switchover for hot upgrading. One of the reasons is that the message sending from a topic to an endpoint was stuck and could not exit, leading to the loss of many messages on that topic. 3) There are huge gaps between the QPS performance of MessageService producer (the production environment receives messages at 1,000-2,000 QPS), the QPS performance of the consumer (the consuming limit QPS hasn't been tested yet because it is dependent on ) and open-source message services, such as Kafka whose producer QPS is 50,000 and consumer QPS is 22,000[4]. If you want to use ODPS MessageService, you'd better satisfy the following conditions: 1) Allow a small number of message losses, because there does exist a small possibility of message loss; 2) Your event processing system should possess event processing competency and the event-receiving QPS is better to reach more than 500. 3) Unused events (callback to the URL does not get through and the event is not used any more) should be deleted. Otherwise they will leave permanent junk files in the MessageService and pile up messages in Pangu. MessageService cannot determine whether a user event needs to be deleted. It may be somewhat challenging to solve the above-mentioned problems for the time being, but we will constantly improve various functions of the event mechanism, reduce the event loss probability, refine the event subscription granularity and optimize user experience. References [1] Event restful api [2] Libevent: http://libevent.org [3] Odps Message Service [4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7. Appendix Event type list of the ODPS event mechanism When an DDL operation is triggered, the event will send a POST request to the pre-registered URL and the message body format is as below: <?xml version="1.0" encoding="UTF-8"?> <Notification> <Account>ALIYUN$odpstest1@aliyun.com</Account> <Project>a_2_test_event</Project> <SourceType>Table</SourceType> <SourceName>backup_partition</SourceName> <Reason>ADDPARTITION</Reason> <TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp> <Properties> <Property> <Name>Name</Name> <Value>ds=ds1/pt=pt1</Value> </Property> </Properties> <OdpsMessagerId>4</OdpsMessagerId> <OdpsMessagerTime>1474289142</OdpsMessagerTime> </Notification> Where: ![]() • Values of SourceType: Table Limits of use 1) At present, only the project owner can create events and the project owner cannot authorize others to create events. 2) The URL receiving the POST request should return HTTP: Status 200 and the POST request on the server end does not support redirection such as 302 redirection. |
|