All Products
Search
Document Center

Elasticsearch:Use Logstash to synchronize data from Azure Event Hubs to an Alibaba Cloud Elasticsearch cluster

Last Updated:May 06, 2025

This topic describes how to use Alibaba Cloud Logstash to synchronize data from Azure Event Hubs to an Alibaba Cloud Elasticsearch cluster.

Procedure

  1. Step 1: Make preparations

  2. Step 2: Create and configure a Logstash pipeline

  3. Step 3: Verify the result

Step 1: Make preparations

  1. Create an Alibaba Cloud Elasticsearch cluster and enable the Auto Indexing feature for the cluster. In this example, a V7.10 cluster is used.

  2. Create an Alibaba Cloud Logstash cluster. In this example, a V7.4 cluster is used.

    For more information, see Create an Alibaba Cloud Logstash cluster.

    The Alibaba Cloud Logstash cluster is deployed in a virtual private cloud (VPC). To connect the cluster to Azure Event Hubs over the Internet, you must configure a Network Address Translation (NAT) gateway and use the gateway to connect the cluster to the Internet. For more information, see Configure a NAT gateway for data transmission over the Internet.

    Note

    If you are using a self-managed Logstash cluster, you must purchase an Elastic Compute Service (ECS) instance that resides in the same VPC as your Alibaba Cloud Elasticsearch cluster. If you have such an ECS instance, bind an elastic IP address (EIP) to the ECS instance.

  3. Prepare a self-managed environment for Azure Event Hubs.

    For more information, see Azure Event Hubs documentation.

Step 2: Create and configure a Logstash pipeline

  1. Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.

  2. Navigate to the desired cluster.

    1. In the top navigation bar, select the region where the cluster resides.

    2. On the Logstash Clusters page, find the cluster and click its ID.

  3. In the left-side navigation pane of the page that appears, click Pipelines.

  4. On the Pipelines page, click Create Pipeline.

  5. In the Create wizard, enter a pipeline ID and configure the pipeline.

    In this example, the following configurations are used for the pipeline:

    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }

    Table 1. Parameters in the input part

    Parameter

    Description

    event_hub_connections

    The connection strings of the event hubs from which you want to synchronize data. Each connection string must include EntityPath. For more information, see event_hub_connections.

    Note

    The event_hub_connections parameter is defined for each event hub. All the other parameters are shared among event hubs.

    initial_position

    The position from which data is read. Valid values: beginning, end, and look_back. Default value: beginning. For more information, see initial_position.

    threads

    The total number of threads used to process data. For more information, see threads.

    decorate_events

    Specifies whether to synchronize the metadata of the event hubs, including the name, consumer_group, processor_host, partition, offset, sequence, timestamp, and event_size. For more information, see decorate_events.

    consumer_group

    The consumer group that is used to read data from the event hubs. You must create a dedicated consumer group for the Logstash cluster and must make sure that all nodes in the Logstash cluster use the consumer group. This way, the nodes can work together. For more information, see consumer_group.

    storage_connection

    The connection string for blob account storage. Blob account storage persists the offsets between restarts and ensures that the nodes in the Logstash cluster process different partitions. If you configure this parameter, restarts resume from the position when processing is interrupted. If you do not configure this parameter, restarts resume from the position indicated by the value of the initial_position parameter. For more information, see storage_connection.

    storage_container

    The name of the storage container that is used to persist offsets and allow multiple nodes in the Logstash cluster to work together. For more information, see storage_container.

    Note

    To avoid overwriting offsets, you can use different storage containers. If you want the same data to be written to different services, you must specify multiple storage containers.

    Table 2. Parameters in the output part

    Parameter

    Description

    hosts

    The endpoint of the Elasticsearch cluster. The value of this parameter must be in the following format: http://<Elasticsearch cluster ID>.elasticsearch.aliyuncs.com:9200.

    index

    The name of the destination index.

    user

    The username that is used to access the Elasticsearch cluster. The default username is elastic.

    password

    The password of the elastic account. The password is specified when you create the cluster. If you forget the password, you can reset it. For more information about the procedure and precautions for resetting a password, see Reset the access password for an Elasticsearch cluster.

    For more information, see Logstash configuration files.

  6. Click Next to configure pipeline parameters.

    管道参数配置

    Parameter

    Description

    Pipeline Workers

    The number of worker threads that run the filter and output plug-ins of the pipeline in parallel. If a backlog of events exists or some CPU resources are not used, we recommend that you increase the number of threads to maximize CPU utilization. The default value of this parameter is the number of vCPUs.

    Pipeline Batch Size

    The maximum number of events that a single worker thread can collect from input plug-ins before it attempts to run filter and output plug-ins. If you set this parameter to a large value, a single worker thread can collect more events but consumes larger memory. If you want to make sure that the worker thread has sufficient memory to collect more events, specify the LS_HEAP_SIZE variable to increase the Java virtual machine (JVM) heap size. Default value: 125.

    Pipeline Batch Delay

    The wait time for an event. This time occurs before you assign a small batch to a pipeline worker thread and after you create batch tasks for pipeline events. Default value: 50. Unit: milliseconds.

    Queue Type

    The internal queue model for buffering events. Valid values:

    • MEMORY: traditional memory-based queue. This is the default value.

    • PERSISTED: disk-based ACKed queue, which is a persistent queue.

    Queue Max Bytes

    The maximum data size for a queue. Unit: MB. Valid values: integers ranging from 1 to 253 - 1. Default value: 1024.

    Note

    The value must be less than the total capacity of your disk.

    Queue Checkpoint Writes

    The maximum number of events that are written before a checkpoint is enforced when persistent queues are enabled. The value 0 indicates no limit. Default value: 1024.

    Warning

    After you configure the parameters, you must save the settings and deploy the pipeline. This triggers a restart of the Logstash cluster. Before you can proceed, make sure that the restart does not affect your business.

  7. Click Save or Save and Deploy.

    • Save: After you click this button, the system stores the pipeline settings and triggers a cluster change. However, the settings do not take effect. After you click Save, the Pipelines page appears. On the Pipelines page, find the created pipeline and click Deploy Now in the Actions column. Then, the system restarts the Logstash cluster to make the settings take effect.

    • Save and Deploy: After you click this button, the system restarts the Logstash cluster to make the settings take effect.

Step 3: Verify the result

  1. Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.

    For more information about how to log on to the Kibana console, see Log on to the Kibana console.

    Note

    In this example, an Elasticsearch V7.10.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.

  2. In the upper-right corner of the page that appears, click Dev tools.

  3. On the Console tab of the page that appears, run the following command to view the synchronized data:

    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }

    If the command is successfully run, the result shown in the figure is returned. Expected result