このトピックでは、例を使用して Apache NiFi を Hologres に接続する方法について説明します。
背景情報
Apache NiFi は、データ処理と配信のための使いやすく信頼性の高いシステムです。システム間のデータフローの管理を自動化します。Apache NiFi は、強力な対話性と使いやすさを備えた Web ベースのユーザーインターフェースを提供し、システム間またはシステム内でのデータフローの管理と処理を可能にします。
前提条件
-
Hologres がアクティブ化されていること。詳細については、「Hologres インスタンスの購入」をご参照ください。
-
Apache NiFi がインストールされていること。詳細については、「NiFi のインストールと起動方法」をご参照ください。
ローカル JSON ファイルの Hologres への書き込み
次の図は、ローカル JSON ファイルを Hologres に書き込むためのワークフローを示しています。
-
GetFile:JSON 形式のファイルを読み取ります。
-
ConvertJSONToSQL:JSON の要素を SQL の INSERT 文に変換します。
-
PutSQL:前のプロセッサで生成された SQL 文を実行し、JSON 要素をデータベースに挿入します。
-
データベースとテーブルの作成
-
ご利用の Hologres インスタンスにログインし、demo という名前のデータベースを作成します。詳細については、「データベースの作成」をご参照ください。
-
データテーブルを作成します。
次の SQL 文を実行してテーブルを作成します。後でこのテーブルにデータを書き込みます。
DROP TABLE IF EXISTS user_info; CREATE TABLE IF NOT EXISTS user_info ( id int, first_name text, last_name text, email text );
-
-
GetFile プロセッサの構成
-
GetFile プロセッサを追加します。
詳細については、「プロセッサの追加」をご参照ください。
-
JSON ファイルへのパスを入力します。
[プロパティ] タブの [入力ディレクトリ] フィールドに、JSON ファイルが保存されているパスを入力します。たとえば、JSON ファイルを Apache NiFi サーバーの
/opt/nifi/nifi-current/file_sourceに保存し、ファイル名を user_info.json とした場合、ファイルの内容は次のようになります。{ "id": 1, "first_name": "Sig", "last_name": "Olivo", "email": "solivo0@blinklist.com" }次の図は構成例です。

-
[適用] をクリックして構成を保存します。
-
-
ConvertJSONToSQL プロセッサの構成
-
ConvertJSONToSQL プロセッサを追加します。
-
[JDBC 接続プール] でサービスを追加します。[互換性のあるコントローラーサービス] を DBCPConnectionPool に、[コントローラーサービス名] を hologres に設定します。

-
[JDBC 接続プール] 行の右端にある右向き矢印ボタンをクリックして、接続文字列を構成します。
-
先ほど作成した DBCPConnectionPool を見つけ、設定ボタンをクリックします。

-
設定ページの [プロパティ] タブで、次のパラメーターを構成します。

パラメーター名
説明
注意事項
データベース接続 URL
Hologres インスタンスの JDBC 接続文字列は、
jdbc:postgresql://<endpoint>/<database name>の形式を使用します。例:jdbc:postgresql://hgpostcn-cn-xxxxxxxxxxx-cn-shanghai.hologres.aliyuncs.com:80/demo。パブリックネットワークまたは VPC エンドポイントのいずれかを使用します。エンドポイントを取得するには、Hologres 管理コンソールのインスタンス詳細ページに移動します。
データベースドライバークラス名
org.postgresql.Driver
該当なし
データベースドライバーの場所
PostgreSQL JDBC ドライバーへのパス。例:
/opt/nifi/nifi-current/jdbc_driver/postgresql-42.3.4.jar。PostgreSQL の公式サイトから JDBC ドライバーをダウンロードします。バージョン 42.2.25 以降の使用を推奨します。
データベースユーザー
ご利用の Alibaba Cloud アカウントの AccessKey ID。
AccessKey ID を取得するには、AccessKey 管理に移動します。
パスワード
ご利用のアカウントの AccessKey Secret。
-
[OK] をクリックして構成を完了します。
-
[有効化] をクリックしてサービスを開始します。
-
ConvertJSONToSQL プロセッサに戻り、次のパラメーターを構成します。詳細については、公式ドキュメントをご参照ください。
パラメーター名
説明
文の種類
実行する SQL 文の種類。この例では、
INSERTを使用します。テーブル名
データを書き込むテーブルの名前。この例では、
user_infoを使用します。スキーマ名
ターゲットテーブルのスキーマ名。この例では、
publicを使用します。 -
[適用] をクリックして構成を完了します。
-
-
PutSQL プロセッサの構成
-
PutSQL プロセッサを追加します。
-
[JDBC 接続プール] を、前のステップで構成した DBCPConnectionPool に設定します。この例では、DBCPConnectionPool の名前は
hologresです。 -
[フラグメント化トランザクションをサポート] を false に設定します。
-
[適用] をクリックして構成を完了します。
-
-
データ書き込みの開始
これで、すべての構成が完了しました。すべてのノードを実行中の状態に設定します。NiFi は JSON ファイルの読み取りを開始し、Hologres にデータを書き込みます。

-
データのクエリ
Hologres で次のコマンドを実行して user_info テーブルをクエリし、挿入されたデータを表示します。
SELECT * FROM user_info;クエリ結果は次のように表示されます。
