Use the aliyun-log-flume plugin to integrate Log Service with Flume to write and consume log data.
Background
The aliyun-log-flume plugin integrates Log Service with Flume, allowing it to connect with other data systems like HDFS and Kafka. The plugin provides a sink and a source.
-
sink: Flume reads data from other data sources and writes it to Log Service.
-
source: Flume consumes log data from Log Service and writes it to other systems.
For more information, see aliyun-log-flume.
Procedure
-
Download and install Flume.
For more information, see Flume.
-
Download the aliyun-log-flume plugin and place it in the
<FLUME_HOME>/libdirectory.For more information, see aliyun-log-flume-1.3.jar.
-
In the
<FLUME_HOME>/confdirectory, create a configuration file named flumejob.conf. -
Start Flume.
Sink
Use a sink to write data from other sources to Log Service through Flume. The following two parsing formats are supported:
-
SIMPLE: Writes the entire Flume event as a single field to Log Service.
-
DELIMITED: Treats the entire Flume event as delimited data. The data is then parsed into fields based on the configured column names and written to Log Service.
The following table describes the sink configuration parameters.
|
Parameter |
Required |
Description |
|
type |
Yes |
The class name for the sink. Set the value to |
|
endpoint |
Yes |
The service endpoint of the Project. Example: |
|
project |
Yes |
The name of the Project. |
|
LogStore |
Yes |
The name of the LogStore. |
|
accessKeyId |
Yes |
The AccessKey ID used to identify a user. For security, use the AccessKey pair of a RAM user. For information about how to obtain an AccessKey pair, see AccessKey pair. |
|
accessKey |
Yes |
The AccessKey Secret used to authenticate a user. For security, use the AccessKey pair of a RAM user. For information about how to obtain an AccessKey pair, see AccessKey pair. |
|
batchSize |
No |
The number of data entries to write to Log Service in each batch. Default value: 1000. |
|
maxBufferSize |
No |
The size of the cache queue. Default value: 1000. |
|
serializer |
No |
The serialization format for a Flume event. Valid values:
|
|
columns |
No |
This parameter is required when serializer is set to DELIMITED. Specifies a comma-separated list of column names. The column names must be in the same order as the fields in the data. |
|
separatorChar |
No |
When serializer is set to DELIMITED, this parameter specifies the field separator. The value must be a single character. The default value is a comma (,). |
|
quoteChar |
No |
When serializer is set to DELIMITED, this parameter specifies the quote character. The default value is a double quotation mark ("). |
|
escapeChar |
No |
When serializer is set to DELIMITED, this parameter specifies the escape character. The default value is a double quotation mark ("). |
|
useRecordTime |
No |
Specifies whether to use the timestamp field from the data as the log time. If set to false, the current system time is used. The default value is false. |
For sink configuration examples, see GitHub.
Source
Use a source to consume log data from Log Service and send it to other data sources through Flume. The following two output formats are supported:
-
DELIMITED: Outputs data to Flume in delimited log format.
-
JSON: Outputs data to Flume in JSON log format.
The following table describes the source configuration parameters.
|
Parameter |
Required |
Description |
|
type |
Yes |
The class name for the source. Set the value to |
|
endpoint |
Yes |
The service endpoint of the Project. Example: |
|
project |
Yes |
The name of the Project. |
|
LogStore |
Yes |
The name of the LogStore. |
|
accessKeyId |
Yes |
The AccessKey ID used to identify a user. For security, use the AccessKey pair of a RAM user. For information about how to obtain an AccessKey pair, see AccessKey pair. |
|
accessKey |
Yes |
The AccessKey Secret used to authenticate a user. For security, use the AccessKey pair of a RAM user. For information about how to obtain an AccessKey pair, see AccessKey pair. |
|
heartbeatIntervalMs |
No |
The heartbeat interval between the client and Log Service. The default value is 30,000 milliseconds. |
|
fetchIntervalMs |
No |
The data fetch interval. The default value is 100 milliseconds. |
|
fetchInOrder |
No |
Specifies whether to consume data in sequential order. The default value is false. |
|
batchSize |
No |
The number of data entries to read in each batch. The default value is 100. |
|
consumerGroup |
No |
The name of the consumer group. |
|
initialPosition |
No |
The starting position for data consumption. Valid values are begin, end, and timestamp. The default value is begin. Note
If a server-side checkpoint exists, it takes precedence. |
|
timestamp |
No |
This parameter is required when initialPosition is set to timestamp. Specifies the starting time in UNIX timestamp format. |
|
deserializer |
Yes |
The deserialization format for a Flume event. Valid values:
|
|
columns |
No |
This parameter is required when deserializer is set to DELIMITED. Specifies a comma-separated list of column names. The column names must be in the same order as the fields in the data. |
|
separatorChar |
No |
When deserializer is set to DELIMITED, this parameter specifies the field separator. The value must be a single character. The default value is a comma (,). |
|
quoteChar |
No |
When deserializer is set to DELIMITED, this parameter specifies the quote character. The default value is a double quotation mark ("). |
|
escapeChar |
No |
When deserializer is set to DELIMITED, this parameter specifies the escape character. The default value is a double quotation mark ("). |
|
appendTimestamp |
No |
When deserializer is set to DELIMITED, this parameter specifies whether to automatically append the timestamp as a field to the end of each row. The default value is false. |
|
sourceAsField |
No |
When deserializer is set to JSON, specifies whether to add the log source as a field named |
|
tagAsField |
No |
When deserializer is set to JSON, specifies whether to add log tags as fields. Each tag is added as a separate field with a name in the format |
|
timeAsField |
No |
When deserializer is set to JSON, specifies whether to add the log time as a field named |
|
useRecordTime |
No |
Specifies whether to use the log's original timestamp. If set to false, the current system time is used. The default value is false. |
For source configuration examples, see GitHub.