すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Kafka データをストリーミングモードで Alibaba Cloud OSS に書き込むための Flink の使用

最終更新日:May 08, 2025

一般的なシナリオでは、ストレージコストを削減したり、クエリと分析を実行したりするために、Kafka データは Alibaba Cloud Object Storage Service(OSS)などのデータレイクストレージサービスにリアルタイムでインポートされます。 E-MapReduce(EMR)V3.37.1 以降では、Dataflow クラスタは JindoFS に関連する組み込みの依存関係を提供します。 Dataflow クラスタで Flink ジョブを実行して、exactly-once セマンティクスを使用して Kafka データをストリーミングモードで OSS に書き込むことができます。このトピックでは、前述のシナリオのビジネス要件を満たすために、Flink ジョブのコードを記述し、Dataflow クラスタでジョブを実行する方法について説明します。

前提条件

手順

  1. 手順 1:環境を準備する

  2. 手順 2:JAR パッケージを準備する

  3. 手順 3:Kafka トピックを作成し、データを生成する

  4. 手順 4:Flink ジョブを作成して実行する

  5. 手順 5:出力を表示する

手順 1:環境を準備する

  1. Flink サービスと Kafka サービスを含む Dataflow クラスタを作成します。 詳細については、クラスタの作成 をご参照ください。

    説明

    このトピックでは、EMR V3.43.1 を使用しています。

  2. Dataflow クラスタと同じリージョンに存在する OSS バケットを作成します。 詳細については、バケットの作成 をご参照ください。

手順 2:JAR パッケージを準備する

  1. デモコードをダウンロードします。 デモコードについては、dataflow-demo を参照してください。

    JindoFS に基づいて Flink ジョブを実行し、ストリーミングモードで OSS にデータを書き込むことができます。 データ書き込み手順は、Hadoop Distributed File System(HDFS)にデータを書き込む手順と似ています。 データを書き込むパスの名前は oss:// で始まる必要があります。 次の例では、Flink は StreamingFileSink メソッドと exactly-once セマンティクスを使用して、Flink でチェックポイントが有効になった後に Kafka データを OSS に書き込みます。

    次のサンプルコードスニペットは、Kafka ソースと OSS シンクを作成する方法の例を示しています。 完全なコードは GitHub からダウンロードできます。 詳細については、dataflow-demo を参照してください。

    重要

    OSS バケットと JindoFS がデプロイされている Dataflow クラスタが同じ Alibaba Cloud アカウントを使用して作成されている場合、JindoFS ではパスワードなしモードで OSS バケットからデータを読み書きできます。

    public class OssDemoJob {
    
        public static void main(String[] args) throws Exception {
            // ...  必要なパラメータなどを設定
    
            // 出力 oss ディレクトリを確認
            Preconditions.checkArgument(
                    params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
                    "outputOssDir should start with 'oss://'.");
    
            // ストリーミング実行環境を設定
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // チェックポイントは必須
            env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
    
            String outputPath = params.get(OUTPUT_OSS_DIR);
    
            // FLIP-27 に基づく新しい Source API を使用して Kafka ソースを構築
            KafkaSource<Event> kafkaSource =
                    KafkaSource.<Event>builder()
                            .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                            .setTopics(params.get(INPUT_TOPIC_ARG))
                            .setStartingOffsets(OffsetsInitializer.latest())
                            .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                            .setDeserializer(new EventDeSerializationSchema())
                            .build();
            // DataStream ソース
            DataStreamSource<Event> source =
                    env.fromSource(
                            kafkaSource,
                            WatermarkStrategy.<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner((event, ts) -> event.getEventTime()),
                            "Kafka Source");
    
            StreamingFileSink<Event> sink =
                    StreamingFileSink.forRowFormat(
                                    new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
                            .build();
            source.addSink(sink);
    
            // ジョブをコンパイルして送信
            env.execute();
        }
    }
    説明

    サンプルコードスニペットは、メインサンプルプログラムを提供します。 ビジネス要件に基づいてサンプルプログラムを変更し、コンパイルに進むことができます。 たとえば、メインサンプルプログラムにパッケージ名を追加したり、チェックポイント間隔を変更したりできます。 Flink ジョブの JAR パッケージをビルドする方法については、Flink 公式ドキュメント を参照してください。 サンプルプログラムを変更する必要がない場合は、dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージを直接使用して後続の操作を実行できます。

  2. CLI で、ダウンロードしたプロジェクトファイルのルートディレクトリに移動し、次のコマンドを実行してファイルをパッケージ化します。

    mvn clean package

    dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージは、pom.xml ファイルの artifactId 情報に基づいて、プロジェクトの dataflow-demo/dataflow-oss-demo/target ディレクトリに表示されます。

手順 3:Kafka トピックを作成し、データを生成する

  1. SSH モードで Dataflow クラスタにログオンします。 EMR クラスタへのログオン方法については、クラスタへのログオン をご参照ください。

  2. 次のコマンドを実行して、テスト用の Kafka トピックを作成します。

    kafka-topics.sh --create  --bootstrap-server core-1-1:9092 \
        --replication-factor 2  \
        --partitions 3  \
        --topic kafka-test-topic

    Kafka トピックが作成されると、CLI に次の情報が返されます。

    Created topic kafka-test-topic.
  3. Kafka トピックにデータを書き込みます。

    1. CLI で次のコマンドを実行して、Kafka プロデューサーコンソールに移動します。

      kafka-console-producer.sh --broker-list core-1-1:9092 --topic  kafka-test-topic
    2. 5 つのテストデータを入力します。

      1,Ken,0,1,1662022777000
      1,Ken,0,2,1662022777000
      1,Ken,0,3,1662022777000
      1,Ken,0,4,1662022777000
      1,Ken,0,5,1662022777000
    3. Ctrl+C を押して、Kafka プロデューサーコンソールを終了します。

手順 4:Flink ジョブを実行する

  1. SSH モードで Dataflow クラスタにログオンします。 EMR クラスタへのログオン方法については、クラスタへのログオン をご参照ください。

  2. dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージを Dataflow クラスタのルートディレクトリにアップロードします。

    説明

    この例では、dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージは root ディレクトリにアップロードされます。 ビジネス要件に基づいてアップロードディレクトリを指定することもできます。

  3. 次のコマンドを実行して、Flink ジョブを送信します。

    この例では、Flink ジョブは Per-Job モードで送信されます。 他のモードで Flink ジョブを送信する方法については、基本的な使い方 をご参照ください。

    flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \
        /dataflow-oss-demo-1.0-SNAPSHOT.jar  \
        --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \
        --kafkaBrokers core-1-1:9092 \
        --inputTopic kafka-test-topic \
        --inputTopicGroup my-group

    パラメータの説明:

    • outputOssDir:データを書き込む OSS バケットのディレクトリ。

    • kafkaBrokers:Kafka クラスタで指定された Kafka ブローカー。 値を core-1-1:9092 に設定します。

    • inputTopic:データを読み取る Kafka トピック。 手順 3 で作成した kafka-test-topic に設定します。

    • inputTopicGroup:使用する Kafka コンシューマーグループ。 値を my-group に設定します。

    result

    次のコマンドを実行して、ジョブのステータスを表示します。

    flink list -t yarn-per-job -Dyarn.application.id=<appId>
    説明

    コマンドでは、<appId> は、ジョブが正常に実行された後に返されるアプリケーション ID を指定します。 この例では、アプリケーション ID は application_1670236019397_0003 です。

手順 5:出力を表示する

  • ジョブが正常に実行された後、OSS コンソールで出力を表示できます。

    1. OSS コンソールにログオンします。

    2. 左側のナビゲーションペインで、[バケット] をクリックします。 [バケット] ページで、作成したバケットの名前をクリックします。

    3. [オブジェクト] ページで、指定した出力ディレクトリの出力を表示します。OSS results

      重要

      ジョブは継続的に実行されるストリーミングジョブであり、多数の出力ファイルを生成する可能性があります。 出力を見た後は、できるだけ早く CLI で yarn application -kill <appId> コマンドを実行してジョブを終了することをお勧めします。

  • Dataflow クラスタの CLI で hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0 コマンドを実行して、OSS バケットに保存されているデータを表示することもできます。 次の図は例を示しています。OSS示例

    重要
    • exactly-once セマンティクスを確保するために、Flink ジョブがチェックポイントを完了するたびに、データファイルは OSS バケットに保存されます。 この例では、チェックポイント間隔は 30 秒です。

    • ジョブは継続的に実行されるストリーミングジョブであり、多数の出力ファイルを生成する可能性があります。 出力を見た後は、できるだけ早く CLI で yarn application -kill <appId> コマンドを実行してジョブを終了することをお勧めします。