本文介绍如何通过编写代码的方式,离线导入大数据量到DRDS数据库。
假设当前数据库有一个表需要导入到DRDS数据库中,数据量大致为814万,以下是目标表的表结构。
CREATE TABLE `post` (`postingType` int NOT NULL,`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,`acceptedAnswer` bigint(20) DEFAULT NULL,`parentId` bigint(20) DEFAULT NULL,`score` int DEFAULT NULL`tags` varchar(128) DEFAULT NULL,PRIMARY KEY (`id`));
导出源数据
数据库之间大数据量的迁移,建议把原始数据导出成一个文本文件,然后通过程序或者命令的方式导入到目标数据库。
对于上一节的post表,可以通过SELECT INTO语法将数据从MySQL导出到一个名为stackoverflow.csv的文件中。在MySQL客户端执行以下命令:
SELECT postingType,id,acceptedAnswer,parentId,score,tagsINTO OUTFILE '/tmp/stackoverflow.csv'FIELDS TERMINATED BY ','OPTIONALLY ENCLOSED BY '"'LINES TERMINATED BY '\n'FROM test_table;
在DRDS数据库上建表
由于导出的数据文件不包括表结构,所以需要手动在DRDS目标数据库上建立表,并且根据实际情况设置拆分键。
例如以下是按照ID对post表进行分库。
CREATE TABLE `post` (`postingType` int NOT NULL,`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,`acceptedAnswer` bigint(20) DEFAULT NULL,`parentId` bigint(20) DEFAULT NULL,`score` int DEFAULT NULL,`tags` varchar(128) DEFAULT NULL,PRIMARY KEY (`id`)) DBPARTITION BY hash(id) ENGINE=InnoDB DEFAULT CHARSET=utf8;
导入数据到DRDS数据库
导出数据文件以后,可以通过代码的方式读取文件内容,然后导入到DRDS数据库中。为了提高效率,建议通过批量插入的方式。
以下是一个Java示例。
测试场景:插入8143801条数据,耗时916秒,TPS在9000左右。
测试客户端配置:i5、8G、SSD。
测试DRDS配置:4C4G。
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException,SQLException {URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");File dataFile = new File(url.toURI());String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) values(?,?,?,?,?,?)";int batchSize = 10000;try (Connection connection = getConnection("XXXXX.drds.aliyuncs.com", 3306, "XXXXX","XXXX","XXXX");BufferedReader br = new BufferedReader(new FileReader(dataFile))) {String line;PreparedStatement st = connection.prepareStatement(sql);long startTime = System.currentTimeMillis();int batchCount = 0;while ((line = br.readLine()) != null) {String[] data = line.split(",");st.setInt(1, Integer.valueOf(data[0]));st.setInt(2, Integer.valueOf(data[1]));st.setObject(3, "".equals(data[2]) ? null : Integer.valueOf(data[2]));st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]));st.setObject(5, "".equals(data[4]) ? null : Integer.valueOf(data[4]));if (data.length >= 6) {st.setObject(6, data[5]);}st.addBatch();if (++batchCount % batchSize == 0) {st.executeBatch();System.out.println(String.format("insert %d record", batchCount));}}if (batchCount % batchSize != 0) {st.executeBatch();}long cost = System.currentTimeMillis() - startTime;System.out.println(String.format("Take %d second,insert %d record, tps %d", cost/1000,batchCount, batchCount/(cost/1000) ));}}/*** 获取数据库连接** @param host 数据库地址* @param port 端口* @param database 数据库名称* @param username 用户名* @param password 密码* @return* @throws ClassNotFoundException* @throws SQLException*/private static Connection getConnection(String host, int port, String database, String username, String password)throws ClassNotFoundException, SQLException {Class.forName("com.mysql.jdbc.Driver");String url = String.format("jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port,database);Connection con = DriverManager.getConnection(url, username, password);return con;}