Jika Anda tidak memerlukan urutan hasil kueri, Anda dapat menggunakan fitur pemindaian paralel untuk mendapatkan hasil kueri secara lebih efisien.
Tablestore SDK for Java V5.6.0 atau yang lebih baru mendukung fitur pemindaian paralel. Pastikan Anda menggunakan versi Tablestore SDK for Java yang sesuai sebelum mengaktifkan fitur ini. Untuk informasi lebih lanjut tentang riwayat versi Tablestore SDK for Java, lihat Riwayat Versi Tablestore SDK for Java.
Informasi Latar Belakang
Fitur indeks pencarian memungkinkan Anda memanggil operasi Search untuk menggunakan semua fitur kueri serta kemampuan analitik seperti pengurutan dan agregasi. Hasil kueri dikembalikan dari operasi Search dalam urutan tertentu.
Dalam beberapa kasus, misalnya ketika menghubungkan Tablestore ke lingkungan komputasi seperti Spark atau Presto, atau jika Anda ingin menanyai kelompok objek tertentu, kecepatan kueri mungkin lebih penting daripada urutan hasil. Untuk meningkatkan kecepatan kueri, Tablestore menyediakan operasi ParallelScan untuk fitur indeks pencarian.
Dibandingkan dengan operasi Search, operasi ParallelScan mendukung semua fitur kueri tetapi tidak menyediakan kemampuan analitik seperti pengurutan dan agregasi. Dengan pendekatan ini, kecepatan kueri meningkat lebih dari lima kali lipat. Anda dapat memanggil operasi ParallelScan untuk mengekspor ratusan juta baris data dalam satu menit. Kemampuan ekspor data dapat diskalakan secara horizontal tanpa batas atas.
Jumlah maksimum baris yang dapat dikembalikan oleh setiap panggilan ParallelScan lebih besar dibandingkan dengan jumlah maksimum baris yang dapat dikembalikan oleh setiap panggilan Search. Operasi Search mengembalikan hingga 100 baris per panggilan, sedangkan operasi ParallelScan mengembalikan hingga 2.000 baris per panggilan. Fitur pemindaian paralel memungkinkan Anda menggunakan beberapa utas untuk memulai permintaan secara paralel dalam sesi, sehingga mempercepat proses ekspor data.
Skenario
Gunakan operasi Search jika Anda ingin mengurutkan atau mengumpulkan hasil kueri, atau jika permintaan kueri berasal dari pengguna akhir.
Gunakan operasi ParallelScan jika Anda tidak memerlukan urutan hasil kueri dan ingin mengembalikan semua hasil yang cocok secara efisien, atau jika data ditarik oleh lingkungan komputasi seperti Spark atau Presto.
Fitur
Berikut adalah perbedaan antara operasi ParallelScan dan operasi Search:
Hasil yang stabil
Tugas pemindaian paralel bersifat stateful. Dalam sesi, set hasil data yang dipindai ditentukan oleh status data saat permintaan pertama diinisiasi. Jika data dimasukkan atau dimodifikasi setelah permintaan pertama dikirim, set hasil tidak terpengaruh.
Sesi
PentingJika ID sesi sulit diperoleh, Anda dapat memanggil operasi ParallelScan tanpa ID sesi. Namun, jika Anda mengirim permintaan tanpa ID sesi, ada kemungkinan sangat rendah terjadinya data duplikat dalam set hasil.
Operasi terkait pemindaian paralel menggunakan sesi. ID sesi digunakan untuk menentukan set hasil data yang dipindai. Berikut adalah cara memperoleh dan menggunakan ID sesi:
Panggil operasi ComputeSplits untuk menanyakan jumlah maksimum tugas pemindaian paralel dan ID sesi saat ini.
Mulai beberapa permintaan pemindaian paralel untuk membaca data. Anda harus menentukan ID sesi saat ini dan ID tugas pemindaian paralel dalam permintaan tersebut.
Tablestore mengembalikan kode kesalahan OTSSessionExpired jika terjadi pengecualian jaringan, pengecualian utas, modifikasi dinamis pada skema, atau pergantian indeks selama proses pemindaian paralel, sehingga pemindaian data berhenti. Dalam kasus ini, Anda harus memulai tugas pemindaian paralel lain untuk melanjutkan pemindaian data.
Tugas pemindaian paralel dengan ID sesi yang sama dan nilai parameter ScanQuery yang sama dianggap sebagai satu tugas. Tugas pemindaian paralel dimulai saat Anda mengirim permintaan ParallelScan pertama dan berakhir ketika semua data telah dipindai atau token kedaluwarsa.
Jumlah maksimum tugas pemindaian paralel dalam satu permintaan
Jumlah maksimum tugas pemindaian paralel dalam satu permintaan yang didukung oleh operasi ParallelScan ditentukan oleh respons permintaan ComputeSplits. Volume data yang lebih besar memerlukan lebih banyak tugas pemindaian paralel dalam sesi.
Satu permintaan ditentukan oleh satu pernyataan kueri. Misalnya, jika Anda menggunakan operasi Search untuk menanyakan hasil di mana nilai city adalah Hangzhou, semua data yang cocok dengan kondisi ini dikembalikan dalam hasil. Namun, jika Anda menggunakan operasi ParallelScan dan jumlah tugas pemindaian paralel dalam sesi adalah 2, setiap permintaan ParallelScan mengembalikan setengah dari hasil. Set hasil lengkap terdiri dari dua set hasil paralel.
Kinerja
Kecepatan kueri permintaan ParallelScan yang mencakup tugas pemindaian paralel lima kali lebih cepat dibandingkan dengan kecepatan kueri permintaan Search. Saat menggunakan fitur pemindaian paralel, kecepatan kueri meningkat seiring dengan jumlah tugas pemindaian paralel dalam sesi. Misalnya, jika delapan tugas pemindaian paralel termasuk dalam sesi, kecepatan kueri dapat ditingkatkan hingga empat kali lipat.
Biaya
Permintaan ParallelScan mengonsumsi lebih sedikit sumber daya dan ditawarkan dengan harga yang lebih rendah. Untuk mengekspor sejumlah besar data, kami sarankan Anda menggunakan operasi ParallelScan.
Batasan
Jumlah maksimum tugas pemindaian paralel adalah 10. Anda dapat menyesuaikan batas ini sesuai kebutuhan bisnis Anda.
Hanya kolom yang ada yang dapat dikembalikan dari indeks pencarian. Namun, kolom bertipe DATE dan NESTED tidak dapat dikembalikan.
Operasi ParallelScan dapat mengembalikan nilai kolom ARRAY dan GEOPOINT. Namun, nilai yang dikembalikan diformat ulang dan mungkin berbeda dari nilai yang ditulis ke tabel data. Misalnya, jika Anda menulis [1,2, 3, 4] ke kolom ARRAY, operasi ParallelScan mengembalikan [1,2,3,4] sebagai nilainya. Jika Anda menulis
10,50ke kolom GEOPOINT, operasi ParallelScan mengembalikan10.0,50.0sebagai nilainya.Anda dapat mengatur parameter ReturnType ke RETURN_ALL_INDEX atau RETURN_SPECIFIED, tetapi tidak ke RETURN_ALL.
Jumlah maksimum baris yang dapat dikembalikan oleh setiap panggilan ParallelScan ditentukan oleh parameter limit. Nilai default parameter limit adalah 2.000. Jika Anda menentukan nilai lebih besar dari 2.000, kinerja hampir tidak berubah dengan peningkatan limit.
Operasi API
Anda dapat memanggil operasi API berikut untuk menggunakan fitur pemindaian paralel:
ComputeSplits: Anda dapat memanggil operasi ini untuk menanyakan jumlah maksimum tugas pemindaian paralel untuk satu permintaan ParallelScan.
ParallelScan: Anda dapat memanggil operasi ini untuk mengekspor data.
Gunakan SDK Tablestore
Anda dapat menggunakan SDK Tablestore berikut untuk memindai data secara paralel:
Tablestore SDK for Java: Pemindaian Paralel
Tablestore SDK for Go: Pemindaian Paralel
Tablestore SDK for Python: Pemindaian Paralel
Tablestore SDK for Node.js: Pemindaian Paralel
Tablestore SDK for .NET: Pemindaian Paralel
Tablestore SDK for PHP: Pemindaian Paralel
Parameter
Parameter | Deskripsi | |
tableName | Nama tabel data. | |
indexName | Nama indeks pencarian. | |
scanQuery | query | Pernyataan kueri untuk indeks pencarian. Operasi ini mendukung kueri tepat, kueri fuzzy, kueri rentang, kueri geo, dan kueri bersarang, yang mirip dengan operasi Search. |
limit | Jumlah maksimum baris yang dapat dikembalikan oleh setiap panggilan ParallelScan. | |
maxParallel | Jumlah maksimum tugas pemindaian paralel per permintaan. Jumlah maksimum tugas pemindaian paralel per permintaan bervariasi berdasarkan volume data. Volume data yang lebih besar memerlukan lebih banyak tugas pemindaian paralel per permintaan. Anda dapat menggunakan operasi ComputeSplits untuk menanyakan jumlah maksimum tugas pemindaian paralel per permintaan. | |
currentParallelId | ID tugas pemindaian paralel dalam permintaan. Nilai valid: [0, Nilai maxParallel) | |
token | Token yang digunakan untuk membagi halaman hasil kueri. Hasil permintaan ParallelScan berisi token untuk halaman berikutnya. Anda dapat menggunakan token untuk mengambil halaman berikutnya. | |
aliveTime | Periode validitas tugas pemindaian paralel saat ini. Periode validitas ini juga merupakan periode validitas token. Unit: detik. Nilai default: 60. Kami sarankan Anda menggunakan nilai default. Jika permintaan berikutnya tidak diinisiasi dalam periode validitas, tidak ada data yang dapat ditanyakan. Waktu validitas token diperbarui setiap kali Anda mengirim permintaan. Catatan Sesi kadaluwarsa lebih awal jika indeks beralih secara dinamis dalam skema, server tunggal gagal, atau load balancing dilakukan di sisi server. Dalam hal ini, Anda harus membuat ulang sesi. | |
columnsToGet | Nama kolom yang akan dikembalikan dalam hasil pengelompokan. Anda dapat menambahkan nama kolom ke Columns. Jika Anda ingin semua kolom dikembalikan dalam indeks pencarian, Anda dapat menggunakan operasi ReturnAllFromIndex yang lebih ringkas. Penting ReturnAll tidak dapat digunakan di sini. | |
sessionId | ID sesi tugas pemindaian paralel. Anda dapat memanggil operasi ComputeSplits untuk membuat sesi dan menanyakan jumlah maksimum tugas pemindaian paralel yang didukung oleh permintaan pemindaian paralel. | |
Contoh
Anda dapat memindai data menggunakan satu utas atau beberapa utas sekaligus, tergantung pada kebutuhan bisnis Anda.
Pemindaian Data Menggunakan Satu Utas
Saat menggunakan pemindaian paralel, kode untuk permintaan yang menggunakan satu utas lebih sederhana dibandingkan dengan kode untuk permintaan yang menggunakan beberapa utas. Parameter currentParallelId dan maxParallel tidak diperlukan untuk permintaan yang menggunakan satu utas. Permintaan ParallelScan yang menggunakan satu utas memberikan throughput yang lebih tinggi dibandingkan dengan permintaan Search, namun lebih rendah dibandingkan dengan permintaan ParallelScan yang menggunakan beberapa utas.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
public class Test {
public static List<Row> scanQuery(final SyncClient client) {
String tableName = "<TableName>";
String indexName = "<IndexName>";
// Menanyakan ID sesi dan jumlah maksimum tugas pemindaian paralel yang didukung oleh permintaan.
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
byte[] sessionId = computeSplitsResponse.getSessionId();
int splitsSize = computeSplitsResponse.getSplitsSize();
/*
* Membuat permintaan pemindaian paralel.
*/
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.setTableName(tableName);
parallelScanRequest.setIndexName(indexName);
ScanQuery scanQuery = new ScanQuery();
// Kueri ini menentukan rentang data yang akan dipindai. Anda dapat membuat kueri bersarang dan kompleks.
Query query = new MatchAllQuery();
scanQuery.setQuery(query);
// Tentukan jumlah maksimum baris yang dapat dikembalikan oleh setiap panggilan ParallelScan.
scanQuery.setLimit(2000);
parallelScanRequest.setScanQuery(scanQuery);
ColumnsToGet columnsToGet = new ColumnsToGet();
columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
parallelScanRequest.setColumnsToGet(columnsToGet);
parallelScanRequest.setSessionId(sessionId);
/*
* Gunakan pembuat untuk membuat permintaan pemindaian paralel yang memiliki fitur yang sama dengan permintaan sebelumnya.
*/
ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(2000)
.build())
.addColumnsToGet("col_1", "col_2")
.sessionId(sessionId)
.build();
List<Row> result = new ArrayList<>();
/*
* Gunakan API asli untuk memindai data.
*/
{
ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
// Menanyakan token ScanQuery untuk permintaan berikutnya.
byte[] nextToken = parallelScanResponse.getNextToken();
// Memperoleh data.
List<Row> rows = parallelScanResponse.getRows();
result.addAll(rows);
while (nextToken != null) {
// Tentukan token.
parallelScanRequest.getScanQuery().setToken(nextToken);
// Lanjutkan memindai data.
parallelScanResponse = client.parallelScan(parallelScanRequest);
// Memperoleh data.
rows = parallelScanResponse.getRows();
result.addAll(rows);
nextToken = parallelScanResponse.getNextToken();
}
}
/*
* Metode yang direkomendasikan.
* Gunakan iterator untuk memindai semua data yang cocok. Metode ini memiliki kecepatan kueri yang sama tetapi lebih mudah digunakan dibandingkan metode sebelumnya.
*/
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(row);
// Memperoleh nilai spesifik.
String col_1 = row.getLatestColumn("col_1").getValue().asString();
long col_2 = row.getLatestColumn("col_2").getValue().asLong();
}
}
/*
* Jika operasi gagal, ulangi operasi. Jika pemanggil fungsi ini memiliki mekanisme ulang atau jika Anda tidak ingin mengulang operasi yang gagal, Anda dapat mengabaikan bagian ini.
* Untuk memastikan ketersediaan, kami sarankan Anda memulai tugas pemindaian paralel baru saat pengecualian terjadi.
* Pengecualian berikut mungkin terjadi saat Anda mengirim permintaan ParallelScan:
* 1. Penyimpangan sesi terjadi di sisi server. Kode kesalahan adalah OTSSessionExpired.
* 2. Penyimpangan seperti pengecualian jaringan terjadi di sisi klien.
*/
try {
// Jalankan logika pemrosesan.
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
// Proses baris data. Jika Anda memiliki sumber daya memori yang cukup, Anda dapat menambahkan baris ke daftar.
result.add(row);
}
}
} catch (Exception ex) {
// Ulangi logika pemrosesan.
{
result.clear();
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
// Proses baris data. Jika Anda memiliki sumber daya memori yang cukup, Anda dapat menambahkan baris ke daftar.
result.add(row);
}
}
}
return result;
}
}Pemindaian Data Menggunakan Beberapa Utas
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
// Menanyakan jumlah core CPU di klien.
final int cpuProcessors = Runtime.getRuntime().availableProcessors();
// Tentukan jumlah utas paralel untuk klien. Kami sarankan Anda menentukan jumlah core CPU di klien sebagai jumlah utas paralel untuk klien untuk mencegah dampak pada kinerja kueri.
final Semaphore semaphore = new Semaphore(cpuProcessors);
// Menanyakan ID sesi dan jumlah maksimum tugas pemindaian paralel yang didukung oleh permintaan.
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
final byte[] sessionId = computeSplitsResponse.getSessionId();
final int maxParallel = computeSplitsResponse.getSplitsSize();
// Buat objek AtomicLong jika Anda perlu memperoleh jumlah baris untuk bisnis Anda.
AtomicLong rowCount = new AtomicLong(0);
/*
* Jika Anda ingin melakukan multithreading menggunakan fungsi, Anda dapat membangun kelas internal untuk mewarisi utas.
* Anda juga dapat membangun kelas eksternal untuk mengatur kode.
*/
final class ThreadForScanQuery extends Thread {
private final int currentParallelId;
private ThreadForScanQuery(int currentParallelId) {
this.currentParallelId = currentParallelId;
this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // Tentukan nama utas.
}
@Override
public void run() {
System.out.println("memulai utas:" + this.getName());
try {
// Jalankan logika pemrosesan.
{
ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.range("col_long").lessThan(10_0000)) // Tentukan data yang akan ditanyakan.
.limit(2000)
.currentParallelId(currentParallelId)
.maxParallel(maxParallel)
.build())
.addColumnsToGet("col_long", "col_keyword", "col_bool") // Tentukan bidang yang akan dikembalikan dari indeks pencarian. Untuk mengembalikan semua bidang dari indeks pencarian, atur returnAllColumnsFromIndex ke true.
//.returnAllColumnsFromIndex(true)
.sessionId(sessionId)
.build();
// Gunakan iterator untuk memperoleh semua data.
RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
long count = 0;
while (ltr.hasNext()) {
Row row = ltr.next();
// Tambahkan logika pemrosesan kustom. Contoh kode berikut menunjukkan cara menambahkan logika pemrosesan kustom untuk menghitung jumlah baris:
count++;
}
rowCount.addAndGet(count);
System.out.println("utas[" + this.getName() + "] selesai. utas ini mendapatkan baris:" + count);
}
} catch (Exception ex) {
// Jika pengecualian terjadi, Anda dapat mengulang logika pemrosesan.
} finally {
semaphore.release();
}
}
}
// Eksekusi utas secara simultan. Nilai valid currentParallelId: [0, Nilai maxParallel).
List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
threadList.add(thread);
}
// Mulai utas secara simultan.
for (ThreadForScanQuery thread : threadList) {
// Tentukan nilai untuk semaphore untuk membatasi jumlah utas yang dapat dimulai pada saat yang sama untuk mencegah hambatan di sisi klien.
semaphore.acquire();
thread.start();
}
// Utas utama diblokir sampai semua utas selesai.
for (ThreadForScanQuery thread : threadList) {
thread.join();
}
System.out.println("semua utas selesai! total baris:" + rowCount.get());
}
}