すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:Flumeを使用してEMR KafkaクラスターからOSS-HDFSが有効なバケットにデータを同期する

最終更新日:Dec 20, 2023

このトピックでは、Flumeを使用してEMR KafkaクラスターからOSS-HDFSが有効になっているObject Storage Service (OSS) バケットにデータを同期する方法について説明します。

前提条件

  • バケットのOSS-HDFSが有効になり、OSS-HDFSにアクセスする権限が付与されます。 詳細については、「OSS-HDFSの有効化とアクセス許可の付与」をご参照ください。

  • データレイク分析用のクラスターが作成され、Flumeが選択されます。 詳細については、「クラスターの作成」をご参照ください。

  • DataFlowクラスターが作成され、Kafkaが選択されます。 詳細については、「クラスターの作成」をご参照ください。

手順

  1. Flumeを設定します。

    1. Flume設定ページに移動します。

      1. EMRコンソールの [EMR on ECS] ページに移動します。

      2. 上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。

      3. [ECS上のEMR] ページで、目的のクラスターを見つけ、[操作] 列の [サービス] をクリックします。

      4. [サービス] タブで、[Flume] セクションの [設定] をクリックします。

    2. Java仮想マシン (JVM) で使用可能な最大メモリを設定します。

      FlumeからJVMにデータを書き込むと、大量のOSS-HDFSメモリが消費されます。 FlumeエージェントのXmxオプションの値を増やすことを推奨します。 最大JVMメモリを増やすには、次の手順を実行します。

      1. flume-env.shタブをクリックします。

        このトピックでは、グローバル設定を使用します。 特定のノードにのみ設定を適用する場合は、FLUMEサービスの [設定] タブの2番目のドロップダウンリストから [独立ノード設定] を選択します。

      2. JAVA_OPTSの値を変更します。

        たとえば、JVMで使用可能な最大メモリを1 GBに設定する場合は、値を-Xmx1gに設定します。

      3. [保存] をクリックします。

    3. flume-conf.propertiesを変更します。

      1. [設定] タブで、[flume-conf.properties] タブをクリックします。

        このトピックでは、グローバル設定を使用します。 特定のノードにのみ設定を適用する場合は、FLUMEサービスの [設定] タブの2番目のドロップダウンリストから [独立ノード設定] を選択します。

      2. flume-conf.propertiesの横にあるエディターで、次の設定項目を入力します。

        説明

        次の設定のdefault-agentの値は、FLUMEサービスの [設定] タブのagent_nameパラメーターの値と同じである必要があります。

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1: ポート1,kafka-host2: ポート2 ..>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://<examplebucket> 。<exampleregio n>.oss-dls.aliyuncs.com/<exampledir>
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # メモリ内のイベントをバッファリングするチャネルを使用する
        default-agent.channels.c1.type=メモリ
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # ソースとシンクをチャネルにバインドする
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1 

        パラメーター

        説明

        default-agent.sources.source1.kafka.bootstrap.servers

        Kafkaクラスター内のブローカーのホスト名とポート番号。

        default-agent.sinks.k1.hdfs.path

        OSS-HDFSへのアクセスに使用されるパス。 パスの形式はoss://<examplebucket>.<exampleregio n>.oss-dls.aliyuncs.com/<exampledir> です。 パスの例はoss:// flume-test.cn-hangzhou.oss-dls.aliyuncs.com/resultです。

        パスコンポーネント:

        • <examplebucket>: OSS-HDFSサービスが有効になっているバケットの名前。

        • <exampleregion>: バケットが配置されているリージョンのID。

        • <exampledir>: OSS-HDFSサービスのディレクトリ名。

        default-agent.channels.c1.ca平和

        チャネルに保存されるイベントの最大数。 ビジネス要件に基づいてこのパラメーターを変更します。

        default-agent.channels.c1.transactionCapacity

        各トランザクションチャネルがソースから受信する、または受信機に提供するイベントの最大数。 ビジネス要件に基づいてこのパラメーターを変更します。

      3. [保存] をクリックします。

  2. データ同期の結果を確認します。

    1. Secure Shell (SSH) を使用してDataflowクラスターにログインします。 詳細については、「クラスターへのログイン」をご参照ください。
    2. 次のコマンドを実行して、flume-testという名前のトピックを作成します。
      kafka-topics.sh-パーティション10-レプリケーションファクター2-zookeeper master-1-1:2181/emr-kafka-topic flume-test-create
    3. テストデータを生成します。

      kafka-console-producer.sh -- topic flume-test -- broker-list master-1-1:9092

      たとえば、abcと入力し、enterキーを押します。

      FlumeData.xxxxファイルはoss:// flume-test.cn-hangzhou.oss-dls.aliyuncs.com/resultで生成されます。xxxxは、ファイル生成のミリ秒単位のUNIXタイムスタンプです。