Flink DataStream API は、カスタムのデータ変換、操作、オペレーターを定義できる柔軟なプログラミングモデルを提供します。この柔軟性は、複雑なビジネスロジックやデータ処理要件に最適です。このトピックでは、Flink JAR ジョブの開発方法について説明します。
オープンソースの Apache Flink のサポート
Realtime Compute for Apache Flink の DataStream API は、オープンソースの Apache Flink バージョンと完全に互換性があります。詳細については、「Introduction to Apache Flink」および「Flink DataStream API Developer Guide」をご参照ください。
開発環境の要件
IntelliJ IDEA などの統合開発環境 (IDE) がインストールされていること。
Maven 3.6.3 以降がインストールされていること。
ジョブ開発は JDK 8 および JDK 11 のみをサポートします。
JAR ジョブをオフラインで開発し、Realtime Compute for Apache Flink コンソールでデプロイして実行できます。
前提条件
この例では、データソースコネクタの使用方法を示します。事前に必要なデータソースを準備する必要があります。
この例では、データソースとして Alibaba Cloud Message Queue for Kafka 2.6.2 と ApsaraDB RDS for MySQL 8.0 を使用します。
インターネット経由または VPC 間でセルフマネージドのデータソースにアクセスする方法については、「ネットワーク接続方法の選択」をご参照ください。
Message Queue for Kafka データソースがない場合は、Kafka インスタンスを購入してデプロイする必要があります。詳細については、「ステップ 2:インスタンスの購入とデプロイ」をご参照ください。インスタンスをデプロイする際は、ご利用の Realtime Compute for Apache Flink ワークスペースと同じ VPC 内にあることを確認してください。
ApsaraDB RDS for MySQL データソースがない場合は、RDS for MySQL インスタンスを購入する必要があります。詳細については、「ステップ 1:ApsaraDB RDS for MySQL インスタンスの作成とデータベースの設定」をご参照ください。インスタンスを購入する際は、ご利用の Realtime Compute for Apache Flink ワークスペースと同じリージョンおよび VPC 内にあることを確認してください。
ジョブの開発
Flink 環境の依存関係の設定
JAR パッケージの依存関係の競合を避けるために、次の点にご注意ください:
${flink.version}は、ジョブのランタイムに対応する Flink のバージョンです。デプロイメントページで選択した Ververica Runtime (VVR) エンジンと同じ Flink バージョンを使用してください。例えば、デプロイメントページでvvr-8.0.9-flink-1.17エンジンを選択した場合、対応する Flink のバージョンは1.17.2です。VVR エンジンのバージョンの詳細については、「現在のジョブの Flink バージョンを確認する方法」をご参照ください。Flink 関連の依存関係については、スコープを `provided` に設定してください。これを行うには、依存関係に
<scope>provided</scope>を追加します。これは主に、org.apache.flinkグループ配下でflink-で始まる非コネクタの依存関係に適用されます。Flink のソースコードでは、@Public または @PublicEvolving で明示的にアノテーションが付けられたメソッドのみが public と見なされます。Realtime Compute for Apache Flink は、これらのメソッドに対してのみ互換性を保証します。
Realtime Compute for Apache Flink の組み込みコネクタが DataStream API をサポートしている場合は、その組み込みの依存関係を使用する必要があります。
以下のコードは、いくつかの基本的な Flink の依存関係を示しています。ログファイルの依存関係を追加する必要がある場合もあります。依存関係の完全なリストについては、このトピックの最後にある「完全なサンプルコード」をご参照ください。
Flink 関連の依存関係
コネクタの依存関係と使用方法
DataStream を使用してデータを読み書きするには、対応する DataStream コネクタを使用する必要があります。Maven 中央リポジトリには、ジョブ開発中に直接使用できる VVR DataStream コネクタ が含まれています。
「サポートされているコネクタ」で DataStream API をサポートすると指定されているコネクタを使用する必要があります。他のコネクタは、将来インターフェイスやパラメーターが変更される可能性があるため、使用しないでください。
コネクタを使用するには、次のいずれかの方法を使用できます:
(推奨) コネクタの JAR パッケージを追加の依存関係ファイルとしてアップロード
ジョブの Maven POM ファイルで、必要なコネクタをプロジェクトの依存関係として追加し、そのスコープを `provided` に設定します。完全な依存関係ファイルについては、このトピックの最後にある「完全なサンプルコード」をご参照ください。
説明${vvr.version}は、ジョブのランタイムエンジンのバージョンです。例えば、ジョブがvvr-8.0.9-flink-1.17エンジンで実行される場合、対応する Flink のバージョンは1.17.2です。最新のエンジンバージョンを使用することを推奨します。エンジンのバージョンに関する詳細については、「エンジン」をご参照ください。コネクタの JAR パッケージは追加の依存関係としてインポートされるため、ジョブの JAR ファイルにパッケージ化する必要はありません。したがって、スコープを
providedに設定する必要があります。
<!-- Kafka コネクタの依存関係 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL コネクタの依存関係 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>新しいコネクタを開発したり、既存のコネクタの機能を拡張したりする場合、プロジェクトには
flink-connector-baseまたはververica-connector-commonの public パッケージも必要です。<!-- Flink コネクタ共通インターフェイスの基本依存関係 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Alibaba Cloud コネクタ共通インターフェイスの基本依存関係 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>DataStream 接続の設定情報とコードサンプルについては、対応する DataStream コネクタのドキュメントをご参照ください。
DataStream タイプとして使用できるコネクタのリストについては、「サポートされているコネクタ」をご参照ください。
ジョブをデプロイする際に、[追加の依存関係] セクションに対応するコネクタの JAR パッケージを追加します。詳細については、「JAR ジョブのデプロイ」をご参照ください。独自に開発したコネクタや、Realtime Compute for Apache Flink が提供するコネクタをアップロードできます。ダウンロードリンクについては、「コネクタリスト」をご参照ください。以下の図は一例です。

コネクタをプロジェクトの依存関係としてジョブの JAR ファイルにパッケージ化
ジョブの Maven POM ファイルで、必要なコネクタをプロジェクトの依存関係として追加します。例えば、Kafka コネクタと MySQL コネクタをインポートできます。
説明${vvr.version}は、ジョブのランタイムエンジンのバージョンです。例えば、ジョブがvvr-8.0.9-flink-1.17エンジンで実行される場合、対応する Flink のバージョンは1.17.2です。最新のエンジンバージョンを使用することを推奨します。エンジンのバージョンに関する詳細については、「エンジン」をご参照ください。コネクタはプロジェクトの依存関係として JAR ファイルに直接パッケージ化されるため、デフォルトの `compile` スコープを使用する必要があります。
<!-- Kafka コネクタの依存関係 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL コネクタの依存関係 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>新しいコネクタを開発したり、既存のコネクタの機能を拡張したりする場合、プロジェクトには
flink-connector-baseまたはververica-connector-commonの public パッケージも必要です。<!-- Flink コネクタ共通インターフェイスの基本依存関係 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Alibaba Cloud コネクタ共通インターフェイスの基本依存関係 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>DataStream 接続の設定情報とコードサンプルについては、対応する DataStream コネクタのドキュメントをご参照ください。
DataStream タイプとして使用できるコネクタのリストについては、「サポートされているコネクタ」をご参照ください。
OSS からの追加の依存関係ファイルの読み取り
Flink JAR ジョブは、`main` 関数内からローカル設定ファイルを読み取ることをサポートしていません。代わりに、設定ファイルをご利用の Flink ワークスペースに関連付けられた OSS バケットにアップロードできます。ジョブをデプロイする際に、追加の依存関係として追加することでファイルを読み取ることができます。以下のセクションで例を示します。
コード内でプレーンテキストのパスワードを使用しないように、`config.properties` という名前の設定ファイルを作成します。
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=passwordJAR ジョブ内のコードを使用して、OSS バケットに保存されている `config.properties` ファイルを読み取ります。
方法1:ワークスペースにアタッチされた OSS バケットからの読み取り
Realtime Compute for Apache Flink 開発コンソールで、左側のナビゲーションウィンドウの [リソース管理] ページに移動し、ファイルをアップロードします。
ジョブの実行時に、デプロイメント中に追加した追加の依存関係ファイルは、ジョブが実行される Pod の /flink/usrlib ディレクトリにロードされます。
以下のコードは、設定ファイルを読み取る方法の例です。
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // プロパティファイルをロードします。 properties.load(input); // プロパティ値を取得します。 configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
方法2:ワークスペースがアクセス権を持つ OSS バケットからの読み取り
設定ファイルを宛先の OSS バケットにアップロードします。
`OSSClient` を使用して、OSS に保存されているファイルを直接読み取ることができます。詳細については、「ストリーム」および「アクセス認証情報の管理」をご参照ください。以下はサンプルコードです。
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // ファイルを読み込んで処理... } finally { if (ossClient != null) { ossClient.shutdown(); } }
ビジネスコードの記述
以下のコードは、外部データソースを Flink データストリームプログラムに統合する方法を示しています。
ウォーターマークは、イベント時間の進行を測定する Flink のメカニズムであり、タイムスタンプと共によく使用されます。この例では、ウォーターマークポリシーは使用しません。詳細については、「Watermark Strategies」をご参照ください。// 外部データソースを Flink データストリームプログラムに統合します。 // WatermarkStrategy.noWatermarks() は、ウォーターマークポリシーを使用しないことを示します。 DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");以下のコードは、オペレーター変換を示しています。この例では、
DataStream<String>をDataStream<Student>に変換します。複雑なオペレーター変換と処理方法の詳細については、「Flink Operators」をご参照ください。// データ構造を student に変換するオペレーター。 DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // データはカンマで区切られます。 String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // データをフィルタリングして、スコアが 60 以上のものを見つけます。
ジョブのパッケージ化
`maven-shade-plugin` プラグインを使用してジョブをパッケージ化できます。
コネクタを追加の依存関係ファイルとしてインポートすることを選択した場合、ジョブをパッケージ化する際に、コネクタ関連の依存関係のスコープが
providedに設定されていることを確認してください。コネクタを依存関係としてパッケージ化することを選択した場合、デフォルトの `compile` スコープを使用できます。
ジョブのテストとデプロイ
Realtime Compute for Apache Flink は、デフォルトではインターネットにアクセスできません。そのため、ローカルでコードをテストできない場合があります。単体テストを個別に行うことを推奨します。詳細については、「コネクタを含むジョブをローカルで実行およびデバッグする」をご参照ください。
JAR ジョブのデプロイ方法については、「JAR ジョブのデプロイ」をご参照ください。
説明ジョブをデプロイする際に、コネクタを追加の依存関係ファイルとしてアップロードすることを選択した場合は、関連するコネクタの JAR パッケージをアップロードする必要があります。
設定ファイルを読み取るには、それを追加の依存関係ファイルとしてアップロードする必要もあります。

完全なサンプルコード
このサンプルコードは、Kafka データソースからのデータが処理され、MySQL に書き込まれる方法を示しています。この例は参照用です。コードスタイルと品質ガイドラインの詳細については、「Code Style and Quality Guide」をご参照ください。
この例には、チェックポイント、Time to Live (TTL)、再起動ポリシーなどの実行時パラメーターの設定は含まれていません。これらの設定は、ジョブがデプロイされた後、[デプロイメントの詳細] ページでカスタマイズできます。コードで設定された構成は、ページで設定された構成よりも優先度が高くなります。デプロイ後にページでこれらの構成をカスタマイズすることを推奨します。このアプローチにより、将来の変更と再利用が簡素化されます。詳細については、「ジョブのデプロイメント情報の設定」をご参照ください。
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// データ構造を定義します。
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// Flink 実行環境を作成します。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// プロパティファイルをロードします。
properties.load(input);
// プロパティ値を取得します。
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Kafka ソースをビルドします
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// 外部データソースを Flink データストリームプログラムに統合します。
// WatermarkStrategy.noWatermarks() は、ウォーターマークポリシーを使用しないことを示します。
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// データをフィルタリングして、スコアが 60 以上のものを見つけます。
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // 各バッチで書き込まれるレコード数。
.withBatchIntervalMs(2000) // 再試行の最大遅延時間 (ミリ秒)。
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink の依存関係 -->
<!-- これらの依存関係は JAR ファイルにパッケージ化すべきではないため、provided に設定します。 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- ここにコネクタの依存関係を追加します。これらはデフォルトのスコープ (compile) である必要があります。 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- 実行時にコンソール出力を生成するためのロギングフレームワークを追加します。 -->
<!-- デフォルトでは、これらの依存関係はアプリケーション JAR から除外されます。 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java コンパイラ -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- maven-shade-plugin を使用して、必要なすべての依存関係を含む fat jar を作成します。 -->
<!-- プログラムのエントリポイントが変更された場合は、<mainClass> の値を変更してください。 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- 不要な依存関係をいくつか削除します。 -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- META-INF フォルダ内の署名をコピーしないでください。そうしないと、JAR ファイルを使用する際にセキュリティ例外が発生する可能性があります。 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>参考
DataStream タイプとして使用できるコネクタのリストについては、「サポートされているコネクタ」をご参照ください。
Flink JAR ジョブ開発プロセスの完全な例については、「Flink JAR ジョブのクイックスタート」をご参照ください。
Realtime Compute for Apache Flink は、SQL ジョブと Python ジョブもサポートしています。これらのジョブの開発方法については、「ジョブ開発マップ」および「Python ジョブの開発」をご参照ください。