This topic describes how to use Flume to synchronize data from an E-MapReduce (EMR) Kafka cluster to the Alibaba Cloud OSS-HDFS service.
Prerequisites
The OSS-HDFS service is enabled and authorized. For more information, see Enable the OSS-HDFS service.
You have created a DataLake cluster and selected the Flume service. For more information, see Create a cluster.
You have created a DataFlow cluster and selected the Kafka service. For more information, see Create a cluster.
Procedure
Configure Flume.
Go to the Flume configuration page.
Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
In the top menu bar, select a region and resource group as needed.
On the EMR on ECS page, click Cluster Service in the Actions column for the target cluster.
On the Cluster Service tab, click Configure in the FLUME service section.
Set the maximum available memory (Xmx) for the JVM.
Flume consumes a large amount of Java Virtual Machine (JVM) memory when it writes data to OSS-HDFS. You can increase the Xmx value for the Flume agent by performing the following steps:
Click the flume-env.sh tab.
This topic describes the global configuration method. If you want to configure by node, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.
Modify the value of the JAVA_OPTS parameter.
For example, to set the maximum available JVM memory to 1 GB, change the parameter value to -Xmx1g.
Click Save.
Modify the flume-conf.properties configuration.
Click the flume-conf.properties tab.
This topic describes the global configuration method. If you want to configure by node, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.
In the editor for flume-conf.properties, add the following configuration items.
NoteThe value of default-agent in the following example must be the same as the value of the agent_name parameter on the Configuration page of the FLUME service.
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path} default-agent.sinks.k1.hdfs.fileType=DataStream # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1Parameter
Description
default-agent.sources.source1.kafka.bootstrap.servers
The hostnames and port numbers of the brokers in the Kafka cluster.
default-agent.sinks.k1.hdfs.path
The path of OSS-HDFS. The format is oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}. An example value is oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result.
The parameters are described as follows:
{yourBucketName}: The name of the bucket for which the OSS-HDFS service is enabled.
{yourBucketRegion}: The region ID where the bucket is located.
{path}: The directory name for the OSS-HDFS service.
default-agent.channels.c1.capacity
The maximum number of events stored in the channel. Modify this parameter value based on your environment.
default-agent.channels.c1.transactionCapacity
The maximum number of events that each transaction channel receives from the source or provides to the sink. Modify this parameter value based on your environment.
Click Save.
Test the data synchronization.
- Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
- Run the following command to create a topic named flume-test:
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create Generate test data.
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092For example, enter
abcand press Enter.A file named FlumeData.xxxx is generated in the oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result path. In the filename, xxxx represents the timestamp in milliseconds when the file was generated.