本文介紹如何通過編寫代碼的方式,離線匯入巨量資料量到PolarDB-X 1.0資料庫。
背景資訊
假設當前資料庫有一個表需要匯入到PolarDB-X 1.0資料庫中,資料量大致為814萬,表結構如下。
CREATE TABLE `post` (
`postingType` int NOT NULL,
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`acceptedAnswer` bigint(20) DEFAULT NULL,
`parentId` bigint(20) DEFAULT NULL,
`score` int DEFAULT NULL
`tags` varchar(128) DEFAULT NULL,
PRIMARY KEY (`id`)
);
資料庫之間巨量資料量的遷移,建議把未經處理資料匯出成一個文字檔,然後通過程式或者命令的方式匯入到目標資料庫。
對於上一節的 post 表,可以通過 SELECT INTO 文法將資料從MySQL匯出到一個名為stackoverflow.csv的檔案中。在MySQL用戶端執行以下命令:
SELECT postingType,id,acceptedAnswer,parentId,score,tags
INTO OUTFILE '/tmp/stackoverflow.csv'
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM test_table;
在PolarDB-X 1.0資料庫上建表
由於匯出的資料檔案不包括表結構,所以需要手工在PolarDB-X 1.0目標資料庫上建立表,並且根據實際情況設定拆分鍵。
例如以下是按照 id 對 post 表進行分庫。
CREATE TABLE `post` (
`postingType` int NOT NULL,
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`acceptedAnswer` bigint(20) DEFAULT NULL,
`parentId` bigint(20) DEFAULT NULL,
`score` int DEFAULT NULL,
`tags` varchar(128) DEFAULT NULL,
PRIMARY KEY (`id`)
) DBPARTITION BY hash(id) ENGINE=InnoDB DEFAULT CHARSET=utf8;
匯入資料到PolarDB-X 1.0資料庫
匯出資料檔案以後,可以通過代碼的方式讀取檔案內容,然後匯入到PolarDB-X 1.0資料庫中。為了提高效率,建議通過批量插入的方式。
以下是用 Java 寫的一個 Demo。
測試情境:插入8143801條資料,耗時916秒,TPS 在9000左右。
測試用戶端配置:i5、8G、SSD。
測試PolarDB-X 1.0配置:4C4G。
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException,
SQLException {
URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");
File dataFile = new File(url.toURI());
String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) values(?,?,?,?,?,?)";
int batchSize = 10000;
try (
Connection connection = getConnection("XXXXX.drds.aliyuncs.com", 3306, "XXXXX",
"XXXX",
"XXXX");
BufferedReader br = new BufferedReader(new FileReader(dataFile))) {
String line;
PreparedStatement st = connection.prepareStatement(sql);
long startTime = System.currentTimeMillis();
int batchCount = 0;
while ((line = br.readLine()) != null) {
String[] data = line.split(",");
st.setInt(1, Integer.valueOf(data[0]));
st.setInt(2, Integer.valueOf(data[1]));
st.setObject(3, "".equals(data[2]) ? null : Integer.valueOf(data[2]));
st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]));
st.setObject(5, "".equals(data[4]) ? null : Integer.valueOf(data[4]));
if (data.length >= 6) {
st.setObject(6, data[5]);
}
st.addBatch();
if (++batchCount % batchSize == 0) {
st.executeBatch();
System.out.println(String.format("insert %d record", batchCount));
}
}
if (batchCount % batchSize != 0) {
st.executeBatch();
}
long cost = System.currentTimeMillis() - startTime;
System.out.println(String.format("Take %d second,insert %d record, tps %d", cost/1000,batchCount, batchCount/(cost/1000) ));
}
}
/**
* 擷取資料庫連接
*
* @param host 資料庫地址
* @param port 連接埠
* @param database 資料庫名稱
* @param username 使用者名稱
* @param password 密碼
* @return
* @throws ClassNotFoundException
* @throws SQLException
*/
private static Connection getConnection(String host, int port, String database, String username, String password)
throws ClassNotFoundException, SQLException {
Class.forName("com.mysql.jdbc.Driver");
String url = String.format(
"jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port,
database);
Connection con = DriverManager.getConnection(url, username, password);
return con;
}