全部產品
Search
文件中心

E-MapReduce:同步EMR Kafka資料至Hive

更新時間:Jul 01, 2024

本文為您介紹如何使用Flume同步EMR DataFlow叢集的資料至EMR DataLake叢集的Hive。

前提條件

  • 已建立DataLake叢集,並且選擇了Flume服務,詳情請參見建立叢集
  • 已建立DataFlow叢集,並且選擇了Kafka服務,詳情請參見建立叢集

操作步驟

  1. 通過SSH方式串連DataLake叢集,詳情請參見登入叢集
  2. 建立Hive表。
    Flume使用事務操作將資料寫入Hive,需要在建立Hive表(flume_test)時設定transactional屬性。
    create table flume_test (id int, content string)
    clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');

    Hive的基礎操作,請參見Hive基礎操作

  3. 配置Flume。
    1. 進入Flume的配置頁面。
      1. 在頂部功能表列處,根據實際情況選擇地區和資源群組
      2. 叢集管理頁面,單擊目的地組群操作列的叢集服務
      3. 叢集服務頁面,單擊FLUME服務地區的配置
    2. 單擊flume-conf.properties頁簽。
      本文樣本採用的是全域配置方式,如果您想按照節點配置,可以在FLUME服務配置頁面的下拉式清單中選擇獨立節點配置
    3. flume-conf.properties的參數值中,添加以下內容。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # Describe the sink
      default-agent.sinks.k1.type = hive
      default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083
      default-agent.sinks.k1.hive.database = default
      default-agent.sinks.k1.hive.table = flume_test
      default-agent.sinks.k1.serializer = DELIMITED
      default-agent.sinks.k1.serializer.delimiter = ","
      default-agent.sinks.k1.serializer.serdeSeparator = ','
      default-agent.sinks.k1.serializer.fieldnames =id,content
      
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      參數描述
      default-agent.sources.source1.kafka.bootstrap.serversKafka叢集Broker的Host和連接埠號碼。
      default-agent.channels.c1.capacity通道中儲存的最大事件數目。請根據實際環境修改該參數值。
      default-agent.channels.c1.transactionCapacity每個事務通道將從源接收或提供給接收器的最大事件數目。請根據實際環境修改該參數值。
      default-agent.sinks.k1.hive.metastoreHive metastore的URI,格式為thrift://emr-header-1.cluster-xxx:9083。其中emr-header-1.cluster-xxx您可以通過hostname擷取。
    4. 儲存配置。
      1. 單擊下方的儲存
      2. 在彈出的對話方塊中,輸入執行原因,單擊確定
  4. 啟動服務。
    1. 在FLUME服務頁面,選擇更多操作 > 重啟
    2. 在彈出的對話方塊中,輸入執行原因,單擊確定
    3. 確認對話方塊中,單擊確定
  5. 測試資料同步情況。
    1. 通過SSH方式串連DataFlow叢集,詳情請參見登入叢集
    2. 建立名稱為flume-test的Topic。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. 產生測試資料。
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      例如輸入abc並斷行符號。

    4. 通過SSH方式串連DataLake叢集,在用戶端配置Hive參數並查詢表中的資料。
      set hive.support.concurrency=true;
      set hive.exec.dynamic.partition.mode=nonstrict;
      set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
      配置好後查詢flume_test表中的資料。
      select * from flume_test;
      返回資訊如下:
      OK
      1    a