All Products
Search
Document Center

AnalyticDB:Data import scenarios

Last Updated:Nov 05, 2024

This topic describes how to use data import scenarios to test the performance of AnalyticDB for MySQL. You can use these tests to obtain a full understanding of service performance.

Test services and specifications

Service

Specifications

AnalyticDB for MySQL 3.0 in the Alibaba Cloud public cloud

An AnalyticDB for MySQL cluster in elastic mode for Cluster Edition with one worker node (24 cores).

ElasticSearch 6.7.0

An Elasticsearch cluster of the Standard Edition with one node (24 cores).

AnalyticDB for MySQL 3.0 on physical servers

Three physical servers. Each server has 32 vCPUs, 128 GiB of memory, a 3.84 TB SSD, a 960 GB SSD, and twelve 8 TB HDDs.

Test environment

Test environment

Configuration

ECS

Two Elastic Compute Service (ECS) instances with 32 vCPUs, 128 GiB of memory, and 3,576 GiB of local NVMe SSD storage.

Note

The ECS instances must be deployed in the same zone as AnalyticDB for MySQL and Elasticsearch clusters with sufficient bandwidths.

Test methods

  • Method to write data in real time to AnalyticDB for MySQL in the Alibaba Cloud public cloud: On the ECS instances, use the Java program to read multiple local TPC-H fragment files and implement multi-threaded import of 2,000 rows per batch based on JDBC. The following SQL statement is used for the import: INSERT INTO lineitem values (...).

  • Method to write data in real time to AnalyticDB for MySQL on physical servers: On the ECS instances, use the Java program to read multiple local TPC-H fragment files and implement multi-threaded import of 2,000 rows per batch based on JDBC. The size of each row is 350 bits. Import results can be queried in less than 1 second. The following SQL statement is used for the import: INSERT INTO lineitem values (...).

  • Method to write data in real time to Elasticsearch: On the ECS instances, use the Python program to read multiple local TPC-H fragment files and implement multi-threaded import of 2,000 rows per batch based on the Elasticsearch repository.

Note

Data used in the test comes from the TPC-H dataset. For more information. visit the TPC-H official website.

Test results

Number of concurrent threads from the client

TPS (AnalyticDB for MySQL in the Alibaba Cloud public cloud)

TPS (AnalyticDB for MySQL on physical servers)

TPS (Elasticsearch)

8

33033

120192

12211

16

56816

218472

7165

32

95083

398087

6267

64

153857

643618

5890

128

186732

787572

5516

Table creation statements

The following statement is used to create a table in the AnalyticDB for MySQL cluster:

CREATE TABLE `lineitem` (
 `l_orderkey` bigint NOT NULL COMMENT '',
 `l_partkey` int NOT NULL COMMENT '',
 `l_suppkey` int NOT NULL COMMENT '',
 `l_linenumber` int NOT NULL COMMENT '',
 `l_quantity` decimal(15, 2) NOT NULL COMMENT '',
 `l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
 `l_discount` decimal(15, 2) NOT NULL COMMENT '',
 `l_tax` decimal(15, 2) NOT NULL COMMENT '',
 `l_returnflag` varchar NOT NULL COMMENT '',
 `l_linestatus` varchar NOT NULL COMMENT '',
 `l_shipdate` date NOT NULL COMMENT '',
 `l_commitdate` date NOT NULL COMMENT '',
 `l_receiptdate` date NOT NULL COMMENT '',
 `l_shipinstruct` varchar NOT NULL COMMENT '',
 `l_shipmode` varchar NOT NULL COMMENT '',
 `l_comment` varchar NOT NULL COMMENT ''
PRIMARY KEY(l_orderkey)
) DISTRIBUTED BY HASH(`l_orderkey`) INDEX_ALL='Y'

The following statement is used to create a table in the Elasticsearch cluster:

curl -X PUT 'http://es_ip:9200/tpch' \
-H 'Content-Type: application/json' \
-d '{
    "settings": {
        "number_of_shards": 32,
        "number_of_replicas" : 2
    },
    "mappings": {
         "lineitem": { 
              "properties": {
               "L_ORDERKEY": {
                  "type": "integer"
               },
               "L_PARTKEY": {
                  "type": "integer"
               },
               "L_SUPPKEY": {
                  "type": "integer"
               },
               "L_LINENUMBER": {
                  "type": "integer"
               },
               "L_QUANTITY": {
                  "type": "double"
               },
               "L_EXTENDEDPRICE": {
                  "type": "double"
               },
               "L_DISCOUNT": {
                  "type": "double"
               },
               "L_TAX": {
                  "type": "double"
               },
               "L_RETURNFLAG": {
                  "type": "keyword"
               },
               "L_LINESTATUS": {
                  "type": "keyword"
               },
               "L_SHIPDATE": {
                  "type": "date"
               },
               "L_COMMITDATE": {
                  "type": "date"
               },
               "L_RECEIPTDATE": {
                  "type": "date"
               },
               "L_SHIPINSTRUCT": {
                  "type": "keyword"
               },
               "L_SHIPMODE": {
                  "type": "keyword"
               },
               "L_COMMENT": {
                  "type": "keyword"
               }
            }
          }
     }
}'

The following script is used to import data:

from threading import Thread
from elasticsearch import Elasticsearch


def func(i):
    es = Elasticsearch(hosts=[
        "es_ip:9200"
    ])
    idx = 0
    with open(r"lineitem.tbl.{}".format(i)) as f:
        actions = []
        while 1:
            r = f.readlines(2000)
            if not r:
                break
            for i in r:
                data = i.split('|')
                body = {
                    'L_ORDERKEY': int(data[0]),
                    'L_PARTKEY': int(data[1]),
                    'L_SUPPKEY': int(data[2]),
                    'L_LINENUMBER': int(data[3]),
                    'L_QUANTITY': float(data[4]),
                    'L_EXTENDEDPRICE': float(data[5]),
                    'L_DISCOUNT': float(data[6]),
                    'L_TAX': float(data[7]),
                    'L_RETURNFLAG': data[8],
                    'L_LINESTATUS': data[9],
                    'L_SHIPDATE': data[10],
                    'L_COMMITDATE': data[11],
                    'L_RECEIPTDATE': data[12],
                    'L_SHIPINSTRUCT': data[13],
                    'L_SHIPMODE': data[14],
                    'L_COMMENT': data[15]
                }
                actions.append({"index": {"_index": "tpch", "_type": "lineitem", "routing": int(data[0])}})
                actions.append(body)
                idx += 1
            es.bulk(actions)
            actions = []
            print(idx)


if __name__ == '__main__':
    for i in range(0, 16):
        Thread(target=func, args=(i + 1,)).start()