全部產品
Search
文件中心

E-MapReduce:使用Flink將Kafka資料流式寫入阿里雲OSS

更新時間:Jan 21, 2025

將Kafka資料即時匯入到OSS等湖儲存中來降低儲存成本或者進行查詢分析是常見的使用情境。在EMR-3.37.1及之後的版本中,DataFlow叢集內建了JindoFS相關的依賴,使得您可以在DataFlow叢集中運行Flink作業,將Kafka資料以Exactly-Once語義流式寫入阿里雲OSS。本文通過樣本為您介紹如何在DataFlow叢集中編寫並運行Flink作業來滿足上述情境。

前提條件

  • 已開通E-MapReduce服務和OSS服務。

  • 已完成雲帳號的授權,詳情請參見角色授權

操作流程

  1. 步驟一:準備環境

  2. 步驟二:準備JAR包

  3. 步驟三:建立Kafka Topic並產生資料

  4. 步驟四:運行Flink作業

  5. 步驟五:查看輸出的結果

步驟一:準備環境

  1. 建立包含Flink和Kafka組件的DataFlow叢集,詳情請參見建立叢集

    說明

    本文以EMR-3.43.1版本為例。

  2. 在OSS上建立與DataFlow叢集相同地區的Bucket,詳情請參見控制台建立儲存空間

步驟二:準備JAR包

  1. 下載Demo代碼

    基於JindoFS,您可以在Flink作業中,如同HDFS一樣將資料以流式的方式寫入OSS中(路徑需要以oss://為首碼)。本樣本中使用了Flink的StreamingFileSink方法來示範開啟了檢查點(Checkpoint)之後,Flink如何以Exactly-Once語義寫入OSS。

    下述程式碼片段示範了如何構建Kafka Source與OSS Sink,完整代碼您可以從GitHub連結中下載獲得。

    重要

    JindoFS支援免密讀寫相同阿里雲帳號下的OSS儲存,因此作業中無需聲明相關AccessKey資訊。

    public class OssDemoJob {
    
        public static void main(String[] args) throws Exception {
            ...
    
            // Check output oss dir
            Preconditions.checkArgument(
                    params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
                    "outputOssDir should start with 'oss://'.");
    
            // Set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Checkpoint is required
            env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
    
            String outputPath = params.get(OUTPUT_OSS_DIR);
    
            // Build Kafka source with new Source API based on FLIP-27
            KafkaSource<Event> kafkaSource =
                    KafkaSource.<Event>builder()
                            .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                            .setTopics(params.get(INPUT_TOPIC_ARG))
                            .setStartingOffsets(OffsetsInitializer.latest())
                            .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                            .setDeserializer(new EventDeSerializationSchema())
                            .build();
            // DataStream Source
            DataStreamSource<Event> source =
                    env.fromSource(
                            kafkaSource,
                            WatermarkStrategy.<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner((event, ts) -> event.getEventTime()),
                            "Kafka Source");
    
            StreamingFileSink<Event> sink =
                    StreamingFileSink.forRowFormat(
                                    new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
                            .build();
            source.addSink(sink);
    
            // Compile and submit the job
            env.execute();
        }
    }
    說明

    本範例程式碼片段給出了主要的樣本程式,您可以根據自身環境進行修改(例如,添加包名以及修改代碼中的Checkpoint間隔)後,進行編譯。關於如何構建Flink作業的JAR包,可以參見Flink官方文檔。如果無需任何修改,您可以直接使用dataflow-oss-demo-1.0-SNAPSHOT.jar包進行操作。

  2. 在命令列中,進入到下載的專案檔的根目錄下,執行以下命令打包檔案。

    mvn clean package

    根據您pom.xml檔案中artifactId的資訊,專案對應目錄dataflow-demo/dataflow-oss-demo/target下會出現dataflow-oss-demo-1.0-SNAPSHOT.jar包。

步驟三:建立Kafka Topic並產生資料

  1. 通過SSH方式串連DataFlow叢集,詳情請參見登入叢集

  2. 執行以下命令,建立測試所需的Topic。

    kafka-topics.sh --create  --bootstrap-server core-1-1:9092 \
        --replication-factor 2  \
        --partitions 3  \
        --topic kafka-test-topic

    建立成功後,命令列會列印如下資訊。

    Created topic kafka-test-topic.
  3. 寫入資料至Kafka Topic。

    1. 在命令列中執行以下命令,進入Kafka Producer Console。

      kafka-console-producer.sh --broker-list core-1-1:9092 --topic  kafka-test-topic
    2. 輸入五條測試資料。

      1,Ken,0,1,1662022777000
      1,Ken,0,2,1662022777000
      1,Ken,0,3,1662022777000
      1,Ken,0,4,1662022777000
      1,Ken,0,5,1662022777000
    3. 按下Ctrl+C退出Kafka Producer Console。

步驟四:運行Flink作業

  1. 通過SSH方式串連DataFlow叢集,詳情請參見登入叢集

  2. 上傳打包好的dataflow-oss-demo-1.0-SNAPSHOT.jar至DataFlow叢集的根目錄下。

    說明

    本文樣本中dataflow-oss-demo-1.0-SNAPSHOT.jar是上傳至root根目錄下,您也可以自訂上傳路徑。

  3. 執行以下命令,提交作業。

    本樣本通過Per-Job Cluster模式提交作業,其他方式請參見基礎使用

    flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \
        /dataflow-oss-demo-1.0-SNAPSHOT.jar  \
        --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \
        --kafkaBrokers core-1-1:9092 \
        --inputTopic kafka-test-topic \
        --inputTopicGroup my-group

    參數說明:

    • outputOssDir:指定您計劃寫入的OSS目錄。

    • kafkaBrokers:指定Kafka叢集的broker,使用core-1-1:9092即可。

    • inputTopic:指定計劃讀取的Kafka Topic,使用在步驟三中建立的kafka-test-topic

    • inputTopicGroup:指定計劃使用的Kafka Consumer Group,使用my-group用於測試即可。

    result

    您可以執行以下命令,查看作業狀態。

    flink list -t yarn-per-job -Dyarn.application.id=<appId>
    說明

    <appId>為作業運行後返回的Application ID。例如,本樣本截圖中的application_1670236019397_0003。

步驟五:查看輸出的結果

  • 作業正常運行後,您可以在OSS控制台查看輸出結果。

    1. 登入OSS管理主控台

    2. 單擊建立的儲存空間。

    3. 在檔案管理頁面指定的輸出目錄下查看輸出結果,輸出結果如下圖所示。OSS results

      重要

      由於該作業為流式作業會持續運行,產生較多輸出檔案,應在完成驗證後,及時在命令列中通過yarn application -kill <appId>命令終止該作業。

  • 您也可以在DataFlow叢集中,通過命令列運行hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0來展示實際儲存到OSS中的資料,如下圖所示。OSS樣本

    重要
    • 為了保證Exactly-Once語義,在Flink作業每完成一次Checkpoint(本樣本中Checkpoint間隔為30s),資料檔案才會落盤到OSS中。

    • 此外,由於該作業為流式作業會持續運行,會產生較多輸出檔案,應在完成驗證後,及時在命令列中通過yarn application -kill <appId>命令終止該作業。