Java High Level REST Clientは、Elasticsearchが提供する高レベルRESTクライアントであり、より使いやすいAPIをサポートしています。 LindormSearchは、Elasticsearch 7.10以前のバージョンの機能と互換性があります。 複雑なクエリと分析を実行したり、Elasticsearchが提供する高度な機能を使用したりするには、Java High Level REST Clientを使用してLindormSearchに接続できます。 この方法で、検索インデックスとドキュメントを簡単に設計および管理できます。
前提条件
JDK 1.8 以降の Java 環境がインストールされていること。
検索エンジンが有効になっていること。詳細については、「有効化ガイド」をご参照ください。
クライアントのIPアドレスがLindormインスタンスのホワイトリストに追加されている。 詳細については、「ホワイトリストの設定」をご参照ください。
手順
Java High Level REST Client をインストールします。Maven プロジェクトの場合、
pom.xmlファイルのdependenciesセクションに依存関係を追加します。<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.20.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.20.0</version> </dependency>重要Java High Level REST Client は上位互換性があります。たとえば、Java High Level REST Client バージョン 6.7.0 は、Elasticsearch クラスターのバージョン 6.7.0 以降と通信できます。新しいクライアント機能を最大限に活用するには、に対して Java High Level REST Client バージョン 7.10.0 またはそれ以前のバージョンを使用することを推奨します。
接続パラメータを設定し、RestClient.builder()メソッドを使用してRestHighLevelClientオブジェクトを作成します。
// Elasticsearch の LindormSearch エンドポイントを指定します。 String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com"; int search_port = 30070; String username = "user"; String password = "test"; final CredentialsProvider credentials_provider = new BasicCredentialsProvider(); credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestHighLevelClient highClient = new RestHighLevelClient( RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider); } }) );パラメータ
パラメータ
説明
search_url
検索エンジンの Elasticsearch 互換エンドポイントです。エンドポイントを取得する方法については、「Elasticsearch 互換アドレス」をご参照ください。
重要アプリケーションがECSインスタンスにデプロイされている場合は、セキュリティを強化し、ネットワークレイテンシを低減するために、VPCを使用してLindormインスタンスに接続することをお勧めします。
ご利用のアプリケーションがオンプレミスで動作している場合、パブリックネットワーク経由で接続する前に、コンソールでパブリックエンドポイントを有効にしてください。有効化手順:左側のナビゲーションウィンドウで Database Connections を選択し、Search Engine タブをクリックしてから、右上隅の Enable Public Endpoint をクリックします。
VPC 経由で Lindorm インスタンスにアクセスする場合、search_url には Elasticsearch 互換エンドポイントの VPC アドレスを設定します。パブリックネットワーク経由でアクセスする場合、search_url には Elasticsearch 互換エンドポイントの Internet アドレスを設定します。
search_port
Lindorm 検索エンジンの Elasticsearch 互換機能 のポートは 30070 です。
username
検索エンジンにアクセスするためのユーザー名およびパスワードです。
デフォルトのユーザー名およびパスワードを取得するには、左側のナビゲーションウィンドウで Database Connections を選択し、Search Engine タブをクリックしてから、Search Engine タブで認証情報を確認します。
password
検索エンジンを使用します。
サンプルコードには、以下の内容が含まれています。
検索インデックスの作成:lindorm_indexという名前の検索インデックスが作成されます。
データの書き込み:IDがtestのドキュメントに単一のデータレコードが書き込まれます。 次に、100,000 件のドキュメントがバッチでインデックスに書き込まれます。
データのクエリ:更新リクエストを開始して、LindormSearchに書き込まれたデータを表示します。 サンプルコードでは、インデックス内のすべてのドキュメントとIDがtestのドキュメントを個別にクエリするために、2つのリクエストが送信されます。
データの削除:IDがtestのドキュメントを削除し、lindorm_indexインデックスを削除します。
// LindormSearch エンドポイントを指定します String index_name = "lindorm_index"; // CreateIndex リクエストを構築して検索インデックスを作成します。 CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name); // インデックスの設定を指定します。 Map<String, Object> settingsMap = new HashMap<>(); settingsMap.put("index.number_of_shards", 4); createIndexRequest.settings(settingsMap); CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS); if (createIndexResponse.isAcknowledged()) { System.out.println("インデックス [" + index_name + "] が正常に作成されました。"); } // ドキュメント ID を指定します。ドキュメント ID を指定しない場合、システムによってドキュメントの ID が自動的に生成されます。この場合、ドキュメントはより優れた書き込みパフォーマンスを提供できます。 String doc_id = "test"; // ドキュメント内のフィールドを指定します。サンプルコードのフィールドと値をビジネスの実際のフィールドと値に置き換えます。 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("field1", "value1"); jsonMap.put("field2", "value2"); // ドキュメントに単一のデータレコードを書き込むリクエストを構築します。ドキュメント ID とドキュメントに書き込むフィールドを指定します。 IndexRequest indexRequest = new IndexRequest(index_name); indexRequest.id(doc_id).source(jsonMap); IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS); System.out.println("ID [" + indexResponse.getId() + "] のドキュメントが正常にインデックス化されました。"); // バッチでデータを書き込みます。 int bulkTotal = 100000; AtomicLong failedBulkItemCount = new AtomicLong(); // BulkProcessor オブジェクトを作成して Bulk リクエストを開始します。 BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // Bulk リクエストの各リクエストのレスポンスを取得できます。サンプルコードでは、レスポンスに基づいて失敗した Bulk アイテムの数をカウントします。 for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { failedBulkItemCount.incrementAndGet(); } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // このコードブロックでエラーがキャプチャされた場合、Bulk リクエスト内のすべてのリクエストは実行されません。 if (null != failure) { failedBulkItemCount.addAndGet(request.numberOfActions()); } } }); // 同時 Bulk リクエストの最大数を指定します。デフォルト値:1。 builder.setConcurrentRequests(10); // BulkProcessor オブジェクトが Bulk リクエストを送信するしきい値を指定します。時間間隔、操作の数、またはリクエストのサイズをしきい値として指定できます。 builder.setFlushInterval(TimeValue.timeValueSeconds(5)); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); BulkProcessor bulkProcessor = builder.build(); Random random = new Random(); for (int i = 0; i < bulkTotal; i++) { // サンプルコードのフィールドと値をビジネスの実際のフィールドと値に置き換えます。 Map<String, Object> map = new HashMap<>(); map.put("field1", random.nextInt() + ""); map.put("field2", random.nextInt() + ""); IndexRequest bulkItemRequest = new IndexRequest(index_name); bulkItemRequest.source(map); // BulkProcessor オブジェクトに操作を追加します。 bulkProcessor.add(bulkItemRequest); } // awaitClose メソッドを使用して、すべての操作が実行されるまで待機できます。 bulkProcessor.awaitClose(120, TimeUnit.SECONDS); long failure = failedBulkItemCount.get(), success = bulkTotal - failure; System.out.println("BulkProcessor を使用した Bulk は、[" + success + "] 件のリクエストが成功し、[" + failure + "] 件のリクエストが失敗しました。"); // 更新リクエストを構築して書き込まれたデータを表示します。 RefreshRequest refreshRequest = new RefreshRequest(index_name); RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS); System.out.println("インデックス [" + index_name + "] が正常に更新されました。"); // 検索リクエストを構築してすべてのデータをクエリします。 SearchRequest searchRequest = new SearchRequest(index_name); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder(); searchSourceBuilder.query(queryMatchAllBuiler); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); long totalHit = searchResponse.getHits().getTotalHits().value; System.out.println("検索クエリは合計 [" + totalHit + "] 件ヒットしました。"); // ID に基づいてデータをクエリする検索リクエストを構築します。 QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id); searchSourceBuilder.query(queryByIdBuilder); searchRequest.source(searchSourceBuilder); searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); for (SearchHit searchHit : searchResponse.getHits()) { System.out.println("ID による検索クエリのレスポンス [" + searchHit.getSourceAsString() + "]"); } // 削除リクエストを構築して、指定された ID の単一ドキュメントを削除します。 DeleteRequest deleteRequest = new DeleteRequest(index_name); deleteRequest.id(doc_id); DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS); System.out.println("ID [" + deleteResponse.getId() + "] のドキュメントが正常に削除されました。"); // DeleteIndex リクエストを構築してインデックスを削除します。 DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name); AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS); if (deleteIndexResponse.isAcknowledged()) { System.out.println("インデックス [" + index_name + "] が正常に削除されました。"); } highClient.close(); } catch (Exception exception) { // 例外を処理します。 System.out.println("msg " + exception); }
完全なサンプルコード
以下は完全なサンプルコードです。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class RestHClientTest {
private static final RequestOptions COMMON_OPTIONS;
static {
// リクエストを作成し、パラメータを設定します。サンプルコードでは、最大キャッシュサイズが 30 MB に設定されています。デフォルトでは、最大キャッシュサイズは 100 MB です。
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
public static void main(String[] args) {
// Elasticsearch の LindormSearch エンドポイントを指定します。
String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
int search_port = 30070;
// LindormSearch への接続に使用するユーザー名とパスワードを指定します。Lindorm コンソールで取得できます。
String username = "user";
String password = "test";
final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestHighLevelClient highClient = new RestHighLevelClient(
RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
}
})
);
try {
String index_name = "lindorm_index";
// CreateIndex リクエストを構築して、検索インデックスを作成します。
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
// インデックスの設定を指定します。
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("index.number_of_shards", 4);
createIndexRequest.settings(settingsMap);
CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
if (createIndexResponse.isAcknowledged()) {
System.out.println("インデックス [" + index_name + "] が正常に作成されました。");
}
// ドキュメント ID を指定します。ドキュメント ID を指定しない場合、システムによってドキュメントの ID が自動的に生成されます。この場合、ドキュメントはより優れた書き込みパフォーマンスを提供できます。
String doc_id = "test";
// ドキュメントのフィールドを指定します。サンプルコードのフィールドと値を、ビジネスの実際のフィールドと値に置き換えてください。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("field1", "value1");
jsonMap.put("field2", "value2");
// ドキュメントに単一のデータレコードを書き込むリクエストを構築します。ドキュメント ID と、ドキュメントに書き込むフィールドを指定します。
IndexRequest indexRequest = new IndexRequest(index_name);
indexRequest.id(doc_id).source(jsonMap);
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
System.out.println("ID [" + indexResponse.getId() + "] のドキュメントが正常にインデックス化されました。");
// バッチでデータを書き込みます。
int bulkTotal = 100000;
AtomicLong failedBulkItemCount = new AtomicLong();
// BulkProcessor オブジェクトを作成して、Bulk リクエストを開始します。
BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// Bulk リクエストの各リクエストのレスポンスを取得できます。サンプルコードでは、レスポンスに基づいて失敗した Bulk アイテムの数をカウントします。
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
failedBulkItemCount.incrementAndGet();
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// このコードブロックでエラーがキャプチャされた場合、Bulk リクエスト内のすべてのリクエストは実行されません。
if (null != failure) {
failedBulkItemCount.addAndGet(request.numberOfActions());
}
}
});
// 同時 Bulk リクエストの最大数を指定します。デフォルト値:1。
builder.setConcurrentRequests(10);
// BulkProcessor オブジェクトが Bulk リクエストを送信するしきい値を指定します。時間間隔、操作の数、またはリクエストのサイズをしきい値として指定できます。
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
BulkProcessor bulkProcessor = builder.build();
Random random = new Random();
for (int i = 0; i < bulkTotal; i++) {
// サンプルコードのフィールドと値を、ビジネスの実際のフィールドと値に置き換えてください。
Map<String, Object> map = new HashMap<>();
map.put("field1", random.nextInt() + "");
map.put("field2", random.nextInt() + "");
IndexRequest bulkItemRequest = new IndexRequest(index_name);
bulkItemRequest.source(map);
// BulkProcessor オブジェクトに操作を追加します。
bulkProcessor.add(bulkItemRequest);
}
// awaitClose メソッドを使用して、すべての操作が実行されるまで待機できます。
bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
long failure = failedBulkItemCount.get(),
success = bulkTotal - failure;
System.out.println("BulkProcessor を使用した Bulk は、[" + success + "] 件のリクエストが成功し、[" + failure + "] 件のリクエストが失敗しました。");
// 更新リクエストを構築して、書き込まれたデータを表示します。
RefreshRequest refreshRequest = new RefreshRequest(index_name);
RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
System.out.println("インデックス [" + index_name + "] が正常に更新されました。");
// 検索リクエストを構築して、すべてのデータをクエリします。
SearchRequest searchRequest = new SearchRequest(index_name);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
searchSourceBuilder.query(queryMatchAllBuiler);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
long totalHit = searchResponse.getHits().getTotalHits().value;
System.out.println("検索クエリは合計 [" + totalHit + "] 件ヒットしました。");
// ID に基づいてデータをクエリする検索リクエストを構築します。
QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
searchSourceBuilder.query(queryByIdBuilder);
searchRequest.source(searchSourceBuilder);
searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
for (SearchHit searchHit : searchResponse.getHits()) {
System.out.println("ID による検索クエリのレスポンス [" + searchHit.getSourceAsString() + "]");
}
// 削除リクエストを構築して、指定された ID の単一ドキュメントを削除します。
DeleteRequest deleteRequest = new DeleteRequest(index_name);
deleteRequest.id(doc_id);
DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
System.out.println("ID [" + deleteResponse.getId() + "] のドキュメントが正常に削除されました。");
// DeleteIndex リクエストを構築して、インデックスを削除します。
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
if (deleteIndexResponse.isAcknowledged()) {
System.out.println("インデックス [" + index_name + "] が正常に削除されました。");
}
highClient.close();
} catch (Exception exception) {
// 例外を処理します。
System.out.println("msg " + exception);
}
}
}次の結果が返されます。
インデックス [lindorm_index] が正常に作成されました。
ID [test] のドキュメントが正常にインデックス化されました。
BulkProcessor を使用した Bulk は、[100000] 件のリクエストが成功し、[0] 件のリクエストが失敗しました。
インデックス [lindorm_index] が正常に更新されました。
検索クエリは合計 [10000] 件ヒットしました。
ID による検索クエリのレスポンス [{"field1":"value1","field2":"value2"}]
ID [test] のドキュメントが正常に削除されました。
インデックス [lindorm_index] が正常に削除されました。結果によると、100,000 件のデータレコードがインデックスに書き込まれています。ただし、インデックス内のすべてのデータをクエリするリクエストに対しては、最大 10,000 件のデータレコードのみが返されます。これは、デフォルトで最大 10,000 件のクエリされたデータレコードが返されるためです。
一致するレコードの総数を取得するには、SearchSourceBuilder オブジェクトの trackTotalHits プロパティを true に設定します。例:searchSourceBuilder.trackTotalHits(true);