This topic describes how to configure an interceptor, a channel selector, and a sink processor for Flume in E-MapReduce (EMR).
Interceptors
Type | Description |
---|---|
Timestamp interceptor | Adds a UNIX timestamp property to an event header. |
Host interceptor | Adds a host property to an event header. |
Static interceptor | Adds a fixed key-value pair property to an event header. |
Remove header interceptor | Drops one or more properties from an event header. |
UUID interceptor | Sets a universally unique identifier (UUID) in an event. If the application layer does not have a UUID, you can use this interceptor to add a UUID. |
Morphline interceptor | Uses a morphline configuration file to filter events, modify event headers, or insert event headers. |
Search interceptor | Uses a Java regular expression to search for event bodies. |
Replace interceptor | Uses a Java regular expression to replace an event body. |
Regex filtering interceptor | Filters the event bodies whose configurations match a specified regular expression or the event bodies whose configurations do not match the regular expression. |
- Example 1: An event body contains
1:2:3.4foobar5
. If you want to configure a regular expression filter, use the following configurations:a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
The event body remains unchanged but the following headers will be added:
one=>1, two=>2, three=>3
. - Example 2: An event body contains
2012-10-18 18:47:57,614 some log line
. If you want to configure a time filter, use the following configurations:a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
The event body remains unchanged but the
timestamp=>1350611220000
header is added.
Channel selector
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
In this example, r1 selectively sends events to the channels c1, c2, c3, and c4. If the value of the state property of a header is CZ, r1 sends events to c1. If the value of the state property of a header is US, r1 sends events to c2 and c3. In other scenarios, r1 sends events to c4.
Sink processor
You can view the position of a sink processor in Flume in Channel selector.
- Example 1: The work mode is failover.
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
In this example, sinks k1 and k2 are used. The weight of k1 is 5 and the weight of k2 is 10. The maximum failover time is 10,000 milliseconds.
- Example 2: The work mode is load balancing.
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
In this example, sinks k1 and k2 are used. You can randomly share loads between the sinks. You can also use the round_robin method to share loads between the sinks. In this example, the
a1.sinkgroups.g1.processor.backoff
parameter specifies whether to exponentially back off failed sinks. If this parameter is set to true, the sink processor masks the failed sinks.