すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MaxCompute Sparkを使用したPhoenixデータへのアクセス

最終更新日:Jan 17, 2025

このトピックでは、MaxCompute Sparkを使用してPhoenixデータにアクセスし、ApsaraDB for HBaseからMaxComputeにデータを書き込む方法について説明します。

背景情報

Phoenixは、ApsaraDB for HBaseが提供するSQLレイヤーであり、同時実行性が高く、レイテンシが低く、簡単なクエリが必要なシナリオに適しています。 MaxCompute Sparkを使用してPhoenixデータにアクセスするには、Phoenixテーブルを作成し、テーブルにデータを書き込み、IntelliJ IDEAでSparkコードを書き込み、DataWorksコンソールでコードのスモークテストを実行する必要があります。 このトピックでは、MaxCompute Sparkを使用してPhoenixデータにアクセスするプロセス全体について説明します。

前提条件

次の前提条件が満たされていることを確認します。

  • MaxComputeが有効化され、MaxComputeプロジェクトが作成されます。 詳細については、「MaxComputeとDataWorksの有効化」および「MaxComputeプロジェクトの作成」をご参照ください。

  • DataWorksが有効化されています。 詳しくは、「購入ガイド」をご参照ください。

  • ApsaraDB for HBaseが有効化されています。 詳細については、「クラスターの購入」をご参照ください。

    説明

    このトピックでは、ApsaraDB for HBase V1.1を使用します。 ビジネス要件に基づいて、別のバージョンのApsaraDB for HBaseを使用できます。

  • Phoenix 4.12.0パッケージがダウンロードされ、インストールされます。 詳細については、「HBase SQL (Phoenix) 4.xの使用」をご参照ください。

    説明

    ApsaraDB for HBase V1.1はPhoenix 4.12.0に対応しています。 開発中にバージョンマッピングを確認します。

  • Virtual Private Cloud (VPC) が有効化されています。 ApsaraDB for HBaseクラスターのセキュリティグループとホワイトリストが設定されています。 詳細については、「ネットワーク接続プロセス」をご参照ください。

    説明

    このトピックでは、ApsaraDB for HBaseクラスターはVPCにあります。 したがって、セキュリティグループのポート2181、10600、および16020が有効になります。 ApsaraDB for HBaseクラスターとMaxComputeが存在するVPCのvSwitchの100.104.0.0/16 CIDRブロックが、MaxComputeのIPアドレスホワイトリストに追加されます。

手順

  1. Phoenixのbinディレクトリに移動し、次のコマンドを実行してPhoenixクライアントを起動します。

    ./sqlline.py hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181
    説明

    hb-2zecxg2ltnpeg8me4-master *-*** .hbase.rds.aliyuncs.com:2181、hb-2zecxg2ltnpeg8me4-master *-*** .hbase.rds.aliyuncs.com:2181、hb-2zecxg2ltnpeg8me4-master *-*** .hbase.rds.aliyuncs.com:2181 ZooKeeperのエンドポイントです。 ApsaraDB for HBaseコンソールにログインし、HBaseクラスターの詳細ページの [データベース接続] ページでZooKeeperのエンドポイントを取得できます。

  2. Phoenixクライアントで、次のステートメントを実行してusersという名前のテーブルを作成し、テーブルにデータを挿入します。

    CREATE TABLE IF NOT EXISTS users(
    id UNSIGNED_INT,
    username char(50),
    password char(50)
    CONSTRAINT my_ph PRIMARY KEY (id));
    UPSERT INTO users(id,username,password) VALUES (1,'kongxx','Letmein');
    説明

    Phoenix構文の詳細については、「HBase SQL (Phoenix) の使用を開始する」をご参照ください。

  3. Phoenixクライアントで、次のステートメントを実行して、usersテーブルのデータを照会します。

    select * from users;
  4. IntelliJ IDEAでSparkコードを作成してパッケージ化します。

    1. Scalaを使用して、テスト用のSparkコードを記述します。

      IntelliJ IDEAの関連するPOMファイルに基づいて、オンプレミス開発環境を設定します。 パブリックエンドポイントを使用して、コードロジックをテストできます。 コードロジックが検証されたら、サンプルコードのspark.hadoop.odps.end.pointパラメーターの設定を変更します。 パブリックエンドポイントを取得するには、ApsaraDB for HBaseコンソールにログインし、ApsaraDB for HBaseクラスターの詳細ページのデータベース接続ページに移動します。 サンプルコード:

      package com.phoenix
      import org.apache.hadoop.conf.Configuration
      import org.apache.spark.sql.SparkSession
      import org.apache.phoenix.spark._
      /**
        * This example applies to Phoenix 4.x. 
        */
      object SparkOnPhoenix4xSparkSession {
        def main(args: Array[String]): Unit = {
          // The ZooKeeper endpoint of the ApsaraDB for HBase cluster. 
          val zkAddress = hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181
          // The name of the Phoenix table. 
          val phoenixTableName = users
          // The name of the MaxCompute Spark table. 
          val ODPSTableName = users_phoenix
          val sparkSession = SparkSession
            .builder()
            .appName("SparkSQL-on-MaxCompute")
            .config("spark.sql.broadcastTimeout", 20 * 60)
            .config("spark.sql.crossJoin.enabled", true)
            .config("odps.exec.dynamic.partition.mode", "nonstrict")
            // You must set spark.master to local[N] to run the code. N indicates the parallelism. 
            //.config("spark.master", "local[4]") 
            .config("spark.hadoop.odps.project.name", "***")
            .config("spark.hadoop.odps.access.id", "***")
            .config("spark.hadoop.odps.access.key", "***")
            //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api")
            .config("spark.sql.catalogImplementation", "odps")
            .getOrCreate()
          var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()
          df.show()
          df.write.mode("overwrite").insertInto(ODPSTableName)
        }
      }
                              

      POMファイルには次の内容が含まれています。

      <?xml version="1.0" encoding="UTF-8"?>
      <!--
        Licensed under the Apache License, Version 2.0 (the "License");
        you may not use this file except in compliance with the License.
        You may obtain a copy of the License at
          http://www.apache.org/licenses/LICENSE-2.0
        Unless required by applicable law or agreed to in writing, software
        distributed under the License is distributed on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        See the License for the specific language governing permissions and
        limitations under the License. See accompanying LICENSE file.
      -->
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <properties>
              <spark.version>2.3.0</spark.version>
              <cupid.sdk.version>3.3.8-public</cupid.sdk.version>        <scala.version>2.11.8</scala.version>
              <scala.binary.version>2.11</scala.binary.version>
              <phoenix.version>4.12.0-HBase-1.1</phoenix.version>
          </properties>
          <groupId>com.aliyun.odps</groupId>
          <artifactId>Spark-Phonix</artifactId>
          <version>1.0.0-SNAPSHOT</version>
          <packaging>jar</packaging>
          <dependencies>
              <dependency>
                  <groupId>org.jpmml</groupId>
                  <artifactId>pmml-model</artifactId>
                  <version>1.3.8</version>
              </dependency>
              <dependency>
                  <groupId>org.jpmml</groupId>
                  <artifactId>pmml-evaluator</artifactId>
                  <version>1.3.10</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_${scala.binary.version}</artifactId>
                  <version>${spark.version}</version>
                  <scope>provided</scope>
                  <exclusions>
                      <exclusion>
                          <groupId>org.scala-lang</groupId>
                          <artifactId>scala-library</artifactId>
                      </exclusion>
                      <exclusion>
                          <groupId>org.scala-lang</groupId>
                          <artifactId>scalap</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_${scala.binary.version}</artifactId>
                  <version>${spark.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-mllib_${scala.binary.version}</artifactId>
                  <version>${spark.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming_${scala.binary.version}</artifactId>
                  <version>${spark.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>com.aliyun.odps</groupId>
                  <artifactId>cupid-sdk</artifactId>
                  <version>${cupid.sdk.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>com.aliyun.phoenix</groupId>
                  <artifactId>ali-phoenix-core</artifactId>
                  <version>4.12.0-AliHBase-1.1-0.8</version>
                  <exclusions>
                      <exclusion>
                          <groupId>com.aliyun.odps</groupId>
                          <artifactId>odps-sdk-mapred</artifactId>
                      </exclusion>
                      <exclusion>
                          <groupId>com.aliyun.odps</groupId>
                          <artifactId>odps-sdk-commons</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency>
              <dependency>
                  <groupId>com.aliyun.phoenix</groupId>
                  <artifactId>ali-phoenix-spark</artifactId>
                  <version>4.12.0-AliHBase-1.1-0.8</version>
                  <exclusions>
                      <exclusion>
                          <groupId>com.aliyun.phoenix</groupId>
                          <artifactId>ali-phoenix-core</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency>
          </dependencies>
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>2.4.3</version>
                      <executions>
                          <execution>
                              <phase>package</phase>
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <minimizeJar>false</minimizeJar>
                                  <shadedArtifactAttached>true</shadedArtifactAttached>
                                  <artifactSet>
                                      <includes>
                                          <!-- Include here the dependencies you
                                              want to be packed in your fat jar -->
                                          <include>*:*</include>
                                      </includes>
                                  </artifactSet>
                                  <filters>
                                      <filter>
                                          <artifact>*:*</artifact>
                                          <excludes>
                                              <exclude>META-INF/*.SF</exclude>
                                              <exclude>META-INF/*.DSA</exclude>
                                              <exclude>META-INF/*.RSA</exclude>
                                              <exclude>**/log4j.properties</exclude>
                                          </excludes>
                                      </filter>
                                  </filters>
                                  <transformers>
                                      <transformer
                                              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                          <resource>reference.conf</resource>
                                      </transformer>
                                      <transformer
                                              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                          <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
                                      </transformer>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
                  <plugin>
                      <groupId>net.alchim31.maven</groupId>
                      <artifactId>scala-maven-plugin</artifactId>
                      <version>3.3.2</version>
                      <executions>
                          <execution>
                              <id>scala-compile-first</id>
                              <phase>process-resources</phase>
                              <goals>
                                  <goal>compile</goal>
                              </goals>
                          </execution>
                          <execution>
                              <id>scala-test-compile-first</id>
                              <phase>process-test-resources</phase>
                              <goals>
                                  <goal>testCompile</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      </project>
    2. コードと依存関係ファイルをIntelliJ IDEAのJARファイルにパッケージ化し、MaxComputeクライアントを使用してJARファイルをMaxComputeプロジェクトにアップロードします。 詳細については、「リソースの追加」をご参照ください。

      説明

      DataWorksコンソールにアップロードされるJARファイルのサイズは、50 MBを超えることはできません。 したがって、MaxComputeクライアントはJARファイルのアップロードに使用されます。

  5. DataWorksコンソールでスモークテストを実行します。

    1. 次のステートメントを実行して、DataWorksコンソールでMaxComputeテーブルを作成します。MaxComputeテーブルの作成と管理.

      CREATE TABLE IF NOT EXISTS users_phoenix
      (
          id       INT   ,
          username STRING,
          password STRING
      ) ;
    2. DataWorksコンソールで、目的のMaxComputeプロジェクトを選択し、JARファイルをデータ開発環境にアップロードします。 詳細については、「MaxComputeリソースの作成と使用」をご参照ください。

    3. ODPS Sparkノードを作成し、ノードのパラメーターを設定します。 詳細については、「MaxCompute Sparkタスクの開発」をご参照ください。

      次のサンプルコードは、spark.hadoop.odps.cupid.vpc.domain.listパラメーターの設定を示しています。 ApsaraDB for HBaseクラスターに基づいてパラメーターを設定します。

      {
        "regionId":"cn-beijing",
        "vpcs":[
          {
            "vpcId":"vpc-2zeaeq21********0exox",
            "zones":[
              {
                "urls":[
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":2181
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":2181
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":2181
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16000
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
      
                  {
                    "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
                    "port":16020
                  },
                ]
              }
            ]
          }
        ]
      }
    4. アイコンをクリックし、image煙のテストを開始します。

  6. スモークテストが成功したら、アドホッククエリノードで次のステートメントを実行します。

    select * from users_phoenix;