すべてのプロダクト
Search
ドキュメントセンター

Hologres:Python

最終更新日:Jun 11, 2025

Psycopg は、Python プログラミング言語用に新しく設計された PostgreSQL データベースアダプタです。 Hologres は PostgreSQL 11 と互換性があるため、Psycopg を使用して Hologres にアクセスできます。このトピックでは、Psycopg 3 を使用して Hologres にアクセスする方法について説明します。

前提条件

Python 3.7 以降がインストールされている。

Psycopg 3 をインストールする

次のコマンドを実行して、Psycopg 3 をインストールする:

pip  install   --upgrade pip             # pip をバージョン 20.3 以降にアップグレードします。
pip install "psycopg[binary]"

Hologres に接続する

Psycopg 3 をインストールした後、次の操作を実行して Hologres に接続できます。

  1. Psycopg 3 をロードします。

    次のコマンドを実行して、インストールされている Psycopg 3 をロードできます。

    import psycopg
  2. データベース接続を作成します。

    psycopg.connect() 関数を使用して Hologres に接続できます。次のコードは、構文とパラメータを示しています。

    conn = psycopg.connect(
        host="<Endpoint>",
        port=<Port>, 
        dbname="<databases>", 
        user="<Access ID>", 
        password="<Access Key>",
        keepalives=<keepalives>, 
        keepalives_idle=<keepalives_idle>,
        keepalives_interval=<keepalives_interval>, 
        keepalives_count=<keepalives_count>
    )
    

    パラメータ

    説明

    Endpoint

    Hologres インスタンスのエンドポイントとポート番号。

    Hologres コンソール にログインします。左側のナビゲーションウィンドウで、[インスタンス] をクリックします。[インスタンス] ページで、インスタンスの ID をクリックします。[インスタンスの詳細] ページの [ネットワーク情報] セクションで、エンドポイントとポート番号を確認します。

    重要

    コードが実行されるネットワーク環境に基づいて、正しいエンドポイントとポート番号を選択してください。そうしないと、接続に失敗します。

    Port

    databases

    Hologres データベースの名前。

    Access ID

    Hologres インスタンスへの接続に使用する Alibaba Cloud アカウントの AccessKey ID。

    AccessKey ページにアクセスして、AccessKey ID を取得できます。

    Access Key

    Hologres インスタンスへの接続に使用する Alibaba Cloud アカウントの AccessKey シークレット。

    keepalives

    オプション。接続方法。このパラメータを設定することをお勧めします。有効な値:

    • 1: 持続的接続を使用します。

    • 0: 非持続的接続を使用します。

    keepalives_idle

    Hologres データベースへの接続がアイドル状態になったときにキープアライブメッセージが送信される間隔。単位: 秒。

    keepalives_interval

    応答が返されない場合にキープアライブメッセージが再送信される間隔。単位: 秒。

    keepalives_count

    キープアライブメッセージが送信される最大回数。

    例:

    conn = psycopg.connect(
        host="<Endpoint>",
        port=<Port>, 
        dbname="<databases>", 
        user="<Access ID>", 
        password="<Access Key>",
        keepalives=1, # 接続を維持します。
        keepalives_idle=130, # 接続がアイドル状態のときに 130 秒ごとにキープアライブメッセージを送信します。
        keepalives_interval=10, # 応答が返されない場合、キープアライブメッセージを再送信する前に 10 秒待機します。
        keepalives_count=15, # 最大 15 回キープアライブメッセージを送信します。
        application_name="<アプリケーション名>"
    )
    説明

    アプリケーション名パラメータを設定すると、履歴スロークエリリストでリクエストを開始したアプリケーションをすばやく表示できます。

Hologres を使用する

Hologres データベースに接続した後、Psycopg 3 を使用してデータ開発操作を実行できます。次の手順を実行して、テーブルを作成し、テーブルにデータを挿入し、データをクエリしてから、リソースを解放できます。固定プラン機能を使用して、読み取りおよび書き込み操作のパフォーマンスを向上させる場合は、関連する GUC パラメータを設定する必要があります。詳細については、「固定プランを使用して SQL 文の実行を高速化する」をご参照ください。

  1. カーソルを作成します。

    データ開発操作を実行する前に、cur = conn.cursor() コマンドを実行して、接続のカーソルを作成する必要があります。

  2. データ開発操作を実行します。

    1. テーブルを作成します。

      次のコマンドを実行して、holo_test という名前のテーブルを作成し、テーブルのデータ型を整数として定義できます。ビジネス要件に基づいて、カスタムテーブル名を設定し、カスタムデータ型を指定できます。

      cur.execute("CREATE TABLE holo_test (num integer);")
    2. テーブルにデータを挿入します。

      次のコマンドを実行して、作成された holo_test テーブルに 1 から 1000 までのデータを挿入できます。

      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. テーブルからデータをクエリします。

      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. トランザクションをコミットします。

    前の例には、DDL、DML、および DQL 操作が含まれています。各 SQL 文の後に conn.commit() コマンドを実行してトランザクションをコミットし、操作がコミットされていることを確認する必要があります。conn 接続コードの後に autocommit パラメータを true に設定して、SQL コマンドを自動的にコミットすることをお勧めします。次のコードは例を示しています。

    • 同期呼び出しの例

      conn = psycopg.connect(
          host="<Endpoint>",
          port=<Port>, 
          dbname="<databases>", 
          user="<Access ID>", 
          password="<Access Key>",
          keepalives=1, # 接続を維持します。
          keepalives_idle=130, # 接続がアイドル状態のときに 130 秒ごとにキープアライブメッセージを送信します。
          keepalives_interval=10, # 応答が返されない場合、キープアライブメッセージを再送信する前に 10 秒待機します。
          keepalives_count=15, # 最大 15 回キープアライブメッセージを送信します。
          application_name="<アプリケーション名>"
      )
      conn.autocommit = "True"
    • 非同期呼び出しの例

      async with await psycopg.AsyncConnection.connect(
          host="<Endpoint>",
          port=<Port>, 
          dbname="<databases>", 
          user="<Access ID>", 
          password="<Access Key>",
          application_name="<アプリケーション名>",
          autocommit = "True"
          ) as aconn:
          async with aconn.cursor() as acur:
              await acur.execute(
                  "INSERT INTO test (num, data) VALUES (%s, %s)",
                  (100, "abc'def"))
              await acur.execute("SELECT * FROM test")
              await acur.fetchone()
              # will return (1, 100, "abc'def")  (1, 100, "abc'def") が返されます。
              async for record in acur:
                  print(record)
  4. リソースを解放します。

    後続の操作に影響を与えないように、上記の操作が完了したら、次のコマンドを実行してカーソルを閉じ、Psycopg 3 を Hologres データベースから切断します。

    cur.close()
    conn.close()

DataFrame を Hologres に効率的にインポートするためのベストプラクティス

Python を使用する場合、Pandas を使用してデータエントリを DataFrame に変換し、DataFrame を処理してから、DataFrame を Hologres にインポートするのが一般的です。このような状況では、インポートを迅速に完了したい場合があります。

# pip install Pandas==1.5.1

データのインポートには COPY モードを使用することをお勧めします。次の Python コードは例を示しています。

import psycopg
import pandas as pd

# Hologres に接続します。
conn = psycopg.connect(
    host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
    port=80,
    dbname="db",
    user="xxx",
    password="xxx",
    application_name="psycopg3"
)

cur = conn.cursor()

# 冗長なテーブルを削除します。
cur.execute("""
            DROP TABLE IF EXISTS df_data;
            """)
conn.commit()

# データインポート用のテストテーブルを作成します。
cur.execute("""
            CREATE TABLE IF NOT EXISTS df_data(
                col1 int,
                col2 int,
                col3 int,
                primary key(col1)
            );
            """)
conn.commit()

# DataFrame を構築します。
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)




# データをバッチで書き込みます。
# StringIO を使用して、DataFrame を CSV 形式の文字列に変換します。
from io import StringIO

# バッファを作成します。
buffer = StringIO()
        
# DataFrame を CSV 形式でバッファに書き込みます。
pd_data.to_csv(buffer, index=False, header=False)
        
# バッファの位置を先頭にリセットします。
buffer.seek(0)

with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
    while data := buffer.read(1024):
        copy.write(data)
conn.commit()

# データをクエリします。
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()

履歴クエリを表示して、COPY モードを使用してデータが Hologres にインポートされたことを確認します。