As the number of monitoring metrics and the amount of data traffic increase, monitoring systems become more complex and require higher time efficiency. This topic describes how to build a high concurrency fine-grained monitoring system by using TairTS.

Overview of TairTS

TairTS is a self-developed module of Tair that supports real-time and high concurrency queries and writes based on the multi-threading model of ApsaraDB for Redis Enhanced Edition (Tair). With TairTS, you can update or add to existing time series data, use the gorilla compression algorithm and specific storage to drastically reduce storage costs, and specify time to live (TTL) settings for skeys to make them automatically roll based on time windows. For more information, see TairTS commands.

Overview of fine-grained monitoring

Figure 1. Architecture of fine-grained monitoring
Architecture of fine-grained monitoring

The preceding figure shows the architecture of a fine-grained monitoring system. The console sends fine-grained monitoring configurations to the application, the application writes the configurations to the collector by using the MQ Telemetry Transport (MQTT) protocol, and the collector processes the configuration data and then writes the data to ApsaraDB for Redis databases.

  • High concurrency queries

    During high concurrency queries, TairTS ensures query performance and supports aggregate operations in scenarios such as downsampling, attribute-based filtering, batch query, and the use of multiple numerical functions for multi-level filtering and query. With TairTS, you can perform batch query and aggregation by using a single command to reduce network interaction, receive responses in milliseconds, and identify issues at the earliest opportunity.

  • High concurrency writes

    One collector may be insufficient to handle high concurrency writes as applications become larger. In this regard, TairTS allows you to update or add to existing time series data to ensure the accuracy of concurrent writes to multiple collectors and reduce memory usage. The following code provides an example on how to concurrently write data:

    import com.aliyun.tair.tairts.TairTs;
    import com.aliyun.tair.tairts.params.ExtsAggregationParams;
    import com.aliyun.tair.tairts.params.ExtsAttributesParams;
    import com.aliyun.tair.tairts.results.ExtsSkeyResult;
    import redis.clients.jedis.Jedis;
    
    public class test {
    
        protected static final String HOST = "127.0.0.1";
        protected static final int PORT = 6379;
    
    
        public static void main(String[] args) {
            try {
                Jedis jedis = new Jedis(HOST, PORT, 2000 * 100);
                if (!"PONG".equals(jedis.ping())) {
                    System.exit(-1);
                }
                TairTs tairTs = new TairTs(jedis);
                //Use the following code if you want to work with a cluster instance:
                //TairTsCluster tairTsCluster = new TairTsCluster(jedisCluster);
    
                String pkey = "cpu_load";
                String skey1 = "app1";
                long startTs = (System.currentTimeMillis() - 100000) / 1000 * 1000;
                long endTs = System.currentTimeMillis() / 1000 * 1000;
                String startTsStr = String.valueOf(startTs);
                String endTsStr = String.valueOf(endTs);
    
                tairTs.extsdel(pkey, skey1);
                long num = 5;
    
                //Concurrently update data in Collector A. 
                for (int i = 0; i < num; i++) {
                    double val = i;
                    long ts = startTs + i*1000;
                    String tsStr = String.valueOf(ts);
                    ExtsAttributesParams params = new ExtsAttributesParams();
                    params.dataEt(1000000000);
                    String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params);
                }
    
                ExtsAggregationParams paramsAgg = new ExtsAggregationParams();
                paramsAgg.maxCountSize(10);
                paramsAgg.aggAvg(1000);
    
                System.out.println("Updated result of Collector A:");
                ExtsSkeyResult rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg);
                for (int i = 0; i < num; i++) {
                    System.out.println("    ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue());
                }
    
                //Concurrently update data in Collector B. 
                for (int i = 0; i < num; i++) {
                    double val = i;
                    long ts = startTs + i*1000;
                    String tsStr = String.valueOf(ts);
                    ExtsAttributesParams params = new ExtsAttributesParams();
                    params.dataEt(1000000000);
                    String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params);
                }
    
    
                System.out.println("Updated result of Collector B:");
                rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg);
                for (int i = 0; i < num; i++) {
                    System.out.println("    ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    Execution results:

    Updated result of Collector A:
        ts: 1597049266000, value: 0.0
        ts: 1597049267000, value: 1.0
        ts: 1597049268000, value: 2.0
        ts: 1597049269000, value: 3.0
        ts: 1597049270000, value: 4.0
    Updated result of Collector B:
        ts: 1597049266000, value: 0.0
        ts: 1597049267000, value: 2.0
        ts: 1597049268000, value: 4.0
        ts: 1597049269000, value: 6.0
        ts: 1597049270000, value: 8.0