全部產品
Search
文件中心

Tablestore:使用TableStoreWriter並發寫入資料

更新時間:May 27, 2025

通過本文您可以瞭解在Table Store中如何通過TableStoreWriter實現高並發資料寫入。

背景介紹

在日誌、物聯網(例如軌跡追蹤或溯源)等情境中,系統會在短時間內產生大量的資料,並將資料寫入到資料庫中。因此資料庫需要提供高並發、高吞吐率的寫入效能,滿足每秒上萬行甚至上百萬行的寫入吞吐率,而Table Store的 BatchWriteRow 介面限制單次只能寫入200行資料。

TableStoreWriter是Table Store基於Java SDK實現的簡單易用、高效能的資料匯入工具類,它封裝了用於高並發、高吞吐率資料匯入的介面,可以實現高並發寫入資料,同時支援行層級回調以及自訂配置功能。更多資訊,請參見附錄 1:TableStoreWriter 實現原理

說明

TableStoreWriter只適用於寬表模型

適用情境

如果您的業務情境具備以下特點,則可以使用TableStoreWriter進行資料寫入,典型的應用情境有日誌儲存、即時通訊(IM)系統、分布式隊列等。

  • 高並發,對吞吐率要求很高

  • 對單條資料的寫入延遲沒有要求

  • 寫入可非同步化(可採用生產者消費者模型)

  • 同一條資料可重複寫入

準備工作

操作步驟

步驟 1:安裝 Tablestore SDK

如果您使用的是Maven專案,請在專案的pom.xml檔案中添加如下依賴:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

更多關於安裝Tablestore SDK的資訊,請參見安裝Tablestore SDK

步驟 2:初始化 TableStoreWriter

初始化TableStoreWriter時,您需要配置執行個體和表的資訊、身份認證資訊,也可以自訂TableStoreWriter的配置參數和回呼函數。使用多線程時,建議共用一個TableStoreWriter對象,TableStoreWriter的初始化範例程式碼如下所示。

TableStoreWriter支援的參數配置和回呼函數,請參見附錄 2:TableStoreWriter 配置參數附錄 3:TableStoreWriter 回呼函數
private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * 一般情況下保持預設配置即可,您也可以按需自訂 TableStoreWriter 配置。
     * 更多參數說明請參見“配置 TableStoreWriter”文檔。
     * */
    WriterConfig config = new WriterConfig();
    // 配置一次大量匯入的行數上限,預設值為 200。
    config.setMaxBatchRowsCount(200); 
    // 配置最大並發數,預設值為 10。建議保持預設。                         
    config.setConcurrency(10);    
            
    /**
     * 自訂行層級 Callback。
     * 該樣本通過成功、失敗計數,簡單展示回調能力。
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** 配置訪問憑證。 **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * 推薦使用內部構建的線程池與 Client,方便使用者使用,隔離初始化和釋放的邏輯。
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

步驟 3:寫入資料

您可以根據不同的增刪改操作構造RowChange,然後將RowChange添加到TableStoreWriter中。

單行寫入資料

以下範例程式碼使用單行寫入方式寫入1000行資料。

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // 對緩衝區中的資料進行 flush。TableStoreWriter也會根據flushInterval和maxBatchSize的配置決定緩衝區的flush時機。其中flushInterval是根據時間定期進行flush,maxBatchSize是根據緩衝區的資料量決定是否進行flush。
    writer.flush();
    
    // 列印Future過程。
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

批量寫入資料

以下範例程式碼使用批量寫入方式寫入1000行資料。

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // 對緩衝區中的資料進行 flush。TableStoreWriter也會根據flushInterval和maxBatchSize的配置決定緩衝區的flush時機。其中flushInterval是根據時間定期進行flush,maxBatchSize是根據緩衝區的資料量決定是否進行flush。
    writer.flush();
    
    // 列印Future過程。
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

步驟 4:關閉 TableStoreWriter

建議您在退出應用程式前手動關閉TableStoreWriter。在TableStoreWriter關閉前,系統會先flush掉緩衝區中的所有資料。

說明

如果在TableStoreWriter關閉過程中或者關閉之後仍然調用addRowChange方法向緩衝區中寫入資料,該部分資料不保證會寫入Table Store。

// 主動關閉Writer,內部等候所有隊列資料寫入後,自動關閉client與內部的線程池。
writer.close();

完整範例程式碼

以下樣本用於建立一張新的資料表,並通過並發寫入的方式將資料寫入到資料表中。

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    // yourInstanceName 填寫您的執行個體名稱
    private static String instanceName = "yourInstanceName";
    // yourEndpoint 填寫您的執行個體訪問地址
    private static String endpoint = "yourEndpoint";
    // 擷取環境變數裡的 AccessKey ID 和 AccessKey Secret
    private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * 使用Writer前確保表已存在。
         * 1、writer會校正表的存在性.
         * 2、校正寫入資料是否與表的欄位、類型一致。
         * */
        sample.tryCreateTable();

        /**
         * 初始化建議使用。
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // 執行個體的服務地址。
         *      ServiceCredentials credentials,                                    // 認證資訊:含 AK,也支援 token
         *      String instanceName,                                               // 執行個體名。
         *      String tableName,                                                  // 表名:一個 writer 僅針對一個表。
         *      WriterConfig config,                                               // writer 的配置。
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // 行層級回調。
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * Future使用:單行寫
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * Future使用:批量寫
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * 您需要主動關閉Writer,內部等候所有隊列資料寫入後,自動關閉 client 與內部的線程池。
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // 配置一次大量匯入的行數上限,預設值為 200。如果希望一次寫入超過 200 行資料,請調大該值。
        config.setMaxBatchRowsCount(200); 
        // 配置最大並發數,預設值為 10。建議保持預設即可。                         
        config.setConcurrency(10);                                   

        /**
         * 自訂的行層級 Callback。
         * 該樣本通過成功、失敗計數,簡單展示回調能力。
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * 推薦使用內部構建的線程池與 client,方便使用者使用,隔離初始化和釋放的邏輯。
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // 列印future過程。
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // 列印future過程。
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

執行結果如下所示。

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

常見問題

使用Java SDK寫入資料時報錯:The count of attribute columns exceeds the maximum:128

附錄

附錄 1:TableStoreWriter 實現原理

TableStoreWriter是基於SDK處理層介面重新封裝的工具類,它依賴了以下介面。

  • TableStoreWriter初始化依賴AsyncClient非同步介面。

  • TableStoreWriter匯入資料使用BatchWriteRow介面。

  • TableStoreWriter異常重試依賴RetryStrategy介面。

代碼分層架構如下圖所示。

image

TableStoreWriter在介面效能和易用性上做了最佳化,具備以下特性:

  • 使用非同步介面:使用更少的線程,提供更高的並發。

  • 自動資料彙總:在記憶體中使用緩衝隊列,讓一次發給Table Store的批量寫請求盡量大,提高寫入吞吐率。

  • 採用生產者消費者模式:更易於非同步化和資料聚集。

  • 使用高效能資料交換隊列:選用Disruptor RingBuffer,採用多生產者單消費者的模型,提供更好的效能。

  • 屏蔽複雜請求封裝:屏蔽調用BatchWriteRow介面細節,通過預檢查自動過濾髒資料(例如主鍵格式與表預定義的不符、行大小超限、行列數超限等),自動處理要求節流(例如一次批量的行數限制、一次批量的大小限制等)。

  • 行層級Callback:相對於Table StoreJava SDK提供的請求層級的Callback,TableStoreWriter提供行層級的Callback,讓商務邏輯可以實現行層級資料處理。

  • 行層級重試:請求層級重試失敗會根據特定的錯誤碼進行行層級重試,最大程度保證資料寫入成功率。

TableStoreWriter的實現和封裝細節如下圖所示。

image

附錄 2:TableStoreWriter 配置參數

初始化TableStoreWriter時,您可以通過修改 WriterConfig 自訂TableStoreWriter的配置參數。

參數

類型

說明

bucketCount

整型

Writer內部的分桶數,預設值為3。一個分桶相當於一個緩衝空間,用於緩衝部分資料,此參數可用於提升按序寫並發,當未達機器瓶頸時,分桶數與寫入速率正相關。

說明

當分桶內的寫入模式為並發寫時,保持預設配置即可。

bufferSize

整型

記憶體中緩衝隊列的大小,預設值為1024。參數值必須為2的指數倍。

enableSchemaCheck

布爾值

資料寫入到緩衝區時,是否進行schema檢查,預設值為true。

  • 開啟schema檢查時,在行資料寫入緩衝區前,TableStoreWriter會對該行資料進行如下檢查。如果行資料未通過上述檢查,則TableStoreWriter會判定行資料為髒資料,不會寫入到緩衝區中。

    • 該行的主鍵的schema是否與表定義的相同。

    • 該行的主鍵列或屬性列的值大小是否超過限制。

    • 該行的屬性列的個數是否超過限制。

    • 屬性列中是否有列名與主鍵列相同。

    • 該行的大小是否超過一次批量請求匯入的最巨量資料量限制。

  • 不開啟schema檢查(設定為false)時,如果緩衝區中的部分行資料為髒資料,則TableStoreWriter將行資料寫入到Table Store時,對應行資料會寫入失敗。

dispatchMode

DispatchMode

資料寫入到緩衝區時,將資料分發到分桶內的模式。取值範圍如下:

  • HASH_PARTITION_KEY:預設值,基於分區鍵雜湊值做分桶進行分發,保證同分區的資料處於一桶內按序寫入。

  • HASH_PRIMARY_KEY:基於主鍵雜湊值做分桶進行分發,保證同主鍵的資料處於一個桶內按序寫入。

  • ROUND_ROBIN:迴圈遍曆每個分桶進行分發。資料隨機分散在不同分桶中。

重要

當分桶數大於等於2時,此參數才有效。

batchRequestType

BatchRequestType

Writer將緩衝區資料發送到Table Store時,構建的請求類型。取值範圍如下:

  • BATCH_WRITE_ROW:預設值,構建BatchWriteRowRequest。

  • BULK_IMPORT:構建BulkImportRequest。

concurrency

整型

Writer將緩衝區資料發送到Table Store時的最大請求並發數,預設值為10。

writeMode

WriteMode

Writer將緩衝區資料寫入到Table Store時,每個分桶內資料寫入到Table Store中的模式。取值範圍如下:

  • PARALLEL:預設值,並發寫。不同桶間並發,同一個桶內也會並行請求。

  • SEQUENTIAL:串列寫。不同桶間並發,同一個桶內串列請求。

allowDuplicatedRowInBatchRequest

布爾值

構建批量請求將資料寫入Table Store時,是否允許有主鍵相同的行,預設值為true。

重要

當資料表上存在二級索引時,Table Store會忽略此參數的配置,不允許有主鍵相同的行。此時TableStoreWriter在構建請求時會將主鍵相同的行加入到不同請求中。

maxBatchSize

整型

一次批量請求寫入Table Store的最巨量資料量,預設值為4 MB,單位為位元組。

maxBatchRowsCount

整型

一次批量請求寫入Table Store的最大行數,預設值為200,最大值為200。

callbackThreadCount

整型

Writer內部Callback啟動並執行線程池線程數,預設值為處理器個數。

callbackThreadPoolQueueSize

整型

Writer內部Callback啟動並執行線程池隊列大小,預設值為1024。

maxColumnsCount

整型

寫入資料到緩衝區時,一行資料的最大列數,預設值為128。

maxAttrColumnSize

整型

寫入資料到緩衝區時,單一屬性列值的最大大小,預設值為2 MB,單位為位元組。

maxPKColumnSize

整型

寫入資料到緩衝區時,單一主鍵列值的最大大小,預設值為1 KB,單位為位元組。

flushInterval

整型

Writer將緩衝區資料發送到Table Store時,自動flush緩衝區的時間間隔,預設值為10000,單位為毫秒。

logInterval

整型

Writer將緩衝區資料發送到Table Store時,自動列印任務狀態的時間間隔,預設值為10000,單位為毫秒。

clientMaxConnections

整型

內部構建Client時使用的最大串連數配置,預設值為300。

writerRetryStrategy

WriterRetryStrategy

內部構建Client時使用的重試策略,取值範圍如下:

  • CERTAIN_ERROR_CODE_NOT_RETRY:預設值,給定的錯誤碼不做重試,其它錯誤均會重試。不做重試的錯誤碼包括:

    • OTSParameterInvalid

    • OTSConditionCheckFail

    • OTSRequestBodyTooLarge

    • OTSInvalidPK

    • OTSOutOfColumnCountLimit

    • OTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY:只對給定的錯誤碼進行重試,其他錯誤均不重試。進行重試的錯誤碼包括:

    • OTSInternalServerError

    • OTSRequestTimeout

    • OTSPartitionUnavailable

    • OTSTableNotReady

    • OTSRowOperationConflict

    • OTSTimeout

    • OTSServerUnavailable

    • OTSServerBusy

配置樣本

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

附錄 3:TableStoreWriter 回呼函數

TableStoreWriter通過Callback來反饋寫入行資料的成功或者失敗資訊。如果行資料寫入成功,則Writer會調用 onCompleted 函數,如果行資料寫入失敗,則Writer會根據異常的類別調用對應的 onFailed 函數。

以下樣本用於統計成功和失敗的行資料數量。

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        //統計成功行數。
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        //統計失敗行數。
        failedRows.incrementAndGet();
    }
};