全部產品
Search
文件中心

Hologres:Python

更新時間:Jun 07, 2025

Psycopg是Python程式設計語言新設計的PostgreSQL資料庫適配器。由於Hologres相容PostgreSQL 11,因此您可以通過Psycopg訪問Hologres。本文將指導您使用Psycopg 3訪問Hologres。

前提條件

已安裝3.7及以上版本的Python環境。

安裝Psycopg 3

執行如下命令安裝Psycopg 3

pip  install   --upgrade pip             # 升級 pip 到 20.3 以上版本
pip install "psycopg[binary]"

串連Hologres

Psycopg 3安裝完成之後,您可以執行如下操作並串連Hologres。

  1. 載入Psycopg 3。

    您可以執行以下命令,載入安裝的Psycopg 3。

    import psycopg
  2. 建立資料庫連接。

    您可以通過psycopg.connect()函數串連Hologres,具體文法和參數說明如下所示。

    conn = psycopg.connect(
        host="<Endpoint>",
        port=<Port>, 
        dbname="<databases>", 
        user="<Access ID>", 
        password="<Access Key>",
        keepalives=<keepalives>, 
        keepalives_idle=<keepalives_idle>,
        keepalives_interval=<keepalives_interval>, 
        keepalives_count=<keepalives_count>
    )
    

    參數

    描述

    Endpoint

    Hologres執行個體的網路地址和連接埠。

    進入Hologres管理主控台,選擇左側導覽列執行個體列表,單擊目標執行個體,在執行個體詳情網路資訊中擷取網路地址和連接埠。

    重要

    請根據代碼運行所在網路環境選擇正確的網路地址和連接埠,否則將無法正常串連。

    Port

    databases

    Hologres建立的資料庫名稱。

    Access ID

    當前阿里雲帳號的AccessKey ID。

    您可以單擊AccessKey 管理,擷取AccessKey ID。

    Access Key

    當前阿里雲帳號的AccessKey Secret。

    keepalives

    可選(推薦使用),串連方式:

    • 1表示使用長串連。

    • 0表示非長串連。

    keepalives_idle

    空閑時,保持串連連通的時間間隔,單位秒(s)。

    keepalives_interval

    沒得到回應時,等待重新嘗試保持連通的時間間隔,單位秒(s)。

    keepalives_count

    嘗試重新保持連通最大次數。

    程式碼範例如下。

    conn = psycopg.connect(
        host="<Endpoint>",
        port=<Port>, 
        dbname="<databases>", 
        user="<Access ID>", 
        password="<Access Key>",
        keepalives=1, # 保持串連
        keepalives_idle=130, # 空閑時,每130秒保持串連連通
        keepalives_interval=10, # 沒得到回應時,等待10秒重新嘗試保持連通
        keepalives_count=15, # 嘗試最多15次重新保持連通
        application_name="<Application Name>"
    )
    說明

    配置Application Name參數可以協助您在歷史慢Query列表中快速查看發起請求的應用。

使用Hologres

當您成功串連Hologres資料庫之後,即可通過Psycopg 3進行資料開發操作。如下內容將指導您建立表、插入資料、查詢和釋放資源等操作。如果需要使用Fixed Plan能力實現更高效能的讀寫操作,需要配置相關GUC參數,請參見Fixed Plan加速SQL執行

  1. 建立遊標。

    在進行資料開發之前,您需要執行命令cur = conn.cursor()來建立串連的遊標。

  2. 資料開發。

    1. 建立表

      您可以執行如下命令,建立一個表holo_test並定義表的資料類型為integer。您也可以根據業務需求定義表名稱和資料類型。

      cur.execute("CREATE TABLE holo_test (num integer);")
    2. 插入資料

      您可以執行如下命令,為建立的表holo_test插入資料1~1000。

      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. 查詢資料

      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. 提交事務。

    上述樣本存在DDL、DML和DQL三種情況,您需要在每個SQL後執行命令conn.commit()提交事務,才能確保操作已經提交。建議您直接在conn串連代碼之後把autocommit參數設定為true,實現SQL命令的自動認可。範例程式碼如下:

    • 同步調用樣本

      conn = psycopg.connect(
          host="<Endpoint>",
          port=<Port>, 
          dbname="<databases>", 
          user="<Access ID>", 
          password="<Access Key>",
          keepalives=1, # 保持串連
          keepalives_idle=130, # 空閑時,每130秒保持串連連通
          keepalives_interval=10, # 沒得到回應時,等待10秒重新嘗試保持連通
          keepalives_count=15, # 嘗試最多15次重新保持連通
          application_name="<Application Name>"
      )
      conn.autocommit = "True"
    • 非同步呼叫樣本

      async with await psycopg.AsyncConnection.connect(
          host="<Endpoint>",
          port=<Port>, 
          dbname="<databases>", 
          user="<Access ID>", 
          password="<Access Key>",
          application_name="<Application Name>",
          autocommit = "True"
          ) as aconn:
          async with aconn.cursor() as acur:
              await acur.execute(
                  "INSERT INTO test (num, data) VALUES (%s, %s)",
                  (100, "abc'def"))
              await acur.execute("SELECT * FROM test")
              await acur.fetchone()
              # will return (1, 100, "abc'def")
              async for record in acur:
                  print(record)
  4. 釋放資源。

    為避免影響後續的操作,當操作執行完成後,您需要執行如下命令關閉遊標並斷開資料庫連接。

    cur.close()
    conn.close()

Pandas DataFrame快速寫入Hologres最佳實務

使用Python時,經常會使用Pandas將資料轉換為DataFrame,並對DataFrame進行處理,最終將DataFrame匯入Hologres,此時希望將DataFrame快速匯入Hologres。

# pip install Pandas==1.5.1

推薦使用COPY模式進行寫入,Python範例程式碼如下。

import psycopg
import pandas as pd

# 串連Hologres
conn = psycopg.connect(
    host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
    port=80,
    dbname="db",
    user="xxx",
    password="xxx",
    application_name="psycopg3"
)

cur = conn.cursor()

# 刪除多餘的表
cur.execute("""
            DROP TABLE IF EXISTS df_data;
            """)
conn.commit()

# 建立測試表,用於寫入資料
cur.execute("""
            CREATE TABLE IF NOT EXISTS df_data(
                col1 int,
                col2 int,
                col3 int,
                primary key(col1)
            );
            """)
conn.commit()

# 構建dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)




# 一批一批寫
# 在這裡用StringIO將DataFrame轉換成CSV格式字串
from io import StringIO

# 建立一個緩衝區
buffer = StringIO()
        
# 將DataFrame寫入緩衝區CSV格式
pd_data.to_csv(buffer, index=False, header=False)
        
# 將緩衝區位置重設到開始
buffer.seek(0)

with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
    while data := buffer.read(1024):
        copy.write(data)
conn.commit()

# 查看資料
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()

查看歷史查詢,驗證已經使用COPY方式寫入資料至Hologres。