This topic describes how to use the Flume service to synchronize data from an E-MapReduce (EMR) Dataflow cluster to the Hive service of an EMR data lake cluster.
Prerequisites
- An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information, see Create a cluster.
- An EMR Dataflow cluster is created, and Kafka is selected from the optional services during cluster creation. For more information, see Create a cluster.
Procedure
- Log on to the data lake cluster in SSH mode. For more information, see Log on to a cluster.
- Create a Hive table. To write data to Hive by performing transactional operations, you must specify the transactional property of Flume when you create a Hive table. In this example, the flume_test table is created.
create table flume_test (id int, content string) clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');
- Configure the Flume service.
- Go to the Configure tab of the Flume service.
- Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
- In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
- On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.
- On the Services tab, click Configure in the Flume service section.
- On the Configure tab, click the flume-conf.properties subtab. In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.
- Add the following content to the value of the flume-conf.properties configuration item:
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group # Describe the sink default-agent.sinks.k1.type = hive default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083 default-agent.sinks.k1.hive.database = default default-agent.sinks.k1.hive.table = flume_test default-agent.sinks.k1.serializer = DELIMITED default-agent.sinks.k1.serializer.delimiter = "," default-agent.sinks.k1.serializer.serdeSeparator = ',' default-agent.sinks.k1.serializer.fieldnames =id,content default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
Parameter Description default-agent.sources.source1.kafka.bootstrap.servers The hostnames and port numbers of brokers in the Kafka cluster. default-agent.channels.c1.capacity The maximum number of events that are stored in the channel. Modify this parameter based on your business requirements. default-agent.channels.c1.transactionCapacity The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify this parameter based on your business requirements. default-agent.sinks.k1.hive.metastore The uniform resource identifier (URI) of the Hive metastore. Configure this parameter in the format of thrift://emr-header-1.cluster-xxx:9083. emr-header-1.cluster-xxx is the hostname of the emr-header-1 node. You can run the hostname
command on the node to obtain the hostname. - Save the configurations.
- Click Save in the lower-left corner.
- In the dialog box that appears, enter an execution reason and click Save.
- Go to the Configure tab of the Flume service.
- Start the Flume service.
- On the Status tab of the Flume service, find the FlumeAgent component and choose More > Restart in the Actions column.
- In the dialog box that appears, enter an execution reason and click OK.
- In the Confirm message, click OK.
- Test data synchronization.
- Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
- Run the following command to create a topic named flume-test:
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
- Generate test data.
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092
For example, enter
abc
and press the Enter key. - Log on to the data lake cluster in SSH mode and run the following commands on the
client to configure Hive parameters:
set hive.support.concurrency=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
Execute the following statement to query data in the flume_test table:select * from flume_test;
The following output is returned:OK 1 a