すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:JARドラフトの開発

最終更新日:Jan 09, 2025

Realtime Compute for Apache Flink DataStreamは、柔軟なプログラミングモデルとAPIを提供します。これらのプログラミングモデルとAPIを使用して、さまざまなデータ変換、操作、および演算子を定義し、複雑なビジネスロジックとデータ処理の要件を満たすことができます。このトピックでは、Realtime Compute for Apache FlinkでJARドラフトを開発する方法について説明します。

Apache Flinkのサポート

Realtime Compute for Apache FlinkでサポートされているDataStream APIは、Apache Flinkと完全に互換性があります。詳細については、「What is Apache Flink?」および「Flink DataStream API Programming Guide」をご参照ください。

環境要件

  • IntelliJ IDEAなどの開発ツールがインストールされている。

  • Maven 3.6.3以降がインストールされている。

  • Java Development Kit(JDK) 8またはJDK 11がインストールされている。

  • Realtime Compute for Apache Flinkの開発コンソールで実行するためにドラフトをデプロイする前に、オンプレミス環境でドラフトを開発する必要があります。

準備

このトピックでは、コネクタの使用方法を示す例が提供されています。コネクタのデータソースを準備する必要があります。

説明

ドラフトを開発する

Apache Flinkの環境依存関係を構成する

説明

JARパッケージの依存関係間の競合を防ぐために、次の点に注意してください。

  • ${flink.version}を使用して、ドラフトの実行時環境に対応するFlinkバージョンを指定します。指定されたFlinkバージョンは、[デプロイメント] ページに表示されるデプロイメントのVerverica Runtime(VVR)バージョンに対応するFlinkバージョンと同じである必要があります。たとえば、[デプロイメント] ページでドラフトに選択したエンジンバージョンが vvr-8.0.9-flink-1.17 の場合、Flinkバージョンは 1.17.2 です。デプロイメントで使用されているエンジンバージョンを表示する方法の詳細については、「デプロイメントで使用されているRealtime Compute for Apache Flinkのエンジンバージョンを表示するにはどうすればよいですか?」をご参照ください。

  • Apache Flinkの依存関係に <scope>provided</scope> を指定します。 org.apache.flink グループで名前が flink- で始まる非コネクタの依存関係が主に必要です。

  • Apache Flinkのソースコードで@Publicまたは@PublicEvolvingと明示的にマークされているメソッドのみを呼び出します。Alibaba Cloud Realtime Compute for Apache Flinkは、これらのメソッドとの互換性のみを保証します。

  • Realtime Compute for Apache Flinkの組み込みコネクタでサポートされているDataStream APIを使用する場合は、Realtime Compute for Apache Flinkの組み込み依存関係を使用します。

次のサンプルコードは、Flink関連の依存関係の例を示しています。ログファイルに関連する依存関係を追加する必要がある場合もあります。完全な依存関係の詳細については、このトピックの「完全なサンプルコード」セクションを参照してください。

Flink関連の依存関係
         <!-- Apache Flink dependencies -->
        <!-- Provide the dependencies to prevent them from being packaged in a JAR file.   -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

コネクタの依存関係を使用する

DataStreamモードでデータを読み書きする場合は、関連するDataStreamコネクタを使用してRealtime Compute for Apache Flinkに接続する必要があります。Maven中央リポジトリに保存されている VVR DataStreamコネクタ を使用して、ドラフトを開発できます。

重要

コネクタのインターフェースとパラメータは、将来変更される可能性があります。「サポートされているコネクタ」でDataStream APIを提供するように指定されているコネクタを使用することをお勧めします。

次のいずれかの方法でコネクタを使用できます。

(推奨)コネクタのuber JARファイルをアップロードし、uber JARファイルを追加の依存関係ファイルとして呼び出す

  1. 必要なコネクタをプロジェクトの依存関係としてドラフトのMavenプロジェクトのpom.xmlファイルに追加し、依存関係に <scope>provided</scope> を指定します。完全な依存関係の詳細については、このトピックの「完全なサンプルコード」セクションを参照してください。

    説明
    • ${vvr.version} は、ドラフトの実行時環境に対応するエンジンバージョンを示します。ドラフトのエンジンバージョンが vvr-8.0.9-flink-1.17 の場合、Flinkバージョンは 1.17.2 です。最新バージョンのVVRを使用することをお勧めします。詳細については、「エンジンの更新」をご参照ください。

    • このトピックでは、コネクタのuber JARファイルは追加の依存関係ファイルとして呼び出されます。この場合、必要なコネクタをドラフトのJARファイルにパッケージ化する必要はありません。したがって、コネクタの依存関係に <scope>provided</scope> を指定する必要があります。

            <!-- Kafka connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- MySQL connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
  2. 新しいコネクタを開発したり、既存のコネクタの拡張機能を使用したりする場合は、コネクタのパブリックパッケージ flink-connector-base または ververica-connector-common をプロジェクトの依存関係として追加します。

            <!-- Basic dependency of the public interface of Flink connectors -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- Basic dependency of the public interface of Alibaba Cloud connectors -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStreamコネクタの構成情報とサンプルコードを表示します。詳細については、DataStreamコネクタのドキュメントのトピックを参照してください。

    DataStreamコネクタの詳細については、「サポートされているコネクタ」をご参照ください。

  4. ドラフトをデプロイし、[JARデプロイメントの作成] ダイアログボックスの [追加の依存関係] フィールドにuber JARパッケージを追加します。詳細については、「JARデプロイメントを作成する」をご参照ください。開発したコネクタ、またはRealtime Compute for Apache Flinkによって提供されるコネクタのJARファイルをアップロードできます。Realtime Compute for Apache Flinkによって提供されるコネクタの公式JARファイルのダウンロードリンクは、「コネクタ」で入手できます。次の図は例を示しています。

    image

コネクタをプロジェクトの依存関係としてドラフトのJARファイルにパッケージ化する

  1. 必要なコネクタをプロジェクトの依存関係としてドラフトのMavenプロジェクトのpom.xmlファイルに追加します。たとえば、KafkaコネクタとMySQLコネクタをプロジェクトの依存関係としてドラフトのMavenプロジェクトのpom.xmlファイルに追加できます。

    説明
    • ${vvr.version} は、ドラフトの実行時環境に対応するエンジンバージョンを示します。ドラフトのエンジンバージョンが vvr-8.0.9-flink-1.17 の場合、Flinkバージョンは 1.17.2 です。最新バージョンのVVRを使用することをお勧めします。詳細については、「エンジンの更新」をご参照ください。

    • このトピックでは、コネクタはプロジェクトの依存関係としてドラフトのJARファイルにパッケージ化されます。この場合、依存関係にはデフォルトのスコープ(コンパイル)を使用する必要があります。

            <!-- Kafka connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
            </dependency>
            <!-- MySQL connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  2. 新しいコネクタを開発したり、既存のコネクタの拡張機能を使用したりする場合は、コネクタのパブリックパッケージ flink-connector-base または ververica-connector-common をプロジェクトの依存関係として追加します。

            <!-- Basic dependency of the public interface of Flink connectors -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- Basic dependency of the public interface of Alibaba Cloud connectors -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStreamコネクタの構成情報とサンプルコードを表示します。詳細については、DataStreamコネクタのドキュメントのトピックを参照してください。

    DataStreamコネクタの詳細については、「サポートされているコネクタ」をご参照ください。

OSSに保存されている依存関係ファイルからデータを読み取る

Realtime Compute for Apache FlinkのJARデプロイメントのMain関数を使用して、ローカル構成ファイルからデータを読み取ることはできません。この場合、構成ファイルをObject Storage Service(OSS)バケットにアップロードし、JARデプロイメントを作成するときに [JARデプロイメントの作成] ダイアログボックスの [追加の依存関係] フィールドに構成ファイルを追加して、構成ファイルからデータを読み取ることができます。例:

  1. 構成ファイルconfig.propertiesを作成し、コードでプレーンテキストを使用しないでください。

    # Kafka 
    bootstrapServers=host1:9092,host2:9092,host3:9092
    inputTopic=topic
    groupId=groupId
    # MySQL
    database.url=jdbc:mysql://localhost:3306/my_database
    database.username=username
    database.password=password
  2. JARドラフトでコードを実行して、OSSバケットに保存されている構成ファイルconfig.propertiesからデータを読み取ります。

    方法 1:ワークスペースに関連付けられているOSSバケットの構成ファイルからデータを読み取る

    1. Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションペインで、[成果物] をクリックします。 [成果物] ページで、ファイルをアップロードします。

    2. デプロイメントの実行時に、追加の依存関係/flink/usrlib フィールドに追加された追加の依存関係ファイルは、デプロイメントが実行されているポッドの ディレクトリにロードされます。

    3. JARドラフトで次のコードを実行して、構成ファイルからデータを読み取ります。

                  Properties properties = new Properties();
                  Map<String,String> configMap = new HashMap<>();
      
                  try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
                      // プロパティファイルをロードします。
                      properties.load(input);
                      // プロパティ値を取得します。
                      configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
                      configMap.put("inputTopic",properties.getProperty("inputTopic"));
                      configMap.put("groupId",properties.getProperty("groupId"));
                      configMap.put("url",properties.getProperty("database.url")) ;
                      configMap.put("username",properties.getProperty("database.username"));
                      configMap.put("password",properties.getProperty("database.password"));
                  } catch (IOException ex) {
                      ex.printStackTrace();
                  }

    方法 2:Realtime Compute for Apache Flinkのワークスペースがアクセス許可を持つOSSバケットの構成ファイルからデータを読み取る

    1. 構成ファイルconfig.propertiesをOSSバケットにアップロードします。

    2. JARドラフトでOSSClientを使用して、OSSバケットの構成ファイルからデータを読み取ります。詳細については、「ストリーミングダウンロード」および「アクセス認証情報の管理」をご参照ください。サンプルコード:

      OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret");
      try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties");
           BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
          // ファイルを読み取って処理します...
      } finally {
          if (ossClient != null) {
              ossClient.shutdown();
          }
      }

ビジネスコードを作成する

  1. 外部データソースをRealtime Compute for Apache Flinkのデータストリーミングプログラムに統合します。 Watermarks は、Realtime Compute for Apache Flinkの時間セマンティクスに基づく計算戦略です。ほとんどの場合、ウォーターマークはタイムスタンプと一緒に使用されます。この例では、ウォーターマーク戦略は使用されていません。詳細については、「Generating Watermarks」をご参照ください。

             // 外部データソースをRealtime Compute for Apache Flinkのデータストリーミングプログラムに統合します。
            // WatermarkStrategy.noWatermarks()を指定します。これは、ウォーターマーク戦略が使用されていないことを示します。
            DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
  2. 演算子を変換します。この例では、DataStream<String> 演算子が DataStream<Student> に変換されます。より複雑な演算子を変換および処理する方法の詳細については、「演算子」をご参照ください。

              // データ構造がstudentである演算子を変換します。
              DataStream<student> source = stream
                    .map(new MapFunction<String, student>() {
                        @Override
                        public student map(String s) throws Exception {
                            // データをコンマ(,)で区切ります。
                            String[] data = s.split(",");
                            return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                        }
                    }).filter(student -> student.score >=60); // scoreが60以上のデータレコードを取得します。

ドラフトのJARファイルをパッケージ化する

maven-shade-pluginを使用して、ドラフトのJARファイルをパッケージ化します。

重要
  • コネクタのuber JARファイルをRealtime Compute for Apache Flinkの開発コンソールにアップロードし、DataStreamドラフトでuber JARファイルを追加の依存関係ファイルとして呼び出す場合は、ドラフトのJARファイルをパッケージ化するときに、コネクタの依存関係に <scope>provided</scope> を指定する必要があります。

  • コネクタをプロジェクトの依存関係としてドラフトのJARファイルにパッケージ化する場合は、ドラフトのJARファイルをパッケージ化するときに、コネクタの依存関係に <scope>compile</scope> を指定する必要があります。

参照用のmaven-shade-pluginの構成

<build>
        <plugins>
            <!-- Java compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- maven-shade-pluginを使用して、必要なすべての依存関係を含むfat JARファイルを作成します。 -->
            <!-- プログラムのエントリポイントが変更された場合は、<mainClass>の値を変更します。 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 不要な依存関係を削除します。 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- META-INFフォルダの署名をコピーしないでください。そうしないと、JARファイルの使用時にセキュリティエラーが発生する可能性があります。 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

ドラフトをテストしてデプロイする

  • デフォルトでは、Realtime Compute for Apache Flinkはインターネットにアクセスできません。この場合、オンプレミスマシンでコードを直接テストできない可能性があります。個別に単体テストを実行することをお勧めします。詳細については、「コネクタを含むFlinkデプロイメントをオンプレミス環境で実行またはデバッグする」をご参照ください。

  • ドラフトのJARデプロイメントを作成する方法の詳細については、「JARデプロイメントを作成する」をご参照ください。

    説明
    • ドラフトのデプロイ時に、コネクタのuber JARファイルを使用し、DataStreamドラフトでuber JARファイルを追加の依存関係ファイルとして呼び出す場合は、コネクタのuber JARファイルをRealtime Compute for Apache Flinkの開発コンソールにアップロードする必要があります。

    • 構成ファイルを読み取る場合は、[JARデプロイメントの作成] ダイアログボックスの [追加の依存関係] フィールドに構成ファイルを追加する必要があります。

    image

完全なサンプルコード

この例では、Kafkaデータソースのデータが処理され、MySQLデータベースに書き込まれます。この例は参照用です。コードスタイルと品質ガイドラインの詳細については、「Apache Flink Code Style and Quality Guide」をご参照ください。

説明

サンプルコードには、チェックポイント、有効期間 (TTL)、再試行ポリシーなどのパラメーターは含まれていません。ドラフトがデプロイされた後、[構成] タブでこれらのパラメーターを構成できます。コード内の構成項目の方が優先順位が高くなります。ドラフトのデプロイ後にカスタム構成を指定することをお勧めします。これにより、後続の変更と適用が容易になります。詳細については、「デプロイメントの構成」をご参照ください。

FlinkDemo.java

package com.aliyun;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkDemo {
    // データ構造を定義します。
    public static class Student {
        public int id;
        public String name;
        public int score;

        public Student(int id, String name, int score) {
            this.id = id;
            this.name = name;
            this.score = score;
        }
    }

    public static void main(String[] args) throws Exception {
        // Flink実行環境を作成します。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        Map<String,String> configMap = new HashMap<>();

        try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
            // プロパティファイルをロードします。
            properties.load(input);
            // プロパティ値を取得します。
            configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
            configMap.put("inputTopic",properties.getProperty("inputTopic"));
            configMap.put("groupId",properties.getProperty("groupId"));
            configMap.put("url",properties.getProperty("database.url")) ;
            configMap.put("username",properties.getProperty("database.username"));
            configMap.put("password",properties.getProperty("database.password"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        // Kafkaソースを構築します
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(configMap.get("bootstrapServers"))
                        .setTopics(configMap.get("inputTopic"))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(configMap.get("groupId"))
                        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                        .build();

        // 外部データソースをFlinkデータストリーミングプログラムに統合します。
        // WatermarkStrategy.noWatermarks()を指定します。これは、ウォーターマーク戦略が使用されていないことを示します。
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

        // scoreが60以上のデータレコードを取得します。
        DataStream<Student> source = stream
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                    }
                }).filter(Student -> Student.score >=60);

        source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
                new JdbcStatementBuilder<Student>() {
                    public void accept(PreparedStatement ps, Student data) {
                        try {
                            ps.setInt(1, data.id);
                            ps.setString(2, data.name);
                            ps.setInt(3, data.score);
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5) // 各バッチに書き込まれるレコードの数。
                        .withBatchIntervalMs(2000) // 再試行に許容される最大遅延。単位:ミリ秒。
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(configMap.get("url"))
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(configMap.get("username"))
                        .withPassword(configMap.get("password"))
                        .build()
        )).name("Sink MySQL");

        env.execute("Flink Demo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>FlinkDemo</name>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.14.1</log4j.version>
    </properties>
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- JARファイルにパッケージ化されないように、依存関係を提供します。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- コネクタの依存関係を追加し、依存関係に <scope>compile</scope> を指定します。 -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <!-- デプロイメントの実行時にRealtime Compute for Apache Flinkの開発コンソールに出力データを生成するためのログフレームワークを追加します。 -->
        <!-- デフォルトでは、これらの依存関係はプログラムのJARファイルから除外されます。 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- maven-shade-pluginを使用して、必要なすべての依存関係を含むfat JARファイルを作成します。 -->
            <!-- プログラムのエントリポイントが変更された場合は、<mainClass>の値を変更します。 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 不要な依存関係を削除します。 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- META-INFフォルダの署名をコピーしないでください。そうしないと、JARファイルの使用時にセキュリティエラーが発生する可能性があります。 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

関連情報