All Products
Search
Document Center

Lindorm:Use DBUtils to develop applications

Last Updated:Oct 23, 2023

DBUtils is a Python connection pool used to connect to databases. This topic describes how to use DBUtils to connect to LindormTable.

Prerequisites

Preparations

Run the following code to install phoenixdb V1.2.0 and DBUtils V3.0.2:

pip install phoenixdb==1.2.0
pip install DBUtils==3.0.2

Sample code

#!/usr/bin/python3

from dbutils.pooled_db import PooledDB
import importlib


class DBUtilsDemo:

    def __init__(self, url, user, password, database):
        config = {
            'url': url,
            'lindorm_user': user,
            'lindorm_password': password,
            'database': database,
            'autocommit': True
        }
        db_creator = importlib.import_module("phoenixdb")
        # Create a connection pool based on DBUtils.
        self.pooled = PooledDB(db_creator,
                               maxcached=10,
                               # Specify the maximum number of idle connections in the connection pool based on business requirements.
                               maxconnections=50,
                               # Specify the maximum number of connections in the connection pool based on business requirements.
                               blocking=True,
                               # Specify whether the client waits if no idle connection is available in the connection pool. True indicates that the client waits until a connection is available. False indicates that the client does not wait for an idle connection. 
                               ping=1,
                               # Check whether the server can be accessed.
                               **config)

    # Obtain a connection from the connection pool.
    def _connect(self):
        try:
            r = self.pooled.connection()
            return r
        except Exception as e:
            print("Failed to connect:" + str(e))

    # Return a connection to the connection pool.
    def _close(self, conn, stmt):
        if stmt:
            stmt.close()
        if conn:
            conn.close()

    # Query a single row of data.
    def select_row(self, sql):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql)
            row = statement.fetchone()
            return row
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)

    # Query multiple rows of data.
    def select_rows(self, sql):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            print(sql)
            statement.execute(sql)
            rows = statement.fetchall()
            return rows
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)

    # Update and insert data to a table.
    def upsert_data(self, sql_upsert):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql_upsert)
            connection.commit()
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)

    # Update and insert data to a table by specifying parameters.
    def upsert_data_prams(self, sql_upsert, prams):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql_upsert, prams)
            connection.commit()
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)

if __name__ == '__main__':
    # Specify the endpoint that is used to connecte to LindormTable by using SQL. 
    url = 'http://ld-bp1p7e07ohamf****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060'
    # Specify the actual username used to connect to LindormTable. You can view the username and password used to connect to LindormTable in the LindormTable cluster management system. 
    user = 'root'
    # Specify the actual password used to connect to LindormTable.
    password = 'root'
    # Specify the name of the database to which you want to connect.
    database = 'test'

    poolUtils = DBUtilsDemo(url, user, password, database)
    poolUtils.upsert_data("upsert into tb(id,name,address) values ('i001','n001','a001')")

    params = ['i002', 'n002', 'a002']
    poolUtils.upsert_data_prams("upsert into tb(id,name,address) values (?,?,?)", params)

    rows = poolUtils.select_rows("select * from tb")
    print(rows)

    row = poolUtils.select_row("select * from tb limit 1")
    print(row)

    row = poolUtils.select_row("select * from tb where id = 'i001' limit 1")
    print(row)
Important
  • For more information about how to obtain the endpoint used to connect to LindormTable by using SQL, see View the endpoints of LindormTable.

  • The value of the url parameter is the HTTP address that can be obtained by deleting the jdbc:lindorm:table:url= part from the LindormTable SQL endpoint. For example, if the LindormTable SQL endpoint that is obtained from the Lindorm console is jdbc:lindorm:table:url=http://ld-bp1p7e07ohamf****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060, set the value of the url parameter to http://ld-bp1p7e07ohamf****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060.

  • You can run the ping command to test the connectivity of connections established by using DBUtils. However, this method is not applicable to connections established by using phoenixdb.

  • If a connection has not been used for more than 10 minutes, a NoSuchConnection exception is returned. In this case, you must obtain a new connection from the connection pool.