DISCLAIMER
Please note that all content presented on this page is provided by Tuya (HK) Limited. Alibaba Cloud makes no representations and warranties, explicit or implied, as to the authenticity or accuracy of any such content, nor makes any guarantee to the condition, quality, durability, performance, reliability, merchantability or fitness for a particular purpose, or non-infringement of any products and/or services made available by Tuya (HK) Limited.
Manage Consumer Groups
A consumer group is the identity of a message consumer. Multiple consumers form a message consumer group to access IoT Platform. After the server subscription is configured, IoT Platform forwards the received device messages to the corresponding consumer group, and each message that is forwarded to the consumer group will be received by one of the consumers. This topic describes how to create, view, and delete consumer groups on IoT Platform.
Working Principle
Server Subscription the system forwards messages of the specified type from all devices of the same product to one or more consumer groups, and randomly one consumer in each consumer group receives the message. Different consumer groups are distinguished by the consumer group ID. Each client can only have one consumer group ID. Each consumer group can contain up to 64 clients. You need to create a consumer group first, configure the consumer group ID to the client, and then configure the Server Subscription ●**Subscription 1:** Forwarding messages from product 1 to consumer group 1 and consumer group 2. ●**Subscription 2:** Forwarding messages from product 2 to consumer group 2. Server Subscription only the specified type of messages can be forwarded to all devices of the same product, if more flexible forwarding device messages are required. Such as forwarding partial Device Messages To the client for consumption, you can use the cloud product forwarding function provided by the IoT platform, first specify the device Topic message to forward to the consumer group subscribed by the server, and then forward to the client that consumes the corresponding consumer group.
Create a consumer group
1 Log on to the IoT Platform console 2 In the left navigation bar, choose message Forwarding server Subscription , click consumer Group List tab. 3 Click create a consumer group . 4 In create a consumer group dialog box, enter the group name, click confirm . Consumer group names support Chinese, English letters, Japanese, numbers and underscores ( _ ), the length range is 4 to 30 characters. A Chinese or Japanese account for 2 characters.
View consumer group
You can view the subscribed products and consumption logs in the consumption group. 1 In consumer Group List , find the consumer group you want to view, and click the corresponding view . 2 In consumer Group Details page, click consumption log tab, you can view specific consumption records.
Delete a consumer group
Warning: After the consumer group is deleted, all consumers in the consumer group will stop receiving messages, and the server subscription service related to the consumer group will be unavailable, which may cause User business interruption. Please proceed with caution. The consumer group created by the user can be deleted, but the default Consumer Group of the Internet of Things platform cannot be deleted. 1 Unsubscribe. If the consumer group is associated with a subscription relationship, you must cancel the subscription first. If the consumer group does not have a subscription relationship, ignore this step. 2 In server Subscription of the page consumer Group List tab, click the corresponding consumer group delete and then click confirm .
Configure Server Subscription
When needed real-time acquisition when the data is reported by the device, if you use the cloud API to obtain only the TSL data, and may not be able to obtain it in real time, use server Subscription the function can obtain device reporting messages in real time and reliably on the business server. This article introduces you to the configuration server Subscription operation steps.
Prerequisites
●Product created. ●The consumer group has been created. You can use the default consumer group (DEFAULT_GROUP) or create a consumer group. ●Equipment reported object Model Topic data the format must be matched to trigger a server subscription. Equipment reported custom Topic data no format required.
Restrictions on use
●After the connection is established, the authentication request needs to be sent immediately. If the authentication is not successful within 15 seconds, the server will actively close the connection.
How to configure
step 1: Create a consumer group
1 Create a consumer group as described above.
Step 2: Create a Server Subscription
Create a subscription in the IoT Platform console, associate the consumer group with the corresponding device message type. 1 In the left navigation bar, choose message Forwarding >> server Subscription . 2 In server Subscription page, click create Subscription . 3 In create Subscription dialog box, complete the configuration, click confirm .
Parameters | Description |
Products | IoT platform forwards messages from all devices of the product. A product can only create one server subscription of the same type. |
Subscription Type | Support selection AMQP or Kafka two types. |
Consumer Group | IoT Platform provides default consumer groups. Select consumer Group list, on the right select target consumer group panel, you can select multiple consumer groups, or you can click in the lower right corner create a consumer group . |
Push message type | ●Told model historical data reporting : If the response data of the asynchronous service call is subscribed, the response message returned by the device side Id it must be sent with the Internet of Things platform Id the same, can realize the normal data subscription. ●OTA module version number reporting: forward messages when the device reports the OTA module version number and the version number changes. ●OTA upgrade device status notification including upgrade package validation and bulk upgrade, event notifications for device upgrade success, failure, cancellation, and progress. ●OTA upgrade batch status notification : Notification of equipment OTA upgrade batch status change. ●Device status change notification : The message notified when the online and offline status of the equipment under the product changes. ●Gateway sub-device discovery report : The gateway reports the discovered sub-device information to IoT Platform. Requires application support on the Gateway. Gateway product specific message type. ●Device Topology Change create and release the topological relationship between the sub-device and the gateway. Gateway product specific message type. ●Equipment life cycle changes : Device creation, deletion, and other messages. |
Step 3: Run the client 1 AMQP client
//Please refer to the table below for detailed parameters
// Specify the number of connections to start a single process
private static int connectionCount = 4;
//Asynchronous thread pool for business processing
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 3600, TimeUnit.SECONDS,
new LinkedBlockingQueue(50000));
public static void main(String[] args) throws Exception {
List<Connection> connections = new ArrayList<>();
for (int i = 0; i < connectionCount; i++) {
long timeStamp = System.currentTimeMillis();
//Signature Method
String signMethod = "hmacsha1";
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//Please refer to the AMQP client access documentation for the method of calculating signatures and assembling passwords.
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent, accessSecret, signMethod);
String connectionUrl = "failover:(amqp://" + host + ":5672?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", consumerGroupId);
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create a connection.
Connection connection = cf.createConnection(userName, password);
connections.add(connection);
((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
// Create a session.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create a Receiver connection.
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
logger.info("amqp demo is started successfully, and will exit after 60s ");
// End program execution
Thread.sleep(60 * 1000 * 30);
logger.info("run shutdown");
connections.forEach(c-> {
try {
c.close();
} catch (JMSException e) {
logger.error("failed to close connection", e);
}
});
executorService.shutdown();
if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
logger.info("shutdown success");
} else {
logger.info("failed to handle messages");
}
}
/**
* Here we handle the specific business logic after you receive the message.
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
long generateTime = message.getLongProperty("generateTime");
logger.info("receive message"
+ ",\n topic = " + topic
+ ",\n messageId = " + messageId
+ ",\n generateTime = " + generateTime
+ ",\n content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {...};
/**
* Calculate Signature
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}The parameters are described as follows:
Parameters | Description |
AccessKey | The key ID and key issued by the platform to developers to call the interface, in system Management> Key Management get. |
AccessSecret | |
ConsumerGroupId | The ID of the consumer group in the current IoT platform instance. Log on to the IoT Platform console, in message Forwarding; Server Subscription; Consumer Group Management view the ID of your consumer group. |
ClientId | The client ID, which is user-defined and cannot exceed 64 characters in length. We recommend that you use unique identifiers such as the UUID, MAC address, and IP address of the server where your AMQP client is located. |
SignMethod | Signature method. The following three types are supported: ●hmacmd5 ●hmacsha1 ●hmacsha256 |
Host | AMQP access domain name. ${YourHost}The corresponding AMQP access domain name information, please in the IoT console system Administration> Development Configuration check the host of the AMQP server. |
Kafka Client In the IoT console system Administration> Development Configuration you can obtain the Kafka server address and port.
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "alikafka-post-cn-29t3s87td001-1-vpc.alikafka.aliyuncs.com:9094,alikafka-post-cn-29t3s87td001-2-vpc.alikafka.aliyuncs.com:9094,alikafka-post-cn-29t3s87td001-3-vpc.alikafka.aliyuncs.com:9094"); // Kafka 服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DEFAULT_GROUP"); // Group is your consumer group ID, default is DEFAULT_GROUP
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("DEFAULT_GROUP")); // Subscribe to a topic with your consumption group ID as the default
try {
while (true) {
for (ConsumerRecord<String, String> record : consumer.poll(100)) {
System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}Manage Server subscriptions After setting the server subscription successfully, in server Subscription on the Subscription page, find the name of the subscribed product and perform the following operations. ●Edit : Click Product Correspondence operation column edit , in edit Subscription dialog boxes, modifying consumer Group or push message type . ●Delete : Click Product Correspondence operation column delete . Click confirm .