すべてのプロダクト
Search
ドキュメントセンター

PolarDB:プログラムを通じてビッグデータをインポートする

最終更新日:May 29, 2024

このトピックでは、コードをコンパイルして大量のデータを分散リレーショナルデータベースサービス (DRDS) データベースにオフラインでインポートする方法について説明します。

背景情報

現在のデータベースのテーブルをDRDSデータベースにインポートする必要があるとします。 インポートされるデータレコードの数は約8.14万です。 テーブルスキーマについては、以下を参照してください。

CREATE TABLE 'post' (
  'postingType' int NOT NULL、
  'id' bigint(20) 符号なしNOT NULL AUTO_INCREMENT、
  'acceptedAnswer' bigint(20) DEFAULT NULL、
  'parentId' bigint(20) DEFAULT NULL、
  'score' int DEFAULT NULL
  'tags' varchar(128) DEFAULT NULL、
  主要なキー ('id')
); 

データベース間で大量のデータを移行する場合は、ソースデータをテキストファイルとしてエクスポートし、プログラムまたはコマンドを使用してターゲットデータベースにインポートすることをお勧めします。

上記のポストテーブルでは、SELECT INTOを使用して、MySQLデータベースからstackoverflow.csvという名前のファイルにデータをエクスポートできます。 MySQLクライアントで次のコマンドを実行します。

SELECT postingType,id,acceptedAnswer,parentId, スコア, タグ
INTO OUTFILE '/tmp/stackoverflow.csv'
'、' によって終了するフィールド
最適に '"' で囲まれています
'\n' によって中断されたライン
FROM test_table; 

DRDSデータベースにテーブルを作成する

エクスポートされたデータファイルにはテーブルスキーマが含まれていないため、ターゲットDRDSデータベースにテーブルを手動で作成し、実際の状況に応じてシャードキーを設定する必要があります。

たとえば、次のコマンドは、投稿テーブルのデータをIDに基づいてデータベースシャードに分割します。

CREATE TABLE 'post' (
  'postingType' int NOT NULL、
  'id' bigint(20) 符号なしNOT NULL AUTO_INCREMENT、
  'acceptedAnswer' bigint(20) DEFAULT NULL、
  'parentId' bigint(20) DEFAULT NULL、
  'score' int DEFAULT NULL、
  'tags' varchar(128) DEFAULT NULL、
  主要なキー ('id')
) DBPARTITION BY hash(id) ENGINE=InnoDB DEFAULT CHARSET=utf8; 

DRDSデータベースへのデータのインポート

データファイルをエクスポートした後、コードを使用してファイルの内容を読み取り、その内容をDRDSデータベースにインポートできます。 効率を向上させるために、データをバッチで挿入することを推奨します。

以下は、Javaでコンパイルされたコードのデモです。

テストケース: 8,143,801データレコードが挿入されます。 それは約916秒かかります。 TPSは約9,000です。

テストクライアントの設定: i5、8 GB、SSD

テストDRDSデータベースの構成: 4コア4 GB

public static void main(String[] args) は、IOException、URISyntaxException、ClassNotFoundException、
        SQLException {

        URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");

        ファイルdataFile=新しいファイル (url.toURI());

        String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) 値 (?, ?, ?, ?,?)";

        int batchSize = 10000;

        トライ (

            Connection connection = getConnection("XXXXX.drds.aliyuncs.com" 、3306、"XXXXX" 、
                "XXXX" 、
                "XXXX");
            BufferedReader br = new BufferedReader(new FileReader(dataFile)) {
            文字列ライン;
            PreparedStatement st = connection.prepareStatement(sql);
            long startTime = System.currentTimeMillis();
            int batchCount = 0;
            while (((line = br.readLine())) != null) {
                String[] data = line.split(",");
                st.setInt(1, Integer.valueOf(data[0]));
                st.setInt(2, Integer.valueOf(data[1]));

                st.setObject(3, "".equals(data[2]))? null : Integer.valueOf(data[2]);
                st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]);
                st.setObject(5, "".equals(data[4]))? null : Integer.valueOf(data[4]);
                if (data.length >= 6) {
                    st.setObject(6, data[5]);
                }
                st.addBatch();
                if (++ batchCount % batchSize == 0) {
                    st.exe cuteBatch();
                    System.out.println(String.format("insert % d record", batchCount));
                }
            }
            if (batchCount % batchSize != 0) {
                st.exe cuteBatch();
            }
            long cost = System.currentTimeMillis() - startTime;

            System.out.println(String.format("Take % d second,insert % d record, tps % d", cost/1000,batchCount, batchCount/(cost/1000) ));

        }

    }

    /**
     * データベース接続を取得する
     *
     * @ paramホストデータベースのアドレス
     * @ paramポートポート番号
     * @ paramデータベースデータベース名
     * @ paramユーザー名ユーザー名
     * @ paramパスワードパスワード
     * @return
     * @ throws ClassNotFoundException
     * @ SQLExceptionをスロー
     */
    プライベート静的接続getConnection(String host、int port、String database、String username、String password)
        throws ClassNotFoundException、SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        String url = String.format (String.format)
            "jdbc:mysql:// % s:% d/% s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true" 、ホスト、ポート、
            データベース);
        接続con = DriverManager.getConnection(url、ユーザー名、パスワード);
        リターンcon;
    }