DBUtils is a Python connection pool library that reduces the overhead of repeatedly opening and closing database connections. Use it when your application frequently creates short-lived connections to an ApsaraDB RDS for MySQL instance, or when the instance is approaching its maximum connection limit.
Prerequisites
Before you begin, ensure that you have:
Python 3.7–3.12 installed on your application server
The server's IP address added to an IP address whitelist of the RDS instance (see Configure an IP address whitelist)
If your application runs on an Elastic Compute Service (ECS) instance in the same region and virtual private cloud (VPC) as the RDS instance, no IP address whitelist configuration is required.
Install dependencies
Install DBUtils and PyMySQL on your server. PyMySQL serves as the underlying database driver.
pip install DBUtils==3.1.0
pip install pymysqlFor DBUtils documentation, see DBUtils.
Set up the connection pool
How it works
PooledDBmaintains a pool of reusable connections.Calling
connection()retrieves an available connection from the pool.Calling
close()on a pooled connection returns it to the pool—it does not close the underlying connection.When a connection has been reused
maxusagetimes, DBUtils automatically closes and replaces it.
Step 1: Import modules
from dbutils.pooled_db import PooledDB
import importlib
import pymysqlStep 2: Build the DBUtilsDemo class
Create a DBUtilsDemo class that encapsulates the pool configuration and exposes _connect and _close helper methods.
class DBUtilsDemo:
def __init__(self, url, user, password, database):
db_config = {
"host": url,
"port": 3306,
"user": user,
"db": database,
"password": password,
"charset": "utf8",
"connect_timeout": 3, # Seconds to wait when establishing a connection
"read_timeout": 10, # Seconds to wait for a read operation
"write_timeout": 10 # Seconds to wait for a write operation
}
# See "Connection pool parameters" for descriptions of each parameter.
self.pooled = PooledDB(pymysql, maxcached=20, maxshared=0, maxconnections=100, maxusage=20, **db_config)
# Retrieve a connection from the pool.
def _connect(self):
try:
r = self.pooled.connection()
return r
except Exception as e:
print("Failed to connect:" + str(e))
# Return a connection to the pool.
# Calling close() on a pooled connection does not close it—it makes the connection
# available for reuse.
def _close(self, conn, stmt):
if stmt:
stmt.close()
if conn:
conn.close()Step 3: Initialize the pool and connect
In your __main__ function, initialize DBUtilsDemo with your RDS instance credentials.
if __name__ == '__main__':
# Use the internal endpoint if your server is in the same VPC as the RDS instance;
# use the public endpoint otherwise.
url = 'rm-bp**************.mysql.rds.aliyuncs.com'
user = 'dbuser'
password = '****'
database = 'dbtest'
poolUtils = DBUtilsDemo(url, user, password, database)Connection pool parameters
Configure PooledDB parameters in the __init__ method. Test parameter changes in a non-production environment before applying them in production.
Parameters to set explicitly
These parameters affect connection reliability and should always be set.
| Parameter | Default | Recommended | Description |
|---|---|---|---|
maxcached | 0 | 20 | Maximum number of idle connections. 0 or None means no limit. Set this to reserve connections for traffic bursts. |
maxusage | None | 10–20 | Maximum times a connection is reused before DBUtils closes and replaces it. Prevents stale idle connections from occupying resources. |
connect_timeout | 10 | 3 | Seconds to wait when establishing a connection. Valid range: 1–31,536,000. For connection pools, a value between 1 and 10 is appropriate; adjust based on network latency. |
read_timeout | None | 10–60 | Seconds to wait for a read operation. If reads are slow, investigate your SQL statements or database load before reducing this value. |
write_timeout | None | 10–60 | Seconds to wait for a write operation. Same guidance as read_timeout. |
Parameters to tune by workload
| Parameter | Default | Recommended | Description |
|---|---|---|---|
maxconnections | 0 | 100 | Maximum connections allowed in the pool. 0 or None means no limit. Set this to the maximum number of concurrent connections your database can handle. |
Parameters that work well at their defaults
| Parameter | Default | Description |
|---|---|---|
mincached | 0 | Initial number of idle connections created at startup. 0 means connections are created on demand—the recommended approach. |
maxshared | 0 | Maximum shared connections. 0 or None means all connections are dedicated. |
blocking | False | Behavior when the pool is exhausted. True blocks the request until a connection is available; False raises an error immediately. |
setsession | None | SQL statements to run when a connection is established. Use for session-level initialization. |
reset | True | Whether to reset a connection's state when it is returned to the pool. |
failures | None | Number of reconnection attempts when a connection fails. |
ping | 1 | Heartbeat check for idle connections. Used to verify connection availability. |
Execute database operations
Add methods to DBUtilsDemo to run read and write operations. All methods follow the same pattern: get a connection, execute, then return the connection to the pool.
Read operations
Add select_row and select_rows to query one or multiple rows.
# Query a single row.
def select_row(self, sql):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql)
row = statement.fetchone()
return row
except Exception as e:
print(e)
finally:
self._close(connection, statement)
# Query multiple rows.
def select_rows(self, sql):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql)
rows = statement.fetchall()
return rows
except Exception as e:
print(e)
finally:
self._close(connection, statement)Call these methods from __main__:
if __name__ == '__main__':
url = 'rm-bp**************.mysql.rds.aliyuncs.com'
user = 'dbuser'
password = '****'
database = 'dbtest'
poolUtils = DBUtilsDemo(url, user, password, database)
# Query a single row.
row = poolUtils.select_row("select * from tb where id = 'i001' limit 1")
print(row)
# Query multiple rows.
rows = poolUtils.select_rows("select * from tb")
print(rows)Write operations
Add upsert_data and upsert_data_prams to handle INSERT, UPDATE, DELETE, and CREATE TABLE statements. Both methods call connection.commit() after execution.
# Write without bound parameters.
def upsert_data(self, sql_upsert):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql_upsert)
connection.commit()
except Exception as e:
print(e)
finally:
self._close(connection, statement)
# Write with bound parameters (recommended to prevent SQL injection).
def upsert_data_prams(self, sql_upsert, params):
connection = self._connect()
statement = None
try:
statement = connection.cursor()
statement.execute(sql_upsert, params)
connection.commit()
except Exception as e:
print(e)
finally:
self._close(connection, statement)Call these methods from __main__:
if __name__ == '__main__':
url = 'rm-bp**************.mysql.rds.aliyuncs.com'
user = 'dbuser'
password = '****'
database = 'dbtest'
poolUtils = DBUtilsDemo(url, user, password, database)
# Write without bound parameters.
poolUtils.upsert_data("insert into tb(id,name,address) values ('i001','n001','a001')")
# Write with bound parameters.
params = ['i002', 'n002', 'a002']
poolUtils.upsert_data_prams("insert into tb(id,name,address) values (%s,%s,%s)", params)What's next
For Java connection pooling, see Use Druid to connect to an ApsaraDB RDS for MySQL instance.
For Go database connectivity, see Use Go-MySQL-Driver to connect to an ApsaraDB RDS for MySQL instance.
For server-side connection pooling managed by the RDS instance, see Configure the connection pooling feature.