全部產品
Search
文件中心

AnalyticDB:資料匯入情境

更新時間:Nov 06, 2024

本文以資料匯入情境為例測試AnalyticDB for MySQL的效能,您可以按照本文介紹自行測試對比,快速瞭解產品效能。

測試產品及規格

產品

規格

AnalyticDB for MySQL 3.0 公用雲端版本

彈性模式叢集版,1 Worker (24-Core)。

Elasticsearch 6.7.0

通用商業版,1節點(24-Core)。

AnalyticDB for MySQL 3.0 物理機部署

3台物理機,每台32 vCPU 128 GiB,兩塊SSD磁碟(3.84 TB和960 GB),12塊8 TB的HDD磁碟。

測試環境配置

測試環境

配置

ECS

2個ECS,32 vCPU 128 GiB,NVMe SSD本地碟儲存:3576 GiB。

說明

ECS與AnalyticDB for MySQL、Elasticsearch同地區可用性區域,頻寬充足。

測試方法

  • AnalyticDB for MySQL公用雲端版即時寫入方法:在ECS上,使用Java程式,讀取本地TPC-H的多個分區檔案,基於JDBC多線程匯入,匯入批次2000條一批。其中匯入SQL為insert into lineitem values (...)

  • AnalyticDB for MySQL物理機版即時寫入(寫入後1秒內可查)方法:使用Java程式,讀取本地TPC-H的多個分區檔案,基於JDBC多線程匯入,匯入批次2000條一批,單行資料長度350 b。其中匯入SQL為insert into lineitem values (...)

  • Elasticsearch即時寫入方法:在ECS上,使用Python程式,讀取本地TPC-H的多個分區檔案,基於Elasticsearch庫多線程匯入,匯入批次2000條一批。

說明

本測試中所用資料均來自於TPC-H。更多詳情,請參見TPC-H官網

測試結果

用戶端並發線程數

TPS(AnalyticDB for MySQL公用雲端版)

TPS(AnalyticDB for MySQL物理機版)

TPS(Elasticsearch)

8

33033

120192

12211

16

56816

218472

7165

32

95083

398087

6267

64

153857

643618

5890

128

186732

787572

5516

建表語句

AnalyticDB for MySQL建表語句如下:

CREATE TABLE `lineitem` (
 `l_orderkey` bigint NOT NULL COMMENT '',
 `l_partkey` int NOT NULL COMMENT '',
 `l_suppkey` int NOT NULL COMMENT '',
 `l_linenumber` int NOT NULL COMMENT '',
 `l_quantity` decimal(15, 2) NOT NULL COMMENT '',
 `l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
 `l_discount` decimal(15, 2) NOT NULL COMMENT '',
 `l_tax` decimal(15, 2) NOT NULL COMMENT '',
 `l_returnflag` varchar NOT NULL COMMENT '',
 `l_linestatus` varchar NOT NULL COMMENT '',
 `l_shipdate` date NOT NULL COMMENT '',
 `l_commitdate` date NOT NULL COMMENT '',
 `l_receiptdate` date NOT NULL COMMENT '',
 `l_shipinstruct` varchar NOT NULL COMMENT '',
 `l_shipmode` varchar NOT NULL COMMENT '',
 `l_comment` varchar NOT NULL COMMENT ''
PRIMARY KEY(l_orderkey)
) DISTRIBUTED BY HASH(`l_orderkey`) INDEX_ALL='Y'

Elasticsearch建表語句如下:

curl -X PUT 'http://es_ip:9200/tpch' \
-H 'Content-Type: application/json' \
-d '{
    "settings": {
        "number_of_shards": 32,
        "number_of_replicas" : 2
    },
    "mappings": {
         "lineitem": { 
              "properties": {
               "L_ORDERKEY": {
                  "type": "integer"
               },
               "L_PARTKEY": {
                  "type": "integer"
               },
               "L_SUPPKEY": {
                  "type": "integer"
               },
               "L_LINENUMBER": {
                  "type": "integer"
               },
               "L_QUANTITY": {
                  "type": "double"
               },
               "L_EXTENDEDPRICE": {
                  "type": "double"
               },
               "L_DISCOUNT": {
                  "type": "double"
               },
               "L_TAX": {
                  "type": "double"
               },
               "L_RETURNFLAG": {
                  "type": "keyword"
               },
               "L_LINESTATUS": {
                  "type": "keyword"
               },
               "L_SHIPDATE": {
                  "type": "date"
               },
               "L_COMMITDATE": {
                  "type": "date"
               },
               "L_RECEIPTDATE": {
                  "type": "date"
               },
               "L_SHIPINSTRUCT": {
                  "type": "keyword"
               },
               "L_SHIPMODE": {
                  "type": "keyword"
               },
               "L_COMMENT": {
                  "type": "keyword"
               }
            }
          }
     }
}'

資料匯入指令碼如下:

from threading import Thread
from elasticsearch import Elasticsearch


def func(i):
    es = Elasticsearch(hosts=[
        "es_ip:9200"
    ])
    idx = 0
    with open(r"lineitem.tbl.{}".format(i)) as f:
        actions = []
        while 1:
            r = f.readlines(2000)
            if not r:
                break
            for i in r:
                data = i.split('|')
                body = {
                    'L_ORDERKEY': int(data[0]),
                    'L_PARTKEY': int(data[1]),
                    'L_SUPPKEY': int(data[2]),
                    'L_LINENUMBER': int(data[3]),
                    'L_QUANTITY': float(data[4]),
                    'L_EXTENDEDPRICE': float(data[5]),
                    'L_DISCOUNT': float(data[6]),
                    'L_TAX': float(data[7]),
                    'L_RETURNFLAG': data[8],
                    'L_LINESTATUS': data[9],
                    'L_SHIPDATE': data[10],
                    'L_COMMITDATE': data[11],
                    'L_RECEIPTDATE': data[12],
                    'L_SHIPINSTRUCT': data[13],
                    'L_SHIPMODE': data[14],
                    'L_COMMENT': data[15]
                }
                actions.append({"index": {"_index": "tpch", "_type": "lineitem", "routing": int(data[0])}})
                actions.append(body)
                idx += 1
            es.bulk(actions)
            actions = []
            print(idx)


if __name__ == '__main__':
    for i in range(0, 16):
        Thread(target=func, args=(i + 1,)).start()