All Products
Search
Document Center

ApsaraDB RDS:Use DBUtils to connect to an ApsaraDB RDS for MySQL instance

Last Updated:Mar 28, 2026

DBUtils is a Python connection pool library that reduces the overhead of repeatedly opening and closing database connections. Use it when your application frequently creates short-lived connections to an ApsaraDB RDS for MySQL instance, or when the instance is approaching its maximum connection limit.

Prerequisites

Before you begin, ensure that you have:

  • Python 3.7–3.12 installed on your application server

  • The server's IP address added to an IP address whitelist of the RDS instance (see Configure an IP address whitelist)

If your application runs on an Elastic Compute Service (ECS) instance in the same region and virtual private cloud (VPC) as the RDS instance, no IP address whitelist configuration is required.

Install dependencies

Install DBUtils and PyMySQL on your server. PyMySQL serves as the underlying database driver.

pip install DBUtils==3.1.0
pip install pymysql

For DBUtils documentation, see DBUtils.

Set up the connection pool

How it works

  • PooledDB maintains a pool of reusable connections.

  • Calling connection() retrieves an available connection from the pool.

  • Calling close() on a pooled connection returns it to the pool—it does not close the underlying connection.

  • When a connection has been reused maxusage times, DBUtils automatically closes and replaces it.

Step 1: Import modules

from dbutils.pooled_db import PooledDB
import importlib
import pymysql

Step 2: Build the DBUtilsDemo class

Create a DBUtilsDemo class that encapsulates the pool configuration and exposes _connect and _close helper methods.

class DBUtilsDemo:
    def __init__(self, url, user, password, database):
        db_config = {
            "host": url,
            "port": 3306,
            "user": user,
            "db": database,
            "password": password,
            "charset": "utf8",
            "connect_timeout": 3,   # Seconds to wait when establishing a connection
            "read_timeout": 10,     # Seconds to wait for a read operation
            "write_timeout": 10     # Seconds to wait for a write operation
        }
        # See "Connection pool parameters" for descriptions of each parameter.
        self.pooled = PooledDB(pymysql, maxcached=20, maxshared=0, maxconnections=100, maxusage=20, **db_config)

    # Retrieve a connection from the pool.
    def _connect(self):
        try:
            r = self.pooled.connection()
            return r
        except Exception as e:
            print("Failed to connect:" + str(e))

    # Return a connection to the pool.
    # Calling close() on a pooled connection does not close it—it makes the connection
    # available for reuse.
    def _close(self, conn, stmt):
        if stmt:
            stmt.close()
        if conn:
            conn.close()

Step 3: Initialize the pool and connect

In your __main__ function, initialize DBUtilsDemo with your RDS instance credentials.

if __name__ == '__main__':
    # Use the internal endpoint if your server is in the same VPC as the RDS instance;
    # use the public endpoint otherwise.
    url = 'rm-bp**************.mysql.rds.aliyuncs.com'
    user = 'dbuser'
    password = '****'
    database = 'dbtest'

    poolUtils = DBUtilsDemo(url, user, password, database)

Connection pool parameters

Configure PooledDB parameters in the __init__ method. Test parameter changes in a non-production environment before applying them in production.

Parameters to set explicitly

These parameters affect connection reliability and should always be set.

ParameterDefaultRecommendedDescription
maxcached020Maximum number of idle connections. 0 or None means no limit. Set this to reserve connections for traffic bursts.
maxusageNone10–20Maximum times a connection is reused before DBUtils closes and replaces it. Prevents stale idle connections from occupying resources.
connect_timeout103Seconds to wait when establishing a connection. Valid range: 1–31,536,000. For connection pools, a value between 1 and 10 is appropriate; adjust based on network latency.
read_timeoutNone10–60Seconds to wait for a read operation. If reads are slow, investigate your SQL statements or database load before reducing this value.
write_timeoutNone10–60Seconds to wait for a write operation. Same guidance as read_timeout.

Parameters to tune by workload

ParameterDefaultRecommendedDescription
maxconnections0100Maximum connections allowed in the pool. 0 or None means no limit. Set this to the maximum number of concurrent connections your database can handle.

Parameters that work well at their defaults

ParameterDefaultDescription
mincached0Initial number of idle connections created at startup. 0 means connections are created on demand—the recommended approach.
maxshared0Maximum shared connections. 0 or None means all connections are dedicated.
blockingFalseBehavior when the pool is exhausted. True blocks the request until a connection is available; False raises an error immediately.
setsessionNoneSQL statements to run when a connection is established. Use for session-level initialization.
resetTrueWhether to reset a connection's state when it is returned to the pool.
failuresNoneNumber of reconnection attempts when a connection fails.
ping1Heartbeat check for idle connections. Used to verify connection availability.

Execute database operations

Add methods to DBUtilsDemo to run read and write operations. All methods follow the same pattern: get a connection, execute, then return the connection to the pool.

Read operations

Add select_row and select_rows to query one or multiple rows.

# Query a single row.
def select_row(self, sql):
    connection = self._connect()
    statement = None
    try:
        statement = connection.cursor()
        statement.execute(sql)
        row = statement.fetchone()
        return row
    except Exception as e:
        print(e)
    finally:
        self._close(connection, statement)

# Query multiple rows.
def select_rows(self, sql):
    connection = self._connect()
    statement = None
    try:
        statement = connection.cursor()
        statement.execute(sql)
        rows = statement.fetchall()
        return rows
    except Exception as e:
        print(e)
    finally:
        self._close(connection, statement)

Call these methods from __main__:

if __name__ == '__main__':
    url = 'rm-bp**************.mysql.rds.aliyuncs.com'
    user = 'dbuser'
    password = '****'
    database = 'dbtest'
    poolUtils = DBUtilsDemo(url, user, password, database)

    # Query a single row.
    row = poolUtils.select_row("select * from tb where id = 'i001' limit 1")
    print(row)

    # Query multiple rows.
    rows = poolUtils.select_rows("select * from tb")
    print(rows)

Write operations

Add upsert_data and upsert_data_prams to handle INSERT, UPDATE, DELETE, and CREATE TABLE statements. Both methods call connection.commit() after execution.

# Write without bound parameters.
def upsert_data(self, sql_upsert):
    connection = self._connect()
    statement = None
    try:
        statement = connection.cursor()
        statement.execute(sql_upsert)
        connection.commit()
    except Exception as e:
        print(e)
    finally:
        self._close(connection, statement)

# Write with bound parameters (recommended to prevent SQL injection).
def upsert_data_prams(self, sql_upsert, params):
    connection = self._connect()
    statement = None
    try:
        statement = connection.cursor()
        statement.execute(sql_upsert, params)
        connection.commit()
    except Exception as e:
        print(e)
    finally:
        self._close(connection, statement)

Call these methods from __main__:

if __name__ == '__main__':
    url = 'rm-bp**************.mysql.rds.aliyuncs.com'
    user = 'dbuser'
    password = '****'
    database = 'dbtest'
    poolUtils = DBUtilsDemo(url, user, password, database)

    # Write without bound parameters.
    poolUtils.upsert_data("insert into tb(id,name,address) values ('i001','n001','a001')")

    # Write with bound parameters.
    params = ['i002', 'n002', 'a002']
    poolUtils.upsert_data_prams("insert into tb(id,name,address) values (%s,%s,%s)", params)

What's next