すべてのプロダクト
Search
ドキュメントセンター

Tablestore:TableStoreWriter を使用してデータを同時書き込みする

最終更新日:Mar 28, 2025

このトピックでは、TableStoreWriter を使用して Tablestore に高並列でデータを書き込む方法について説明します。

背景情報

ロギングや IoT トレースなどのシナリオでは、システムは短期間に大量のデータを生成し、データベースに書き込みます。 データベースは高い書き込み並列性と、毎秒数万行、あるいは数百万行もの高い書き込みスループットを提供する必要があります。 ただし、Tablestore の BatchWriteRow 操作を使用して、バッチで最大 200 行までしか書き込むことができません。

TableStoreWriter は、パフォーマンス専有型で使いやすいデータインポートツールクラスであり、Tablestore SDK for Java によって提供されます。 このクラスは、高並列および高スループットでデータをインポートするために使用される操作をカプセル化します。 TableStoreWriter を使用すると、高並列で Tablestore データテーブルにデータを書き込み、行レベルのコールバックとカスタム構成を指定できます。 詳細については、「付録 1:TableStoreWriter の動作原理」をご参照ください。

説明

TableStoreWriter は、Wide Column モデル にのみ適しています。

シナリオ

以下のビジネス要件がある場合は、TableStoreWriter を使用して、ログストレージ、インスタントメッセージ (IM)、分散キューなどのシナリオで Tablestore にデータを書き込むことができます。

  • アプリケーションの高い同時実行性をサポートするために高いスループットが必要です。

  • 単一行のデータの書き込みレイテンシに対する要件はありません。

  • データは非同期で書き込むことができます(プロデューサー - コンシューマーモードを使用できます)。

  • 同じ行のデータを繰り返し書き込むことができます。

前提条件

手順

ステップ 1:Tablestore SDK for Java をインストールする

Maven を使用して Java プロジェクトを管理する場合は、pom.xml ファイルに次の依存関係を追加します。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

Tablestore SDK for Java のインストール方法については、「Tablestore SDK for Java をインストールする」をご参照ください。

ステップ 2:TableStoreWriter を初期化する

TableStoreWriter を初期化するときは、インスタンスとテーブルの情報、および身分認証情報を指定する必要があります。 カスタム TableStoreWriter パラメーターとコールバック関数を指定することもできます。 複数のスレッドで同じ TableStoreWriter オブジェクトを共有することをお勧めします。 次のサンプルコードは、TableStoreWriter を初期化する方法の例を示しています。

TableStoreWriter でサポートされているパラメーターとコールバック関数については、「付録 2:TableStoreWriter パラメーター」および「付録 3:TableStoreWriter コールバック関数」をご参照ください。
private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * ほとんどの場合、TableStoreWriter パラメーターのデフォルト値を保持できます。 ビジネス要件に基づいて他の値に設定することもできます。
     * パラメーターの詳細については、「付録 2:TableStoreWriter パラメーター」をご参照ください。
     * */
    WriterConfig config = new WriterConfig();
    // バッチ書き込みリクエストで Tablestore に書き込むことができる行の最大数を指定します。 デフォルト値:200。
    config.setMaxBatchRowsCount(200); 
    // TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用する並列リクエストの最大数を指定します。 デフォルト値:10。デフォルト値を保持することをお勧めします。                          
    config.setConcurrency(10);    
            
    /**
     * 行レベルのコールバックを構成します。
     * この例では、コールバックは、Tablestore に書き込まれた行数と、Tablestore への書き込みに失敗した行数をカウントするように構成されています。
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** アクセス認証情報を構成します。   **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * 使いやすく、初期化と解放ロジックを分離できる、組み込みのスレッドプールとクライアントを使用することをお勧めします。
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

ステップ 3:データを書き込む

さまざまな追加、削除、変更操作に基づいて RowChanges を構築し、RowChanges を TableStoreWriter に追加できます。

一度に 1 行のデータを書き込む

次のサンプルコードは、一度に 1 行のデータを書き込むことによって、データテーブルに 1,000 行のデータを書き込む方法の例を示しています。

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // バッファ内のデータを Tablestore にフラッシュします。 TableStoreWriter は、flushInterval パラメーターと maxBatchSize パラメーターに基づいて、バッファ内のデータを Tablestore にフラッシュするタイミングを決定します。 flushInterval パラメーターは、バッファ内のデータを Tablestore にフラッシュする間隔を指定します。 maxBatchSize パラメーターは、バッファ内のデータ量に基づいてバッファ内のデータを Tablestore にフラッシュするかどうかを指定します。
    writer.flush();
    
    // Future オブジェクトを表示します。
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

一度に複数行のデータを書き込む

次のサンプルコードは、一度に複数行のデータを書き込むことによって、データテーブルに 1,000 行を書き込む方法の例を示しています。

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // バッファ内のデータを Tablestore にフラッシュします。 TableStoreWriter は、flushInterval パラメーターと maxBatchSize パラメーターに基づいて、バッファ内のデータを Tablestore にフラッシュするタイミングを決定します。 flushInterval パラメーターは、バッファ内のデータを Tablestore にフラッシュする間隔を指定します。 maxBatchSize パラメーターは、バッファ内のデータ量に基づいてバッファ内のデータを Tablestore にフラッシュするかどうかを指定します。
    writer.flush();
    
    // Future オブジェクトを表示します。
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

ステップ 4:TableStoreWriter をシャットダウンする

アプリケーションを終了する前に、TableStoreWriter を手動でシャットダウンすることをお勧めします。 TableStoreWriter をシャットダウンする前に、システムはバッファ内のすべてのデータを Tablestore にフラッシュします。

説明

TableStoreWriter のシャットダウン中またはシャットダウン後に addRowChange メソッドを呼び出してバッファにデータを書き込むと、データが Tablestore に書き込まれない場合があります。

// TableStoreWriter を事前にシャットダウンします。 キュー内のすべてのデータが Tablestore に書き込まれた後、システムはクライアントと内部スレッドプールを自動的にシャットダウンします。
writer.close();

完全なサンプルコード

次のサンプルコードは、データテーブルを作成し、テーブルにデータを同時書き込みする方法の例を示しています。

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    // インスタンスの名前を指定します。
    private static String instanceName = "yourInstanceName";
    // インスタンスのエンドポイントを指定します。
    private static String endpoint = "yourEndpoint";
    // 環境変数から AccessKey ID と AccessKey シークレットを取得します。
    private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * TableStoreWriter を使用する前に、テーブルがすでに存在することを確認してください。
         * 1. TableStoreWriter は、テーブルが存在するかどうかを確認します。
         * 2. 書き込みたいデータのフィールドとフィールドタイプが、テーブルのフィールドとフィールドタイプと同じであるかどうかを確認します。
         * */
        sample.tryCreateTable();

        /**
         * TableStoreWriter を初期化するには、DefaultTablestoreWriter を使用することをお勧めします。
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // インスタンスのエンドポイント。
         *      ServiceCredentials credentials,                                    // アクセス認証情報(AccessKey ペアを含む)。 トークンがサポートされています。
         *      String instanceName,                                               // インスタンスの名前。
         *      String tableName,                                                  // テーブルの名前。 TableStoreWriter は 1 つのテーブルにのみデータを書き込むことができます。
         *      WriterConfig config,                                               // TableStoreWriter の構成。
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // 行レベルのコールバック。
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * Future オブジェクトでは、一度に 1 行のデータを書き込むメソッドが使用されます。
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * Future オブジェクトでは、一度に複数行のデータを書き込むメソッドが使用されます。
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * TableStoreWriter を事前にシャットダウンします。 キュー内のすべてのデータが Tablestore に書き込まれた後、システムはクライアントと内部スレッドプールを自動的にシャットダウンします。
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // バッチ書き込みリクエストで Tablestore に書き込むことができる行の最大数を指定します。 デフォルト値:200。バッチ書き込みリクエストで 200 行を超えるデータを書き込みたい場合は、このパラメーターにより大きな値を指定します。
        config.setMaxBatchRowsCount(200); 
        // TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用する並列リクエストの最大数を指定します。 デフォルト値:10。デフォルト値を保持することをお勧めします。                          
        config.setConcurrency(10);                                   

        /**
         * 行レベルのコールバックを構成します。
         * この例では、コールバックは、Tablestore に書き込まれた行数と、Tablestore への書き込みに失敗した行数をカウントするように構成されています。
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * 使いやすく、初期化と解放ロジックを分離できる、組み込みのスレッドプールとクライアントを使用することをお勧めします。
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // Future オブジェクトを表示します。
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // Future オブジェクトを表示します。
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

この例では、次の結果が返されます。

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

FAQ

Tablestore SDK for Java を使用して Tablestore データテーブルにデータを書き込むときに、「属性列の数が最大値 128 を超えています」というエラーが報告された場合はどうすればよいですか?

付録

付録 1:TableStoreWriter の動作原理

TableStoreWriter は、SDK 処理レイヤーでの操作に基づいて再カプセル化されたツールクラスです。 TableStoreWriter は、次の操作に依存します。

  • AsyncClient。TableStoreWriter の初期化に使用されます。

  • BatchWriteRow。TableStoreWriter を使用して Tablestore にデータをインポートするために使用されます。

  • RetryStrategy。書き込みエラーが発生した場合に TableStoreWriter による再試行に使用されます。

次の図は、コード階層アーキテクチャを示しています。

EN

TableStoreWriter は、パフォーマンスと使いやすさの点で最適化されており、次の機能を提供します。

  • 非同期操作:より少ないスレッドを使用して、より高い並列性を提供します。

  • 自動データ集約:メモリ内のバッファキューを使用して、同時に Tablestore に送信される書き込みリクエストの数を最大化します。 これにより、書き込みスループットが向上します。

  • プロデューサー - コンシューマーモード:このモードは、非同期処理とデータ集約を容易にします。

  • パフォーマンス専有型データ交換キュー:Disruptor RingBuffer は、マルチプロデューサーおよびシングルコンシューマーモードでより良いパフォーマンスを提供します。

  • 複雑なリクエストカプセル化のマスキング:BatchWriteRow 操作を呼び出す詳細はマスキングされます。 事前チェックを使用してダーティデータが除外され、バッチでインポートできる行の数やサイズなどのリクエスト制限が自動的に処理されます。 ダーティデータとは、スキーマがテーブルのスキーマと異なる行、サイズが上限を超える行、または列の数が上限を超える行のことです。

  • 行レベルのコールバック:Tablestore SDK for Java によって提供されるリクエストレベルのコールバックと比較して、TableStoreWriter によって提供される行レベルのコールバックにより、ビジネスロジックは行レベルでデータを処理できます。

  • 行レベルの再試行:リクエストレベルの再試行が失敗した場合、特定のエラーコードに対して行レベルの再試行が実行され、可能な限り高い書き込み成功率が確保されます。

次の図は、TableStoreWriter の動作メカニズムとカプセル化の詳細を示しています。

EN2

付録 2:TableStoreWriter パラメーター

TableStoreWriter を初期化するときは、WriterConfig を変更して、ビジネス要件に基づいて TableStoreWriter パラメーターを指定できます。

パラメーター

タイプ

説明

bucketCount

Integer

TableStoreWriter 内のバケットの数。 デフォルト値:3。バケットは、データをキャッシュするために使用されるバッファに相当します。 このパラメーターを指定して、並列シーケンシャル書き込みリクエストの数を増やすことができます。 マシンのボトルネックに達していない場合、バケットの数は書き込みレートと正の相関があります。

説明

バケットの書き込みモードが同時書き込みの場合は、デフォルト値を保持します。

bufferSize

Integer

メモリ内のバッファキューに含めることができる行の最大数。 デフォルト値:1024。このパラメーターの値は、2 の指数乗の倍数である必要があります。

enableSchemaCheck

ブール値

データをバッファにフラッシュするときにスキーマをチェックするかどうかを指定します。 デフォルト値:true。

  • true の値は、行データをバッファにフラッシュする前に、TableStoreWriter が行データに対して次のチェックを実行することを指定します。 行データが上記のチェックに失敗した場合、TableStoreWriter は行データをダーティデータと見なします。 ダーティデータはバッファにフラッシュされません。

    • 行のプライマリキーのスキーマがテーブルのスキーマと同じであるかどうかを確認します。

    • 行の各プライマリキー列または属性列の値のサイズが上限を超えているかどうかを確認します。

    • 行の属性列の数が上限を超えているかどうかを確認します。

    • 属性列の名前が行のプライマリキー列の名前と同じであるかどうかを確認します。

    • 行のサイズが、リクエストを使用して同時にインポートできるデータの最大量を超えているかどうかを確認します。

  • false の値は、バッファ内の特定の行データがダーティデータの場合、TableStoreWriter は行データを Tablestore に書き込むことができないことを指定します。

dispatchMode

DispatchMode

データをバッファにフラッシュするときに、データをバケットにディスパッチするモード。 有効な値:

  • HASH_PARTITION_KEY:パーティションキーのハッシュ値に基づいてデータをバケットにディスパッチします。 パーティションキーのハッシュ値が同じデータは、同じバケットに順番に書き込まれます。 これはデフォルト値です。

  • HASH_PRIMARY_KEY:プライマリキーのハッシュ値に基づいてデータをバケットにディスパッチします。 プライマリキーのハッシュ値が同じデータは、同じバケットに順番に書き込まれます。

  • ROUND_ROBIN:各バケットを走査して、ループでデータをディスパッチします。 データは異なるバケットにランダムにディスパッチされます。

重要

このパラメーターは、バケットの数が 2 以上の場合にのみ有効になります。

batchRequestType

BatchRequestType

TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用するリクエストのタイプ。 有効な値:

  • BATCH_WRITE_ROW:BatchWriteRowRequest。 これはデフォルト値です。

  • BULK_IMPORT:BulkImportRequest。

concurrency

Integer

TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用する並列リクエストの最大数。 デフォルト値:10。

writeMode

WriteMode

TableStoreWriter がバッファ内のデータを Tablestore に書き込むときに、バケット内のデータが Tablestore に書き込まれるモード。 有効な値:

  • PARALLEL:バケットから Tablestore にデータを並列書き込みし、各バケットから Tablestore に同時書き込みします。 これはデフォルト値です。

  • SEQUENTIAL:バケットから Tablestore にデータを並列書き込みし、各バケットから Tablestore に順番に書き込みします。

allowDuplicatedRowInBatchRequest

ブール値

TableStoreWriter がバッチ書き込みリクエストを作成するときに、プライマリキー値が同じ行を許可するかどうかを指定します。 デフォルト値:true。

重要

データテーブルにセカンダリインデックスが作成されている場合、Tablestore はこのパラメーターの設定を無視し、プライマリキー値が同じ行を許可しません。 この場合、TableStoreWriter はリクエストを作成するときに、プライマリキー値が同じ行を異なるリクエストに追加します。

maxBatchSize

Integer

バッチ書き込みリクエストで Tablestore に書き込むことができるデータの最大量。 単位:バイト。 デフォルトでは、バッチ書き込みリクエストで最大 4 MB のデータを Tablestore に書き込むことができます。

maxBatchRowsCount

Integer

バッチ書き込みリクエストで Tablestore に書き込むことができる行の最大数。 デフォルト値:200。最大値:200。

callbackThreadCount

Integer

TableStoreWriter 内でコールバックを実行するスレッドプールのスレッド数。 このパラメーターのデフォルト値は、プロセッサの数です。

callbackThreadPoolQueueSize

Integer

TableStoreWriter 内でコールバックを実行するスレッドプールのキューサイズ。 デフォルト値:1024。

maxColumnsCount

Integer

データをバッファにフラッシュするときの、行の列の最大数。 デフォルト値:128。

maxAttrColumnSize

Integer

データをバッファにフラッシュするときの、単一属性列の値の最大サイズ。 単位:バイト。 デフォルトでは、各属性列の値のサイズは最大 2 MB です。

maxPKColumnSize

Integer

データをバッファにフラッシュするときの、単一プライマリキー列の値の最大サイズ。 単位:バイト。 デフォルトでは、各プライマリキー列の値のサイズは最大 1 KB です。

flushInterval

Integer

TableStoreWriter がバッファ内のデータを Tablestore に自動的に書き込む間隔。 デフォルト値:10000。単位:ミリ秒。

logInterval

Integer

TableStoreWriter がバッファ内のデータを Tablestore に書き込むときに、タスクステータスが自動的に表示される間隔。 デフォルト値:10000。単位:ミリ秒。

clientMaxConnections

Integer

クライアントが内部的に構築されるときに使用される接続の最大数。 デフォルト値:300。

writerRetryStrategy

WriterRetryStrategy

クライアントが内部的に構築されるときに使用される再試行ポリシー。 有効な値:

  • CERTAIN_ERROR_CODE_NOT_RETRY:特定のエラーコードに対して再試行を実行せず、他のエラーコードに対して再試行を実行します。 これはデフォルト値です。 再試行が実行されないエラーコード:

    • OTSParameterInvalid

    • OTSConditionCheckFail

    • OTSRequestBodyTooLarge

    • OTSInvalidPK

    • OTSOutOfColumnCountLimit

    • OTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY:特定のエラーコードに対して再試行を実行し、他のエラーコードに対して再試行を実行しません。 再試行が実行されるエラーコード:

    • OTSInternalServerError

    • OTSRequestTimeout

    • OTSPartitionUnavailable

    • OTSTableNotReady

    • OTSRowOperationConflict

    • OTSTimeout

    • OTSServerUnavailable

    • OTSServerBusy

構成例

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

付録 3:TableStoreWriter コールバック関数

TableStoreWriter は、コールバックを使用して書き込みの成功または失敗を報告します。 データの行が Tablestore に書き込まれると、TableStoreWriter は onCompleted 関数を呼び出します。 データの行が Tablestore に書き込めないと、TableStoreWriter は例外のカテゴリに基づいて onFailed 関数を呼び出します。

次のサンプルコードは、コールバックを使用して、Tablestore に書き込まれた行数と、Tablestore への書き込みに失敗した行数をカウントする方法の例を示しています。

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        // Tablestore に書き込まれた行数をカウントします。
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        // Tablestore への書き込みに失敗した行数をカウントします。
        failedRows.incrementAndGet();
    }
};