All Products
Search
Document Center

DataHub:DataHub plug-ins for Logstash

Last Updated:Apr 17, 2024

DataHub plug-ins for Logstash

Logstash


Logstash is a distributed log collection framework. It contains simple but strong logic and is often used with Elasticsearch and Kibana for log data analysis. Logstash, Elasticsearch, and Kibana are known as the ELK stack. To support a wider variety of data inputs, DataHub offers output and input plug-ins for data transfer with Logstash. Logstash supports more than 30 types of data sources, such as files, Syslog logs, Redis logs, Log4j logs, Apache logs, and NGINX logs, in the Logstash open source community. Logstash also supports filter plug-ins for customizing the fields to be transferred. This topic describes how to use Logstash to import log data to or export log data from DataHub with efficiency.

Note

The DataHub output and input plug-ins for Logstash comply with the Apache License 2.0.

Install Logstash and the plug-ins

Requirement

Java Runtime Environment (JRE) 7 or later is required to run Logstash. If the JRE version does not meet the requirement, specific features of Logstash are unavailable.

Installation methods

Alibaba Cloud provides the following two methods to install Logstash and the plug-ins:

  • Complete installation

    You can use the complete installation package to install both Logstash and the DataHub plug-ins for Logstash with a few clicks.

  • Separate installation

    Install Logstash: For more information about how to install Logstash, see Logstash Reference. The latest version of Logstash requires Java 7 or later.

    Install the DataHub plug-ins for Logstash: Download and install the plug-ins that you require.

    To import data to DataHub, use the DataHub output plug-in for Logstash.

  • To export data from DataHub, use the DataHub input plug-in for Logstash.

  • Run the following commands to install the output and input plug-ins:

    $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-output-datahub-1.0.10.gem
    $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-input-datahub-1.0.10.gem

    An error similar to the following error may occur during installation:

    WARNING: can not set Session#timeout=(0) no session context

    In this case, check whether your server can access the Internet. If the server can access the Internet, you can use an image source in the Chinese mainland. For more information, visit https://gems.ruby-china.com/.

  • Offline installation

If your server cannot access the Internet, you cannot install the DataHub plug-ins for Logstash by using the preceding methods. In this case, you must build an offline installation package for the Logstash version that you require. We recommend that you first install the plug-ins on a test server by using the preceding methods, refer to Offline Plugin Management to build an offline installation package, and then upload it to your online server for offline installation.

Use the DataHub output plug-in for Logstash to import data to DataHub

Import Log4j logs to DataHub

This example shows how to collect unstructured Log4j logs and parse the logs into structured data by using Logstash. The following code shows a sample Log4j log:

20:04:30.359 [qtp1453606810-20] INFO  AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

If you want to collect and parse the preceding Log4j log and import the structured data after parsing to DataHub, configure the schema of the DataHub topic and the Logstash task based on the descriptions in this section.

Parameter

Data type

request_time

STRING

thread_id

STRING

log_level

STRING

class_name

STRING

request_id

STRING

detail

STRING

The following code shows the configuration of the Logstash task:

input {
    file {
        # Forward slashes (/) rather than backslashes (\) are used in Windows.
        path => "${APP_HOME}/log/bayes.log"
        start_position => "beginning"
    }
}

filter{
    grok {
        match => {
           "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)"
        }
    }
}

output {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Import CSV files to DataHub

This example shows how to use Logstash to collect CSV files. The following code shows a sample CSV file:

1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1

The following table describes the schema of the DataHub topic to which the CSV file is to be imported.

Parameter

Data type

col1

BIGINT

col2

DOUBLE

col3

BOOLEAN

col4

TIMESTAMP

col5

STRING

The following code shows the configuration of the Logstash task:

input {
    file {
        path => "${APP_HOME}/data.csv"
        start_position => "beginning"
    }
}

filter{
    csv {
        columns => ['col1', 'col2', 'col3', 'col4', 'col5']
    }
}

output {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Import JSON files to DataHub

One-to-one synchronization: imports one JSON file to one topic

The following code shows a sample JSON file named test_topic_22.json in the /home/software/data/22 directory:

[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]

The following code shows the configuration of the Logstash task:

input {
    file {
        path => "/home/software/data/22/test_topic_22.json"
        start_position => "beginning"
        # The storage path of the synchronization offset.
                sincedb_path => "/home/software/sincedb/filedatahub"
        codec => json {
            charset => "UTF-8"
        }
    }
}

filter{
    mutate {
      rename => {
        "col1"    =>    "col1"
        "col2"    =>    "col2"
      }
    }

}
output {

   datahub {
      access_id => "xxxx"
      access_key => "xxxx"
      endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
      project_name => "test_dh1"
      topic_name => "topic_test_2"
      #shard_id => "0"
      #shard_keys => ["thread_id"]
      dirty_data_continue => true
      dirty_data_file => "/home/software/dirty_vehIdInfo.data"
      dirty_data_file_max_size => 1000
   }

}

Run the following command and view the results:

bash bin/logstash -f config/logstash.conf

17-2

Many-to-many synchronization: imports multiple JSON files to multiple topics

The following code shows a sample JSON file named test_topic_22.json in the /home/software/data/22 directory:

[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]

The following code shows a sample JSON file named test_topic.json in the /home/software/data/11 directory:

[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]

The following code shows the configuration of the Logstash task:

input {
    file {
        path => "/home/software/data/**/*.json"
        start_position => "beginning"
        # The storage path of the synchronization offset.
                sincedb_path => "/home/software/sincedb/filedatahub"
        codec => json {
            charset => "UTF-8"
        }
    }
}

filter{
  # Use paths to distinguish between the two JSON files.
    if "22"  in [path] {
        mutate {
          rename => {
            "col1"        =>    "col1"
            "col2"        =>    "col2"
          }
        }
    }
    if "11" in [path] {
        mutate {
          rename => {
            "col1"    =>    "col1"
            "col2"    =>    "col2"
          }
        }
    }


}
output {

    if "22"  in [path] {
      datahub {
        access_id => "xxxxx"
        access_key => "xxxxx"
        endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
        project_name => "test_dh1"
        topic_name => "topic_test_2"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/home/software/dirty_vehIdInfo.data"
        dirty_data_file_max_size => 1000
      }
    }

    if "11"  in [path] {
      datahub {
        access_id => "xxxxx"
        access_key => "xxxxx"
        endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
        project_name => "test_dh1"
        topic_name => "topic_test"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/home/software/dirty.data"
        dirty_data_file_max_size => 1000
    }                    
    }


}

Run the following command and view the results:

bash bin/logstash -f config/logstash.conf

17-1

One-to-many synchronization: imports one JSON file to different topics based on a key

[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]
[{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]

The following code shows the configuration of the Logstash task:

input {
    file {
        path => "/home/software/data/22/test_topic_22.json"
        start_position => "beginning"
    sincedb_path => "/home/software/sincedb/filedatahub"
        codec => json {
            charset => "UTF-8"
        }
    }
}

filter{
    mutate {
      rename => {
        "col1"    =>    "col1"
        "col2"    =>    "col2"
      }
    }

}
output {

     # Import the JSON file to different topics based on the value of the col1 parameter.
   if[col1] == "aaaaa" {
     datahub {
        access_id => "xxxxx"
        access_key => "xxxxx"
        endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
        project_name => "test_dh1"
        topic_name => "topic_test_2"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/home/software/dirty_vehIdInfo.data"
        dirty_data_file_max_size => 1000
     }
   }
   else {
     datahub {
          access_id => "xxxxx"
          access_key => "xxxxx"
          endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
          project_name => "test_dh1"
          topic_name => "topic_test"
          #shard_id => "0"
          #shard_keys => ["thread_id"]
          dirty_data_continue => true
          dirty_data_file => "/home/software/dirty_vehIdInfo.data"
          dirty_data_file_max_size => 1000
     }
   }

}

Run the following command and view the results:

bash bin/logstash -f config/logstash.conf

17-3

Synchronize MySQL data to Logstash

mysql表数据

The following code shows the configuration of the Logstash task:

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.47.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://xxxx:3306/databaese"
    jdbc_user => "username"
    jdbc_password => "password"
    use_column_value => true
    tracking_column => "modifytime"
    schedule => "* * * * *"
    statement => "select * from suyang_test"
   }
}

output {
      datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Start Logstash

Run the following command to start Logstash:

logstash -f <The path of the preceding configuration file>

The batch_size parameter specifies the number of records to be sent to DataHub at a time. The default value is 125. You can run the following command with the batch_size parameter set to a proper value to start Logstash.

logstash -f <The path of the preceding configuration file> -b 256

Parameters

The following part describes the parameters in the preceding configuration file:

access_id: required. The AccessKey ID of your Alibaba Cloud account.
    access_key: required. The AccessKey secret of your Alibaba Cloud account.
    endpoint: required. The endpoint of DataHub.
    project_name: required. The name of the DataHub project.
    topic_name: required. The name of the DataHub topic.
    retry_times: optional. The maximum number of retries allowed. The value -1 indicates unlimited retries. The value 0 indicates no retries. A value greater than 0 indicates the specified number of retries. Default value: -1.
    retry_interval: optional. The interval between two consecutive retries. Unit: seconds. Default value: 5.
    skip_after_retry: optional. Specifies whether to skip the import of data in the current batch if the number of retries initiated by a DataHub exception exceeds the value of the retry_times parameter. Default value: false.
    approximate_request_bytes: optional. The approximate number of bytes that can be sent in each request. The default value is 2048576, which indicates 2 MB. This parameter is used to prevent a request from being rejected if the request body is oversized.
    shard_keys: optional. The keys of the shards. The hash value of a key is used to map the ID of the shard to which a record is imported. The value of this parameter is an array. If the shard_keys and shard_id parameters are not set, the system polls the shards to determine to which shard the records are imported.
    shard_id: optional. The ID of the shard to which records are imported. If the shard_keys and shard_id parameters are not set, the system polls the shards to determine to which shard the records are imported.
    dirty_data_continue: optional. Specifies whether to ignore dirty records. The value true indicates that dirty records are to be ignored. Default value: false. If you set this parameter to true, you must set the dirty_data_file parameter.
    dirty_data_file: optional. The name of the file that stores dirty records. If you set the dirty_data_continue parameter to true, you must set this parameter. The file is divided into .part 1 and .part 2. The most recent records are stored in .part 2.
    dirty_data_file_max_size: optional. The maximum size of the file that stores dirty records. This value is only for reference.
    enable_pb: optional. Specifies whether to enable Protocol Buffers (Protobuf) for data transfer. If you want to disable Protobuf in Apsara Stack, set this parameter to false.

Use the DataHub input plug-in for Logstash to export data from DataHub

Export data from DataHub

Use the following configuration of the Logstash task in this example:

input {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "test_project"
        topic_name => "test_topic"
        interval=> 5
        #cursor => {
        #    "0"=>"20000000000000000000000003110091"
        #    "2"=>"20000000000000000000000003110091"
        #    "1"=>"20000000000000000000000003110091"
        #    "4"=>"20000000000000000000000003110091"
        #    "3"=>"20000000000000000000000003110000"
        #}
        shard_ids => []
        pos_file => "/home/admin/logstash/pos_file"
    }
}

output {
    file {
        path => "/home/admin/logstash/output"
    }
}

Parameters

    access_id: required. The AccessKey ID of your Alibaba Cloud account.
    access_key: required. The AccessKey secret of your Alibaba Cloud account.
    endpoint: required. The endpoint of DataHub.
    project_name: required. The name of the DataHub project.
    topic_name: required. The name of the DataHub topic.
    retry_times: optional. The maximum number of retries allowed. The value -1 indicates unlimited retries. The value 0 indicates no retries. A value greater than 0 indicates the specified number of retries.
    retry_interval(Optional): The interval between two consecutive retries. Unit: seconds.
    shard_ids: optional. The IDs of the shards in which records are to be consumed. If you do not set this parameter, records in all the shards are consumed.
    cursor: optional. The sequence number of the record from which the consumption begins. By default, this parameter is empty, which indicates that the consumption starts from the earliest record.
    pos_file: required. The checkpoint file, which is used to reset the consumption offset.
    enable_pb: optional. Specifies whether to enable Protobuf for data transfer. If you want to disable Protobuf in Apsara Stack, set this parameter to false.
    compress_method: optional. The compression algorithm for data transfer. By default, this parameter is left empty, indicating that compression is not performed. Valid values: lz4 and deflate.
    print_debug_info: optional. Specifies whether to return the debugging information of DataHub. Default value: false. If you set this parameter to true, a large amount of information is returned. The information is used only for debugging.

FAQ

Q: How do I import built-in fields in Logstash to DataHub?

A: Each record in Logstash has an additional field, such as @timestamp. However, DataHub does not support field names that contain special characters. In this case, you can change the original field name by adding a configuration. For example, you can add the following configuration to change the field name @timestamp to column_name:

filter{
    mutate {
      rename => {
        "@timestamp" => "column_name"
      }
    }

References

For more information about the configuration parameters, visit the Logstash official website and see the ELK stack guide.