I. Introduction
Apache Flume is a distributed, reliable and available system, which can efficiently collect, aggregate and move log data from different data sources to a centralized data storage system. ODPS Sink is a Flume plug-in developed based on the ODPS DataHub service, which can import event data of Flume into an ODPS. The plug-in is compatible with Flume's original functions and features, supports user-defined ODPS table partition and can create partitions automatically. II. Environment requirements 1. JDK (1.6 or later; 1.7 recommended) 2. Flume-NG 1.x III. Plug-in deployment 1. Download the ODPS Sink plug-in aliyun-odps-flume-plugin and decompress it. 2. Download Flume-NG 1.x from https://flume.apache.org/download.html. (1) Download apache-flume-1.6.0-bin.tar.gz. (2) Download apache-flume-1.6.0-src.tar.gz. 3. Install Flume. (1) Decompress apache-flume-1.6.0-src.tar.gz and apache-flume-1.6.0-bin.tar.gz. (2) Copy the files in apache-flume-1.6.0-src to apache-flume-1.6.0-bin. 4. Deploy the ODPS Sink plug-in by moving the odps_sink directory to the Apache Flume installation directory: $ mkdir {YOUR_APACHE_FLUME_DIR}/plugins.d $mv odps_sink/ { YOUR_APACHE_FLUME_DIR }/plugins.d/ After moving the directory, verify that the ODPS Sink plug-in resides in the target directory: $ ls { YOUR_APACHE_FLUME_DIR}/plugins.d odps_sink After deployment, set the type field of sink in the Flume configuration file to: com.aliyun.odps.flume.sink.OdpsSink IV. Configuration example Example: Structured data in log files is parsed and uploaded to an ODPS table. The format of the log files to be uploaded is as follows (each line represents one record and fields are separated by a comma): test_basic.log some,log,line1 some,log,line2 ... Step 1: Create an ODPS Datahub table in an ODPS project. The following shows the statements for creating the table: CREATE TABLE hub_table_basic (col1 STRING, col2 STRING) PARTITIONED BY (pt STRING) INTO 1 SHARDS HUBLIFECYCLE 1; Step 2: Create a Flume job configuration file. In the conf/ directory of the Flume installation directory, create the file odps_basic.conf and enter the following content in the file: odps_basic.conf A single-node Flume configuration for ODPS Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log Describe the sink a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID} a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY} a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT} a1.sinks.k1.odps.table = hub_table_basic a1.sinks.k1.odps.partition = 20150814 a1.sinks.k1.batchSize = 100 a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = , a1.sinks.k1.serializer.fieldnames = col1,,col2 a1.sinks.k1.serializer.charset = UTF-8 a1.sinks.k1.shard.number = 1 a1.sinks.k1.shard.maxTimeOut = 60 a1.sinks.k1.autoCreatePartition = true Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 Step 3: Start Flume. Start Flume and specify the agent name and configuration file path. You can use-Dflume.root.logger=INFO, console to output logs in real time to the console. $ cd { YOUR_APACHE_FLUME_DIR} $ bin/flume-ng agent -n a1 -c conf -f conf/odps_basic.conf -Dflume.root.logger=INFO,console If the command execution is successful, information similar to the following is displayed: ... Write success. Event count: 2 ... You can then query the written data in the ODPS Datahub table. Upload data from multiple sources to the ODPS. To upload data from multiple sources to the ODPS, you only need to configure the corresponding sources and channels. The following data uploading methods are supported: (1) Uploading data with multiple sources, one channel and one sink ![]() (2) Uploading data with multiple sources, multiple channels and one sink ![]() (3) Uploading data with multiple sources, channels, sinks and storage locations ![]() (4) Uploading data with multiple agents ![]() The following provides the configuration for method (1): odps_basic.conf A single-node Flume configuration for ODPS Name the components on this agent a1.sources = r1 r2 a1.sinks = k1 a1.channels = c1 Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log Configuration of source 2 a1.sources.r2.type = exec a1.sources.r2.command = cat {YOUR_LOG_DIRECTORY}/test_basic2.log Describe the sink a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID} a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY} a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT} a1.sinks.k1.odps.table = hub_table_basic a1.sinks.k1.odps.partition = 20150814 a1.sinks.k1.batchSize = 100 a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = , a1.sinks.k1.serializer.fieldnames = col1,,col2 a1.sinks.k1.serializer.charset = UTF-8 a1.sinks.k1.shard.number = 1 a1.sinks.k1.shard.maxTimeOut = 60 a1.sinks.k1.autoCreatePartition = true Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 Channel of source 2 a1.sources.r2.channels = c2 Possible problems: 1. A data error occurs in the sink and data cannot be transferred. ![]() The error is caused by a comment above the data. By default, the comment is read and as a result the number of data lines is different from that in the configuration file. You can delete the comment to resolve this problem. 2. An OOM problem exists. Flume reports the following error: java.lang.OutOfMemoryError: GC overhead limit exceeded Or java.lang.OutOfMemoryError: Java heap space Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space During Flume startup, the default maximum heap memory size is 20 MB. The OOM may easily occur in the online environment. Therefore, add the following JVM startup parameter to flume-env.sh: JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit" Include the -c conf argument in the command for starting an agent. Otherwise, the environment variables in flume-env.sh will not be loaded. |
|
1st Reply#
Posted time:Feb 2, 2019 23:15 PM
Good one. But if we have a continuous move of files from source and sink is direct odps table, though it allows a bigger margin of partitions, not sure how much. In the case of continuous movement, I feel that HDFS staging can safeguard, until and unless, odps sink table is rigorously tested with multiple scenarios of workloads.
|
|
|