Jika Anda tidak memiliki persyaratan terkait urutan hasil kueri, Anda dapat menggunakan fitur pemindaian paralel untuk mendapatkan hasil kueri secara efisien.
Tablestore SDK for Java versi 5.6.0 atau lebih baru mendukung fitur pemindaian paralel. Sebelum menggunakan fitur ini, pastikan Anda telah mengunduh versi yang benar dari Tablestore SDK for Java. Untuk informasi lebih lanjut tentang sejarah versi Tablestore SDK for Java, lihat Sejarah Versi Tablestore SDK for Java.
Prasyarat
Klien Tablestore diinisialisasi. Untuk informasi lebih lanjut, lihat Inisialisasi Klien Tablestore.
Tabel data dibuat dan data ditulis ke dalam tabel tersebut. Untuk informasi lebih lanjut, lihat Buat Tabel Data dan Tulis Data.
Indeks pencarian dibuat untuk tabel data. Untuk informasi lebih lanjut, lihat Buat Indeks Pencarian.
Parameter
Parameter | Deskripsi | |
tableName | Nama tabel data. | |
indexName | Nama indeks pencarian. | |
scanQuery | query | Pernyataan kueri untuk indeks pencarian. Operasi ini mendukung kueri tepat, kueri fuzzy, kueri rentang, kueri geo, dan kueri bersarang, yang mirip dengan operasi Search. |
limit | Jumlah maksimum baris yang dapat dikembalikan oleh setiap pemanggilan ParallelScan. | |
maxParallel | Jumlah maksimum tugas pemindaian paralel per permintaan. Jumlah maksimum tugas pemindaian paralel per permintaan bervariasi berdasarkan volume data. Volume data yang lebih besar memerlukan lebih banyak tugas pemindaian paralel per permintaan. Anda dapat menggunakan operasi ComputeSplits untuk menanyakan jumlah maksimum tugas pemindaian paralel per permintaan. | |
currentParallelId | ID tugas pemindaian paralel dalam permintaan. Nilai valid: [0, Nilai maxParallel) | |
token | Token yang digunakan untuk membagi halaman hasil kueri. Hasil dari permintaan ParallelScan berisi token untuk halaman berikutnya. Anda dapat menggunakan token ini untuk mengambil halaman berikutnya. | |
aliveTime | Masa berlaku tugas pemindaian paralel saat ini. Masa berlaku ini juga merupakan masa berlaku token. Satuan: detik. Nilai default: 60. Kami merekomendasikan Anda menggunakan nilai default. Jika permintaan berikutnya tidak dilakukan dalam periode validitas, tidak ada lagi data yang bisa di-query. Waktu validitas token diperbarui setiap kali Anda mengirimkan permintaan. Catatan Sesi kadaluarsa lebih awal jika indeks dinamis diubah dalam skema, server tunggal gagal, atau load balancing di sisi server dilakukan. Dalam kasus ini, Anda harus membuat ulang sesi. | |
columnsToGet | Nama kolom yang akan dikembalikan dalam hasil pengelompokan. Anda dapat menambahkan nama kolom ke Columns. Jika Anda ingin semua kolom dikembalikan dalam indeks pencarian, Anda dapat menggunakan operasi ReturnAllFromIndex yang lebih ringkas. Penting ReturnAll tidak dapat digunakan di sini. | |
sessionId | ID sesi tugas pemindaian paralel. Anda dapat memanggil operasi ComputeSplits untuk membuat sesi dan menanyakan jumlah maksimum tugas pemindaian paralel yang didukung oleh permintaan pemindaian paralel. | |
Contoh
Anda dapat memindai data menggunakan satu thread atau beberapa thread sesuai dengan kebutuhan bisnis Anda.
Memindai data menggunakan satu thread
Saat menggunakan pemindaian paralel, kode untuk permintaan yang menggunakan satu thread lebih sederhana daripada kode untuk permintaan yang menggunakan beberapa thread. Parameter currentParallelId dan maxParallel tidak diperlukan untuk permintaan yang menggunakan satu thread. Permintaan ParallelScan yang menggunakan satu thread memberikan throughput lebih tinggi dibandingkan dengan permintaan Search, tetapi lebih rendah dibandingkan dengan permintaan ParallelScan yang menggunakan beberapa thread.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
public class Test {
public static List<Row> scanQuery(final SyncClient client) {
String tableName = "<TableName>";
String indexName = "<IndexName>";
// Query the session ID and the maximum number of parallel scan tasks supported by the request.
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
byte[] sessionId = computeSplitsResponse.getSessionId();
int splitsSize = computeSplitsResponse.getSplitsSize();
/*
* Create a parallel scan request.
*/
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.setTableName(tableName);
parallelScanRequest.setIndexName(indexName);
ScanQuery scanQuery = new ScanQuery();
// This query determines the range of the data to scan. You can create a nested and complex query.
Query query = new MatchAllQuery();
scanQuery.setQuery(query);
// Specify the maximum number of rows that can be returned by each ParallelScan call.
scanQuery.setLimit(2000);
parallelScanRequest.setScanQuery(scanQuery);
ColumnsToGet columnsToGet = new ColumnsToGet();
columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
parallelScanRequest.setColumnsToGet(columnsToGet);
parallelScanRequest.setSessionId(sessionId);
/*
* Use builder to create a parallel scan request that has the same features as the preceding request.
*/
ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(2000)
.build())
.addColumnsToGet("col_1", "col_2")
.sessionId(sessionId)
.build();
List<Row> result = new ArrayList<>();
/*
* Use the native API operation to scan data.
*/
{
ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
// Query the token of ScanQuery for the next request.
byte[] nextToken = parallelScanResponse.getNextToken();
// Obtain the data.
List<Row> rows = parallelScanResponse.getRows();
result.addAll(rows);
while (nextToken != null) {
// Specify the token.
parallelScanRequest.getScanQuery().setToken(nextToken);
// Continue to scan the data.
parallelScanResponse = client.parallelScan(parallelScanRequest);
// Obtain the data.
rows = parallelScanResponse.getRows();
result.addAll(rows);
nextToken = parallelScanResponse.getNextToken();
}
}
/*
* Recommended method.
* Use an iterator to scan all matched data. This method has the same query speed but is easier to use compared with the previous method.
*/
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(row);
// Obtain the specific values.
String col_1 = row.getLatestColumn("col_1").getValue().asString();
long col_2 = row.getLatestColumn("col_2").getValue().asLong();
}
}
/*
* If the operation fails, retry the operation. If the caller of this function has a retry mechanism or if you do not want to retry the failed operation, you can ignore this part.
* To ensure availability, we recommend that you start a new parallel scan task when exceptions occur.
* The following exceptions may occur when you send a ParallelScan request:
* 1. A session exception occurs on the server side. The error code is OTSSessionExpired.
* 2. An exception such as a network exception occurs on the client side.
*/
try {
// Execute the processing logic.
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
// Process rows of data. If you have sufficient memory resources, you can add the rows to a list.
result.add(row);
}
}
} catch (Exception ex) {
// Retry the processing logic.
{
result.clear();
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
// Process rows of data. If you have sufficient memory resources, you can add the rows to a list.
result.add(row);
}
}
}
return result;
}
}Memindai data menggunakan beberapa thread
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
// Query the number of CPU cores on the client.
final int cpuProcessors = Runtime.getRuntime().availableProcessors();
// Specify the number of parallel threads for the client. We recommend that you specify the number of CPU cores on the client as the number of parallel threads for the client to prevent impact on the query performance.
final Semaphore semaphore = new Semaphore(cpuProcessors);
// Query the session ID and the maximum number of parallel scan tasks supported by the request.
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
final byte[] sessionId = computeSplitsResponse.getSessionId();
final int maxParallel = computeSplitsResponse.getSplitsSize();
// Create an AtomicLong object if you need to obtain the row count for your business.
AtomicLong rowCount = new AtomicLong(0);
/*
* If you want to perform multithreading by using a function, you can build an internal class to inherit the threads.
* You can also build an external class to organize the code.
*/
final class ThreadForScanQuery extends Thread {
private final int currentParallelId;
private ThreadForScanQuery(int currentParallelId) {
this.currentParallelId = currentParallelId;
this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // Specify the thread name.
}
@Override
public void run() {
System.out.println("start thread:" + this.getName());
try {
// Execute the processing logic.
{
ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.range("col_long").lessThan(10_0000)) // Specify the data to query.
.limit(2000)
.currentParallelId(currentParallelId)
.maxParallel(maxParallel)
.build())
.addColumnsToGet("col_long", "col_keyword", "col_bool") // Specify the fields to return from the search index. To return all fields from the search index, set returnAllColumnsFromIndex to true.
//.returnAllColumnsFromIndex(true)
.sessionId(sessionId)
.build();
// Use an iterator to obtain all the data.
RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
long count = 0;
while (ltr.hasNext()) {
Row row = ltr.next();
// Add a custom processing logic. The following sample code shows how to add a custom processing logic to count the number of rows:
count++;
}
rowCount.addAndGet(count);
System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
}
} catch (Exception ex) {
// If exceptions occur, you can retry the processing logic.
} finally {
semaphore.release();
}
}
}
// Simultaneously execute threads. Valid values of currentParallelId: [0, Value of maxParallel).
List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
threadList.add(thread);
}
// Simultaneously initiate the threads.
for (ThreadForScanQuery thread : threadList) {
// Specify a value for semaphore to limit the number of threads that can be initiated at the same time to prevent bottlenecks on the client.
semaphore.acquire();
thread.start();
}
// The main thread is blocked until all threads are complete.
for (ThreadForScanQuery thread : threadList) {
thread.join();
}
System.out.println("all thread finished! total rows:" + rowCount.get());
}
}