Java High Level REST Client adalah klien REST tingkat tinggi yang disediakan oleh Elasticsearch dan mendukung API yang lebih mudah digunakan. LindormSearch kompatibel dengan fitur-fitur Elasticsearch versi 7.10 dan versi sebelumnya. Untuk melakukan query kompleks, analisis, atau menggunakan fitur canggih yang disediakan oleh Elasticsearch, Anda dapat menggunakan Java High Level REST Client untuk terhubung ke LindormSearch. Dengan cara ini, Anda dapat dengan mudah merancang dan mengelola indeks pencarian serta dokumen.
Prasyarat
Lingkungan Java dengan JDK 1.8 atau versi yang lebih baru telah diinstal.
Anda telah mengaktifkan mesin pencarian. Untuk informasi selengkapnya, lihat Panduan Pengaktifan.
Alamat IP klien Anda telah ditambahkan ke daftar putih instance Lindorm. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih.
Prosedur
Instal Java High Level REST Client. Untuk proyek Maven, tambahkan dependensi berikut ke file
pom.xmldi bawah elemendependencies.<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.20.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.20.0</version> </dependency>PentingJava High Level REST Client bersifat forward compatible. Sebagai contoh, Java High Level REST Client versi 6.7.0 dapat berkomunikasi dengan kluster Elasticsearch versi 6.7.0 atau yang lebih baru. Untuk memanfaatkan sepenuhnya fitur-fitur baru dari klien tersebut, kami menyarankan penggunaan Java High Level REST Client versi 7.10.0 atau versi sebelumnya untuk .
Konfigurasikan parameter koneksi dan gunakan metode RestClient.builder() untuk membuat objek RestHighLevelClient.
// Tentukan titik akhir LindormSearch untuk Elasticsearch. String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com"; int search_port = 30070; String username = "user"; String password = "test"; final CredentialsProvider credentials_provider = new BasicCredentialsProvider(); credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestHighLevelClient highClient = new RestHighLevelClient( RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider); } }) );Parameter
Parameter
Deskripsi
search_url
Titik akhir yang kompatibel dengan Elasticsearch untuk mesin pencarian. Untuk mendapatkan titik akhir tersebut, lihat Alamat yang kompatibel dengan Elasticsearch.
PentingJika aplikasi Anda diterapkan pada instance ECS, kami sarankan Anda menggunakan VPC untuk terhubung ke instance Lindorm demi memastikan keamanan yang lebih tinggi dan latensi jaringan yang lebih rendah.
Jika aplikasi Anda berjalan di lingkungan on-premises, aktifkan titik akhir publik di Konsol sebelum melakukan koneksi melalui jaringan publik. Untuk mengaktifkannya: Di panel navigasi sebelah kiri, pilih Database Connections, klik tab Search Engine, lalu klik Enable Public Endpoint di pojok kanan atas.
Saat mengakses instans Lindorm melalui VPC, atur search_url ke alamat VPC dari titik akhir yang kompatibel dengan Elasticsearch. Saat mengakses melalui jaringan publik, atur search_url ke alamat Internet dari titik akhir yang kompatibel dengan Elasticsearch.
search_port
Port untuk fitur Elasticsearch Compatibility pada mesin pencarian Lindorm adalah 30070.
username
Username dan password untuk mengakses mesin pencarian.
Untuk mendapatkan username dan password default: Di panel navigasi sebelah kiri, pilih Database Connections, klik tab Search Engine, lalu lihat kredensial di tab Search Engine.
password
Gunakan mesin pencarian.
Kode contoh mencakup hal-hal berikut:
Buat indeks pencarian: Indeks pencarian bernama lindorm_index dibuat.
Tulis data: Satu catatan data ditulis ke dokumen dengan ID test. Kemudian, 100.000 dokumen ditulis ke indeks secara batch.
Query data: Kirim permintaan refresh untuk menampilkan data yang ditulis ke LindormSearch. Dalam kode contoh, dua permintaan dikirim untuk masing-masing mengquery semua dokumen di indeks dan dokumen dengan ID test.
Hapus data: Hapus dokumen dengan ID test dan hapus indeks lindorm_index.
try { String index_name = "lindorm_index"; // Buat permintaan CreateIndex untuk membuat indeks pencarian. CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name); // Tentukan pengaturan indeks. Map<String, Object> settingsMap = new HashMap<>(); settingsMap.put("index.number_of_shards", 4); createIndexRequest.settings(settingsMap); CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS); if (createIndexResponse.isAcknowledged()) { System.out.println("Buat indeks [" + index_name + "] berhasil."); } // Tentukan ID dokumen. Jika Anda tidak menentukan ID dokumen, sistem akan secara otomatis menghasilkan ID untuk dokumen tersebut. Dalam kasus ini, dokumen dapat memberikan performa tulis yang lebih baik. String doc_id = "test"; // Tentukan bidang dalam dokumen. Ganti bidang dan nilai dalam kode contoh dengan yang sebenarnya dalam bisnis Anda. Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("field1", "value1"); jsonMap.put("field2", "value2"); // Buat permintaan untuk menulis satu catatan data ke dokumen. Tentukan ID dokumen dan bidang yang ingin Anda tulis ke dokumen. IndexRequest indexRequest = new IndexRequest(index_name); indexRequest.id(doc_id).source(jsonMap); IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS); System.out.println("Indeks dokumen dengan id[" + indexResponse.getId() + "] berhasil."); // Tulis data secara batch. int bulkTotal = 100000; AtomicLong failedBulkItemCount = new AtomicLong(); // Buat objek BulkProcessor untuk memulai permintaan Bulk. BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // Anda dapat memperoleh respons dari setiap permintaan dalam permintaan Bulk. Kode contoh menghitung jumlah item Bulk gagal berdasarkan respons. for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { failedBulkItemCount.incrementAndGet(); } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // Jika kegagalan ditangkap oleh blok kode ini, semua permintaan dalam permintaan Bulk tidak dieksekusi. if (null != failure) { failedBulkItemCount.addAndGet(request.numberOfActions()); } } }); // Tentukan jumlah maksimum permintaan Bulk bersamaan. Nilai default: 1. builder.setConcurrentRequests(10); // Tentukan ambang batas berdasarkan mana objek BulkProcessor mengirim permintaan Bulk. Anda dapat menentukan interval waktu, jumlah operasi, atau ukuran permintaan sebagai ambang batas. builder.setFlushInterval(TimeValue.timeValueSeconds(5)); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); BulkProcessor bulkProcessor = builder.build(); Random random = new Random(); for (int i = 0; i < bulkTotal; i++) { // Ganti bidang dan nilai dalam kode contoh dengan yang sebenarnya dalam bisnis Anda. Map<String, Object> map = new HashMap<>(); map.put("field1", random.nextInt() + ""); map.put("field2", random.nextInt() + ""); IndexRequest bulkItemRequest = new IndexRequest(index_name); bulkItemRequest.source(map); // Tambahkan operasi ke objek BulkProcessor. bulkProcessor.add(bulkItemRequest); } // Anda dapat menggunakan metode awaitClose untuk menunggu hingga semua operasi dieksekusi. bulkProcessor.awaitClose(120, TimeUnit.SECONDS); long failure = failedBulkItemCount.get(), success = bulkTotal - failure; System.out.println("Bulk menggunakan BulkProcessor selesai dengan [" + success + "] permintaan berhasil, [" + failure + "] permintaan gagal."); // Buat permintaan refresh untuk menampilkan data yang ditulis. RefreshRequest refreshRequest = new RefreshRequest(index_name); RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS); System.out.println("Refresh pada indeks [" + index_name + "] berhasil."); // Buat permintaan pencarian untuk mengquery semua data. SearchRequest searchRequest = new SearchRequest(index_name); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder(); searchSourceBuilder.query(queryMatchAllBuiler); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); long totalHit = searchResponse.getHits().getTotalHits().value; System.out.println("Pencarian query cocok semua hits [" + totalHit + "] total."); // Buat permintaan pencarian untuk mengquery data berdasarkan ID. QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id); searchSourceBuilder.query(queryByIdBuilder); searchRequest.source(searchSourceBuilder); searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); for (SearchHit searchHit : searchResponse.getHits()) { System.out.println("Pencarian query berdasarkan id respons [" + searchHit.getSourceAsString() + "]"); } // Buat permintaan hapus untuk menghapus dokumen tunggal dengan ID tertentu. DeleteRequest deleteRequest = new DeleteRequest(index_name); deleteRequest.id(doc_id); DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS); System.out.println("Hapus dokumen dengan id [" + deleteResponse.getId() + "] berhasil."); // Buat permintaan DeleteIndex untuk menghapus indeks. DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name); AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS); if (deleteIndexResponse.isAcknowledged()) { System.out.println("Hapus indeks [" + index_name + "] berhasil."); } highClient.close(); } catch (Exception exception) { // Tangani pengecualian. System.out.println("msg " + exception); }
Contoh lengkap
Berikut ini adalah kode contoh lengkap:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class RestHClientTest {
private static final RequestOptions COMMON_OPTIONS;
static {
// Buat permintaan dan konfigurasikan parameter. Dalam kode contoh, ukuran cache maksimum diatur menjadi 30 MB. Secara default, ukuran cache maksimum adalah 100 MB.
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
public static void main(String[] args) {
// Tentukan titik akhir LindormSearch untuk Elasticsearch.
String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
int search_port = 30070;
// Tentukan nama pengguna dan kata sandi yang digunakan untuk terhubung ke LindormSearch. Anda dapat memperolehnya di konsol Lindorm.
String username = "user";
String password = "test";
final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestHighLevelClient highClient = new RestHighLevelClient(
RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
}
})
);
try {
String index_name = "lindorm_index";
// Buat permintaan CreateIndex untuk membuat indeks pencarian.
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
// Tentukan pengaturan indeks.
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("index.number_of_shards", 4);
createIndexRequest.settings(settingsMap);
CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
if (createIndexResponse.isAcknowledged()) {
System.out.println("Buat indeks [" + index_name + "] berhasil.");
}
// Tentukan ID dokumen. Jika Anda tidak menentukan ID dokumen, sistem akan secara otomatis menghasilkan ID untuk dokumen tersebut. Dalam kasus ini, dokumen dapat memberikan performa tulis yang lebih baik.
String doc_id = "test";
// Tentukan bidang dalam dokumen. Ganti bidang dan nilai dalam kode contoh dengan yang sebenarnya dalam bisnis Anda.
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("field1", "value1");
jsonMap.put("field2", "value2");
// Buat permintaan untuk menulis satu catatan data ke dokumen. Tentukan ID dokumen dan bidang yang ingin Anda tulis ke dokumen.
IndexRequest indexRequest = new IndexRequest(index_name);
indexRequest.id(doc_id).source(jsonMap);
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
System.out.println("Indeks dokumen dengan id[" + indexResponse.getId() + "] berhasil.");
// Tulis data secara batch.
int bulkTotal = 100000;
AtomicLong failedBulkItemCount = new AtomicLong();
// Buat objek BulkProcessor untuk memulai permintaan Bulk.
BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// Anda dapat memperoleh respons dari setiap permintaan dalam permintaan Bulk. Kode contoh menghitung jumlah item Bulk gagal berdasarkan respons.
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
failedBulkItemCount.incrementAndGet();
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// Jika kegagalan ditangkap oleh blok kode ini, semua permintaan dalam permintaan Bulk tidak dieksekusi.
if (null != failure) {
failedBulkItemCount.addAndGet(request.numberOfActions());
}
}
});
// Tentukan jumlah maksimum permintaan Bulk bersamaan. Nilai default: 1.
builder.setConcurrentRequests(10);
// Tentukan ambang batas berdasarkan mana objek BulkProcessor mengirim permintaan Bulk. Anda dapat menentukan interval waktu, jumlah operasi, atau ukuran permintaan sebagai ambang batas.
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
BulkProcessor bulkProcessor = builder.build();
Random random = new Random();
for (int i = 0; i < bulkTotal; i++) {
// Ganti bidang dan nilai dalam kode contoh dengan yang sebenarnya dalam bisnis Anda.
Map<String, Object> map = new HashMap<>();
map.put("field1", random.nextInt() + "");
map.put("field2", random.nextInt() + "");
IndexRequest bulkItemRequest = new IndexRequest(index_name);
bulkItemRequest.source(map);
// Tambahkan operasi ke objek BulkProcessor.
bulkProcessor.add(bulkItemRequest);
}
// Anda dapat menggunakan metode awaitClose untuk menunggu hingga semua operasi dieksekusi.
bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
long failure = failedBulkItemCount.get(),
success = bulkTotal - failure;
System.out.println("Bulk menggunakan BulkProcessor selesai dengan [" + success + "] permintaan berhasil, [" + failure + "] permintaan gagal.");
// Buat permintaan refresh untuk menampilkan data yang ditulis.
RefreshRequest refreshRequest = new RefreshRequest(index_name);
RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
System.out.println("Refresh pada indeks [" + index_name + "] berhasil.");
// Buat permintaan pencarian untuk mengquery semua data.
SearchRequest searchRequest = new SearchRequest(index_name);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
searchSourceBuilder.query(queryMatchAllBuiler);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
long totalHit = searchResponse.getHits().getTotalHits().value;
System.out.println("Pencarian query cocok semua hits [" + totalHit + "] total.");
// Buat permintaan pencarian untuk mengquery data berdasarkan ID.
QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
searchSourceBuilder.query(queryByIdBuilder);
searchRequest.source(searchSourceBuilder);
searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
for (SearchHit searchHit : searchResponse.getHits()) {
System.out.println("Pencarian query berdasarkan id respons [" + searchHit.getSourceAsString() + "]");
}
// Buat permintaan hapus untuk menghapus dokumen tunggal dengan ID tertentu.
DeleteRequest deleteRequest = new DeleteRequest(index_name);
deleteRequest.id(doc_id);
DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
System.out.println("Hapus dokumen dengan id [" + deleteResponse.getId() + "] berhasil.");
// Buat permintaan DeleteIndex untuk menghapus indeks.
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
if (deleteIndexResponse.isAcknowledged()) {
System.out.println("Hapus indeks [" + index_name + "] berhasil.");
}
highClient.close();
} catch (Exception exception) {
// Tangani pengecualian.
System.out.println("msg " + exception);
}
}
}Hasil berikut dikembalikan:
Buat indeks [lindorm_index] berhasil.
Indeks dokumen dengan id[test] berhasil.
Bulk menggunakan BulkProcessor selesai dengan [100000] permintaan berhasil, [0] permintaan gagal.
Refresh pada indeks [lindorm_index] berhasil.
Pencarian query cocok semua hits [10000] total.
Pencarian query berdasarkan id respons [{"field1":"value1","field2":"value2"}]
Hapus dokumen dengan id [test] berhasil.
Hapus indeks [lindorm_index] berhasil.Menurut hasil tersebut, 100.000 catatan data ditulis ke indeks. Namun, hanya 10.000 catatan data yang dikembalikan untuk permintaan yang mengquery semua data di indeks. Hal ini karena hingga 10.000 catatan data yang diquery dikembalikan secara default.
Untuk memperoleh jumlah total catatan yang cocok, atur properti trackTotalHits dari objek SearchSourceBuilder ke true, seperti searchSourceBuilder.trackTotalHits(true);.