このトピックでは、コードをコンパイルして大量のデータを分散リレーショナルデータベースサービス (DRDS) データベースにオフラインでインポートする方法について説明します。
背景情報
現在のデータベースのテーブルをDRDSデータベースにインポートする必要があるとします。 インポートされるデータレコードの数は約8.14万です。 テーブルスキーマについては、以下を参照してください。
CREATE TABLE 'post' (
'postingType' int NOT NULL、
'id' bigint(20) 符号なしNOT NULL AUTO_INCREMENT、
'acceptedAnswer' bigint(20) DEFAULT NULL、
'parentId' bigint(20) DEFAULT NULL、
'score' int DEFAULT NULL
'tags' varchar(128) DEFAULT NULL、
主要なキー ('id')
);
データベース間で大量のデータを移行する場合は、ソースデータをテキストファイルとしてエクスポートし、プログラムまたはコマンドを使用してターゲットデータベースにインポートすることをお勧めします。
上記のポストテーブルでは、SELECT INTOを使用して、MySQLデータベースからstackoverflow.csv
という名前のファイルにデータをエクスポートできます。 MySQLクライアントで次のコマンドを実行します。
SELECT postingType,id,acceptedAnswer,parentId, スコア, タグ
INTO OUTFILE '/tmp/stackoverflow.csv'
'、' によって終了するフィールド
最適に '"' で囲まれています
'\n' によって中断されたライン
FROM test_table;
DRDSデータベースにテーブルを作成する
エクスポートされたデータファイルにはテーブルスキーマが含まれていないため、ターゲットDRDSデータベースにテーブルを手動で作成し、実際の状況に応じてシャードキーを設定する必要があります。
たとえば、次のコマンドは、投稿テーブルのデータをIDに基づいてデータベースシャードに分割します。
CREATE TABLE 'post' (
'postingType' int NOT NULL、
'id' bigint(20) 符号なしNOT NULL AUTO_INCREMENT、
'acceptedAnswer' bigint(20) DEFAULT NULL、
'parentId' bigint(20) DEFAULT NULL、
'score' int DEFAULT NULL、
'tags' varchar(128) DEFAULT NULL、
主要なキー ('id')
) DBPARTITION BY hash(id) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DRDSデータベースへのデータのインポート
データファイルをエクスポートした後、コードを使用してファイルの内容を読み取り、その内容をDRDSデータベースにインポートできます。 効率を向上させるために、データをバッチで挿入することを推奨します。
以下は、Javaでコンパイルされたコードのデモです。
テストケース: 8,143,801データレコードが挿入されます。 それは約916秒かかります。 TPSは約9,000です。
テストクライアントの設定: i5、8 GB、SSD
テストDRDSデータベースの構成: 4コア4 GB
public static void main(String[] args) は、IOException、URISyntaxException、ClassNotFoundException、
SQLException {
URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");
ファイルdataFile=新しいファイル (url.toURI());
String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) 値 (?, ?, ?, ?,?)";
int batchSize = 10000;
トライ (
Connection connection = getConnection("XXXXX.drds.aliyuncs.com" 、3306、"XXXXX" 、
"XXXX" 、
"XXXX");
BufferedReader br = new BufferedReader(new FileReader(dataFile)) {
文字列ライン;
PreparedStatement st = connection.prepareStatement(sql);
long startTime = System.currentTimeMillis();
int batchCount = 0;
while (((line = br.readLine())) != null) {
String[] data = line.split(",");
st.setInt(1, Integer.valueOf(data[0]));
st.setInt(2, Integer.valueOf(data[1]));
st.setObject(3, "".equals(data[2]))? null : Integer.valueOf(data[2]);
st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]);
st.setObject(5, "".equals(data[4]))? null : Integer.valueOf(data[4]);
if (data.length >= 6) {
st.setObject(6, data[5]);
}
st.addBatch();
if (++ batchCount % batchSize == 0) {
st.exe cuteBatch();
System.out.println(String.format("insert % d record", batchCount));
}
}
if (batchCount % batchSize != 0) {
st.exe cuteBatch();
}
long cost = System.currentTimeMillis() - startTime;
System.out.println(String.format("Take % d second,insert % d record, tps % d", cost/1000,batchCount, batchCount/(cost/1000) ));
}
}
/**
* データベース接続を取得する
*
* @ paramホストデータベースのアドレス
* @ paramポートポート番号
* @ paramデータベースデータベース名
* @ paramユーザー名ユーザー名
* @ paramパスワードパスワード
* @return
* @ throws ClassNotFoundException
* @ SQLExceptionをスロー
*/
プライベート静的接続getConnection(String host、int port、String database、String username、String password)
throws ClassNotFoundException、SQLException {
Class.forName("com.mysql.jdbc.Driver");
String url = String.format (String.format)
"jdbc:mysql:// % s:% d/% s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true" 、ホスト、ポート、
データベース);
接続con = DriverManager.getConnection(url、ユーザー名、パスワード);
リターンcon;
}