This topic describes how to configure an interceptor, a channel selector, and a sink processor for Flume in E-MapReduce (EMR).

Interceptors

An interceptor is located between a source and a channel and is used to modify or drop an event. The following figure shows the position of an interceptor in the network. interceptor
The following table describes the types of interceptors.
Type Description
Timestamp interceptor Adds a UNIX timestamp property to an event header.
Host interceptor Adds a host property to an event header.
Static interceptor Adds a fixed key-value pair property to an event header.
Remove header interceptor Drops one or more properties from an event header.
UUID interceptor Sets a universally unique identifier (UUID) in an event. If the application layer does not have a UUID, you can use this interceptor to add a UUID.
Morphline interceptor Uses a morphline configuration file to filter events, modify event headers, or insert event headers.
Search interceptor Uses a Java regular expression to search for event bodies.
Replace interceptor Uses a Java regular expression to replace an event body.
Regex filtering interceptor Filters the event bodies whose configurations match a specified regular expression or the event bodies whose configurations do not match the regular expression.
Examples:
  • Example 1: An event body contains 1:2:3.4foobar5. If you want to configure a regular expression filter, use the following configurations:
    a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
    a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
    a1.sources.r1.interceptors.i1.serializers.s1.name = one
    a1.sources.r1.interceptors.i1.serializers.s2.name = two
    a1.sources.r1.interceptors.i1.serializers.s3.name = three

    The event body remains unchanged but the following headers will be added: one=>1, two=>2, three=>3.

  • Example 2: An event body contains 2012-10-18 18:47:57,614 some log line. If you want to configure a time filter, use the following configurations:
    a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
    a1.sources.r1.interceptors.i1.serializers = s1
    a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
    a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
    a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

    The event body remains unchanged but the timestamp=>1350611220000 header is added.

Channel selector

A channel selector is used to select a channel if one source maps to multiple channels. The following figure shows the position of a channel selector in the network. Selector
Flume has a built-in replicating selector and a built-in multiplexing selector. By default, a replicating selector is used. A replicating selector sends all events to each channel. A multiplexing selector sends events based on specific rules. Example:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

In this example, r1 selectively sends events to the channels c1, c2, c3, and c4. If the value of the state property of a header is CZ, r1 sends events to c1. If the value of the state property of a header is US, r1 sends events to c2 and c3. In other scenarios, r1 sends events to c4.

Sink processor

You can view the position of a sink processor in Flume in Channel selector.

A sink processor is used when multiple sinks consume data in one channel queue. The sink processor sets the work mode of the sinks to load balancing or failover. By default, sinks and channels have a one-to-one mapping relationship. If the work mode is load balancing, events are distributed to sinks based on the specified load balancing mechanism. If the work mode is failover, one of the sinks works as the primary sink and the other sinks work as the secondary sinks. If the primary sink does not run as expected, events are switched to a secondary sink without business interruption. Selector
Examples:
  • Example 1: The work mode is failover.
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    In this example, sinks k1 and k2 are used. The weight of k1 is 5 and the weight of k2 is 10. The maximum failover time is 10,000 milliseconds.

  • Example 2: The work mode is load balancing.
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random

    In this example, sinks k1 and k2 are used. You can randomly share loads between the sinks. You can also use the round_robin method to share loads between the sinks. In this example, the a1.sinkgroups.g1.processor.backoff parameter specifies whether to exponentially back off failed sinks. If this parameter is set to true, the sink processor masks the failed sinks.