Gordon
Assistant Engineer
Assistant Engineer
  • UID622
  • Fans3
  • Follows0
  • Posts52
Reads:48065Replies:1

[Share]Importing Flume Data into an ODPS

Created#
More Posted time:Sep 20, 2016 17:24 PM
I. Introduction
Apache Flume is a distributed, reliable and available system, which can efficiently collect, aggregate and move log data from different data sources to a centralized data storage system.
ODPS Sink is a Flume plug-in developed based on the ODPS DataHub service, which can import event data of Flume into an ODPS. The plug-in is compatible with Flume's original functions and features, supports user-defined ODPS table partition and can create partitions automatically.
II. Environment requirements
1. JDK (1.6 or later; 1.7 recommended)
2. Flume-NG 1.x
III. Plug-in deployment
1. Download the ODPS Sink plug-in aliyun-odps-flume-plugin and decompress it.
2. Download Flume-NG 1.x from https://flume.apache.org/download.html.
(1) Download apache-flume-1.6.0-bin.tar.gz.
(2) Download apache-flume-1.6.0-src.tar.gz.
3. Install Flume.
(1) Decompress apache-flume-1.6.0-src.tar.gz and apache-flume-1.6.0-bin.tar.gz.
(2) Copy the files in apache-flume-1.6.0-src to apache-flume-1.6.0-bin.
4. Deploy the ODPS Sink plug-in by moving the odps_sink directory to the Apache Flume installation directory:
$ mkdir {YOUR_APACHE_FLUME_DIR}/plugins.d
$mv odps_sink/ { YOUR_APACHE_FLUME_DIR }/plugins.d/
After moving the directory, verify that the ODPS Sink plug-in resides in the target directory:
$ ls { YOUR_APACHE_FLUME_DIR}/plugins.d
odps_sink
After deployment, set the type field of sink in the Flume configuration file to:
com.aliyun.odps.flume.sink.OdpsSink
IV. Configuration example
Example: Structured data in log files is parsed and uploaded to an ODPS table.
The format of the log files to be uploaded is as follows (each line represents one record and fields are separated by a comma):
test_basic.log
some,log,line1
some,log,line2
...
Step 1: Create an ODPS Datahub table in an ODPS project.
The following shows the statements for creating the table:
CREATE TABLE hub_table_basic (col1 STRING, col2 STRING)
PARTITIONED BY (pt STRING)
INTO 1 SHARDS
HUBLIFECYCLE 1;
Step 2: Create a Flume job configuration file.
In the conf/ directory of the Flume installation directory, create the file odps_basic.conf and enter the following content in the file:
odps_basic.conf
A single-node Flume configuration for ODPS
Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log
Describe the sink
a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api
a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com
a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
a1.sinks.k1.odps.table = hub_table_basic
a1.sinks.k1.odps.partition = 20150814
a1.sinks.k1.batchSize = 100
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = col1,,col2
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
a1.sinks.k1.autoCreatePartition = true
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Step 3: Start Flume.
Start Flume and specify the agent name and configuration file path. You can use-Dflume.root.logger=INFO, console to output logs in real time to the console.
$ cd { YOUR_APACHE_FLUME_DIR}
$ bin/flume-ng agent -n a1 -c conf -f conf/odps_basic.conf -Dflume.root.logger=INFO,console
If the command execution is successful, information similar to the following is displayed:
...
Write success. Event count: 2
...
You can then query the written data in the ODPS Datahub table.
Upload data from multiple sources to the ODPS.
To upload data from multiple sources to the ODPS, you only need to configure the corresponding sources and channels. The following data uploading methods are supported:
(1) Uploading data with multiple sources, one channel and one sink


(2) Uploading data with multiple sources, multiple channels and one sink


(3) Uploading data with multiple sources, channels, sinks and storage locations


(4) Uploading data with multiple agents


The following provides the configuration for method (1):
odps_basic.conf
A single-node Flume configuration for ODPS
Name the components on this agent
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log
Configuration of source 2
a1.sources.r2.type = exec
a1.sources.r2.command = cat {YOUR_LOG_DIRECTORY}/test_basic2.log
Describe the sink
a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api
a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com
a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
a1.sinks.k1.odps.table = hub_table_basic
a1.sinks.k1.odps.partition = 20150814
a1.sinks.k1.batchSize = 100
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = col1,,col2
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
a1.sinks.k1.autoCreatePartition = true
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Channel of source 2
a1.sources.r2.channels = c2
Possible problems:
1. A data error occurs in the sink and data cannot be transferred.


The error is caused by a comment above the data. By default, the comment is read and as a result the number of data lines is different from that in the configuration file. You can delete the comment to resolve this problem.
2. An OOM problem exists.
Flume reports the following error:
java.lang.OutOfMemoryError: GC overhead limit exceeded
Or
java.lang.OutOfMemoryError: Java heap space
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space
During Flume startup, the default maximum heap memory size is 20 MB. The OOM may easily occur in the online environment. Therefore, add the following JVM startup parameter to flume-env.sh:
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
Include the -c conf argument in the command for starting an agent. Otherwise, the environment variables in flume-env.sh will not be loaded.

Raja_KT
Architect
Architect
  • UID6384
  • Fans6
  • Follows3
  • Posts554
1st Reply#
Posted time:Feb 2, 2019 23:15 PM
Good one. But if we have a continuous move of files from source and sink is direct odps table, though it allows a bigger margin of partitions, not sure how much. In the case of continuous movement, I feel that HDFS staging can safeguard, until and unless, odps sink table is rigorously tested with multiple scenarios of workloads.
Street children suffer not because of their fault. We can help them if we want.Contact me.
Guest