Topik ini menjelaskan cara menggunakan TableStoreWriter untuk menulis data ke Tablestore dengan konkurensi tinggi.
Informasi latar belakang
Dalam skenario seperti pencatatan log dan pelacakan IoT, sistem menghasilkan sejumlah besar data dalam waktu singkat dan menulis data tersebut ke database. Database perlu menyediakan throughput penulisan yang tinggi hingga puluhan ribu atau bahkan jutaan baris per detik. Namun, operasi BatchWriteRow dari Tablestore hanya dapat menulis hingga 200 baris dalam satu batch.
TableStoreWriter adalah alat impor data berperforma tinggi dan mudah digunakan yang disediakan oleh Tablestore SDK untuk Java. Kelas ini membungkus operasi yang digunakan untuk mengimpor data dengan konkurensi dan throughput tinggi. Anda dapat menggunakan TableStoreWriter untuk menulis data ke tabel Tablestore dengan konkurensi tinggi serta menentukan callback tingkat baris dan konfigurasi kustom. Untuk informasi lebih lanjut, lihat Lampiran 1: Prinsip Kerja TableStoreWriter.
TableStoreWriter hanya cocok untuk model Wide Column.
Skenario
Jika Anda memiliki persyaratan bisnis berikut, Anda dapat menggunakan TableStoreWriter untuk menulis data ke Tablestore dalam skenario seperti penyimpanan log, pesan instan, dan antrian terdistribusi:
Throughput tinggi diperlukan untuk mendukung konkurensi tinggi aplikasi.
Tidak ada persyaratan untuk latensi penulisan pada baris data tunggal.
Data dapat ditulis secara asinkron (mode produsen-konsumen dapat digunakan).
Baris data yang sama dapat ditulis ulang.
Prasyarat
Tablestore telah diaktifkan dan sebuah instance telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Tablestore dan Buat Instance.
Pasangan AccessKey dari akun Alibaba Cloud Anda atau Pengguna Resource Access Management (RAM) telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Prosedur
Langkah 1: Instal Tablestore SDK untuk Java
Jika Anda menggunakan Maven untuk mengelola proyek Java, tambahkan dependensi berikut ke file pom.xml:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency> Untuk informasi tentang cara menginstal Tablestore SDK untuk Java, lihat Instal Tablestore SDK untuk Java.
Langkah 2: Inisialisasi TableStoreWriter
Saat menginisialisasi TableStoreWriter, Anda harus menentukan informasi instance dan tabel serta informasi autentikasi identitas. Anda juga dapat menentukan parameter TableStoreWriter kustom dan fungsi callback. Kami merekomendasikan agar beberapa thread berbagi objek TableStoreWriter yang sama. Contoh kode berikut memberikan contoh cara menginisialisasi TableStoreWriter.
Untuk informasi tentang parameter dan fungsi callback yang didukung oleh TableStoreWriter, lihat Lampiran 2: Parameter TableStoreWriter dan Lampiran 3: Fungsi Callback TableStoreWriter.
private static TableStoreWriter createTablesStoreWriter() {
/**
* Dalam kebanyakan kasus, Anda dapat mempertahankan nilai default untuk parameter TableStoreWriter. Anda juga dapat mengatur parameter tersebut ke nilai lain berdasarkan kebutuhan bisnis Anda.
* Untuk informasi lebih lanjut tentang parameter, lihat Lampiran 2: Parameter TableStoreWriter.
* */
WriterConfig config = new WriterConfig();
// Tentukan jumlah maksimum baris yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Nilai default: 200.
config.setMaxBatchRowsCount(200);
// Tentukan jumlah maksimum permintaan paralel yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai default: 10. Kami merekomendasikan agar Anda mempertahankan nilai default.
config.setConcurrency(10);
/**
* Konfigurasikan callback tingkat baris.
* Dalam contoh ini, callback dikonfigurasikan untuk menghitung jumlah baris yang berhasil ditulis ke Tablestore dan jumlah baris yang gagal ditulis ke Tablestore.
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
/** Konfigurasikan kredensial akses. **/
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* Kami merekomendasikan agar Anda menggunakan thread pools dan client bawaan, yang lebih mudah digunakan dan mengisolasi logika inisialisasi dan pelepasan.
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}Langkah 3: Menulis data
Anda dapat membuat RowChanges berdasarkan operasi tambah, hapus, dan modifikasi yang berbeda, lalu tambahkan RowChanges ke TableStoreWriter.
Tulis satu baris data sekaligus
Contoh kode berikut memberikan contoh cara menulis 1.000 baris data ke tabel dengan menulis satu baris data sekaligus:
public void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Mulai]");
System.out.println("Tulis Satu Baris dengan Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Thread penulisan selesai.");
// Flush data dalam buffer ke Tablestore. TableStoreWriter menentukan kapan data dalam buffer diflush ke Tablestore berdasarkan parameter flushInterval dan maxBatchSize. Parameter flushInterval menentukan interval di mana data dalam buffer diflush ke Tablestore. Parameter maxBatchSize menentukan apakah data dalam buffer diflush ke Tablestore berdasarkan jumlah data dalam buffer.
writer.flush();
// Tampilkan objek Future.
// printFutureResult(futures);
System.out.println("=========================================================[Selesai]");
}Tulis beberapa baris data sekaligus
Contoh kode berikut memberikan contoh cara menulis 1.000 baris ke tabel dengan menulis beberapa baris data sekaligus:
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Mulai]");
System.out.println("Tulis Daftar Baris dengan Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Thread penulisan selesai.");
// Flush data dalam buffer ke Tablestore. TableStoreWriter menentukan kapan data dalam buffer diflush ke Tablestore berdasarkan parameter flushInterval dan maxBatchSize. Parameter flushInterval menentukan interval di mana data dalam buffer diflush ke Tablestore. Parameter maxBatchSize menentukan apakah data dalam buffer diflush ke Tablestore berdasarkan jumlah data dalam buffer.
writer.flush();
// Tampilkan objek Future.
// printFutureResult(futures);
System.out.println("=========================================================[Selesai]");
}Langkah 4: Matikan TableStoreWriter
Kami merekomendasikan agar Anda mematikan TableStoreWriter secara manual sebelum keluar dari aplikasi. Sebelum Anda mematikan TableStoreWriter, sistem akan me-flush semua data dalam buffer ke Tablestore.
Jika Anda memanggil metode addRowChange untuk menulis data ke buffer saat Anda sedang mematikan TableStoreWriter atau setelah Anda mematikan TableStoreWriter, data mungkin tidak akan ditulis ke Tablestore.
// Matikan TableStoreWriter secara proaktif. Setelah semua data dalam antrian ditulis ke Tablestore, sistem secara otomatis mematikan client dan thread pool internal.
writer.close();Kode sampel lengkap
Contoh kode berikut memberikan contoh cara membuat tabel dan menulis data secara bersamaan ke tabel:
import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;
import com.aliyuncs.exceptions.ClientException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;
public class TableStoreWriterDemo {
// Tentukan nama instance.
private static String instanceName = "yourInstanceName";
// Tentukan endpoint instance.
private static String endpoint = "yourEndpoint";
// Dapatkan ID AccessKey dan rahasia AccessKey dari variabel lingkungan.
private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static String tableName = "<TABLE_NAME>";
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ClientException {
TableStoreWriterDemo sample = new TableStoreWriterDemo();
/**
* Pastikan tabel sudah ada sebelum Anda menggunakan TableStoreWriter.
* 1. TableStoreWriter memverifikasi apakah tabel ada.
* 2. Periksa apakah bidang dan tipe bidang data yang ingin Anda tulis sama dengan bidang dan tipe bidang tabel.
* */
sample.tryCreateTable();
/**
* Kami merekomendasikan agar Anda menggunakan DefaultTablestoreWriter untuk menginisialisasi TableStoreWriter.
* DefaultTableStoreWriter(
* String endpoint, // Endpoint instance.
* ServiceCredentials credentials, // Kredensial akses, termasuk pasangan AccessKey. Token didukung.
* String instanceName, // Nama instance.
* String tableName, // Nama tabel. TableStoreWriter dapat menulis data ke satu tabel saja.
* WriterConfig config, // Konfigurasi TableStoreWriter.
* TableStoreCallback<RowChange, RowWriteResult> resultCallback // Callback tingkat baris.
* )
* */
TableStoreWriter writer = sample.createTablesStoreWriter();
/**
* Metode untuk menulis satu baris data sekaligus digunakan dalam objek Future.
* */
sample.writeSingleRowWithFuture(writer);
/**
* Metode untuk menulis beberapa baris data sekaligus digunakan dalam objek Future.
* */
//sample.writeRowListWithFuture(writer);
System.out.println("Hitungan oleh TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
System.out.println("Hitungan oleh WriterStatics: " + writer.getWriterStatistics());
/**
* Matikan TableStoreWriter secara proaktif. Setelah semua data dalam antrian ditulis ke Tablestore, sistem secara otomatis mematikan client dan thread pool internal.
* */
writer.close();
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// Tentukan jumlah maksimum baris yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Nilai default: 200. Jika Anda ingin menulis lebih dari 200 baris data dalam permintaan penulisan batch, tentukan nilai yang lebih besar untuk parameter ini.
config.setMaxBatchRowsCount(200);
// Tentukan jumlah maksimum permintaan paralel yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai default: 10. Kami merekomendasikan agar Anda mempertahankan nilai default.
config.setConcurrency(10);
/**
* Konfigurasikan callback tingkat baris.
* Dalam contoh ini, callback dikonfigurasikan untuk menghitung jumlah baris yang berhasil ditulis ke Tablestore dan jumlah baris yang gagal ditulis ke Tablestore.
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* Kami merekomendasikan agar Anda menggunakan thread pools dan client bawaan, yang lebih mudah digunakan dan mengisolasi logika inisialisasi dan pelepasan.
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
private static void tryCreateTable() throws ClientException {
SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
try {
ots.deleteTable(new DeleteTableRequest(tableName));
} catch (Exception e) {
}
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(
tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));
try {
CreateTableResponse res = ots.createTable(request);
} catch (Exception e) {
throw new ClientException(e);
} finally {
ots.shutdown();
}
}
public static void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Mulai]");
System.out.println("Tulis Satu Baris dengan Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Thread penulisan selesai.");
writer.flush();
// Tampilkan objek Future.
// printFutureResult(futures);
System.out.println("=========================================================[Selesai]");
}
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Mulai]");
System.out.println("Tulis Daftar Baris dengan Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Thread penulisan selesai.");
writer.flush();
// Tampilkan objek Future.
// printFutureResult(futures);
System.out.println("=========================================================[Selesai]");
}
private static void printFutureResult(List<Future<WriterResult>> futures) {
int totalRow = 0;
for (int index = 0; index < futures.size(); index++) {
try {
WriterResult result = futures.get(index).get();
totalRow += result.getTotalCount();
System.out.println(String.format(
"Future[%d] selesai:\tgagal: %d\tberhasil: %d\tfutureBatch: %d\ttotalFinished: %d",
index, result.getFailedRows().size(), result.getSucceedRows().size(),
result.getTotalCount(), totalRow));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}Dalam contoh ini, hasil berikut dikembalikan:
=========================================================[Mulai]
Tulis Satu Baris dengan Future
Thread penulisan selesai.
=========================================================[Selesai]
Hitungan oleh TablestoreCallback: failedRow=0, succeedRow=1000
Hitungan oleh WriterStatics: WriterStatistics: {
totalRequestCount=6,
totalRowsCount=1000,
totalSucceedRowsCount=1000,
totalFailedRowsCount=0,
totalSingleRowRequestCount=0,
}FAQ
Lampiran
Lampiran 1: Prinsip kerja TableStoreWriter
TableStoreWriter adalah kelas alat yang direenkapsulasi berdasarkan operasi pada lapisan pemrosesan SDK. TableStoreWriter bergantung pada operasi berikut:
AsyncClient, digunakan untuk menginisialisasi TableStoreWriter.
BatchWriteRow, digunakan untuk mengimpor data ke Tablestore dengan TableStoreWriter.
RetryStrategy, digunakan untuk mencoba kembali oleh TableStoreWriter ketika terjadi kegagalan penulisan.
Gambar berikut menunjukkan arsitektur hierarki kode.

TableStoreWriter dioptimalkan dalam hal performa dan kemudahan penggunaan dan menyediakan fitur-fitur berikut:
Operasi Asinkron: Lebih sedikit thread digunakan untuk menyediakan konkurensi yang lebih tinggi.
Aggregasi Data Otomatis: Antrian buffer digunakan dalam memori untuk memaksimalkan jumlah permintaan penulisan yang dikirim ke Tablestore pada saat yang sama. Ini meningkatkan throughput penulisan.
Mode Produsen-Konsumen: Mode ini memfasilitasi pemrosesan asinkron dan agregasi data.
Antrian Pertukaran Data Berperforma Tinggi: Disruptor RingBuffer memberikan performa yang lebih baik dalam mode multi-produsen dan single-konsumen.
Penyembunyian Enkapsulasi Permintaan Kompleks: Detail pemanggilan operasi BatchWriteRow disembunyikan. Data kotor disaring menggunakan pemeriksaan awal, dan batasan permintaan, seperti batasan pada jumlah atau ukuran baris yang dapat diimpor dalam batch, diproses secara otomatis. Data kotor mengacu pada baris yang skemanya berbeda dari tabel, ukurannya melebihi batas atas, atau di mana jumlah kolom melebihi batas atas.
Callback Tingkat Baris: Dibandingkan dengan callback tingkat permintaan yang disediakan oleh Tablestore SDK untuk Java, callback tingkat baris yang disediakan oleh TableStoreWriter memungkinkan logika bisnis memproses data pada tingkat baris.
Retries Tingkat Baris: Jika retries tingkat permintaan gagal, retries tingkat baris dilakukan untuk kode kesalahan tertentu untuk memastikan tingkat keberhasilan penulisan tertinggi yang mungkin.
Gambar berikut menunjukkan mekanisme kerja dan detail enkapsulasi TableStoreWriter.

Lampiran 2: Parameter TableStoreWriter
Saat Anda menginisialisasi TableStoreWriter, Anda dapat memodifikasi WriterConfig untuk menentukan parameter TableStoreWriter berdasarkan kebutuhan bisnis Anda.
Parameter | Tipe | Deskripsi |
bucketCount | Integer | Jumlah bucket di TableStoreWriter. Nilai default: 3. Bucket setara dengan buffer yang digunakan untuk menyimpan data sementara. Anda dapat menentukan parameter ini untuk meningkatkan jumlah permintaan penulisan berurutan paralel. Jika bottleneck mesin belum tercapai, jumlah bucket berkorelasi positif dengan laju penulisan. Catatan Jika mode penulisan bucket adalah penulisan bersamaan, pertahankan nilai default. |
bufferSize | Integer | Jumlah maksimum baris yang dapat disertakan dalam antrian buffer di memori. Nilai default: 1024. Nilai parameter ini harus merupakan kelipatan eksponensial dari 2. |
enableSchemaCheck | Boolean | Menentukan apakah akan memeriksa skema saat data diflush ke buffer. Nilai default: true.
|
dispatchMode | Mode di mana data didistribusikan ke bucket saat data diflush ke buffer. Nilai yang valid:
Penting Parameter ini hanya berlaku jika jumlah bucket lebih besar dari atau sama dengan 2. | |
batchRequestType | Tipe permintaan yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai yang valid:
| |
concurrency | Integer | Jumlah maksimum permintaan paralel yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai default: 10. |
writeMode | Mode di mana data dalam bucket ditulis ke Tablestore ketika TableStoreWriter menulis data dalam buffer ke Tablestore. Nilai yang valid:
| |
allowDuplicatedRowInBatchRequest | Boolean | Menentukan apakah baris yang memiliki nilai kunci utama yang sama diizinkan ketika TableStoreWriter membuat permintaan penulisan batch. Nilai default: true. Penting Jika indeks sekunder dibuat untuk tabel data, Tablestore mengabaikan pengaturan parameter ini dan tidak mengizinkan baris yang memiliki nilai kunci utama yang sama. Dalam hal ini, TableStoreWriter menambahkan baris yang memiliki nilai kunci utama yang sama ke permintaan yang berbeda ketika TableStoreWriter membuat permintaan. |
maxBatchSize | Integer | Jumlah maksimum data yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Unit: byte. Secara default, hingga 4 MB data dapat ditulis ke Tablestore dalam permintaan penulisan batch. |
maxBatchRowsCount | Integer | Jumlah maksimum baris yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Nilai default: 200. Nilai maksimum: 200. |
callbackThreadCount | Integer | Jumlah thread dalam thread pool yang menjalankan callback di dalam TableStoreWriter. Nilai default parameter ini adalah jumlah Prosesor. |
callbackThreadPoolQueueSize | Integer | Ukuran antrian thread pool yang menjalankan callback di dalam TableStoreWriter. Nilai default: 1024. |
maxColumnsCount | Integer | Jumlah maksimum kolom dalam satu baris saat data diflush ke buffer. Nilai default: 128. |
maxAttrColumnSize | Integer | Ukuran maksimum nilai dari satu kolom atribut saat data diflush ke buffer. Unit: byte. Secara default, nilai setiap kolom atribut dapat mencapai hingga 2 MB. |
maxPKColumnSize | Integer | Ukuran maksimum nilai dari satu kolom kunci utama saat data diflush ke buffer. Unit: byte. Secara default, nilai setiap kolom kunci utama dapat mencapai hingga 1 KB. |
flushInterval | Integer | Interval di mana TableStoreWriter secara otomatis menulis data dalam buffer ke Tablestore. Nilai default: 10000. Unit: milidetik. |
logInterval | Integer | Interval di mana status tugas ditampilkan secara otomatis ketika TableStoreWriter menulis data dalam buffer ke Tablestore. Nilai default: 10000. Unit: milidetik. |
clientMaxConnections | Integer | Jumlah maksimum koneksi yang digunakan saat client dibangun secara internal. Nilai default: 300. |
writerRetryStrategy | Kebijakan retry yang digunakan saat client dibangun secara internal. Nilai yang valid:
|
Contoh konfigurasi
WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);Lampiran 3: Fungsi callback TableStoreWriter
TableStoreWriter menggunakan callback untuk melaporkan keberhasilan atau kegagalan penulisan. Jika baris data berhasil ditulis ke Tablestore, TableStoreWriter memanggil fungsi onCompleted. Jika baris data gagal ditulis ke Tablestore, TableStoreWriter memanggil fungsi onFailed berdasarkan kategori pengecualian.
Contoh kode berikut memberikan contoh cara menggunakan callback untuk menghitung jumlah baris yang berhasil ditulis ke Tablestore dan jumlah baris yang gagal ditulis ke Tablestore:
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
// Hitung jumlah baris yang berhasil ditulis ke Tablestore.
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
// Hitung jumlah baris yang gagal ditulis ke Tablestore.
failedRows.incrementAndGet();
}
};