Hologres は、フルマネージド Flink と高度に互換性があります。ほとんどの場合、Flink SQL を使用して Hologres ソーステーブル、ディメンションテーブル、および結果テーブルを宣言し、データ処理ロジックを表現できます。特別なビジネスシナリオでは、Flink SQL がビジネス要件を満たせない場合、DataStream コネクタを使用してデータの読み取りと書き込みを行うことができます。このトピックでは、Hologres コネクタに基づいて DataStream ドラフトを開発およびデバッグする方法について説明します。この例では、vvr-8.0.8-flink-1.17 を使用しています。
前提条件
Hologres インスタンスが作成され、Hologres インスタンスのデータベースが作成されます。詳細については、「データベースの作成」をご参照ください。
オンプレミスコードデバッグ用に IntelliJ IDEA などのコード開発プラットフォームがインストールされています。
ステップ 1:コネクタが依存する JAR パッケージをダウンロードする
DataStream コネクタを使用して Hologres からデータを読み取り、Hologres にデータを書き込む場合は、フルマネージド Flink に接続するために Hologres コネクタをダウンロードする必要があります。 Hologres コネクタのバージョンについては、「Hologres DataStream コネクタ」をご参照ください。
次の JAR パッケージをダウンロードする必要があります。
ververica-connector-hologres-1.17-vvr-8.0.8.jar: この JAR パッケージは、ローカルデバッグに使用されます。
ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar: この JAR パッケージは、ローカルデバッグとオンラインデプロイに使用されます。
説明vvr-6.0-flink-1.15 以降を使用する Realtime Compute for Apache Flink の場合、オンプレミス環境でコネクタを使用してデプロイをデバッグするときは、特定バージョンのコネクタの uber JAR ファイルを使用する必要があります。詳細については、「コネクタを含む Flink デプロイをオンプレミス環境で実行またはデバッグする」をご参照ください。
上記の JAR パッケージをダウンロードした後、次のコマンドを実行して、JAR パッケージ ververica-connector-hologres-1.17-vvr-8.0.8.jar をローカル Maven リポジトリにインストールします。
mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jar$pathは、JAR パッケージ ververica-connector-hologres-1.17-vvr-8.0.8.jar が格納されている絶対パスです。
ステップ 2:ドラフトを開発し、オンプレミス環境でドラフトをデバッグする
Realtime Compute for Apache Flink の開発コンソールでデプロイを作成して実行する前に、オンプレミス環境でドラフトを開発する必要があります。次のサンプルコードは、Hologres バイナリログソーステーブルからデータを読み取る実装クラスを構築する方法を示しています。
オンプレミス環境でコードを記述します。
DataStream API を使用して Hologres バイナリログソーステーブルからデータを読み取るサンプルコード
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs; import com.alibaba.ververica.connectors.hologres.binlog.StartupMode; import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource; import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam; import com.alibaba.ververica.connectors.hologres.config.JDBCOptions; import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import java.util.Collections; public class HologresBinlogSourceDemo { public static void main(String[] args) throws Exception { Configuration envConf = new Configuration(); // ドラフトをオンプレミス環境でデバッグする場合は、uber JAR ファイルの絶対パスを指定し、ドラフトの JAR ファイルをパッケージ化してアップロードする場合は、絶対パスをコメントアウトします。 envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf); // 読み取るソーステーブルのスキーマを初期化します。テーブルスキーマのフィールドが Hologres テーブルのフィールドと一致していることを確認してください。特定のフィールドのみを定義できます。 TableSchema schema = TableSchema.builder() .field("<id>", DataTypes.INT().notNull()) .primaryKey("<id>") .build(); // Hologres 関連のパラメータを設定します。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>"); config.setString(HologresConfigs.USERNAME, "<yourUserName>"); config.setString(HologresConfigs.PASSWORD, "<yourPassword>"); config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>"); config.setString(HologresConfigs.TABLE, "<yourTableName>"); config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // Java Database Connectivity( JDBC) オプションを構築します。 JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); // Hologres バイナリログソーステーブルからデータを読み取るために HologresBinlogSource を構築します。 long startTimeMs = 0; HologresBinlogSource source = new HologresBinlogSource( new HologresConnectionParam(config), schema, config, jdbcOptions, startTimeMs, StartupMode.INITIAL, "", "", -1, Collections.emptySet()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute(); } }次の表は、パラメータについて説明しています。
パラメータ
説明
path_to_uber_jar
ローカル uber JAR ファイルの絶対パス。Windows オペレーティングシステムを使用している場合は、
file:///D:/path/to/a-uber.jarなどのディスクパーティションをパスに追加する必要があります。id
読み取るソーステーブルのスキーマ。テーブルスキーマのフィールドが Hologres テーブルのフィールドと一致していることを確認してください。特定のフィールドのみを定義できます。
yourEndpoint
Hologres インスタンスのエンドポイント。Hologres コンソール の Hologres インスタンスのインスタンス詳細ページの [ネットワーク情報] セクションで、Hologres インスタンスのエンドポイントを表示できます。
yourUserName
Alibaba Cloud アカウントの AccessKey ID。アクセスキーペア ページから AccessKey ID を取得できます。
yourPassword
Alibaba Cloud アカウントの AccessKey シークレット。
yourDatabaseName
Hologres データベースの名前。
yourTableName
データを読み取る Hologres テーブルの名前。
pom.xml ファイル:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.2</flink.version> <vvr.version>1.17-vvr-8.0.8</vvr.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>1.7.21</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${vvr.version}</version> </dependency> <!-- ログに log4j 依存関係を実装します。 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
オンプレミス環境でドラフトをデバッグして実行します。
ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar などの ClassLoader JAR パッケージをドラフト構成に追加する必要があります。詳細については、「ステップ 2:デプロイの実行に必要な ClassLoader JAR パッケージを設定する」をご参照ください。
オプション。
org.apache.flink.configuration.Configurationなどの一般的な Flink クラスが見つからないことを示すエラーメッセージが表示された場合は、[オプションの変更] パラメータで [provided スコープを持つ依存関係をクラスパスに追加] を選択する必要があります。
構成が完了したら、オンプレミス環境でドラフトをデバッグして実行できます。
コネクタを含むデプロイのデバッグ方法の詳細については、「コネクタを含む Flink デプロイをオンプレミス環境で実行またはデバッグする」をご参照ください。
ステップ 3:ドラフトのプログラムをパッケージ化し、ドラフトのデプロイを実行する
ローカルデバッグが成功したら、ドラフトのプログラムを JAR ファイルにパッケージ化し、uber JAR ファイルと共に Realtime Compute for Apache Flink の開発コンソールにアップロードできます。
ドラフトのプログラムをパッケージ化する前に、次のコードをコメントアウトします。
envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");ドラフトのプログラムをコンパイルしてパッケージ化します。
Maven を使用して、ドラフトのプログラムと依存関係をコンパイルしてパッケージ化します。サンプルコマンド:
mvn clean package -DskipTestsパッケージ化操作が完了すると、オンプレミスマシンに hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar という名前の JAR ファイルが生成されます。
JAR ファイルをアップロードします。
Realtime Compute for Apache Flink の開発コンソールの [アーティファクト] ページで、生成された JAR ファイルと ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar ファイルをアップロードします。詳細については、「ステップ 2:テスト JAR パッケージとデータファイルをアップロードする」をご参照ください。
JAR デプロイを作成します。
Realtime Compute for Apache Flink の開発コンソールの [デプロイ] ページで JAR デプロイを作成します。JAR デプロイの作成方法とパラメータの設定方法の詳細については、「ステップ 3:JAR デプロイを作成する」をご参照ください。
デプロイを開始し、計算結果を表示します。
説明JAR ファイルを更新する場合は、JAR ファイルをアップロードし、JAR デプロイを作成し、デプロイを再起動する必要があります。
Realtime Compute for Apache Flink の開発コンソールの [デプロイ] ページで、目的のデプロイを見つけて、[アクション] 列の [開始] をクリックします。
[ジョブの開始] ダイアログボックスで、パラメータを設定します。
デプロイを開始するときに設定する必要があるパラメータの詳細については、「デプロイメントの開始」をご参照ください。
[開始] をクリックします。
[開始] をクリックすると、[デプロイステータス] が [実行中] に変わります。これは、デプロイが想定どおりに実行されていることを示します。
FAQ
問題 1:IntelliJ IDEA で Alibaba Cloud Realtime Compute for Apache Flink のコネクタを含むデプロイを実行またはデバッグすると、コネクタ関連のクラス が見つからないという問題が発生する場合があります。たとえば、デプロイを実行すると、次のエラーが発生します。
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter。どうすればよいですか?考えられる原因:オンプレミス環境でデプロイをデバッグするときに、uber JAR ファイルが正しく使用されていません。
解決策:ローカルデバッグには uber JAR ファイルを正しく使用してください。詳細については、このトピックまたは「コネクタを含む Flink デプロイをオンプレミス環境で実行またはデバッグする」をご参照ください。
問題 2:一般的な Flink クラス が見つからないことを示すエラーメッセージが表示されます。たとえば、
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configurationメッセージが表示されます。どうすればよいですか?考えられる原因:依存関係が見つからないか、想定どおりにロードされていない可能性があります。
解決策:
pom.xml ファイルに依存関係が追加されていないため、エラーメッセージが表示されます。不足している依存関係を特定し、pom.xml ファイルに依存関係を追加できます。ほとんどの場合、flink-connector-base が不足している可能性があります。例外パッケージのパスを検索して、不足している Flink 依存関係を特定することもできます。
provided 依存関係が想定どおりにロードされないため、エラーメッセージが表示されます。IntelliJ IDEA で [オプションの変更] パラメータの [provided スコープを持つ依存関係をクラスパスに追加] を選択できます。
問題 3:デプロイを実行すると
Incompatible magic valueというエラーメッセージが表示された場合はどうすればよいですか?考えられる原因:
原因 1:uber JAR ファイルのバージョンがコネクタのバージョンと異なる可能性があります。
原因 2:ClassLoader JAR パッケージの構成が無効です。
解決策:
原因 1 の解決策:このトピックを参照して、コネクタと uber JAR ファイルの関連バージョンを選択できます。
原因 2 の解決策:ClassLoader JAR パッケージを再構成します。詳細については、「ステップ 2:デプロイの実行に必要な ClassLoader JAR パッケージを設定する」をご参照ください。
問題 4:デプロイを実行すると
Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not foundというエラーメッセージが表示された場合はどうすればよいですか?考えられる原因:uber JAR ファイルは Windows の 32 ビット Java をサポートしていません。
解決策:64 ビット Java をインストールします。
java -versionコマンドを実行して、Java のインストール情報を表示できます。インストール情報に64-Bitが含まれていない場合は、32 ビット Java がインストールされています。
問題 5:デプロイを実行すると
Caused by: java.lang.ClassFormatErrorというエラーメッセージが表示された場合はどうすればよいですか?考えられる原因:IntelliJ IDEA に設定されている Java Development Kit( JDK) バージョンが無効です。
解決策:JDK 8 または JDK 11 を使用します。