DBUtils は、データベースへの接続に使用される Python 接続プールです。このトピックでは、DBUtils を使用して LindormTable に接続する方法について説明します。
前提条件
Python 3.8 以降がインストールされていること。
クライアントの IP アドレスが Lindorm インスタンスのホワイトリストに追加されていること。詳細については、「ホワイトリストの設定」をご参照ください。
LindormTable のバージョンが 2.3.1 以降であること。 LindormTable のバージョンを表示またはアップグレードする方法の詳細については、「LindormTable のリリースノート」および「Lindorm インスタンスのマイナーエンジンバージョンのアップグレード」をご参照ください。
準備
次のコードを実行して、phoenixdb V1.2.0 および DBUtils V3.0.2 をインストールします。
pip install phoenixdb==1.2.0
pip install DBUtils==3.0.2サンプルコード
#!/usr/bin/python3
from dbutils.pooled_db import PooledDB
import importlib
class DBUtilsDemo:
def __init__(self, url, user, password, database):
config = {
'url': url,
'lindorm_user': user,
'lindorm_password': password,
'database': database,
'autocommit': True
}
db_creator = importlib.import_module("phoenixdb")
# DBUtilsに基づいて接続プールを作成します。
self.pooled = PooledDB(db_creator,
maxcached=10,
# ビジネス要件に基づいて、接続プール内のアイドル接続の最大数を指定します。
maxconnections=50,
# ビジネス要件に基づいて、接続プール内の接続の最大数を指定します。
blocking=True,
# 接続プールにアイドル接続がない場合に、クライアントが待機するかどうかを指定します。True は、クライアントが接続が利用可能になるまで待機することを示します。False は、クライアントがアイドル接続を待機しないことを示します。
ping=1,
# サーバーにアクセスできるかどうかを確認します。
**config)
# 接続プールから接続を取得します。
def _connect(self):
try:
r = self.pooled.connection()
return r
except Exception as e:
print("Failed to connect:" + str(e))
# 接続を接続プールに戻します。
def _close(self, conn, stmt):
if stmt:
stmt.close()
if conn:
conn.close()
# 1 行のデータをクエリします。
def select_row(self, sql):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql)
row = statement.fetchone()
return row
except Exception as e:
print(e)
finally:
self._close(connection, statement)
# 複数の行のデータをクエリします。
def select_rows(self, sql):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
print(sql)
statement.execute(sql)
rows = statement.fetchall()
return rows
except Exception as e:
print(e)
finally:
self._close(connection, statement)
# テーブルにデータを更新および挿入します。
def upsert_data(self, sql_upsert):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql_upsert)
connection.commit()
except Exception as e:
print(e)
finally:
self._close(connection, statement)
# パラメーターを指定して、テーブルにデータを更新および挿入します。
def upsert_data_prams(self, sql_upsert, prams):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql_upsert, prams)
connection.commit()
except Exception as e:
print(e)
finally:
self._close(connection, statement)
if __name__ == '__main__':
# SQL を使用して LindormTable に接続するために使用するエンドポイントを指定します。
url = 'http://ld-bp1p7e07ohamf****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060'
# LindormTable への接続に使用する実際のユーザー名を指定します。LindormTable に接続するために使用するユーザー名とパスワードは、LindormTable クラスタ管理システムで確認できます。
user = 'root'
# LindormTable への接続に使用する実際のパスワードを指定します。
password = 'root'
# 接続するデータベースの名前を指定します。
database = 'test'
poolUtils = DBUtilsDemo(url, user, password, database)
poolUtils.upsert_data("upsert into tb(id,name,address) values ('i001','n001','a001')")
params = ['i002', 'n002', 'a002']
poolUtils.upsert_data_prams("upsert into tb(id,name,address) values (?,?,?)", params)
rows = poolUtils.select_rows("select * from tb")
print(rows)
row = poolUtils.select_row("select * from tb limit 1")
print(row)
row = poolUtils.select_row("select * from tb where id = 'i001' limit 1")
print(row)
SQL を使用して LindormTable に接続するために使用するエンドポイントを取得する方法の詳細については、「LindormTable のエンドポイントの表示」をご参照ください。
URL
jdbc:lindorm:table:url=jdbc:lindorm:table:url=http://ld-bp1p7e07ohamf****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060URLhttp://ld-bp1p7e07ohamf****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060パラメーターの値は、LindormTable SQL エンドポイントから の部分を削除することで取得できる HTTP アドレスです。たとえば、Lindorm コンソールから取得した LindormTable SQL エンドポイントが の場合、 パラメーターの値を に設定します。ping コマンドを実行して、DBUtils を使用して確立された接続の接続性をテストできます。ただし、この方法は phoenixdb を使用して確立された接続には適用できません。
接続が 10 分以上使用されていない場合、NoSuchConnection 例外が返されます。この場合、接続プールから新しい接続を取得する必要があります。