This topic describes how to use the Flume service to synchronize data from an E-MapReduce (EMR) Dataflow cluster to the Hive service of an EMR data lake cluster.

Prerequisites

  • An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information, see Create a cluster.
  • An EMR Dataflow cluster is created, and Kafka is selected from the optional services during cluster creation. For more information, see Create a cluster.

Procedure

  1. Log on to the data lake cluster in SSH mode. For more information, see Log on to a cluster.
  2. Create a Hive table.
    To write data to Hive by performing transactional operations, you must specify the transactional property of Flume when you create a Hive table. In this example, the flume_test table is created.
    create table flume_test (id int, content string)
    clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');
  3. Configure the Flume service.
    1. Go to the Configure tab of the Flume service.
      1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
      3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.
      4. On the Services tab, click Configure in the Flume service section.
    2. On the Configure tab, click the flume-conf.properties subtab.
      In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.
    3. Add the following content to the value of the flume-conf.properties configuration item:
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # Describe the sink
      default-agent.sinks.k1.type = hive
      default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083
      default-agent.sinks.k1.hive.database = default
      default-agent.sinks.k1.hive.table = flume_test
      default-agent.sinks.k1.serializer = DELIMITED
      default-agent.sinks.k1.serializer.delimiter = ","
      default-agent.sinks.k1.serializer.serdeSeparator = ','
      default-agent.sinks.k1.serializer.fieldnames =id,content
      
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      Parameter Description
      default-agent.sources.source1.kafka.bootstrap.servers The hostnames and port numbers of brokers in the Kafka cluster.
      default-agent.channels.c1.capacity The maximum number of events that are stored in the channel. Modify this parameter based on your business requirements.
      default-agent.channels.c1.transactionCapacity The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify this parameter based on your business requirements.
      default-agent.sinks.k1.hive.metastore The uniform resource identifier (URI) of the Hive metastore. Configure this parameter in the format of thrift://emr-header-1.cluster-xxx:9083. emr-header-1.cluster-xxx is the hostname of the emr-header-1 node. You can run the hostname command on the node to obtain the hostname.
    4. Save the configurations.
      1. Click Save in the lower-left corner.
      2. In the dialog box that appears, enter an execution reason and click Save.
  4. Start the Flume service.
    1. On the Status tab of the Flume service, find the FlumeAgent component and choose More > Restart in the Actions column.
    2. In the dialog box that appears, enter an execution reason and click OK.
    3. In the Confirm message, click OK.
  5. Test data synchronization.
    1. Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
    2. Run the following command to create a topic named flume-test:
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. Generate test data.
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      For example, enter abc and press the Enter key.

    4. Log on to the data lake cluster in SSH mode and run the following commands on the client to configure Hive parameters:
      set hive.support.concurrency=true;
      set hive.exec.dynamic.partition.mode=nonstrict;
      set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
      Execute the following statement to query data in the flume_test table:
      select * from flume_test;
      The following output is returned:
      OK
      1    a