Psycopg は、Python プログラミング言語用に新しく設計された PostgreSQL データベースアダプタです。 Hologres は PostgreSQL 11 と互換性があるため、Psycopg を使用して Hologres にアクセスできます。このトピックでは、Psycopg 3 を使用して Hologres にアクセスする方法について説明します。
前提条件
Python 3.7 以降がインストールされている。
Psycopg 3 をインストールする
次のコマンドを実行して、Psycopg 3 をインストールする:
pip install --upgrade pip # pip をバージョン 20.3 以降にアップグレードします。
pip install "psycopg[binary]"
Hologres に接続する
Psycopg 3 をインストールした後、次の操作を実行して Hologres に接続できます。
Psycopg 3 をロードします。
次のコマンドを実行して、インストールされている Psycopg 3 をロードできます。
import psycopgデータベース接続を作成します。
psycopg.connect()関数を使用して Hologres に接続できます。次のコードは、構文とパラメータを示しています。conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=<keepalives>, keepalives_idle=<keepalives_idle>, keepalives_interval=<keepalives_interval>, keepalives_count=<keepalives_count> )パラメータ
説明
Endpoint
Hologres インスタンスのエンドポイントとポート番号。
Hologres コンソール にログインします。左側のナビゲーションウィンドウで、[インスタンス] をクリックします。[インスタンス] ページで、インスタンスの ID をクリックします。[インスタンスの詳細] ページの [ネットワーク情報] セクションで、エンドポイントとポート番号を確認します。
重要コードが実行されるネットワーク環境に基づいて、正しいエンドポイントとポート番号を選択してください。そうしないと、接続に失敗します。
Port
databases
Hologres データベースの名前。
Access ID
Hologres インスタンスへの接続に使用する Alibaba Cloud アカウントの AccessKey ID。
AccessKey ページにアクセスして、AccessKey ID を取得できます。
Access Key
Hologres インスタンスへの接続に使用する Alibaba Cloud アカウントの AccessKey シークレット。
keepalives
オプション。接続方法。このパラメータを設定することをお勧めします。有効な値:
1: 持続的接続を使用します。
0: 非持続的接続を使用します。
keepalives_idle
Hologres データベースへの接続がアイドル状態になったときにキープアライブメッセージが送信される間隔。単位: 秒。
keepalives_interval
応答が返されない場合にキープアライブメッセージが再送信される間隔。単位: 秒。
keepalives_count
キープアライブメッセージが送信される最大回数。
例:
conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 接続を維持します。 keepalives_idle=130, # 接続がアイドル状態のときに 130 秒ごとにキープアライブメッセージを送信します。 keepalives_interval=10, # 応答が返されない場合、キープアライブメッセージを再送信する前に 10 秒待機します。 keepalives_count=15, # 最大 15 回キープアライブメッセージを送信します。 application_name="<アプリケーション名>" )説明アプリケーション名パラメータを設定すると、履歴スロークエリリストでリクエストを開始したアプリケーションをすばやく表示できます。
Hologres を使用する
Hologres データベースに接続した後、Psycopg 3 を使用してデータ開発操作を実行できます。次の手順を実行して、テーブルを作成し、テーブルにデータを挿入し、データをクエリしてから、リソースを解放できます。固定プラン機能を使用して、読み取りおよび書き込み操作のパフォーマンスを向上させる場合は、関連する GUC パラメータを設定する必要があります。詳細については、「固定プランを使用して SQL 文の実行を高速化する」をご参照ください。
カーソルを作成します。
データ開発操作を実行する前に、
cur = conn.cursor()コマンドを実行して、接続のカーソルを作成する必要があります。データ開発操作を実行します。
テーブルを作成します。
次のコマンドを実行して、
holo_testという名前のテーブルを作成し、テーブルのデータ型を整数として定義できます。ビジネス要件に基づいて、カスタムテーブル名を設定し、カスタムデータ型を指定できます。cur.execute("CREATE TABLE holo_test (num integer);")テーブルにデータを挿入します。
次のコマンドを実行して、作成された
holo_testテーブルに 1 から 1000 までのデータを挿入できます。cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))テーブルからデータをクエリします。
cur.execute("SELECT sum(num) FROM holo_test;") cur.fetchone()
トランザクションをコミットします。
前の例には、DDL、DML、および DQL 操作が含まれています。各 SQL 文の後に
conn.commit()コマンドを実行してトランザクションをコミットし、操作がコミットされていることを確認する必要があります。conn接続コードの後に autocommit パラメータを true に設定して、SQL コマンドを自動的にコミットすることをお勧めします。次のコードは例を示しています。同期呼び出しの例
conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 接続を維持します。 keepalives_idle=130, # 接続がアイドル状態のときに 130 秒ごとにキープアライブメッセージを送信します。 keepalives_interval=10, # 応答が返されない場合、キープアライブメッセージを再送信する前に 10 秒待機します。 keepalives_count=15, # 最大 15 回キープアライブメッセージを送信します。 application_name="<アプリケーション名>" ) conn.autocommit = "True"非同期呼び出しの例
async with await psycopg.AsyncConnection.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", application_name="<アプリケーション名>", autocommit = "True" ) as aconn: async with aconn.cursor() as acur: await acur.execute( "INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) await acur.execute("SELECT * FROM test") await acur.fetchone() # will return (1, 100, "abc'def") (1, 100, "abc'def") が返されます。 async for record in acur: print(record)
リソースを解放します。
後続の操作に影響を与えないように、上記の操作が完了したら、次のコマンドを実行してカーソルを閉じ、Psycopg 3 を Hologres データベースから切断します。
cur.close() conn.close()
DataFrame を Hologres に効率的にインポートするためのベストプラクティス
Python を使用する場合、Pandas を使用してデータエントリを DataFrame に変換し、DataFrame を処理してから、DataFrame を Hologres にインポートするのが一般的です。このような状況では、インポートを迅速に完了したい場合があります。
# pip install Pandas==1.5.1データのインポートには COPY モードを使用することをお勧めします。次の Python コードは例を示しています。
import psycopg
import pandas as pd
# Hologres に接続します。
conn = psycopg.connect(
host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
port=80,
dbname="db",
user="xxx",
password="xxx",
application_name="psycopg3"
)
cur = conn.cursor()
# 冗長なテーブルを削除します。
cur.execute("""
DROP TABLE IF EXISTS df_data;
""")
conn.commit()
# データインポート用のテストテーブルを作成します。
cur.execute("""
CREATE TABLE IF NOT EXISTS df_data(
col1 int,
col2 int,
col3 int,
primary key(col1)
);
""")
conn.commit()
# DataFrame を構築します。
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# データをバッチで書き込みます。
# StringIO を使用して、DataFrame を CSV 形式の文字列に変換します。
from io import StringIO
# バッファを作成します。
buffer = StringIO()
# DataFrame を CSV 形式でバッファに書き込みます。
pd_data.to_csv(buffer, index=False, header=False)
# バッファの位置を先頭にリセットします。
buffer.seek(0)
with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
while data := buffer.read(1024):
copy.write(data)
conn.commit()
# データをクエリします。
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()履歴クエリを表示して、COPY モードを使用してデータが Hologres にインポートされたことを確認します。