Hologres はフルマネージド Flink との互換性が高く、ほとんどの場合、Flink SQL を使用して Hologres のソーステーブル、ディメンションテーブル、結果テーブルを宣言し、SQL でデータ処理ロジックを定義できます。しかし、特殊なビジネスシナリオで Flink SQL がコンピューティングニーズを満たせない場合は、DataStream を使用してデータを読み書きする必要があります。このトピックでは、Flink 1.17 向けの Ververica Runtime (VVR) 8.0.8 を例に、Hologres コネクタを使用する DataStream ジョブのデバッグと開発方法を説明します。
前提条件
-
Hologres インスタンスを購入し、データベースを作成済みであること。 詳細については、「データベースの作成」をご参照ください。
-
ローカルでのコードデバッグのために、IntelliJ IDEA などのコード開発プラットフォームをインストール済みであること。
ステップ 1: コネクタの依存関係のダウンロード
DataStream を使用して Hologres のデータを読み書きするには、フルマネージド Flink 用の Hologres コネクタをダウンロードする必要があります。 リリースされたコネクタのバージョン一覧については、「Hologres DataStream Connector」をご参照ください。
-
次の 2 つの依存関係 JAR ファイルをダウンロードします。
-
ververica-connector-hologres-1.17-vvr-8.0.8.jar: ローカルデバッグ用。
-
ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar: ローカルデバッグおよびオンラインデプロイ用。
説明Flink 1.15 向けの VVR 6.0 以降、商用コネクタをローカルでデバッグする際には、対応する uber JAR を使用する必要があります。 詳細については、「コネクタを含むジョブのローカルでの実行とデバッグ」をご参照ください。
-
-
ダウンロードが完了したら、次のコマンドを実行して、ververica-connector-hologres-1.17-vvr-8.0.8.jar ファイルをローカルの Maven リポジトリにインストールします。
mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jarコマンド内の
$pathは、ververica-connector-hologres-1.17-vvr-8.0.8.jar ファイルが保存されているローカルディレクトリの絶対パスです。
ステップ 2: ローカルでの開発とデバッグ
フルマネージド Flink コンソールでプロジェクトをデプロイして実行する前に、ローカルで開発する必要があります。 たとえば、Binlog ソーステーブル のプロジェクトコードと pom.xml ファイルは次のとおりです。
-
ローカルでコードを記述します。
-
DataStream API デモコード:
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs; import com.alibaba.ververica.connectors.hologres.binlog.StartupMode; import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource; import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam; import com.alibaba.ververica.connectors.hologres.config.JDBCOptions; import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import java.util.Collections; public class HologresBinlogSourceDemo { public static void main(String[] args) throws Exception { Configuration envConf = new Configuration(); // ローカルでデバッグする際は、uber JAR の絶対パスを指定します。アップロード用にパッケージ化する際は、この行をコメントアウトします。 envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf); // 読み取るテーブルのスキーマを初期化します。スキーマは Hologres テーブルのフィールドと一致する必要があります。フィールドのサブセットのみを定義することも可能です。 TableSchema schema = TableSchema.builder() .field("<id>", DataTypes.INT().notNull()) .primaryKey("<id>") .build(); // Hologres パラメーター。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>"); config.setString(HologresConfigs.USERNAME, "<yourUserName>"); config.setString(HologresConfigs.PASSWORD, "<yourPassword>"); config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>"); config.setString(HologresConfigs.TABLE, "<yourTableName>"); config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // JDBCOptions のビルド。 JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); // HologresBinlogSource のビルド。 long startTimeMs = 0; HologresBinlogSource source = new HologresBinlogSource( new HologresConnectionParam(config), schema, config, jdbcOptions, startTimeMs, StartupMode.INITIAL, "", "", -1, Collections.emptySet()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute(); } }パラメーターの説明:
パラメーター
説明
path_to_uber_jar
ローカルの uber JAR の絶対パス。 Windows の場合は、ディスクパーティションを追加する必要があります。例:
file:///D:/path/to/a-uber.jarid
読み取るテーブルのスキーマ。 スキーマは Hologres テーブルのフィールドと一致する必要があります。 フィールドのサブセットのみを定義することも可能です。
yourEndpoint
Hologres インスタンスのネットワークドメイン名です。Hologres コンソールのインスタンス製品ページに移動し、Network Information セクションからドメイン名を取得します。
yourUserName
ご利用の Alibaba Cloud アカウントの AccessKey ID。 AccessKey 管理 ページに移動して AccessKey ID を取得します。
yourPassword
ご利用の Alibaba Cloud アカウントに対応する AccessKey Secret。
yourDatabaseName
Hologres データベースの名前。
yourTableName
読み取る Hologres テーブルの名前。
-
pom.xml ファイル:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.2</flink.version> <vvr.version>1.17-vvr-8.0.8</vvr.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>1.7.21</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${vvr.version}</version> </dependency> <!-- ログ実装 log4j 依存関係 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
-
-
ローカルでのデバッグと実行。
-
必要な ClassLoader JAR ファイル (ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar) を構成します。 詳細については、「ステップ 2: 必要な ClassLoader JAR ファイルの構成」をご参照ください。
-
(任意)
org.apache.flink.configuration.Configurationなどの共通 Flink クラスが見つからないためにエラーが発生した場合は、「Modify options」で [Add dependencies with provided scope to classpath] を選択します。
構成が完了したら、プロジェクトをローカルでデバッグおよび実行し、正常に実行されることを確認します。
-
ローカルデバッグの詳細については、「コネクタを含むジョブのローカルでの実行とデバッグ」をご参照ください。
ステップ 3: パッケージ化と実行
プロジェクトのローカルでのデバッグに成功したら、プロジェクトをパッケージ化し、uber JAR とともに Flink コンソールにアップロードします。
-
パッケージ化する前に、次のコードをコメントアウトします。
envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); -
コンパイルとパッケージ化。
Maven を使用して、アプリケーションとその依存関係をコンパイルおよびパッケージ化します。 コマンドは次のとおりです。
mvn clean package -DskipTestsパッケージ化が完了すると、ローカルディレクトリに hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar という名前のファイルが生成されます。
-
JAR ファイルのアップロード。
Flink コンソールの[リソース管理]ページで、パッケージ化されたアプリケーションの JAR ファイルと ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar ファイルをアップロードします。詳細については、「ステップ 2: テスト JAR ファイルとデータファイルをアップロードする」をご参照ください。
-
JAR ジョブのデプロイ。
Flink コンソールの [ジョブ O&M] ページで、JAR ジョブをデプロイできます。詳細およびパラメーターについては、ステップ 3: JAR ジョブをデプロイするをご参照ください。
-
ジョブを開始して Flink のコンピューティング結果を表示します。
説明JAR ファイルを更新した場合は、再度アップロードし、ジョブを再デプロイしてからジョブを開始する必要があります。
-
Flink コンソールの[ジョブ運用管理]ページで、対象のジョブを見つけ、起動をActions列でクリックします。
-
リソース情報と基本設定を構成します。
ジョブの開始パラメーターの詳細については、「ジョブの開始」をご参照ください。
-
起動 をクリックします。
開始をクリックすると、ジョブのStatus がRunning に変化し、ジョブが正しく実行されていることを示します。
-
よくある質問
-
質問 1: Realtime Compute for Apache Flink の商用コネクタへの依存関係を含む Flink ジョブを IntelliJ IDEA で実行およびデバッグすると、コネクタ関連のクラスが見つからないことを示す実行時エラーが発生することがあります。 例:
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter-
原因: この種のエラーは通常、ローカルデバッグ中に uber JAR が正しく使用されなかったために発生します。
-
解決策: このトピックまたは「コネクタを含むジョブのローカルでの実行とデバッグ」を参照して、デバッグに uber JAR を正しく使用する方法を確認してください。
-
-
質問 2: 共通の Flink クラスが欠落しているため、プログラムを実行できないというエラーメッセージが表示されます。 例:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration-
原因: 依存関係が欠落しているか、正しくロードされなかった可能性があります。
-
解決策:
-
pom.xml ファイルに必要な依存関係 (多くの場合 flink-connector-base) がインポートされていません。 例外パッケージパスを検索して、必要な Flink の依存関係を特定することもできます。
-
`provided` 依存関係が実行時にロードされなかった可能性があります。 IntelliJ IDEA で、「Modify options」に移動し、[Add dependencies with provided scope to classpath] にチェックを入れます。
-
-
-
質問 3: 実行時に
Incompatible magic valueエラーが報告されます。-
原因:
-
原因 1: uber JAR のバージョンがコネクタのバージョンと一致しない可能性があります。
-
原因 2: ClassLoader の設定が正しくない可能性があります。
-
-
解決策:
-
原因 1 の場合: このトピックを参照して、コネクタと uber JAR の正しいバージョンを選択してください。
-
原因 2 の場合: 「必要な ClassLoader JAR ファイルの構成」を参照して、設定を再構成してください。
-
-
-
質問 4: 実行時に
Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not found例外がスローされます。-
原因: uber JAR は現在、Windows システム上の 32 ビット Java をサポートしていません。
-
解決策: 64 ビット Java をインストールしてください。
java -versionコマンドを実行して、Java のインストール情報を確認できます。 出力に64-Bitが含まれていない場合は、32 ビット Java を使用しています。
-
-
質問 5: 実行時に
Caused by: java.lang.ClassFormatError例外がスローされます。-
原因: IntelliJ IDEA で構成されている JDK のバージョンが原因である可能性があります。
-
解決策: JDK 8 または JDK 11 の新しいバージョンを使用してください。
-