Test specifications

Service Specifications
AnalyticDB for MySQL 3.0 An AnalyticDB for MySQL elastic cluster with one worker node (8 cores).
Elasticsearch 6.7.0 An Elasticsearch cluster of the X-Pack Version with one node (8 cores).
ECS Two ECS instances with 32 vCPUs, 128 GiB of memory, and 3576 GiB of local NVMe SSD storage.
Note The ECS instances and AnalyticDB for MySQL and Elasticsearch clusters are deployed in the same zone with sufficient bandwidths.

Test method

  • Method to write data to AnalyticDB for MySQL in real time: On the ECS instances, use the Java program to read multiple local TPC-H fragment files and implement multi-threaded import of 2,000 entries per batch based on JDBC. The following SQL statement is used for the import: insert into lineitem values (...).
  • Method to write data to Elasticsearch in real time: On the ECS instances, use the Python program to read multiple local TPC-H fragment files and implement multi-threaded import of 2,000 entries per batch based on the Elasticsearch repository.

Test results

Number of concurrent threads from the client TPS (AnalyticDB for MySQL) TPS (Elasticsearch)
8 33033 12211
16 56816 7165
32 95083 6267
64 153857 5890
128 186732 5516

Table creation statements

The following statements are used to create a table in the AnalyticDB for MySQL cluster:

create table `lineitem` (
 `l_orderkey` bigint NOT NULL COMMENT '',
 `l_partkey` int NOT NULL COMMENT '',
 `l_suppkey` int NOT NULL COMMENT '',
 `l_linenumber` int NOT NULL COMMENT '',
 `l_quantity` decimal(15, 2) NOT NULL COMMENT '',
 `l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
 `l_discount` decimal(15, 2) NOT NULL COMMENT '',
 `l_tax` decimal(15, 2) NOT NULL COMMENT '',
 `l_returnflag` varchar NOT NULL COMMENT '',
 `l_linestatus` varchar NOT NULL COMMENT '',
 `l_shipdate` date NOT NULL COMMENT '',
 `l_commitdate` date NOT NULL COMMENT '',
 `l_receiptdate` date NOT NULL COMMENT '',
 `l_shipinstruct` varchar NOT NULL COMMENT '',
 `l_shipmode` varchar NOT NULL COMMENT '',
 `l_comment` varchar NOT NULL COMMENT ''
) DISTRIBUTE BY HASH(`l_orderkey`) INDEX_ALL='Y'

The following statements are used to create a table in the Elasticsearch cluster:

curl -X PUT 'http://es_ip:9200/tpch' \
-H 'Content-Type: application/json' \
-d '{
    "settings": {
        "number_of_shards": 32,
        "number_of_replicas" : 2
    },
    "mappings": {
         "lineitem": { 
              "properties": {
               "L_ORDERKEY": {
                  "type": "integer"
               },
               "L_PARTKEY": {
                  "type": "integer"
               },
               "L_SUPPKEY": {
                  "type": "integer"
               },
               "L_LINENUMBER": {
                  "type": "integer"
               },
               "L_QUANTITY": {
                  "type": "double"
               },
               "L_EXTENDEDPRICE": {
                  "type": "double"
               },
               "L_DISCOUNT": {
                  "type": "double"
               },
               "L_TAX": {
                  "type": "double"
               },
               "L_RETURNFLAG": {
                  "type": "keyword"
               },
               "L_LINESTATUS": {
                  "type": "keyword"
               },
               "L_SHIPDATE": {
                  "type": "date"
               },
               "L_COMMITDATE": {
                  "type": "date"
               },
               "L_RECEIPTDATE": {
                  "type": "date"
               },
               "L_SHIPINSTRUCT": {
                  "type": "keyword"
               },
               "L_SHIPMODE": {
                  "type": "keyword"
               },
               "L_COMMENT": {
                  "type": "keyword"
               }
            }
          }
     }
}'

The following script is used to import data:

from threading import Thread
from elasticsearch import Elasticsearch


def func(i):
    es = Elasticsearch(hosts=[
        "es_ip:9200"
    ])
    idx = 0
    with open(r"lineitem.tbl.{}".format(i)) as f:
        actions = []
        while 1:
            r = f.readlines(2000)
            if not r:
                break
            for i in r:
                data = i.split('|')
                body = {
                    'L_ORDERKEY': int(data[0]),
                    'L_PARTKEY': int(data[1]),
                    'L_SUPPKEY': int(data[2]),
                    'L_LINENUMBER': int(data[3]),
                    'L_QUANTITY': float(data[4]),
                    'L_EXTENDEDPRICE': float(data[5]),
                    'L_DISCOUNT': float(data[6]),
                    'L_TAX': float(data[7]),
                    'L_RETURNFLAG': data[8],
                    'L_LINESTATUS': data[9],
                    'L_SHIPDATE': data[10],
                    'L_COMMITDATE': data[11],
                    'L_RECEIPTDATE': data[12],
                    'L_SHIPINSTRUCT': data[13],
                    'L_SHIPMODE': data[14],
                    'L_COMMENT': data[15]
                }
                actions.append({"index": {"_index": "tpch", "_type": "lineitem", "routing": int(data[0])}})
                actions.append(body)
                idx += 1
            es.bulk(actions)
            actions = []
            print(idx)


if __name__ == '__main__':
    for i in range(0, 16):
        Thread(target=func, args=(i + 1,)).start()