Dave
Assistant Engineer
Assistant Engineer
  • UID627
  • Fans2
  • Follows0
  • Posts55
Reads:1191Replies:0

Optimization practices of high-concurrency IM system architecture

Created#
More Posted time:Jan 20, 2017 14:52 PM
In building social IM and circle-of-friends applications, a basic requirement is to keep a user's friends posted about the user’s messages and updates in the circle of friends in a timely and accurate manner. To achieve this, an incremental serial number or ID is usually required for each message or update by the user. With this mechanism, all messages can be completely processed by receiving terminals in the correct order. If the total number of messages or the concurrency of sent messages is high, the NoSQL storage product is often used to store the messages. However, common NoSQL products do not have the column auto-increment feature and thus external components are usually required to increment the serial numbers and IDs of messages. This complicates the overall architecture and aggravates the latency of the entire link.
Features
The auto-increment feature of primary key columns provided by Table Store can effectively meet the requirements of the scenario above. The specific practices will be to declare a primary key column as an auto-increment column when creating a table. As a result, the application does not have to enter an actual value but a placeholder in the auto-increment column when writing a row of new data. The Table Store automatically generates a value for the auto-increment column upon receiving the row of new data, while ensuring that the value generated later is greater than the one generated earlier in the same partitioning key range.
The auto-increment feature of primary key columns has the following characteristics:
• The architecture exclusive to Table Store and the implementation method of auto-incrementing primary key columns ensure that the value generated for the auto-incrementing column is unique and strictly incrementing.
• Currently, multiple primary keys are supported while the first primary key must be the partitioning key. To ensure the even distribution of data, the partitioning key column cannot be set as the auto-incrementing column.
• Given that restriction, the auto-increment of the primary key column is actually at the partitioning key level.
• Any primary key columns except the partitioning key column can be set as the auto-incrementing column.
• Currently, only one primary key column can be set as the auto-incrementing column for each table.
• The property column is not allowed to be set as auto-incrementing.
• The values generated by the auto-incrementing column are 64-bit signed long integers.
• The auto-incrementing column feature is table-oriented, namely the same instance can have both tables with auto-incrementing columns and tables without auto-incrementing columns.
• The auto-incrementing column can be set only when you create a table. You cannot set an auto-incrementing column in an existing table.
The next section explains how to use the auto-increment feature of primary key columns of Table Store in an actual scenario.
Scenario
In this example scenario, we will build an IM chat tool to describe the roles and usage of the auto-increment feature.
Features
This IM chat tool needs to support the following features:
• One-on-one chat
• Group chat
• Multi-terminal message sync for the same user
Existing architecture
Step 1: Determine a messaging model
 
• The figure above shows the messaging model.
• A message sent by the sender will be pushed to the background system by the sender's client.
• The background system will first store the message temporarily.
• After storing the message successfully, the background system will push the message to the receiver's client.
Step 2: Determine background architecture
 
• The background architecture mainly consists of two parts: the logic layer and the storage layer.
• The logic layer includes the application server, queuing service and auto-increment ID generator. This layer is the core of the entire background architecture and is responsible for executing various core service logic including message receiving, pushing and notification and group message copy-on-write.
• The storage layer is mainly used for persistence of message data and other data as needed.
• For one-on-one chat, the sender first sends a message to the application server. The application server stores the message to the table where the primary key is the receiver while notifying the message push service in the application server of a new message. Finally, the message push service uses the ID of the last message pushed to the receiver as the initial primary key value to read all the messages with an ID greater than that ID from the storage system and push the messages to the receiver.
• For group chat, the messaging logic is even more complicated. Specifically, the logic uses the async queue to complete the extensive writing of messages, namely a message sent to the group will be stored for each member of the group.
 
• The figure above shows the group messaging process without the storage layer.
• The process uses extensive writing but not extensive reading for the following two reasons:
     o The number of members in a group is usually small, and storage costs are low. The cost is even lower with the data compression function enabled.
     o By extensively writing messages to each member's storage space (inbox), the system only needs to check the inbox of each member for pushing messages for each receiver. In this condition, the processing logic of group chat is the same as that of single chat and is easy to implement.
• After a message is sent by the sender, the message will be pushed to the application server by the sender's client. The application server distributes the message to a queue (where the messages to the same receiver are stored in the same queue) according to the receiver ID. In the queue, messages will be processed orderly. In specific, a new message ID will be obtained for a new message from the auto-increment ID generator, and then the message will be written to the Table Store. After the message is written successfully, the next message will be written in the same way above.
• Normally, messages to the same receiver are stored in the same queue. However, a single queue may contain messages to multiple receivers.
• For group chat, two users may send messages at the same time. In this condition, both messages can enter different application servers, which however will send the messages to the same receiver to the same queuing service, namely both messages are in the same queue for the same receiver, as shown in the figure below:
 
• Data in each queue will be processed serially. Whenever a piece of data is written to the Table Store, a new ID will be assigned by the system, which is greater than any previous IDs. To ensure message IDs are strict-incrementing and to avoid strict increment failure because of writing failure of the previous message, a user-level lock is required to prevent other messages from being written until the previous writing operation succeeds when data is written to the storage system. This lock ensures that all messages are written orderly and will be released once a writing operation succeeds.
• In the previous step, all messages need to be reprocessed if the queue fails. In this condition, these messages will be moved to a new queue and new message IDs greater than any previous IDs are required by the new queue. However, the new queue does not know what the greatest ID is so far and requires a global auto-increment ID generator as it cannot generate auto-incrementing IDs by itself.
• To support multiple terminals, a session will be created for each terminal in the application server, and each session holds the ID of the latest message. When a new message is notified, all messages later than the current message will be read from the storage system, ensuring message sync for each terminal without interference when multiple terminals are online at the time, as shown in the figure below.
 
• In multi-terminal scenarios, when an online terminal goes offline, the session of the terminal will be stored to another table by the application server. When the terminal comes online again, the session will be restored from the storage system and unread messages to the terminal will be pushed.
Step 3: Determine a storage system
The Alibaba Cloud Table Store is selected as the storage system for the following reasons:
• Write operations support both the single-row writing and multi-row batch writing modes, meeting high-concurrency data writing requirements.
Range-specific reading and paging are supported for massive messages.
Data lifecycle management is supported to clear expired data automatically, saving storage costs.
• The Alibaba Cloud Table Store is a commercial cloud service with proven reliability and stability.
• The Table Store is cost-effective, and special-offer packages are available for users with mass data scenarios.
• The storage system features a high reading/writing performance. For chat messages, the latency is as low as milliseconds or even microseconds.
Step 4: Determine a table structure
The determined table structure of the Table Store is as follows:


• The table structure of the Table Store consists of two parts: primary key columns and property columns. Primary key columns support up to 4 primary keys while the first primary key is the partitioning key.
• Before using the storage system, you must determine the structure of primary key columns, which cannot be changed during use. Comparatively, property columns are schema-free and customizable. Given that property columns for each row of data can be different, you only need to design the structure of primary key columns.
• The first primary key is the partitioning key, which is designed to ensure the even distribution of data and requests without hotspot congestion. Given that messages are eventually read according to receivers, you can use the receiver's ID as the partitioning key. To ensure a better data distribution, you can also use part of the MD5 value (such as the first 4 characters) of the receiver's ID for the same purpose. By doing this, you can ensure better even data distribution.
• To locate the messages to the receiver, the full receiver's ID must be stored as the first primary key uses only part of the receiver's ID. Therefore, you can use the receiver's ID as the second primary key.
• The third primary key can be the message ID, and the values of this primary key must be monotonically incrementing so that the latest message can be queried.
• Property columns can store message content and metadata.
So far, a complete chat system has been built with high-concurrency processing capabilities and high performance. However, this system is also exposed to certain challenges.
Challenges
• When messages to multiple users are in the same queue, those messages will be processed serially. To ensure the IDs of the messages are strict-incrementing, a lock is required during the processing and this introduces a challenge: if the number of messages to a user is massive, the total number of messages in the queue where the user resides is high and this may congest other users' messages, resulting in delayed messages for other users.
• When the chat message volume becomes massive during major events or some festivals and holidays, the capacity of the queue must be expanded, or the overall system latency may increase dramatically or even the system may crash due to overload.
Problem 2 can be resolved by adding hardware devices but not for problem 1. Is there some solution to both problems?
New architecture
The complexity of both problems lies in forcible strict-incrementing messages. However, using the auto-incrementing feature of primary key columns simplifies the upper application layer.
The new architecture with Table Store **auto-incrementing feature of primary key columns** enabled is as follows:
 
• The major difference of the simplified architecture lies in the removal of two components: the queuing service and the auto-increment ID generator.
• When a message is received by the application server, the message will be directly written to the Table Store. For message_id of the auto-incrementing primary key column, no definite value needs to be entered during data writing but a specific placeholder. Instead, the value will be generated inside the Table Store.
• In the new architecture, auto-increment operations are executed inside the Table Store. Even when multiple application servers write messages to the same receiver in the Table Store, the storage system still can ensure all the messages are processed serially and that each message has a unique and strict-incrementing message ID. Therefore, the queuing service is no longer required in the new architecture. Eventually, this completely resolves problem 1.
• On the other hand, given that the Table Store is a cloud service and supports the pay-as-you-go billing mode, you do not need to consider the capacity of the system and this completely resolves problem 2.
• In the former architecture, messages to the same user must be in the same queue. Comparatively, those messages can be concurrently processed by multiple queues, and a message volume surge for a single user does not affect other users, instead, the load is evenly distributed to all queues.
• By using the auto-increment feature of primary key columns, application servers can directly write data to the Table Store without waiting in queues and obtaining message IDs, delivering a better performance.
Implementation
With the architecture diagram above, you can implement it by using a Java SDK. Currently, Java SDK v4.2.0 already supports the auto-increment feature of primary key columns.
Step 1: Create a table
According to the aforementioned design, the table structure is as follows:


The primary key in the third column is message_id and this column is the auto-incrementing primary key column. When creating the table, set the property and type of the message_id column as AUTO_INCREMENT and INTEGER respectively.
private static void createTable(SyncClient client) {
        TableMeta tableMeta = new TableMeta(“message_table”);

        //The first column is the partitioning key column.
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING));

        //The second column is the receiver's ID column.
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("receive_id", PrimaryKeyType.STRING));

        //The third column is the message ID column. This column is the auto-incrementing column and its type and property are respectively INTEGER and PKO_AUTO_INCREMENT.
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT));

        int timeToLive = -1;  //The data expiration period is set to never. You can also set another data valid period, and expired data will be deleted automatically.
        int maxVersions = 1;  //Keep one version only and multiple versions are supported.

        TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);

        CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);

        client.createTable(request);
    }


By executing the code above, you can create a table where the primary key column in the third column is auto-incrementing.
Step 2: Write data
Currently, PutRow and BatchWriteRow data writing modes are supported and both modes support the auto-increment feature of primary key columns. During data writing, the third column message_id is the auto-incrementing column and no value needs to be entered in this column but a placeholder.
private static void putRow(SyncClient client, String receive_id) {
        //Construct primary key columns.
        PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

        //Values of the first column are the first 4 digits of hash(receive_id).
        primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4)));

        //Values of the second column are receivers' IDs.
        primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id));

        //Values of the third column are message IDs. This column is the auto-incrementing column and its values are generated by Table Store. In this column, no actual value needs to be entered but the placeholder: AUTO_INCREMENT.
        primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT);
        PrimaryKey primaryKey = primaryKeyBuilder.build();

        RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey);

        //The return type is set as RT_PK, which means including the value of the primary key column in the returned result. If ReturnType is not set, no result will return by default.
        rowPutChange.setReturnType(ReturnType.RT_PK);

        //Add a property column whose values are message content.
        rowPutChange.addColumn(new Column("content", ColumnValue.fromString(content)));

        //Write data to Table Store.
        PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange));

        //Print the returned primary key column.
        Row returnRow = response.getRow();
        if (returnRow != null) {
            System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString());
        }

        //Print the consumed CU.
        CapacityUnit  cu = response.getConsumedCapacity().getCapacityUnit();
        System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit());
        System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit());
    }


Step 3: Read data
During data reading, read the latest message using GetRange. The starting value of the message_id primary key column is the message_id value of the previous message plus 1 and the ending value is INF_MAX. In this way, you can always read the latest message and send it to the client.
private static void getRange(SyncClient client, String receive_id, String lastMessageId) {
        RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(“message_table”);

        //Set a starting primary key value.
        PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

        //Values of the first column are the first 4 digits of hash(receive_id).
        primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4)));

        //Values of the second column are receivers' IDs.
        primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id));

        //Values of the third column are message IDs starting from the ID of the previous message.
        primaryKeyBuilder.addPrimaryKeyColumn(“message_id”, PrimaryKeyValue.fromLong(lastMessageId + 1));
        rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

        //Set an ending primary key value.
        primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

        //Values of the first column are the first 4 digits of hash(receive_id).
        primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4)));

        //Values of the second column are receivers' IDs.
        primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id));

        //Values of the third column are message IDs.
        primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.INF_MAX);
        rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

        rangeRowQueryCriteria.setMaxVersions(1);

        System. out. println("GetRange result is:");
        while (true) {
            GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
            for (Row row : getRangeResponse.getRows()) {
                System.out.println(row);
            }

            //If nextStartPrimaryKey is not null, continue the reading.
            if (getRangeResponse.getNextStartPrimaryKey() != null) {
              rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
            } else {
                break;
            }
        }
    }


The example above shows how to apply the Table Store and the auto-increment feature of primary key columns in a chat system. This example is also helpful for other scenarios, and we hope we can explore that together.
Guest