一般的なシナリオでは、ストレージコストを削減したり、クエリと分析を実行したりするために、Kafka データは Alibaba Cloud Object Storage Service(OSS)などのデータレイクストレージサービスにリアルタイムでインポートされます。 E-MapReduce(EMR)V3.37.1 以降では、Dataflow クラスタは JindoFS に関連する組み込みの依存関係を提供します。 Dataflow クラスタで Flink ジョブを実行して、exactly-once セマンティクスを使用して Kafka データをストリーミングモードで OSS に書き込むことができます。このトピックでは、前述のシナリオのビジネス要件を満たすために、Flink ジョブのコードを記述し、Dataflow クラスタでジョブを実行する方法について説明します。
前提条件
EMR と OSS がアクティブ化されている。
使用する Alibaba Cloud アカウントまたは RAM ユーザーに必要な権限が付与されている。 詳細については、Alibaba Cloud アカウントへのロールの割り当てをご参照ください。
手順
手順 1:環境を準備する
手順 2:JAR パッケージを準備する
デモコードをダウンロードします。 デモコードについては、dataflow-demo を参照してください。
JindoFS に基づいて Flink ジョブを実行し、ストリーミングモードで OSS にデータを書き込むことができます。 データ書き込み手順は、Hadoop Distributed File System(HDFS)にデータを書き込む手順と似ています。 データを書き込むパスの名前は oss:// で始まる必要があります。 次の例では、Flink は StreamingFileSink メソッドと exactly-once セマンティクスを使用して、Flink でチェックポイントが有効になった後に Kafka データを OSS に書き込みます。
次のサンプルコードスニペットは、Kafka ソースと OSS シンクを作成する方法の例を示しています。 完全なコードは GitHub からダウンロードできます。 詳細については、dataflow-demo を参照してください。
重要OSS バケットと JindoFS がデプロイされている Dataflow クラスタが同じ Alibaba Cloud アカウントを使用して作成されている場合、JindoFS ではパスワードなしモードで OSS バケットからデータを読み書きできます。
public class OssDemoJob { public static void main(String[] args) throws Exception { // ... 必要なパラメータなどを設定 // 出力 oss ディレクトリを確認 Preconditions.checkArgument( params.get(OUTPUT_OSS_DIR).startsWith("oss://"), "outputOssDir should start with 'oss://'."); // ストリーミング実行環境を設定 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // チェックポイントは必須 env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE); String outputPath = params.get(OUTPUT_OSS_DIR); // FLIP-27 に基づく新しい Source API を使用して Kafka ソースを構築 KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder() .setBootstrapServers(params.get(KAFKA_BROKERS_ARG)) .setTopics(params.get(INPUT_TOPIC_ARG)) .setStartingOffsets(OffsetsInitializer.latest()) .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG)) .setDeserializer(new EventDeSerializationSchema()) .build(); // DataStream ソース DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); StreamingFileSink<Event> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); source.addSink(sink); // ジョブをコンパイルして送信 env.execute(); } }
説明サンプルコードスニペットは、メインサンプルプログラムを提供します。 ビジネス要件に基づいてサンプルプログラムを変更し、コンパイルに進むことができます。 たとえば、メインサンプルプログラムにパッケージ名を追加したり、チェックポイント間隔を変更したりできます。 Flink ジョブの JAR パッケージをビルドする方法については、Flink 公式ドキュメント を参照してください。 サンプルプログラムを変更する必要がない場合は、dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージを直接使用して後続の操作を実行できます。
CLI で、ダウンロードしたプロジェクトファイルのルートディレクトリに移動し、次のコマンドを実行してファイルをパッケージ化します。
mvn clean package
dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージは、pom.xml ファイルの artifactId 情報に基づいて、プロジェクトの dataflow-demo/dataflow-oss-demo/target ディレクトリに表示されます。
手順 3:Kafka トピックを作成し、データを生成する
SSH モードで Dataflow クラスタにログオンします。 EMR クラスタへのログオン方法については、クラスタへのログオン をご参照ください。
次のコマンドを実行して、テスト用の Kafka トピックを作成します。
kafka-topics.sh --create --bootstrap-server core-1-1:9092 \ --replication-factor 2 \ --partitions 3 \ --topic kafka-test-topic
Kafka トピックが作成されると、CLI に次の情報が返されます。
Created topic kafka-test-topic.
Kafka トピックにデータを書き込みます。
CLI で次のコマンドを実行して、Kafka プロデューサーコンソールに移動します。
kafka-console-producer.sh --broker-list core-1-1:9092 --topic kafka-test-topic
5 つのテストデータを入力します。
1,Ken,0,1,1662022777000 1,Ken,0,2,1662022777000 1,Ken,0,3,1662022777000 1,Ken,0,4,1662022777000 1,Ken,0,5,1662022777000
Ctrl+C を押して、Kafka プロデューサーコンソールを終了します。
手順 4:Flink ジョブを実行する
SSH モードで Dataflow クラスタにログオンします。 EMR クラスタへのログオン方法については、クラスタへのログオン をご参照ください。
dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージを Dataflow クラスタのルートディレクトリにアップロードします。
説明この例では、dataflow-oss-demo-1.0-SNAPSHOT.jar パッケージは root ディレクトリにアップロードされます。 ビジネス要件に基づいてアップロードディレクトリを指定することもできます。
次のコマンドを実行して、Flink ジョブを送信します。
この例では、Flink ジョブは Per-Job モードで送信されます。 他のモードで Flink ジョブを送信する方法については、基本的な使い方 をご参照ください。
flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \ /dataflow-oss-demo-1.0-SNAPSHOT.jar \ --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \ --kafkaBrokers core-1-1:9092 \ --inputTopic kafka-test-topic \ --inputTopicGroup my-group
パラメータの説明:
outputOssDir:データを書き込む OSS バケットのディレクトリ。
kafkaBrokers:Kafka クラスタで指定された Kafka ブローカー。 値を
core-1-1:9092
に設定します。inputTopic:データを読み取る Kafka トピック。 手順 3 で作成した
kafka-test-topic
に設定します。inputTopicGroup:使用する Kafka コンシューマーグループ。 値を
my-group
に設定します。
次のコマンドを実行して、ジョブのステータスを表示します。
flink list -t yarn-per-job -Dyarn.application.id=<appId>
説明コマンドでは、
<appId>
は、ジョブが正常に実行された後に返されるアプリケーション ID を指定します。 この例では、アプリケーション ID は application_1670236019397_0003 です。
手順 5:出力を表示する
ジョブが正常に実行された後、OSS コンソールで出力を表示できます。
OSS コンソールにログオンします。
左側のナビゲーションペインで、[バケット] をクリックします。 [バケット] ページで、作成したバケットの名前をクリックします。
[オブジェクト] ページで、指定した出力ディレクトリの出力を表示します。
重要ジョブは継続的に実行されるストリーミングジョブであり、多数の出力ファイルを生成する可能性があります。 出力を見た後は、できるだけ早く CLI で
yarn application -kill <appId>
コマンドを実行してジョブを終了することをお勧めします。
Dataflow クラスタの CLI で
hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0
コマンドを実行して、OSS バケットに保存されているデータを表示することもできます。 次の図は例を示しています。重要exactly-once セマンティクスを確保するために、Flink ジョブがチェックポイントを完了するたびに、データファイルは OSS バケットに保存されます。 この例では、チェックポイント間隔は 30 秒です。
ジョブは継続的に実行されるストリーミングジョブであり、多数の出力ファイルを生成する可能性があります。 出力を見た後は、できるだけ早く CLI で yarn application -kill <appId> コマンドを実行してジョブを終了することをお勧めします。