This topic describes how to use Alibaba Cloud Logstash to synchronize data from Azure Event Hubs to an Alibaba Cloud Elasticsearch cluster.
Procedure
Step 1: Make preparations
Create an Alibaba Cloud Elasticsearch cluster and enable the Auto Indexing feature for the cluster. In this example, a V7.10 cluster is used.
For more information, see Create an Alibaba Cloud Elasticsearch cluster and Configure the YML file.
Create an Alibaba Cloud Logstash cluster. In this example, a V7.4 cluster is used.
For more information, see Create an Alibaba Cloud Logstash cluster.
The Alibaba Cloud Logstash cluster is deployed in a virtual private cloud (VPC). To connect the cluster to Azure Event Hubs over the Internet, you must configure a Network Address Translation (NAT) gateway and use the gateway to connect the cluster to the Internet. For more information, see Configure a NAT gateway for data transmission over the Internet.
NoteIf you are using a self-managed Logstash cluster, you must purchase an Elastic Compute Service (ECS) instance that resides in the same VPC as your Alibaba Cloud Elasticsearch cluster. If you have such an ECS instance, bind an elastic IP address (EIP) to the ECS instance.
Prepare a self-managed environment for Azure Event Hubs.
For more information, see Azure Event Hubs documentation.
Step 2: Create and configure a Logstash pipeline
Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.
Navigate to the desired cluster.
In the top navigation bar, select the region where the cluster resides.
On the Logstash Clusters page, find the cluster and click its ID.
In the left-side navigation pane of the page that appears, click Pipelines.
On the Pipelines page, click Create Pipeline.
In the Create wizard, enter a pipeline ID and configure the pipeline.
In this example, the following configurations are used for the pipeline:
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"] initial_position => "beginning" threads => 2 decorate_events => true consumer_group => "group-kl" storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn" storage_container => "lettie_container" } } filter { } output { elasticsearch { hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"] index => "test-log" password => "xxxxxx" user => "elastic" } }Table 1. Parameters in the input part
Parameter
Description
event_hub_connections
The connection strings of the event hubs from which you want to synchronize data. Each connection string must include EntityPath. For more information, see event_hub_connections.
NoteThe event_hub_connections parameter is defined for each event hub. All the other parameters are shared among event hubs.
initial_position
The position from which data is read. Valid values: beginning, end, and look_back. Default value: beginning. For more information, see initial_position.
threads
The total number of threads used to process data. For more information, see threads.
decorate_events
Specifies whether to synchronize the metadata of the event hubs, including the name, consumer_group, processor_host, partition, offset, sequence, timestamp, and event_size. For more information, see decorate_events.
consumer_group
The consumer group that is used to read data from the event hubs. You must create a dedicated consumer group for the Logstash cluster and must make sure that all nodes in the Logstash cluster use the consumer group. This way, the nodes can work together. For more information, see consumer_group.
storage_connection
The connection string for blob account storage. Blob account storage persists the offsets between restarts and ensures that the nodes in the Logstash cluster process different partitions. If you configure this parameter, restarts resume from the position when processing is interrupted. If you do not configure this parameter, restarts resume from the position indicated by the value of the initial_position parameter. For more information, see storage_connection.
storage_container
The name of the storage container that is used to persist offsets and allow multiple nodes in the Logstash cluster to work together. For more information, see storage_container.
NoteTo avoid overwriting offsets, you can use different storage containers. If you want the same data to be written to different services, you must specify multiple storage containers.
Table 2. Parameters in the output part
Parameter
Description
hosts
The endpoint of the Elasticsearch cluster. The value of this parameter must be in the following format:
http://<Elasticsearch cluster ID>.elasticsearch.aliyuncs.com:9200.index
The name of the destination index.
user
The username that is used to access the Elasticsearch cluster. The default username is elastic.
password
The password of the elastic account. The password is specified when you create the cluster. If you forget the password, you can reset it. For more information about the procedure and precautions for resetting a password, see Reset the access password for an Elasticsearch cluster.
For more information, see Logstash configuration files.
Click Next to configure pipeline parameters.

Parameter
Description
Pipeline Workers
The number of worker threads that run the filter and output plug-ins of the pipeline in parallel. If a backlog of events exists or some CPU resources are not used, we recommend that you increase the number of threads to maximize CPU utilization. The default value of this parameter is the number of vCPUs.
Pipeline Batch Size
The maximum number of events that a single worker thread can collect from input plug-ins before it attempts to run filter and output plug-ins. If you set this parameter to a large value, a single worker thread can collect more events but consumes larger memory. If you want to make sure that the worker thread has sufficient memory to collect more events, specify the LS_HEAP_SIZE variable to increase the Java virtual machine (JVM) heap size. Default value: 125.
Pipeline Batch Delay
The wait time for an event. This time occurs before you assign a small batch to a pipeline worker thread and after you create batch tasks for pipeline events. Default value: 50. Unit: milliseconds.
Queue Type
The internal queue model for buffering events. Valid values:
MEMORY: traditional memory-based queue. This is the default value.
PERSISTED: disk-based ACKed queue, which is a persistent queue.
Queue Max Bytes
The maximum data size for a queue. Unit: MB. Valid values: integers ranging from
1 to 253 - 1. Default value: 1024.NoteThe value must be less than the total capacity of your disk.
Queue Checkpoint Writes
The maximum number of events that are written before a checkpoint is enforced when persistent queues are enabled. The value 0 indicates no limit. Default value: 1024.
WarningAfter you configure the parameters, you must save the settings and deploy the pipeline. This triggers a restart of the Logstash cluster. Before you can proceed, make sure that the restart does not affect your business.
Click Save or Save and Deploy.
Save: After you click this button, the system stores the pipeline settings and triggers a cluster change. However, the settings do not take effect. After you click Save, the Pipelines page appears. On the Pipelines page, find the created pipeline and click Deploy Now in the Actions column. Then, the system restarts the Logstash cluster to make the settings take effect.
Save and Deploy: After you click this button, the system restarts the Logstash cluster to make the settings take effect.
Step 3: Verify the result
Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.
For more information about how to log on to the Kibana console, see Log on to the Kibana console.
NoteIn this example, an Elasticsearch V7.10.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
In the upper-right corner of the page that appears, click Dev tools.
On the Console tab of the page that appears, run the following command to view the synchronized data:
GET test-log3/_search { "query":{ "match":{ "message":"L23" } } }If the command is successfully run, the result shown in the figure is returned.
