Psycopg是Python用于操作PostgreSQL的库。Hologres兼容PostgreSQL 11,因此您可以通过psycopg访问Hologres。本文将指导您使用pyscopg2访问Hologres,示例使用的操作环境为基于CentOS 7系统的Python 3.8版本。
安装Python3.8
您可以基于Miniconda、Anaconda安装Python 3.8环境。如下内容以CentOS 7系统为例,安装Python 3.8版本。
连接Hologres
Python3.8环境和psycopg2安装完成之后,您可以执行如下操作并连接Hologres。
使用Hologres
当您成功连接Hologres数据库之后,即可通过psycopg2进行数据开发操作。如下内容将指导您创建表、插入数据、查询和释放资源等操作。如果需要使用Fixed Plan能力实现更高性能的读写操作,需要配置相关GUC参数,请参见Fixed Plan加速SQL执行。
Pandas DataFrame快速写入Hologres最佳实践
使用Python时,经常会使用Pandas将数据转换为DataFrame,并对DataFrame进行处理,最终将DataFrame导入Hologres,此时希望将DataFrame快速导入Hologres。导入时候常用to_sql
函数,详情请参见Pandas。
需要Pandas为V1.4.2及以上版本,您可以执行如下命令强制安装V1.5.1版本的Pandas库。
# pip install Pandas==1.5.1
推荐使用
to_sql
函数的callable方式,使用copy方式导入数据,样例的Python代码如下。# 加载依赖
import pandas as pd
import psycopg2
# 生成连接字符串
host="hgpostcn-cn-xxxxxx-cn-hangzhou.hologres.aliyuncs.com"
port=80
dbname="demo"
user="LTAI5xxxxx"
password="fa8Kdgxxxxx"
application_name="Python Test"
conn = "postgresql+psycopg2://{}:{}@{}:{}/{}?application_name={}".format(user, password, host, port, dbname,application_name)
print(conn)
# 生成dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 定义callable函数
import csv
from io import StringIO
def psql_insert_copy(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
# 导入数据
pd_data.to_sql(
name="pd_data",
con=conn,
if_exists="replace",
index=False,
method=psql_insert_copy
)
查看历史查询,验证已经使用COPY方式写入数据至Hologres。
