Flume サービスを使用して、E-MapReduce (EMR) Dataflow クラスター(Kafka)からストリーミングデータを、EMR データレイククラスターの Hive サービスへ同期します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Flume をオプションサービスとして選択した EMR データレイククラスター。詳細については、「クラスターの作成」をご参照ください。
Kafka をオプションサービスとして選択した EMR Dataflow クラスター。詳細については、「クラスターを作成する」をご参照ください。
ステップ 1:Hive テーブルの作成
SSH モードでデータレイククラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。
transactional プロパティを true に設定し、ORC フォーマットとバケット化を適用する必要があります。次の文を実行して、flume_test という名前のテーブルを作成します。
create table flume_test (id int, content string)
clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');ステップ 2:Flume の構成
構成 タブに移動します。
EMR コンソール にログインします。左側ナビゲーションウィンドウで、ECS 上の EMR をクリックします。
上部ナビゲーションバーで、ご利用のクラスターが存在するリージョンを選択し、リソースグループを指定します。
ECS 上の EMR ページで、データレイククラスターを検索し、[操作] 列の サービス をクリックします。
サービス タブで、Flume サービスセクションの 構成 をクリックします。
構成 タブで、flume-conf.properties サブタブをクリックします。デフォルトでは、グローバル構成がすべてのノードに適用されます。個別のノードを構成する場合は、ドロップダウンリストから 独立ノード構成 を選択します。
次の構成を
flume-conf.propertiesの値に追加します。パラメーター デフォルト値 説明 default-agent.sources.source1.kafka.bootstrap.servers — Kafka クラスター内のブローカーのホスト名およびポート番号。形式: host1:port1,host2:port2,…default-agent.sinks.k1.hive.metastore — Hive メタストアの URI。形式: thrift://emr-header-1.cluster-xxx:9083。hostnameコマンドを emr-header-1 ノードで実行して、ホスト名を取得します。default-agent.channels.c1.capacity100 チャネルに格納可能なイベントの最大数。スループット要件に応じて調整してください。 default-agent.channels.c1.transactionCapacity100 各トランザクションがソースから受信またはシンクへ配信するイベントの最大数。スループット要件に応じて調整してください。 default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group # シンクの定義 default-agent.sinks.k1.type = hive default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083 default-agent.sinks.k1.hive.database = default default-agent.sinks.k1.hive.table = flume_test default-agent.sinks.k1.serializer = DELIMITED default-agent.sinks.k1.serializer.delimiter = "," default-agent.sinks.k1.serializer.serdeSeparator = ',' default-agent.sinks.k1.serializer.fieldnames =id,content default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1次の表に、主なパラメーターの説明を示します。必須パラメーターは 太字 で記載しています。
構成を保存します。
左下隅の 保存 をクリックします。
表示されるダイアログボックスで、実行理由を入力し、保存 をクリックします。
ステップ 3:Flume エージェントの起動
Flume サービスの ステータス タブで、FlumeAgent コンポーネントを検索します。
[操作] 列で、その他 > 再起動 を選択します。
表示されるダイアログボックスで、実行理由を入力し、OK をクリックします。
確認メッセージで、OK をクリックします。
ステップ 4:データ同期のテスト
セキュアシェル (SSH) を使用して Dataflow クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。
flume-testという名前の Kafka Topic を作成します。kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --createKafka プロデューサーを起動し、テストデータを送信します。
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092例として、
abcを入力して Enter キーを押します。SSH モードでデータレイククラスターにログインし、次の Hive パラメーターを実行してトランザクションサポートを有効にします。
set hive.support.concurrency=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;flume_testテーブルをクエリして、データが書き込まれていることを確認します。select * from flume_test;期待される出力は次のとおりです。
OK 1 a