All Products
Search
Document Center

DataHub plug-ins for Logstash

Last Updated: Aug 25, 2021

DataHub plug-ins for Logstash

Logstash


Logstash is a distributed log collection framework. It contains simple but strong logic and is often used with Elasticsearch and Kibana, known as the ELK Stack, for log data analysis. To support a wider variety of data inputs, DataHub offers output and input plug-ins for data transfer with Logstash. You can access more than 30 types of data sources, such as files, Syslog logs, Redis logs, Log4j logs, Apache logs, and NGINX logs, in the Logstash open source community by using Logstash. Logstash also supports filter plug-ins for customizing the fields to be transferred. This topic describes how to use Logstash to import log data to or export log data from DataHub with efficiency.

Note

The DataHub output and input plug-ins for Logstash comply with the Apache License 2.0.

Install Logstash and the plug-ins

Requirement

Java Runtime Environment (JRE) 7 or later is required to run Logstash. If the JRE version does not meet the requirement, specific features of Logstash are unavailable.

Installation methods

Alibaba Cloud provides the following two methods to install Logstash and the plug-ins:

  • Complete installation

    You can use the complete installation package to install both Logstash and the DataHub plug-ins for Logstash with a few clicks.

    Run the following commands to decompress the package:

          $ tar -xzvf logstash-with-datahub-6.4.0-1.0.10.tar.gz
          $ cd logstash-with-datahub-6.4.0

    After the package is decompressed, Logstash and the plug-ins are automatically installed.

  • Separate installation

    Install Logstash: For more information about how to install Logstash, see Logstash Reference. The latest version of Logstash requires Java 7 or later.

    Install the DataHub plug-ins for Logstash: Download and install the plug-ins that you require.

    Download the DataHub output plug-in for Logstash. You can use this plug-in to import data to DataHub.

    Download the DataHub input plug-in for Logstash. You can use this plug-in to export data from DataHub.

    Run the following commands to install the output and input plug-ins:

    $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-output-datahub-1.0.10.gem
    $ {LOG_STASH_HOME}/bin/logstash-plugin install --local logstash-input-datahub-1.0.10.gem

    An error similar to the following error may occur during installation:

    WARNING: can not set Session#timeout=(0) no session context

    In this case, check whether your server can access the Internet. If the server can access the Internet, you can use an image source in mainland China. For more information, visit https://gems.ruby-china.com/.

  • Offline installation

If your online server cannot access the Internet, you cannot install the DataHub plug-ins for Logstash by using the preceding methods. You must build an offline installation package because an offline installation package is dedicated to a specific version of Logstash. We recommend that you first install the plug-ins on a test server by using the preceding methods, refer to Offline Plugin Management to build an offline installation package, and then upload it to your online server for offline installation.

Use the DataHub output plug-in for Logstash to import data to DataHub

Import Log4j logs to DataHub

This example shows how to collect unstructured Log4j logs and derive a structure out of the logs by using Logstash. The following code shows a sample Log4j log:

20:04:30.359 [qtp1453606810-20] INFO  AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

A structure needs to be derived out of the preceding Log4j log and then imported to DataHub. The following table describes the schema of the DataHub topic to which the Log4j log is to be imported.

Parameter

Data type

request_time

STRING

thread_id

STRING

log_level

STRING

class_name

STRING

request_id

STRING

detail

STRING

Use the following configuration of the Logstash task in this example:

input {
    file {
        # Forward slashes (/) rather than backslashes (\) are used in Windows.
        path => "${APP_HOME}/log/bayes.log"
        start_position => "beginning"
    }
}

filter{
    grok {
        match => {
           "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)"
        }
    }
}

output {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Import CSV files to DataHub

This example shows how to use Logstash to collect CSV files. The following code shows a sample CSV file:

1111,1.23456789012E9,true,14321111111000000,string_dataxxx0,
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1

The following table describes the schema of the DataHub topic to which the CSV file is to be imported.

Parameter

Data type

col1

BIGINT

col2

DOUBLE

col3

BOOLEAN

col4

TIMESTAMP

col5

STRING

Use the following configuration of the Logstash task in this example:

input {
    file {
        path => "${APP_HOME}/data.csv"
        start_position => "beginning"
    }
}

filter{
    csv {
        columns => ['col1', 'col2', 'col3', 'col4', 'col5']
    }
}

output {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Import JSON files to DataHub

One-to-one synchronization: imports each JSON file to one topic

The following code shows a sample JSON file named test_topic_22.json in the /home/software/data/22/ directory:

[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]

Configuration file

input {
    file {
        path => "/home/software/data/22/test_topic_22.json"
        start_position => "beginning"
        # The storage path of the synchronization offset.
                sincedb_path => "/home/software/sincedb/filedatahub"
        codec => json {
            charset => "UTF-8"
        }
    }
}

filter{
    mutate {
      rename => {
        "col1"    =>    "col1"
        "col2"    =>    "col2"
      }
    }

}
output {

   datahub {
      access_id => "xxxx"
      access_key => "xxxx"
      endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
      project_name => "test_dh1"
      topic_name => "topic_test_2"
      #shard_id => "0"
      #shard_keys => ["thread_id"]
      dirty_data_continue => true
      dirty_data_file => "/home/software/dirty_vehIdInfo.data"
      dirty_data_file_max_size => 1000
   }

}

Run the following command and view the results:

bash bin/logstash -f config/logstash.conf

Many-to-many synchronization: imports multiple JSON files to multiple topics

The following code shows a sample JSON file named test_topic_22.json in the /home/software/data/22/ directory:

[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]

The following code shows a sample JSON file named test_topic.json in the /home/software/data/11/ directory:

[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]
[{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"},{"col1":"aaaaa","col2":"bbbbbb"}]

Configuration file

input {
    file {
        path => "/home/software/data/**/*.json"
        start_position => "beginning"
        # The storage path of the synchronization offset.
                sincedb_path => "/home/software/sincedb/filedatahub"
        codec => json {
            charset => "UTF-8"
        }
    }
}

filter{
  # Use pathes to distinguish between the two JSON files.
    if "22"  in [path] {
        mutate {
          rename => {
            "col1"        =>    "col1"
            "col2"        =>    "col2"
          }
        }
    }
    if "11" in [path] {
        mutate {
          rename => {
            "col1"    =>    "col1"
            "col2"    =>    "col2"
          }
        }
    }


}
output {

    if "22"  in [path] {
      datahub {
        access_id => "xxxxx"
        access_key => "xxxxx"
        endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
        project_name => "test_dh1"
        topic_name => "topic_test_2"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/home/software/dirty_vehIdInfo.data"
        dirty_data_file_max_size => 1000
      }
    }

    if "11"  in [path] {
      datahub {
        access_id => "xxxxx"
        access_key => "xxxxx"
        endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
        project_name => "test_dh1"
        topic_name => "topic_test"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/home/software/dirty.data"
        dirty_data_file_max_size => 1000
    }                    
    }


}

Run the following command and view the results:

bash bin/logstash -f config/logstash.conf
log_001

One-to-many synchronization: imports each JSON file to different topics based on a key

[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222},{"col1":"aaaaa","col2":222}]
[{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]
[{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"},{"col1":"bbbbb","col2":"ccccc"}]

Configuration file

input {
    file {
        path => "/home/software/data/22/test_topic_22.json"
        start_position => "beginning"
    sincedb_path => "/home/software/sincedb/filedatahub"
        codec => json {
            charset => "UTF-8"
        }
    }
}

filter{
    mutate {
      rename => {
        "col1"    =>    "col1"
        "col2"    =>    "col2"
      }
    }

}
output {

     # Import the JSON file to different topics based on the value of the col1 parameter.
   if[col1] == "aaaaa" {
     datahub {
        access_id => "xxxxx"
        access_key => "xxxxx"
        endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
        project_name => "test_dh1"
        topic_name => "topic_test_2"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/home/software/dirty_vehIdInfo.data"
        dirty_data_file_max_size => 1000
     }
   }
   else {
     datahub {
          access_id => "xxxxx"
          access_key => "xxxxx"
          endpoint => "http://dh-cn-shanghai-int-vpc.aliyuncs.com"
          project_name => "test_dh1"
          topic_name => "topic_test"
          #shard_id => "0"
          #shard_keys => ["thread_id"]
          dirty_data_continue => true
          dirty_data_file => "/home/software/dirty_vehIdInfo.data"
          dirty_data_file_max_size => 1000
     }
   }

}

Run the following command and view the results:

bash bin/logstash -f config/logstash.conf

Synchronize MySQL data to Logstash

Configuration file

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.47.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://xxxx:3306/databaese"
    jdbc_user => "username"
    jdbc_password => "password"
    use_column_value => true
    tracking_column => "modifytime"
    schedule => "* * * * *"
    statement => "select * from suyang_test"
   }
}

output {
      datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

Start Logstash

Run the following command to start Logstash:

logstash -f <The path of the preceding configuration file>

The batch_size parameter specifies the number of records to be sent to DataHub at a time. The default value is 125. Set the batch_size parameter to start Logstash.

logstash -f <The path of the preceding configuration file> -b 256

Parameters

The following part describes the parameters in the preceding configuration file:

    access_id: required. The AccessKey ID of your Alibaba Cloud account.
    access_key: required. The AccessKey secret of your Alibaba Cloud account.
    endpoint: required. The endpoint of DataHub.
    project_name: required. The name of the DataHub project.
    topic_name: required. The name of the DataHub topic.
    retry_times: optional. The maximum number of retries allowed. A value of -1 indicates unlimited retries. A value of 0 indicates no retries. A value greater than 0 indicates the specified number of retries. Default value: -1.
    retry_interval: optional. The interval between two consecutive retries. Unit: seconds. Default value: 5.
    skip_after_retry: optional. Specifies whether to skip the import of data in the current batch if the number of retries initiated by a DataHub exception exceeds the value of the retry_times parameter. Default value: false.
    approximate_request_bytes: optional. The approximate number of bytes that can be sent in each request. The default value is 2048576, which indicates 2 MB. This parameter is used to prevent a request from being rejected if the request body is oversized.
    shard_keys: optional. The keys of the shards. The hash value of a key is used to map the ID of the shard to which a record is imported. The value of this parameter is an array. If the shard_keys and shard_id parameters are not set, the system polls the shards to determine to which shard the records are imported.
    shard_id: optional. The ID of the shard to which records are imported. If the shard_keys and shard_id parameters are not set, the system polls the shards to determine to which shard the records are imported.
    dirty_data_continue: optional. Specifies whether to ignore dirty records. A value of true indicates that dirty records are to be ignored. Default value: false. If you set this parameter to true, you must set the dirty_data_file parameter.
    dirty_data_file: optional. The name of the file that stores dirty records. If you set the dirty_data_continue parameter to true, you must set this parameter. The file is divided into .part 1 and .part 2. The most recent records are stored in .part 2.
    dirty_data_file_max_size: optional. The maximum size of the file that stores dirty records. This value is only for reference.
    enable_pb: optional. Specifies whether to enable Protocol Buffers (Protobuf) for data transfer. If you want to disable Protobuf in Apsara Stack, set this parameter to false.

Use the DataHub input plug-ins for Logstash to export data from DataHub

Export data from DataHub

Use the following configuration of the Logstash task in this example:

input {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "test_project"
        topic_name => "test_topic"
        interval=> 5
        #cursor => {
        #    "0"=>"20000000000000000000000003110091"
        #    "2"=>"20000000000000000000000003110091"
        #    "1"=>"20000000000000000000000003110091"
        #    "4"=>"20000000000000000000000003110091"
        #    "3"=>"20000000000000000000000003110000"
        #}
        shard_ids => []
        pos_file => "/home/admin/logstash/pos_file"
    }
}

output {
    file {
        path => "/home/admin/logstash/output"
    }
}

Parameters

    access_id: required. The AccessKey ID of your Alibaba Cloud account.
    access_key: required. The AccessKey secret of your Alibaba Cloud account.
    endpoint: required. The endpoint of DataHub.
    project_name: required. The name of the DataHub project.
    topic_name: required. The name of the DataHub topic.
    retry_times: optional. The maximum number of retries allowed. A value of -1 indicates unlimited retries. A value of 0 indicates no retries. A value greater than 0 indicates the specified number of retries.
    retry_interval(Optional): The interval between two consecutive retries. Unit: seconds.
    shard_ids: optional. The IDs of the shards in which records are to be consumed. If you do not set this parameter, records in all the shards are consumed.
    cursor: optional. The sequence number of the record from which the consumption begins. By default, this parameter is empty, which indicates that the consumption starts from the earliest record.
    pos_file: required. The checkpoint file, which is used to reset the consumption offset.
    enable_pb: optional. Specifies whether to enable Protobuf for data transfer. If you want to disable Protobuf in Apsara Stack, set this parameter to false.
    compress_method: optional. The compression algorithm for data transfer. By default, this parameter is empty. Valid values: lz4 and deflate.
    print_debug_info: optional. Specifies whether to return the debugging information of DataHub. Default value: false. If you set this parameter to true, a large amount of information is returned. The information is used only for debugging.

FAQ

Q: How do I import built-in fields in Logstash to DataHub?

A: Each record in Logstash has an additional field, such as @timestamp. However, DataHub does not support field names that contain special characters. In this case, you can change the original field name by adding a configuration. For example, you can add the following configuration to change the field name @timestamp to column_name:

filter{
    mutate {
      rename => {
        "@timestamp" => "column_name"
      }
    }

References

For more information about the configuration parameters, visit the Logstash official website and see ELK Stack Chinese guide.