Stream data from an E-MapReduce (EMR) Kafka cluster to OSS-HDFS using Apache Flume. This guide shows you how to configure a Flume agent with a Kafka Source and an OSS-HDFS Sink, then verify that data flows end to end.
Prerequisites
Before you begin, ensure that you have:
OSS-HDFS enabled and authorized. See Enable the OSS-HDFS service
A DataLake cluster created with the Flume service selected. See Create a cluster
A DataFlow cluster created with the Kafka service selected. See Create a cluster
Configure Flume
Go to the Flume configuration page
Log on to the EMR console.
In the left-side navigation pane, click EMR on ECS.
In the top menu bar, select a region and resource group.
On the EMR on ECS page, click Cluster Service in the Actions column for your cluster.
On the Cluster Service tab, click Configure in the FLUME service section.
Set the maximum JVM heap size
Flume consumes significant Java Virtual Machine (JVM) memory when writing to OSS-HDFS. Increase the Xmx value to avoid out-of-memory errors.
Click the flume-env.sh tab.
The steps below describe the global configuration method. To configure by node instead, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.
Update the JAVA_OPTS parameter. To set the maximum JVM heap to 1 GB, set the value to
-Xmx1g.Click Save.
Configure flume-conf.properties
Click the flume-conf.properties tab.
The steps below describe the global configuration method. To configure by node instead, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.
Add the following configuration to the editor.
ImportantThe agent name
default-agentin the configuration must match the agent_name parameter value on the Configuration page of the FLUME service.# Agent topology default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 # Source: Kafka default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group # Sink: OSS-HDFS default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path} default-agent.sinks.k1.hdfs.fileType = DataStream # Channel: memory buffer default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind Source and Sink to the Channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1Replace the placeholders with your actual values:
Placeholder Description Example <kafka-host1:port1,kafka-host2:port2...>Hostnames and port numbers of the Kafka brokers broker-1:9092,broker-2:9092{yourBucketName}Name of the bucket with OSS-HDFS enabled my-bucket{yourBucketRegion}Region ID where the bucket is located cn-hangzhou{path}Target directory in OSS-HDFS resultReview the key configuration parameters:
Parameter Description default-agent.sources.source1.kafka.bootstrap.serversKafka broker addresses (host:port pairs, comma-separated) default-agent.sinks.k1.hdfs.pathOSS-HDFS output path. Format: oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}. Example:oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/resultdefault-agent.channels.c1.capacityMaximum number of events stored in the channel. Adjust based on your throughput. default-agent.channels.c1.transactionCapacityMaximum number of events each transaction reads from the Source or writes to the Sink. Adjust based on your throughput. Click Save.
Verify data synchronization
Use Secure Shell (SSH) to log on to the DataFlow cluster. See Log on to a cluster.
Create a Kafka topic named
flume-test:kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --createStart a console producer and send a test message:
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092Type
abcand press Enter.Confirm that Flume has written the data to OSS-HDFS. A file named
FlumeData.xxxx(wherexxxxis the creation timestamp in milliseconds) should appear at the output path:oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result/FlumeData.xxxx