Psycopg是Python程式設計語言新設計的PostgreSQL資料庫適配器。由於Hologres相容PostgreSQL 11,因此您可以通過Psycopg訪問Hologres。本文將指導您使用Psycopg 3訪問Hologres。
前提條件
已安裝3.7及以上版本的Python環境。
安裝Psycopg 3
執行如下命令安裝Psycopg 3。
pip install --upgrade pip # 升級 pip 到 20.3 以上版本
pip install "psycopg[binary]"
串連Hologres
Psycopg 3安裝完成之後,您可以執行如下操作並串連Hologres。
載入Psycopg 3。
您可以執行以下命令,載入安裝的Psycopg 3。
import psycopg
建立資料庫連接。
您可以通過
psycopg.connect()
函數串連Hologres,具體文法和參數說明如下所示。conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=<keepalives>, keepalives_idle=<keepalives_idle>, keepalives_interval=<keepalives_interval>, keepalives_count=<keepalives_count> )
參數
描述
Endpoint
Hologres執行個體的網路地址和連接埠。
進入Hologres管理主控台,選擇左側導覽列執行個體列表,單擊目標執行個體,在執行個體詳情頁網路資訊中擷取網路地址和連接埠。
重要請根據代碼運行所在網路環境選擇正確的網路地址和連接埠,否則將無法正常串連。
Port
databases
Hologres建立的資料庫名稱。
Access ID
當前阿里雲帳號的AccessKey ID。
您可以單擊AccessKey 管理,擷取AccessKey ID。
Access Key
當前阿里雲帳號的AccessKey Secret。
keepalives
可選(推薦使用),串連方式:
1表示使用長串連。
0表示非長串連。
keepalives_idle
空閑時,保持串連連通的時間間隔,單位秒(s)。
keepalives_interval
沒得到回應時,等待重新嘗試保持連通的時間間隔,單位秒(s)。
keepalives_count
嘗試重新保持連通最大次數。
程式碼範例如下。
conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持串連 keepalives_idle=130, # 空閑時,每130秒保持串連連通 keepalives_interval=10, # 沒得到回應時,等待10秒重新嘗試保持連通 keepalives_count=15, # 嘗試最多15次重新保持連通 application_name="<Application Name>" )
說明配置Application Name參數可以協助您在歷史慢Query列表中快速查看發起請求的應用。
使用Hologres
當您成功串連Hologres資料庫之後,即可通過Psycopg 3進行資料開發操作。如下內容將指導您建立表、插入資料、查詢和釋放資源等操作。如果需要使用Fixed Plan能力實現更高效能的讀寫操作,需要配置相關GUC參數,請參見Fixed Plan加速SQL執行。
建立遊標。
在進行資料開發之前,您需要執行命令
cur = conn.cursor()
來建立串連的遊標。資料開發。
建立表
您可以執行如下命令,建立一個表
holo_test
並定義表的資料類型為integer。您也可以根據業務需求定義表名稱和資料類型。cur.execute("CREATE TABLE holo_test (num integer);")
插入資料
您可以執行如下命令,為建立的表
holo_test
插入資料1~1000。cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
查詢資料
cur.execute("SELECT sum(num) FROM holo_test;") cur.fetchone()
提交事務。
上述樣本存在DDL、DML和DQL三種情況,您需要在每個SQL後執行命令
conn.commit()
提交事務,才能確保操作已經提交。建議您直接在conn
串連代碼之後把autocommit參數設定為true,實現SQL命令的自動認可。範例程式碼如下:同步調用樣本
conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持串連 keepalives_idle=130, # 空閑時,每130秒保持串連連通 keepalives_interval=10, # 沒得到回應時,等待10秒重新嘗試保持連通 keepalives_count=15, # 嘗試最多15次重新保持連通 application_name="<Application Name>" ) conn.autocommit = "True"
非同步呼叫樣本
async with await psycopg.AsyncConnection.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", application_name="<Application Name>", autocommit = "True" ) as aconn: async with aconn.cursor() as acur: await acur.execute( "INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) await acur.execute("SELECT * FROM test") await acur.fetchone() # will return (1, 100, "abc'def") async for record in acur: print(record)
釋放資源。
為避免影響後續的操作,當操作執行完成後,您需要執行如下命令關閉遊標並斷開資料庫連接。
cur.close() conn.close()
Pandas DataFrame快速寫入Hologres最佳實務
使用Python時,經常會使用Pandas將資料轉換為DataFrame,並對DataFrame進行處理,最終將DataFrame匯入Hologres,此時希望將DataFrame快速匯入Hologres。
# pip install Pandas==1.5.1
推薦使用COPY模式進行寫入,Python範例程式碼如下。
import psycopg
import pandas as pd
# 串連Hologres
conn = psycopg.connect(
host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
port=80,
dbname="db",
user="xxx",
password="xxx",
application_name="psycopg3"
)
cur = conn.cursor()
# 刪除多餘的表
cur.execute("""
DROP TABLE IF EXISTS df_data;
""")
conn.commit()
# 建立測試表,用於寫入資料
cur.execute("""
CREATE TABLE IF NOT EXISTS df_data(
col1 int,
col2 int,
col3 int,
primary key(col1)
);
""")
conn.commit()
# 構建dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 一批一批寫
# 在這裡用StringIO將DataFrame轉換成CSV格式字串
from io import StringIO
# 建立一個緩衝區
buffer = StringIO()
# 將DataFrame寫入緩衝區CSV格式
pd_data.to_csv(buffer, index=False, header=False)
# 將緩衝區位置重設到開始
buffer.seek(0)
with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
while data := buffer.read(1024):
copy.write(data)
conn.commit()
# 查看資料
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()
查看歷史查詢,驗證已經使用COPY方式寫入資料至Hologres。