このトピックでは、MaxComputeからHologresデータベースにアクセスする方法について説明します。
背景情報
Hologresは、Alibaba Cloudが提供するリアルタイムデータ処理のためのインタラクティブな分析サービスです。 Hologresを使用すると、高い同時実行性でリアルタイムデータを書き込み、クエリし、高いパフォーマンスでデータを分析できます。 HologresとMaxComputeを使用して、フェデレーションデータウェアハウスソリューションを構築できます。これにより、データ移行の必要がなくなり、リアルタイムとオフラインのデータ処理を組み合わせることができます。 このソリューションは、大規模なオフライン分析、リアルタイムの運用分析、インタラクティブなクエリなどのシナリオに適しています。 ソリューションを実装するには、次の方法を使用してMaxComputeからHologresにアクセスします。
外部テーブルを使用します。
MaxComputeでSparkを使用します。
前提条件
次のMaxCompute設定は完了です。
MaxComputeが有効化され、MaxComputeプロジェクトが作成されます。
MaxComputeの有効化方法については、「MaxComputeとDataWorksの有効化」をご参照ください。 MaxComputeプロジェクトの作成方法については、「MaxCompute プロジェクトを作成する」をご参照ください。
MaxComputeクライアントがインストールされています。
詳細については、「MaxComputeクライアント (odpscmd) 」をご参照ください。
開発環境がMaxComputeのSparkに設定されています。
このトピックでは、LinuxオペレーティングシステムとSpark-2.4.5パッケージを使用します。 詳細については、「Linux開発環境の設定」をご参照ください。
DataWorksが有効化されています。
詳細については、「DataWorksの有効化」をご参照ください。
Hologresインスタンスが購入され、HoloWebに接続されます。
詳細については、「Hologresインスタンスの購入」および「HoloWebに接続してクエリを実行する」をご参照ください。
PostgreSQL Java Database Connectivity (JDBC) ドライバがダウンロードされます。
このトピックでは、
postgresql-42.2.16.jar
パッケージを使用します。 パッケージは、Linuxオペレーティングシステムの /home/postgreSQLパスに保存されます。 PostgreSQL JDBCドライバーをダウンロードするには、公式ダウンロードページに移動します。
外部テーブルの使用
では、Hologresコンソール購入したインスタンスを選択し、という名前のHologresデータベースを作成します。
mc_db_holo
.Hologresデータベースの作成方法については、「データベースの作成」をご参照ください。
では、HoloWebコンソールで次のステートメントを実行します。
mc_db_holo
という名前のテーブルを作成するデータベースmc_sql_holo
テーブルにデータを挿入します。Hologresテーブルの作成方法については、「create table」をご参照ください。
CREATE TABLE mc_sql_holo( id INTEGER, name TEXT ); INSERT INTO mc_sql_holo VALUES (1,'zhangsan'), (2,'lisi'), (3,'wangwu') ;
では、Resource Access Management (RAM) コンソールという名前のRAMロールを作成します。
AliyunOdpsHoloRole
信頼ポリシーを変更します。詳細については、「Hologres外部テーブル」トピックの「STSモードでのHologres外部テーブルの作成」をご参照ください。
説明この例では、信頼できるAccount用にRAMロールが作成されています。
RAMロールの付与
AliyunOdpsHoloRole
Hologresインスタンスへのアクセス。詳細については、「Hologres外部テーブル」トピックの「STSモードでのHologres外部テーブルの作成」をご参照ください。
MaxComputeクライアントで、次のステートメントを実行して、
mc_externaltable_holo
という名前の外部テーブルを作成します。create external table if not exists mc_externaltable_holo ( id int , name string ) stored by 'com.aliyun.odps.jdbc.JdbcStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::13969******5947:role/aliyunodpsholorole') LOCATION 'jdbc:postgresql://hgprecn-cn-2r42******-cn-hangzhou-internal.hologres.aliyuncs.com:80/mc_db_holo?currentSchema=public&useSSL=false&table=mc_sql_holo/' TBLPROPERTIES ( 'mcfed.mapreduce.jdbc.driver.class'='org.postgresql.Driver', 'odps.federation.jdbc.target.db.type'='holo', 'odps.federation.jdbc.colmapping'='id:id,name:name' );
説明ステートメントのパラメーターの詳細については、「Hologres外部テーブル」をご参照ください。
MaxComputeクライアントで、次のステートメントを実行して、外部テーブルからデータを取得します。
set odps.sql.split.hive.bridge=true; set odps.sql.hive.compatible=true; select * from mc_externaltable_holo limit 10;
説明SET操作のパラメーターについては、「SET操作」をご参照ください。
次の応答が返されます。
+----+----------+ | id | name | +----+----------+ | 1 | zhangsan | | 2 | lisi | | 3 | wangwu | +----+----------+
MaxComputeクライアントで、次のステートメントを実行して、外部テーブルにデータを挿入します。
set odps.sql.split.hive.bridge=true; set odps.sql.hive.compatible=true; insert into mc_externaltable_holo values (4,'alice');
HoloWebコンソールで、Hologresテーブル
mc_sql_holo
のデータを照会します。 前の手順で挿入したデータは、テーブルに含まれます。select * from mc_sql_holo;
ローカルモードでMaxComputeでSparkを使用する
では、HoloWebコンソールで次のステートメントを実行します。
mc_db_holo
という名前のテーブルを作成するデータベースmc_jdbc_holo
.Hologresテーブルの作成方法については、「create table」をご参照ください。
CREATE TABLE mc_jdbc_holo( id INTEGER, name TEXT );
に移動します。Go to the/home/pythoncodeという名前のPythonスクリプトを作成します。
holo_local.py
.サンプルPythonスクリプト:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Spark_local") \ .config("spark.eventLog.enabled","false") \ .getOrCreate() jdbcDF = spark.read.format("jdbc"). \ options( url='jdbc:postgresql://hgprecn-cn-2r42******-cn-hangzhou.hologres.aliyuncs.com:80/mc_db_holo', dbtable='mc_jdbc_holo', user='LTAI****************', password='********************', driver='org.postgresql.Driver').load() jdbcDF.printSchema()
パラメーター:
url:
JDBC: postgresql://
プレフィックスを含むjdbc URL。hgprecn-cn-2r42 ****** -cn-hangzhou.hologres.aliyuncs.com:80: インターネット経由でHologresインスタンスにアクセスするために使用されるエンドポイント。 エンドポイントの取得方法については、「インスタンス設定」をご参照ください。
mc_db_holo: 接続するHologresデータベースの名前。 この例では、Hologresデータベース名はmc_db_holoです。
dbtable: データの読み取り元またはデータの書き込み先のHologresテーブルの名前。 この例では、Hologresテーブル名はmc_db_holoです。
user: 許可されたAlibaba CloudアカウントまたはRAMユーザーのAccessKey ID。 AccessKeyペアページでAccessKey IDを取得できます。
password: ユーザーパラメーターで指定したAccessKey IDに対応するAccessKeyシークレット。 AccessKeyペアページでAccessKey secretを取得できます。
driver: PostgreSQLドライバ。 このパラメーターをorg.postgresql.Driverに設定します。
Linuxオペレーティングシステムのディレクトリから次のspark-submitコマンドを実行して、ローカルモードでジョブを送信します。
spark-submit --master local --driver-class-path /home/postgreSQL/postgresql-42.2.16.jar --jars /home/postgreSQL/postgresql-42.2.16.jar /home/pythoncode/holo_local.py
出力ログを表示します。 印刷されたスキーマ情報がHologresの
mc_jdbc_holo
テーブルのスキーマ情報と同じである場合、MaxComputeからHologresにアクセスできます。
クラスタモードでMaxComputeでSparkを使用する
では、HoloWebコンソールで次のステートメントを実行します。
mc_db_holo
という名前のテーブルを作成するデータベースmc_jdbc_holo
.Hologresテーブルの作成方法については、「create table」をご参照ください。
CREATE TABLE mc_jdbc_holo( id INTEGER, name TEXT );
に移動します。Go to the/home/pythoncodeという名前のPythonスクリプトを作成します。
holo_yarncluster.py
.サンプルPythonスクリプト:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Spark_yarn") \ .getOrCreate() jdbcDF = spark.read.format("jdbc"). \ options( url='jdbc:postgresql://hgprecn-cn-2r42******-cn-hangzhou-internal.hologres.aliyuncs.com:80/mc_db_holo', dbtable='mc_jdbc_holo', user='LTAI****************', password='********************', driver='org.postgresql.Driver').load() jdbcDF.printSchema()
パラメーター:
url: JDBC: postgresql:// プレフィックスを含むjdbc URL。
hgprecn-cn-2r42 ****** -cn-hangzhou-internal.hologres.aliyuncs.com:80: クラシックネットワーク経由でHologresインスタンスにアクセスするために使用されるエンドポイント。 エンドポイントの取得方法については、「インスタンス設定」をご参照ください。
mc_db_holo: 接続するHologresデータベースの名前。 この例では、Hologresデータベース名はmc_db_holoです。
dbtable: データの読み取り元またはデータの書き込み先のHologresテーブルの名前。 この例では、Hologresテーブル名はmc_db_holoです。
user: 許可されたAlibaba CloudアカウントまたはRAMユーザーのAccessKey ID。 AccessKeyペアページでAccessKey IDを取得できます。
password: ユーザーパラメーターで指定したAccessKey IDに対応するAccessKeyシークレット。 AccessKeyペアページでAccessKey secretを取得できます。
driver: PostgreSQLドライバ。 このパラメーターをorg.postgresql.Driverに設定します。
に移動します。Go to the/home/spark2.4.5/spark-2.4.5-odps0.33.2/confディレクトリに移動し、spark-defaults.confファイルを変更します。
# Modify the following configurations: spark.hadoop.odps.project.name = <MaxCompute_Project_Name> spark.hadoop.odps.end.point = <Endpoint> spark.hadoop.odps.runtime.end.point = <VPC_Endpoint> spark.hadoop.odps.access.id = <AccessKey_ID> spark.hadoop.odps.access.key = <AccessKey_Secret> spark.hadoop.odps.cupid.trusted.services.access.list = <Hologres_Classic_Network> # Retain the following configurations: spark.master = yarn-cluster spark.driver.cores = 2 spark.driver.memory = 4g spark.dynamicAllocation.shuffleTracking.enabled = true spark.dynamicAllocation.shuffleTracking.timeout = 20s spark.dynamicAllocation.enabled = true spark.dynamicAllocation.maxExecutors = 10 spark.dynamicAllocation.initialExecutors = 2 spark.executor.cores = 2 spark.executor.memory = 8g spark.eventLog.enabled = true spark.eventLog.overwrite = true spark.eventLog.dir = odps://admin_task_project/cupidhistory/sparkhistory spark.sql.catalogImplementation = hive spark.sql.sources.default = hive
パラメーター:
MaxCompute_Project_Name: 作成したMaxComputeプロジェクトの名前。
プロジェクト名を取得するには、MaxComputeコンソールにログインし、左上隅のリージョンを選択し、左側のナビゲーションウィンドウで [ワークスペース] > [プロジェクト] を選択します。
access_id: MaxComputeプロジェクトへのアクセスに使用されるAccessKey id。
AccessKeyペアページでAccessKey IDを取得できます。
AccessKey Secret: access_idパラメーターで指定したAccessKey IDに対応するAccessKeyシークレット。
AccessKeyペアページでAccessKey secretを取得できます。
Endpoint: インターネット経由でMaxComputeプロジェクトにアクセスするために使用されるエンドポイント。
各リージョンのパブリックエンドポイントの詳細については、「異なるリージョンのエンドポイント (インターネット) 」をご参照ください。
VPC_Endpoint: MaxComputeプロジェクトが存在するVirtual Private Cloud (VPC) へのアクセスに使用されるエンドポイント。
各リージョンのVPCエンドポイントの詳細については、「異なるリージョンのエンドポイント (VPC) 」をご参照ください。
Hologres_Classic_Network: クラシックネットワーク経由でHologresにアクセスするために使用されるエンドポイント。 この設定では、MaxComputeサンドボックス環境でHologresインスタンスに接続するためのネットワークポリシーを指定します。 それ以外の場合、MaxComputeは外部サービスにアクセスできません。
Linuxオペレーティングシステムのディレクトリから次のspark-submitコマンドを実行して、クラスターモードでジョブを送信します。
spark-submit --master yarn-cluster --driver-class-path /home/postgreSQL/postgresql-42.2.16.jar --jars /home/postgreSQL/postgresql-42.2.16.jar /home/pythoncode/holo_yarncluster.py
ジョブを送信した後、出力ログで次の情報を使用してジョブを監視できます。
LogView URL。
Spark UIのJobView URL。
Logview URLを開きます。 ジョブのステータスがsuccessの場合、
jdbcDF.printSchema()
メソッドの結果を表示します。 の順にクリックし、Stdoutセクションに印刷されたスキーマ情報がHologresの
mc_jdbc_holo
テーブルのスキーマ情報と同じである場合、MaxComputeからHologresにアクセスできます。説明Spark UIのジョブビューURLを開いて、ジョブの詳細を表示することもできます。
DataWorksモードでMaxComputeでSparkを使用する
では、HoloWebコンソールで次のステートメントを実行します。
mc_db_holo
という名前のテーブルを作成するデータベースmc_jdbc_holo
.Hologresテーブルの作成方法については、「create table」をご参照ください。
CREATE TABLE mc_jdbc_holo( id INTEGER, name TEXT );
に移動します。Go to the/home/spark2.4.5/spark-2.4.5-odps0.33.2/confディレクトリに移動し、spark-defaults.confファイルを変更します。
# Modify the following configurations: spark.hadoop.odps.project.name = <MaxCompute_Project_Name> spark.hadoop.odps.end.point = <Endpoint> spark.hadoop.odps.runtime.end.point = <VPC_Endpoint> spark.hadoop.odps.access.id = <AccessKey_ID> spark.hadoop.odps.access.key = <AccessKey_Secret> spark.hadoop.odps.cupid.trusted.services.access.list = <Hologres_Classic_Network> # Retain the following configurations: spark.master = yarn-cluster spark.driver.cores = 2 spark.driver.memory = 4g spark.dynamicAllocation.shuffleTracking.enabled = true spark.dynamicAllocation.shuffleTracking.timeout = 20s spark.dynamicAllocation.enabled = true spark.dynamicAllocation.maxExecutors = 10 spark.dynamicAllocation.initialExecutors = 2 spark.executor.cores = 2 spark.executor.memory = 8g spark.eventLog.enabled = true spark.eventLog.overwrite = true spark.eventLog.dir = odps://admin_task_project/cupidhistory/sparkhistory spark.sql.catalogImplementation = hive spark.sql.sources.default = hive
パラメーター:
MaxCompute_Project_Name: 作成したMaxComputeプロジェクトの名前。
プロジェクト名を取得するには、MaxComputeコンソールにログインし、左上隅のリージョンを選択し、左側のナビゲーションウィンドウで [ワークスペース] > [プロジェクト] を選択します。
access_id: MaxComputeプロジェクトへのアクセスに使用されるAccessKey id。
AccessKeyペアページでAccessKey IDを取得できます。
AccessKey Secret: access_idパラメーターで指定したAccessKey IDに対応するAccessKeyシークレット。
AccessKeyペアページでAccessKey secretを取得できます。
Endpoint: インターネット経由でMaxComputeプロジェクトにアクセスするために使用されるエンドポイント。
各リージョンのパブリックエンドポイントの詳細については、「異なるリージョンのエンドポイント (インターネット) 」をご参照ください。
VPC_Endpoint: MaxComputeプロジェクトが存在するVirtual Private Cloud (VPC) へのアクセスに使用されるエンドポイント。
各リージョンのVPCエンドポイントの詳細については、「異なるリージョンのエンドポイント (VPC) 」をご参照ください。
Hologres_Classic_Network: クラシックネットワーク経由でHologresにアクセスするために使用されるエンドポイント。 この設定では、MaxComputeサンドボックス環境でHologresインスタンスに接続するためのネットワークポリシーを指定します。 それ以外の場合、MaxComputeは外部サービスにアクセスできません。
DataWorksコンソール.にログインします。
左側のナビゲーションウィンドウで、ワークスペース.をくり区します。
On theワークスペースページで、表示したいワークスペースを見つけて選択します。ショートカット>データ開発で、アクション列を作成します。
PostgreSQL JDBCリソースとODPS Sparkノードを作成します。
左側のナビゲーションウィンドウで、使用するビジネスフローを右クリックし、 を選択します。 [リソースの作成] ダイアログボックスで、ダウンロードしたPostgreSQL JDBCパッケージをアップロードし、[作成] をクリックします。
説明ビジネスフローの作成方法については、「ワークフローの作成」をご参照ください。
MaxComputeリソースの作成方法については、「MaxComputeリソースの作成と使用」をご参照ください。
使用するビジネスフローを右クリックし、 を選択します。 [リソースの作成] ダイアログボックスで、[名前] フィールドにリソース名を入力し、[作成] をクリックします。
この例では、リソース名は
read_holo.py
です。次のコードをコピーして、
read_holo.py
ファイルをクリックし、アイコンが表示されます。
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Spark") \ .getOrCreate() jdbcDF = spark.read.format("jdbc"). \ options( url='jdbc:postgresql://hgprecn-cn-2r42******-cn-hangzhou-internal.hologres.aliyuncs.com:80/mc_db_holo', dbtable='mc_jdbc_holo', user='LTAI****************', password='********************', driver='org.postgresql.Driver').load() jdbcDF.printSchema()
パラメーター:
url:
JDBC: postgresql://
プレフィックスを含むjdbc URL。hgprecn-cn-2r42 ****** -cn-hangzhou.hologres.aliyuncs.com:80: インターネット経由でHologresインスタンスにアクセスするために使用されるエンドポイント。 エンドポイントの取得方法については、「インスタンス設定」をご参照ください。
mc_db_holo: 接続するHologresデータベースの名前。 この例では、Hologresデータベース名はmc_db_holoです。
dbtable: データの読み取り元またはデータの書き込み先のHologresテーブルの名前。 この例では、Hologresテーブル名はmc_db_holoです。
user: 許可されたAlibaba CloudアカウントまたはRAMユーザーのAccessKey ID。 AccessKeyペアページでAccessKey IDを取得できます。
password: ユーザーパラメーターで指定したAccessKey IDに対応するAccessKeyシークレット。 AccessKeyペアページでAccessKey secretを取得できます。
driver: PostgreSQLドライバ。 このパラメーターをorg.postgresql.Driverに設定します。
使用するビジネスフローを右クリックし、です。 を選択します。 [ノードの作成] ダイアログボックスで、[名前] フィールドにノード名を入力し、[確認] をクリックします。 この例では、ノード名はspark_read_holo
次の図の手順に従って、
spark_read_holo
ノード.[設定項目] フィールドにエントリを追加します。 キーを
spark.hadoop.odps.cupid.trusted.services.access.list
に設定します。値を
hgprecn-cn-2r42 ****** -cn-hangzhou-internal.hologres.aliyuncs.com:80
に設定します。これは、クラシックネットワーク経由でHologresにアクセスするために使用されるエンドポイントを指定します。説明この設定では、MaxComputeサンドボックス環境でHologresインスタンスに接続するためのネットワークポリシーを指定します。 それ以外の場合、MaxComputeは外部サービスにアクセスできません。
ビジネスワークフローのキャンバスで、を選択します。 .
ジョブを送信した後、診断情報、LogView URL、Spark UIのJobview URLなど、ジョブの詳細を出力ログに表示できます。
Logview URLを開きます。 ジョブのステータスがsuccessの場合、
jdbcDF.printSchema()
メソッドの結果を表示します。 の順にクリックし、Stdoutセクションに印刷されたスキーマ情報がHologresの
mc_jdbc_holo
テーブルのスキーマ情報と同じである場合、MaxComputeからHologresにアクセスできます。説明Spark UIのジョブビューURLを開いて、ジョブの詳細を表示することもできます。