edit-icon download-icon

Benchmark test of writing capabilities

Last Updated: May 11, 2018

This document introduces how to use the following program to test HiTSD’s data writing capabilities, including multithread writing, batch writing, write throttling, and error retry.

Test file

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>HiTSDBDemo</groupId>
  5. <artifactId>Perf</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>Perf</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>com.alibaba</groupId>
  16. <artifactId>fastjson</artifactId>
  17. <version>1.2.13</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.httpcomponents</groupId>
  21. <artifactId>httpclient</artifactId>
  22. <version>4.3.3</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>junit</groupId>
  26. <artifactId>junit</artifactId>
  27. <version>3.8.1</version>
  28. <scope>test</scope>
  29. </dependency>
  30. </dependencies>
  31. <build>
  32. <plugins>
  33. <plugin>
  34. <artifactId>maven-compiler-plugin</artifactId>
  35. <version>2.3.2</version>
  36. <configuration>
  37. <source>1.6</source>
  38. <target>1.6</target>
  39. <encoding>UTF-8</encoding>
  40. </configuration>
  41. </plugin>
  42. <plugin>
  43. <artifactId>maven-assembly-plugin</artifactId>
  44. <version>2.6</version>
  45. <configuration>
  46. <finalName>PerfDemo</finalName>
  47. <appendAssemblyId>false</appendAssemblyId>
  48. <descriptorRefs>
  49. <descriptorRef>jar-with-dependencies</descriptorRef>
  50. </descriptorRefs>
  51. <archive>
  52. <manifest>
  53. <mainClass>demo.perf.PerfMain</mainClass>
  54. </manifest>
  55. </archive>
  56. </configuration>
  57. <executions>
  58. <execution>
  59. <id>make-assembly-put</id>
  60. <phase>package</phase>
  61. <goals>
  62. <goal>single</goal>
  63. </goals>
  64. </execution>
  65. </executions>
  66. </plugin>
  67. </plugins>
  68. </build>
  69. </project>

src/main/java/demo/perf/PerfMain.java

  1. package demo.perf;
  2. import java.util.ArrayList;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Timer;
  7. import java.util.TimerTask;
  8. import java.util.concurrent.ArrayBlockingQueue;
  9. import java.util.concurrent.BlockingQueue;
  10. import java.util.concurrent.TimeUnit;
  11. import java.util.concurrent.atomic.AtomicLong;
  12. import org.apache.http.HttpResponse;
  13. import org.apache.http.client.methods.HttpPost;
  14. import org.apache.http.entity.StringEntity;
  15. import org.apache.http.impl.client.CloseableHttpClient;
  16. import org.apache.http.impl.client.HttpClients;
  17. import com.alibaba.fastjson.JSON;
  18. public class PerfMain {
  19. // Number of tags of the simulation data
  20. private static final int TAG;
  21. // Number of data points generated per second
  22. private static final int SOURCE_RATE;
  23. // Number of data points written in batch
  24. private static final int BATCH;
  25. // Concurrency
  26. private static final int THREADS;
  27. // Instance address
  28. private static final String HITSDB_ADDR;
  29. // Instance port
  30. private static final int HITSDB_PORT;
  31. public static int getEnvInt(String envName, int value) {
  32. String env = System.getenv(envName);
  33. if (env != null && !env.equals("")) {
  34. try {
  35. value = Integer.parseInt(env);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. return value;
  41. }
  42. // Timeout
  43. private static final int SYNC_TIMEOUT_MS;
  44. // Number of retries
  45. private static final int MAX_TRY;
  46. static {
  47. TAG = getEnvInt("TAG", 6);
  48. SOURCE_RATE = getEnvInt("SOURCE_RATE", 50000);
  49. BATCH = getEnvInt("BATCH", 400);
  50. THREADS = getEnvInt("THREADS", 256);
  51. HITSDB_PORT = getEnvInt("HITSDB_PORT", 8242);
  52. HITSDB_ADDR = System.getenv("HITSDB_ADDR");
  53. MAX_TRY = getEnvInt("MAX_TRY", 4);
  54. SYNC_TIMEOUT_MS = getEnvInt("SYNC_TIMEOUT_MS", 2 * 60 * 1000);
  55. System.out.println("TAG:" + TAG);
  56. System.out.println("SOURCE_RATE:" + SOURCE_RATE);
  57. System.out.println("BATCH:" + BATCH);
  58. System.out.println("THREADS:" + THREADS);
  59. System.out.println("HITSDB_ADDR:" + HITSDB_ADDR);
  60. System.out.println("HITSDB_PORT:" + HITSDB_PORT);
  61. System.out.println("SYNC_TIMEOUT_MS:" + SYNC_TIMEOUT_MS);
  62. System.out.println("MAX_TRY:" + MAX_TRY);
  63. }
  64. // Path into which the data is written
  65. private final String putUrl = "http://" + HITSDB_ADDR + ":" + HITSDB_PORT + "/api/put?sync_timeout="
  66. + SYNC_TIMEOUT_MS;
  67. private final AtomicLong successed = new AtomicLong();
  68. private final DataSource dataSource = new DataSource();
  69. private final ArrayList<Thread> THREAD_POOL = new ArrayList<Thread>();
  70. // Type of the data point
  71. static class DataPoint {
  72. public String metric;
  73. public Long timestamp;
  74. public Double value;
  75. public Map<String, String> tags;
  76. }
  77. class DataSource {
  78. private final BlockingQueue<String> flowQueue = new ArrayBlockingQueue<String>(10000);
  79. private final BlockingQueue<String> dataQueue = new ArrayBlockingQueue<String>(10000);
  80. private final BlockingQueue<String> retryQueue = new ArrayBlockingQueue<String>(10000);
  81. private final Timer timer = new Timer(true);
  82. private final List<Thread> threads = new ArrayList<Thread>();
  83. public void start() {
  84. // Simulation data generator
  85. for (int i = 0; i < 4; ++i) {
  86. Thread thread = new Thread(new Runnable() {
  87. @Override
  88. public void run() {
  89. Long loop = 0L;
  90. // Generate the data model
  91. ArrayList<DataPoint> builder = new ArrayList<DataPoint>();
  92. for (int b = 0; b < BATCH; ++b) {
  93. DataPoint dataPoint = new DataPoint();
  94. dataPoint.timestamp = System.currentTimeMillis();
  95. dataPoint.value = 3.14;
  96. dataPoint.metric = "perf_metric_" + b;
  97. dataPoint.tags = new HashMap<String, String>();
  98. for (int t = 0; t < TAG; ++t) {
  99. dataPoint.tags.put("tagk" + t, "tagv" + t);
  100. }
  101. builder.add(dataPoint);
  102. }
  103. while (true) {
  104. try {
  105. String data = null;
  106. if (retryQueue.size() > 0) {
  107. data = retryQueue.poll(0, TimeUnit.MILLISECONDS);
  108. }
  109. if (data != null) {
  110. dataQueue.put(data);
  111. } else {
  112. // Generate data on each time series
  113. for (int i = 0; i < BATCH; ++i) {
  114. DataPoint dataPoint = builder.get(i);
  115. dataPoint.timestamp += loop * 1000;
  116. }
  117. dataQueue.put(JSON.toJSONString(builder));
  118. ++loop;
  119. }
  120. } catch (Exception e) {
  121. }
  122. }
  123. }
  124. });
  125. thread.setDaemon(true);
  126. thread.start();
  127. threads.add(thread);
  128. }
  129. try {
  130. Thread.sleep(1000);
  131. } catch (InterruptedException e1) {
  132. }
  133. // Traffic control
  134. timer.schedule(new TimerTask() {
  135. @Override
  136. public void run() {
  137. for (int i = 0; i < SOURCE_RATE; i += BATCH) {
  138. String elem = dataQueue.peek();
  139. if (elem == null) {
  140. System.out.println("pull null, data gen too low");
  141. break;
  142. } else {
  143. if (flowQueue.offer(elem)) {
  144. try {
  145. dataQueue.take();
  146. } catch (InterruptedException e) {
  147. break;
  148. }
  149. }
  150. }
  151. }
  152. }
  153. }, 0, 1000);
  154. }
  155. public String pull() throws InterruptedException {
  156. return flowQueue.poll(Long.MAX_VALUE, TimeUnit.SECONDS);
  157. }
  158. public void retryPush(String data) throws InterruptedException {
  159. retryQueue.put(data);
  160. if(retryQueue.size() > 1000) {
  161. System.out.println("Retry queued " + retryQueue.size());
  162. }
  163. }
  164. }
  165. public void start() {
  166. dataSource.start();
  167. // Concurrent writing
  168. for (int i = 0; i < THREADS; ++i) {
  169. Thread thread = new Thread(new Runnable() {
  170. @Override
  171. public void run() {
  172. while (true) {
  173. try {
  174. String data = dataSource.pull();
  175. int try_count = 0;
  176. for (; try_count < MAX_TRY; ++try_count) {
  177. CloseableHttpClient httpClient = HttpClients.createDefault();
  178. HttpPost httpPost = new HttpPost(putUrl);
  179. StringEntity eStringEntity = new StringEntity(data, "utf-8");
  180. eStringEntity.setContentType("application/json");
  181. httpPost.setEntity(eStringEntity);
  182. HttpResponse response = httpClient.execute(httpPost);
  183. int statusCode = response.getStatusLine().getStatusCode();
  184. if (statusCode != 200 && statusCode != 204) {
  185. httpClient.close();
  186. Thread.sleep(100);
  187. } else {
  188. successed.addAndGet(BATCH);
  189. httpClient.close();
  190. break;
  191. }
  192. }
  193. if (try_count == MAX_TRY) {
  194. dataSource.retryPush(data);
  195. }
  196. } catch (Exception e) {
  197. System.err.println(e.getMessage());
  198. e.printStackTrace();
  199. }
  200. }
  201. }
  202. });
  203. thread.setDaemon(true);
  204. thread.start();
  205. THREAD_POOL.add(thread);
  206. }
  207. }
  208. public void test() throws InterruptedException {
  209. // Collect the TPS interval
  210. final Long showTimeMs = 10 * 1000L;
  211. final AtomicLong lastSuccess = new AtomicLong();
  212. start();
  213. Timer tps = new Timer(true);
  214. tps.schedule(new TimerTask() {
  215. @Override
  216. public void run() {
  217. Long nowSuccess = successed.get();
  218. System.out.println("total success: " + nowSuccess.longValue() + " tps: "
  219. + (nowSuccess - lastSuccess.get()) * 1000 / showTimeMs);
  220. lastSuccess.set(nowSuccess);
  221. }
  222. }, showTimeMs, showTimeMs);
  223. }
  224. public static void main(String[] args) throws InterruptedException {
  225. PerfMain demo = new PerfMain();
  226. demo.test();
  227. // Test the execution time
  228. final Long testRunSeconds = (long) getEnvInt("RUN_SECONDS", 5 * 60);
  229. System.out.println("RUN_SECONDS:" + testRunSeconds);
  230. Thread.sleep(testRunSeconds * 1000);
  231. System.exit(0);
  232. }
  233. }

build.sh

  1. #!/bin/sh
  2. mvn -DskipTests=true package
  3. cp target/PerfDemo.jar ./bin

bin/run.sh

  1. #!/bin/sh
  2. SHELL_DIR=$(cd `dirname $0`; pwd)
  3. cd ${SHELL_DIR}
  4. # TAG count
  5. export TAG=8
  6. # Data generation rate
  7. export SOURCE_RATE=50000
  8. # Number of data points written in batch
  9. export BATCH=400
  10. # Concurrency
  11. export THREADS=256
  12. # Address, domain name or IP address into which the data is written
  13. export HITSDB_ADDR="100.81.155.3"
  14. # Port into which the data is written
  15. export HITSDB_PORT=8242
  16. mem=`cat /proc/meminfo | grep MemTotal | awk '{print $2}'`
  17. max_mem_cost=`expr ${mem} / 1024 \* 80 / 100`
  18. java -jar -Xmx${max_mem_cost}m PerfDemo.jar

Build

Depends on JDK 6 and Maven 3.

  1. sh build.sh

Run

  1. # For instructions on the parameter configurations, see the test script.
  2. sh bin/run.sh
Thank you! We've received your feedback.