DataHub plug-ins for Logstash
Logstash
Logstash is a distributed log collection framework. It contains simple but strong logic and is often used with Elasticsearch and Kibana for log data analysis. Logstash, Elasticsearch, and Kibana are known as the ELK stack. To support a wider variety of data inputs, DataHub offers output and input plug-ins for data transfer with Logstash. Logstash supports more than 30 types of data sources, such as files, Syslog logs, Redis logs, Log4j logs, Apache logs, and NGINX logs, in the Logstash open source community. Logstash also supports filter plug-ins for customizing the fields to be transferred. This topic describes how to use Logstash to import log data to or export log data from DataHub with efficiency.
The DataHub output and input plug-ins for Logstash comply with the Apache License 2.0.
Install Logstash and the plug-ins
Requirement
Java Runtime Environment (JRE) 7 or later is required to run Logstash. If the JRE version does not meet the requirement, specific features of Logstash are unavailable.
Installation methods
Alibaba Cloud provides the following two methods to install Logstash and the plug-ins:
Complete installation
You can use the complete installation package to install both Logstash and the DataHub plug-ins for Logstash with a few clicks.
The following installation package is applicable to Logstash 6 and Logstash 7:
tar -xzvf logstash-with-datahub-6.4.0-1.0.10.tar.gz cd logstash-with-datahub-6.4.0
The following installation package is applicable to Logstash 8:
tar -xzvf logstash-with-datahub-8.10.3-1.0.12.tar.gz cd logstash-with-datahub-8.10.3
Separate installation
Install Logstash: For more information about how to install Logstash, see Logstash Reference. The latest version of Logstash requires Java 7 or later.
Install the DataHub plug-ins for Logstash: Download and install the plug-ins that you require.
To import data to DataHub, use the DataHub output plug-in for Logstash.
logstash-output-datahub-1.0.10.gem is applicable to Logstash 6 and Logstash 7.
logstash-output-datahub-1.0.12.gem is applicable to Logstash 8.
To export data from DataHub, use the DataHub input plug-in for Logstash.
logstash-input-datahub-1.0.10.gem is applicable to Logstash 6 and Logstash 7.
logstash-input-datahub-1.0.12.gem is applicable to Logstash 8.
Run the following commands to install the output and input plug-ins:
$ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-output-datahub-1.0.10.gem $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-input-datahub-1.0.10.gem
An error similar to the following error may occur during installation:
WARNING: can not set Session#timeout=(0) no session context
In this case, check whether your server can access the Internet. If the server can access the Internet, you can use an image source in the Chinese mainland. For more information, visit
https://gems.ruby-china.com/
.Offline installation
If your server cannot access the Internet, you cannot install the DataHub plug-ins for Logstash by using the preceding methods. In this case, you must build an offline installation package for the Logstash version that you require. We recommend that you first install the plug-ins on a test server by using the preceding methods, refer to Offline Plugin Management to build an offline installation package, and then upload it to your online server for offline installation.
Use the DataHub output plug-in for Logstash to import data to DataHub
Import Log4j logs to DataHub
This example shows how to collect unstructured Log4j logs and parse the logs into structured data by using Logstash. The following code shows a sample Log4j log:
20:04:30.359 [qtp1453606810-20] INFO AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms
If you want to collect and parse the preceding Log4j log and import the structured data after parsing to DataHub, configure the schema of the DataHub topic and the Logstash task based on the descriptions in this section.
Parameter | Data type |
request_time | STRING |
thread_id | STRING |
log_level | STRING |
class_name | STRING |
request_id | STRING |
detail | STRING |
The following code shows the configuration of the Logstash task:
input {
file {
# Forward slashes (/) rather than backslashes (\) are used in Windows.
path => "${APP_HOME}/log/bayes.log"
start_position => "beginning"
}
}
filter{
grok {
match => {
"message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)"
}
}
}
output {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "project"
topic_name => "topic"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/Users/ph0ly/trash/dirty.data"
dirty_data_file_max_size => 1000
}
}
Import CSV files to DataHub
This example shows how to use Logstash to collect CSV files. The following code shows a sample CSV file:
1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1
The following table describes the schema of the DataHub topic to which the CSV file is to be imported.
Parameter | Data type |
col1 | BIGINT |
col2 | DOUBLE |
col3 | BOOLEAN |
col4 | TIMESTAMP |
col5 | STRING |
The following code shows the configuration of the Logstash task:
input {
file {
path => "${APP_HOME}/data.csv"
start_position => "beginning"
}
}
filter{
csv {
columns => ['col1', 'col2', 'col3', 'col4', 'col5']
}
}
output {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "project"
topic_name => "topic"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/Users/ph0ly/trash/dirty.data"
dirty_data_file_max_size => 1000
}
}
Import JSON files to DataHub
One-to-one synchronization: imports one JSON file to one topic
The following code shows a sample JSON file named test_topic_22.json in the /home/software/data/22 directory:
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
The following code shows the configuration of the Logstash task:
input {
file {
path => "/home/software/data/22/test_topic_22.json"
start_position => "beginning"
# The storage path of the synchronization offset.
sincedb_path => "/home/software/sincedb/filedatahub"
codec => json {
charset => "UTF-8"
}
}
}
filter{
mutate {
rename => {
"col1" => "col1"
"col2" => "col2"
}
}
}
output {
datahub {
access_id => "xxxx"
access_key => "xxxx"
endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
project_name => "test_dh1"
topic_name => "topic_test_2"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/home/software/dirty_vehIdInfo.data"
dirty_data_file_max_size => 1000
}
}
Run the following command and view the results:
bash bin/logstash -f config/logstash.conf
Many-to-many synchronization: imports multiple JSON files to multiple topics
The following code shows a sample JSON file named test_topic_22.json in the /home/software/data/22 directory:
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
The following code shows a sample JSON file named test_topic.json in the /home/software/data/11 directory:
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
The following code shows the configuration of the Logstash task:
input {
file {
path => "/home/software/data/**/*.json"
start_position => "beginning"
# The storage path of the synchronization offset.
sincedb_path => "/home/software/sincedb/filedatahub"
codec => json {
charset => "UTF-8"
}
}
}
filter{
# Use paths to distinguish between the two JSON files.
if "22" in [path] {
mutate {
rename => {
"col1" => "col1"
"col2" => "col2"
}
}
}
if "11" in [path] {
mutate {
rename => {
"col1" => "col1"
"col2" => "col2"
}
}
}
}
output {
if "22" in [path] {
datahub {
access_id => "xxxxx"
access_key => "xxxxx"
endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
project_name => "test_dh1"
topic_name => "topic_test_2"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/home/software/dirty_vehIdInfo.data"
dirty_data_file_max_size => 1000
}
}
if "11" in [path] {
datahub {
access_id => "xxxxx"
access_key => "xxxxx"
endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
project_name => "test_dh1"
topic_name => "topic_test"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/home/software/dirty.data"
dirty_data_file_max_size => 1000
}
}
}
Run the following command and view the results:
bash bin/logstash -f config/logstash.conf
One-to-many synchronization: imports one JSON file to different topics based on a key
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]
[{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]
The following code shows the configuration of the Logstash task:
input {
file {
path => "/home/software/data/22/test_topic_22.json"
start_position => "beginning"
sincedb_path => "/home/software/sincedb/filedatahub"
codec => json {
charset => "UTF-8"
}
}
}
filter{
mutate {
rename => {
"col1" => "col1"
"col2" => "col2"
}
}
}
output {
# Import the JSON file to different topics based on the value of the col1 parameter.
if[col1] == "aaaaa" {
datahub {
access_id => "xxxxx"
access_key => "xxxxx"
endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
project_name => "test_dh1"
topic_name => "topic_test_2"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/home/software/dirty_vehIdInfo.data"
dirty_data_file_max_size => 1000
}
}
else {
datahub {
access_id => "xxxxx"
access_key => "xxxxx"
endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
project_name => "test_dh1"
topic_name => "topic_test"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/home/software/dirty_vehIdInfo.data"
dirty_data_file_max_size => 1000
}
}
}
Run the following command and view the results:
bash bin/logstash -f config/logstash.conf
Synchronize MySQL data to Logstash
The following code shows the configuration of the Logstash task:
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://xxxx:3306/databaese"
jdbc_user => "username"
jdbc_password => "password"
use_column_value => true
tracking_column => "modifytime"
schedule => "* * * * *"
statement => "select * from suyang_test"
}
}
output {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "project"
topic_name => "topic"
#shard_id => "0"
#shard_keys => ["thread_id"]
dirty_data_continue => true
dirty_data_file => "/Users/ph0ly/trash/dirty.data"
dirty_data_file_max_size => 1000
}
}
Start Logstash
Run the following command to start Logstash:
logstash -f <The path of the preceding configuration file>
The batch_size parameter specifies the number of records to be sent to DataHub at a time. The default value is 125. You can run the following command with the batch_size parameter set to a proper value to start Logstash.
logstash -f <The path of the preceding configuration file> -b 256
Parameters
The following part describes the parameters in the preceding configuration file:
access_id: required. The AccessKey ID of your Alibaba Cloud account.
access_key: required. The AccessKey secret of your Alibaba Cloud account.
endpoint: required. The endpoint of DataHub.
project_name: required. The name of the DataHub project.
topic_name: required. The name of the DataHub topic.
retry_times: optional. The maximum number of retries allowed. The value -1 indicates unlimited retries. The value 0 indicates no retries. A value greater than 0 indicates the specified number of retries. Default value: -1.
retry_interval: optional. The interval between two consecutive retries. Unit: seconds. Default value: 5.
skip_after_retry: optional. Specifies whether to skip the import of data in the current batch if the number of retries initiated by a DataHub exception exceeds the value of the retry_times parameter. Default value: false.
approximate_request_bytes: optional. The approximate number of bytes that can be sent in each request. The default value is 2048576, which indicates 2 MB. This parameter is used to prevent a request from being rejected if the request body is oversized.
shard_keys: optional. The keys of the shards. The hash value of a key is used to map the ID of the shard to which a record is imported. The value of this parameter is an array. If the shard_keys and shard_id parameters are not set, the system polls the shards to determine to which shard the records are imported.
shard_id: optional. The ID of the shard to which records are imported. If the shard_keys and shard_id parameters are not set, the system polls the shards to determine to which shard the records are imported.
dirty_data_continue: optional. Specifies whether to ignore dirty records. The value true indicates that dirty records are to be ignored. Default value: false. If you set this parameter to true, you must set the dirty_data_file parameter.
dirty_data_file: optional. The name of the file that stores dirty records. If you set the dirty_data_continue parameter to true, you must set this parameter. The file is divided into .part 1 and .part 2. The most recent records are stored in .part 2.
dirty_data_file_max_size: optional. The maximum size of the file that stores dirty records. This value is only for reference.
enable_pb: optional. Specifies whether to enable Protocol Buffers (Protobuf) for data transfer. If you want to disable Protobuf in Apsara Stack, set this parameter to false.
Use the DataHub input plug-in for Logstash to export data from DataHub
Export data from DataHub
Use the following configuration of the Logstash task in this example:
input {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "test_project"
topic_name => "test_topic"
interval=> 5
#cursor => {
# "0"=>"20000000000000000000000003110091"
# "2"=>"20000000000000000000000003110091"
# "1"=>"20000000000000000000000003110091"
# "4"=>"20000000000000000000000003110091"
# "3"=>"20000000000000000000000003110000"
#}
shard_ids => []
pos_file => "/home/admin/logstash/pos_file"
}
}
output {
file {
path => "/home/admin/logstash/output"
}
}
Parameters
access_id: required. The AccessKey ID of your Alibaba Cloud account.
access_key: required. The AccessKey secret of your Alibaba Cloud account.
endpoint: required. The endpoint of DataHub.
project_name: required. The name of the DataHub project.
topic_name: required. The name of the DataHub topic.
retry_times: optional. The maximum number of retries allowed. The value -1 indicates unlimited retries. The value 0 indicates no retries. A value greater than 0 indicates the specified number of retries.
retry_interval(Optional): The interval between two consecutive retries. Unit: seconds.
shard_ids: optional. The IDs of the shards in which records are to be consumed. If you do not set this parameter, records in all the shards are consumed.
cursor: optional. The sequence number of the record from which the consumption begins. By default, this parameter is empty, which indicates that the consumption starts from the earliest record.
pos_file: required. The checkpoint file, which is used to reset the consumption offset.
enable_pb: optional. Specifies whether to enable Protobuf for data transfer. If you want to disable Protobuf in Apsara Stack, set this parameter to false.
compress_method: optional. The compression algorithm for data transfer. By default, this parameter is left empty, indicating that compression is not performed. Valid values: lz4 and deflate.
print_debug_info: optional. Specifies whether to return the debugging information of DataHub. Default value: false. If you set this parameter to true, a large amount of information is returned. The information is used only for debugging.
FAQ
Q: How do I import built-in fields in Logstash to DataHub?
A: Each record in Logstash has an additional field, such as @timestamp. However, DataHub does not support field names that contain special characters. In this case, you can change the original field name by adding a configuration. For example, you can add the following configuration to change the field name @timestamp to column_name:
filter{
mutate {
rename => {
"@timestamp" => "column_name"
}
}
References
For more information about the configuration parameters, visit the Logstash official website and see the ELK stack guide.