すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:EMR Kafka データを Hive に同期する

最終更新日:Mar 27, 2026

Flume サービスを使用して、E-MapReduce (EMR) Dataflow クラスター(Kafka)からストリーミングデータを、EMR データレイククラスターの Hive サービスへ同期します。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • Flume をオプションサービスとして選択した EMR データレイククラスター。詳細については、「クラスターの作成」をご参照ください。

  • Kafka をオプションサービスとして選択した EMR Dataflow クラスター。詳細については、「クラスターを作成する」をご参照ください。

ステップ 1:Hive テーブルの作成

SSH モードでデータレイククラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。

Flume はトランザクション操作を使用してデータを Hive に書き込みます。対象となる Hive テーブルには、transactional プロパティを true に設定し、ORC フォーマットとバケット化を適用する必要があります。

次の文を実行して、flume_test という名前のテーブルを作成します。

create table flume_test (id int, content string)
clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');

ステップ 2:Flume の構成

  1. 構成 タブに移動します。

    1. EMR コンソール にログインします。左側ナビゲーションウィンドウで、ECS 上の EMR をクリックします。

    2. 上部ナビゲーションバーで、ご利用のクラスターが存在するリージョンを選択し、リソースグループを指定します。

    3. ECS 上の EMR ページで、データレイククラスターを検索し、[操作] 列の サービス をクリックします。

    4. サービス タブで、Flume サービスセクションの 構成 をクリックします。

  2. 構成 タブで、flume-conf.properties サブタブをクリックします。デフォルトでは、グローバル構成がすべてのノードに適用されます。個別のノードを構成する場合は、ドロップダウンリストから 独立ノード構成 を選択します。

  3. 次の構成を flume-conf.properties の値に追加します。

    パラメーターデフォルト値説明
    default-agent.sources.source1.kafka.bootstrap.serversKafka クラスター内のブローカーのホスト名およびポート番号。形式:host1:port1,host2:port2,…
    default-agent.sinks.k1.hive.metastoreHive メタストアの URI。形式:thrift://emr-header-1.cluster-xxx:9083hostname コマンドを emr-header-1 ノードで実行して、ホスト名を取得します。
    default-agent.channels.c1.capacity100チャネルに格納可能なイベントの最大数。スループット要件に応じて調整してください。
    default-agent.channels.c1.transactionCapacity100各トランザクションがソースから受信またはシンクへ配信するイベントの最大数。スループット要件に応じて調整してください。
    default-agent.sources = source1
    default-agent.sinks = k1
    default-agent.channels = c1
    
    default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
    default-agent.sources.source1.channels = c1
    default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
    default-agent.sources.source1.kafka.topics = flume-test
    default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
    
    # シンクの定義
    default-agent.sinks.k1.type = hive
    default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083
    default-agent.sinks.k1.hive.database = default
    default-agent.sinks.k1.hive.table = flume_test
    default-agent.sinks.k1.serializer = DELIMITED
    default-agent.sinks.k1.serializer.delimiter = ","
    default-agent.sinks.k1.serializer.serdeSeparator = ','
    default-agent.sinks.k1.serializer.fieldnames =id,content
    
    default-agent.channels.c1.type = memory
    default-agent.channels.c1.capacity = 100
    default-agent.channels.c1.transactionCapacity = 100
    
    default-agent.sources.source1.channels = c1
    default-agent.sinks.k1.channel = c1

    次の表に、主なパラメーターの説明を示します。必須パラメーターは 太字 で記載しています。

  4. 構成を保存します。

    1. 左下隅の 保存 をクリックします。

    2. 表示されるダイアログボックスで、実行理由を入力し、保存 をクリックします。

ステップ 3:Flume エージェントの起動

  1. Flume サービスの ステータス タブで、FlumeAgent コンポーネントを検索します。

  2. [操作] 列で、その他再起動 を選択します。

  3. 表示されるダイアログボックスで、実行理由を入力し、OK をクリックします。

  4. 確認メッセージで、OK をクリックします。

ステップ 4:データ同期のテスト

  1. セキュアシェル (SSH) を使用して Dataflow クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  2. flume-test という名前の Kafka Topic を作成します。

    kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
  3. Kafka プロデューサーを起動し、テストデータを送信します。

    kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

    例として、abc を入力して Enter キーを押します。

  4. SSH モードでデータレイククラスターにログインし、次の Hive パラメーターを実行してトランザクションサポートを有効にします。

    set hive.support.concurrency=true;
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
  5. flume_test テーブルをクエリして、データが書き込まれていることを確認します。

    select * from flume_test;

    期待される出力は次のとおりです。

    OK
    1    a