全部产品
Search
文档中心

云原生数据库 PolarDB:使用程序进行大数据导入

更新时间:Oct 09, 2020

本文介绍如何通过编写代码的方式,离线导入大数据量到DRDS数据库。

假设当前数据库有一个表需要导入到DRDS数据库中,数据量大致为814万,以下是目标表的表结构。

  1. CREATE TABLE `post` (
  2. `postingType` int NOT NULL,
  3. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  4. `acceptedAnswer` bigint(20) DEFAULT NULL,
  5. `parentId` bigint(20) DEFAULT NULL,
  6. `score` int DEFAULT NULL
  7. `tags` varchar(128) DEFAULT NULL,
  8. PRIMARY KEY (`id`)
  9. );

导出源数据

数据库之间大数据量的迁移,建议把原始数据导出成一个文本文件,然后通过程序或者命令的方式导入到目标数据库。

对于上一节的post表,可以通过SELECT INTO语法将数据从MySQL导出到一个名为stackoverflow.csv的文件中。在MySQL客户端执行以下命令:

  1. SELECT postingType,id,acceptedAnswer,parentId,score,tags
  2. INTO OUTFILE '/tmp/stackoverflow.csv'
  3. FIELDS TERMINATED BY ','
  4. OPTIONALLY ENCLOSED BY '"'
  5. LINES TERMINATED BY '\n'
  6. FROM test_table;

在DRDS数据库上建表

由于导出的数据文件不包括表结构,所以需要手动在DRDS目标数据库上建立表,并且根据实际情况设置拆分键。

例如以下是按照ID对post表进行分库。

  1. CREATE TABLE `post` (
  2. `postingType` int NOT NULL,
  3. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  4. `acceptedAnswer` bigint(20) DEFAULT NULL,
  5. `parentId` bigint(20) DEFAULT NULL,
  6. `score` int DEFAULT NULL,
  7. `tags` varchar(128) DEFAULT NULL,
  8. PRIMARY KEY (`id`)
  9. ) DBPARTITION BY hash(id) ENGINE=InnoDB DEFAULT CHARSET=utf8;

导入数据到DRDS数据库

导出数据文件以后,可以通过代码的方式读取文件内容,然后导入到DRDS数据库中。为了提高效率,建议通过批量插入的方式。

以下是一个Java示例。

测试场景:插入8143801条数据,耗时916秒,TPS在9000左右。

测试客户端配置:i5、8G、SSD。

测试DRDS配置:4C4G。

  1. public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException,
  2. SQLException {
  3. URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");
  4. File dataFile = new File(url.toURI());
  5. String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) values(?,?,?,?,?,?)";
  6. int batchSize = 10000;
  7. try (
  8. Connection connection = getConnection("XXXXX.drds.aliyuncs.com", 3306, "XXXXX",
  9. "XXXX",
  10. "XXXX");
  11. BufferedReader br = new BufferedReader(new FileReader(dataFile))) {
  12. String line;
  13. PreparedStatement st = connection.prepareStatement(sql);
  14. long startTime = System.currentTimeMillis();
  15. int batchCount = 0;
  16. while ((line = br.readLine()) != null) {
  17. String[] data = line.split(",");
  18. st.setInt(1, Integer.valueOf(data[0]));
  19. st.setInt(2, Integer.valueOf(data[1]));
  20. st.setObject(3, "".equals(data[2]) ? null : Integer.valueOf(data[2]));
  21. st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]));
  22. st.setObject(5, "".equals(data[4]) ? null : Integer.valueOf(data[4]));
  23. if (data.length >= 6) {
  24. st.setObject(6, data[5]);
  25. }
  26. st.addBatch();
  27. if (++batchCount % batchSize == 0) {
  28. st.executeBatch();
  29. System.out.println(String.format("insert %d record", batchCount));
  30. }
  31. }
  32. if (batchCount % batchSize != 0) {
  33. st.executeBatch();
  34. }
  35. long cost = System.currentTimeMillis() - startTime;
  36. System.out.println(String.format("Take %d second,insert %d record, tps %d", cost/1000,batchCount, batchCount/(cost/1000) ));
  37. }
  38. }
  39. /**
  40. * 获取数据库连接
  41. *
  42. * @param host 数据库地址
  43. * @param port 端口
  44. * @param database 数据库名称
  45. * @param username 用户名
  46. * @param password 密码
  47. * @return
  48. * @throws ClassNotFoundException
  49. * @throws SQLException
  50. */
  51. private static Connection getConnection(String host, int port, String database, String username, String password)
  52. throws ClassNotFoundException, SQLException {
  53. Class.forName("com.mysql.jdbc.Driver");
  54. String url = String.format(
  55. "jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port,
  56. database);
  57. Connection con = DriverManager.getConnection(url, username, password);
  58. return con;
  59. }