全部產品
Search
文件中心

ApsaraDB for OceanBase:PyMySQL 串連 OceanBase 資料庫樣本程式

更新時間:Jul 01, 2024

本文將介紹如何使用 PyMySQL 庫和 OceanBase 資料庫構建一個應用程式,實現基本的資料庫操作,包括建立表、插入資料、查詢資料和刪除表等。

image.png點擊下載 python-pymysql 樣本工程

前提條件

  • 您已安裝 Python 3.x 和 pip。

  • 您已安裝 OceanBase 資料庫並且建立了 MySQL 模式租戶。

操作步驟

  1. 檢查Pythonpip的版本。

  2. 安裝 PyMySQL 庫。

  3. 擷取 OceanBase 資料庫連接資訊。

  4. 修改config.py檔案中的資料庫連接資訊。

  5. 運行main.py檔案。

步驟一:檢查 Python 和 pip 的版本

開啟命令提示字元或 PowerShell 終端,運行python --versionpip --version命令,確保 Python 和 pip 正常安裝。

樣本如下:

PS C:\Windows\system32> python --version
Python 3.7.0
PS C:\Windows\system32> pip --version
pip 22.3.1 from d:\python\python37\lib\site-packages\pip (python 3.7)

步驟二:安裝 PyMySQL 庫

開啟命令提示字元或 PowerShell 終端,運行以下命令,安裝 PyMySQL 庫。

  1. 運行以下命令,進入代碼的python-pymysql目錄。

    樣本如下:

    cd python-pymysql
  2. 運行以下命令,安裝專案所需的 Python 庫。

    樣本如下:

    pip install -r requirements.txt
說明

您也可以直接開啟命令提示字元或 PowerShell 終端運行pip install pymysql命令安裝 PyMySQL 庫。

步驟三:擷取 OceanBase 資料庫連接資訊

聯絡 OceanBase 資料庫部署人員或者管理員擷取相應的資料庫連接串。

obclient -h$host -P$port -u$user_name -p$password -D$database_name

參數說明:

  • $hostOceanBase 資料庫連接的網域名稱。

  • $port:OceanBase 資料庫連接連接埠,MySQL 模式租戶預設是 3306。

  • $database_name需要訪問的資料庫名稱。

  • $user_name:租戶的串連帳號。

  • $password提供賬戶密碼。

更多串連串的資訊,請參見 擷取串連參數

樣本如下:

obclient -hxxx.xxx.xxx.xxx -P3306 -utest_user001 -p****** -Dtest

步驟四:修改 config.py 檔案中的資料庫連接資訊

根據 步驟三:擷取 OceanBase 資料庫連接資訊 中的資訊修改專案檔python-pymysql/config.py中的資料庫連接資訊。

  1. 進入python-pymysql專案檔夾。

  2. 修改config.py檔案中的資料庫連接資訊。

    • 在 Windows 環境下,使用文字編輯器開啟config.py檔案,修改檔案中的資料庫連接資訊,確保與實際情況相符。

    • 在 Linux 環境下,可以使用vi config.py或者vim config.py命令編輯config.py檔案,修改檔案中的資料庫連接資訊,確保與實際情況相符。

    config.py檔案中的資料庫連接資訊樣本如下:

    DB_CONFIG = {
        'host': '10.10.10.1',
        'port': 3306,
        'user': 'test_user001',
        'password': '******',
        'database': 'test',
        'charset': 'utf8mb4'
    }

步驟五:運行 main.py 檔案

開啟命令提示字元或 PowerShell 終端,運行main.py檔案,查詢資料並輸出結果。

  1. 進入python-pymysql專案目錄下。

    樣本如下:

    cd D:\demo\demo\python-pymysql
  2. 運行main.py檔案。

    樣本如下:

    python main.py

    返回結果如下:

    2023-11-10 16:56:48,021 - INFO - Start executing the script
    2023-11-10 16:56:48,021 - INFO - Start creating the table
    2023-11-10 16:56:48,281 - INFO - Table creation successful
    2023-11-10 16:56:48,281 - INFO - Start inserting data
    2023-11-10 16:56:48,540 - INFO - Data insertion successful
    (1, 'John', 20)
    (2, 'Lucy', 25)
    (3, 'Tom', 30)
    2023-11-10 16:56:48,737 - INFO - Start dropping the table
    2023-11-10 16:56:48,999 - INFO - Table dropped successfully
    2023-11-10 16:56:48,999 - INFO - Script execution completed

專案代碼介紹

點擊 python-pymysql 下載專案代碼,是一個名稱為python-pymysql.zip的壓縮包。

解壓後,得到一個名為python-pymysql的檔案夾。目錄結構如下所示:

python-pymysql
├── config.py
├── test_sql.py
├── main.py
└── requirements.txt

檔案說明:

  • config.py:用於儲存資料庫連接資訊。

  • test_sql.py:用於儲存 SQL 陳述式。

  • main.py:主程式入口,用於執行資料庫的基本操作,包括建立表、插入資料、查詢資料和刪除表。

  • requirements.txt:用於儲存專案所需要的 Python 包及其版本資訊。

    說明

    本文擷取的代碼中只列出了 PyMySQL 庫的版本要求,可以通過sudo pip install -r requirements.txt命令安裝,執行以上命令後,會自動安裝所需的庫。

config.py 代碼介紹

本文擷取的config.py檔案中

的代碼定義了資料庫連接資訊。資料庫連接資訊主要包括以下幾個部分:

指定串連資料庫的 IP 位址、連接埠號碼、使用者名稱、密碼、資料庫名稱和字元集。

代碼如下:

DB_CONFIG = {
    'host': '$host',
    'port': $port,
    'user': '$user_name',
    'password': '$password',
    'database': '$database_name',
    'charset': 'utf8mb4'
}

參數解釋:

  • hostOceanBase 資料庫連接的網域名稱。

  • port:提供 OceanBase 資料庫連接連接埠,預設連接埠是 3306。

  • user提供租戶的串連賬戶。

  • password提供賬戶密碼。

  • database:需要串連的資料庫名。

  • charset:串連資料庫時使用的字元集。

重要

這裡的參數值是根據具體環境和資料庫設定而定的,需要根據實際情況進行修改。

test_sql.py 代碼介紹

本文擷取的test_sql.py檔案中的代碼定義了資料庫操作的 SQL 陳述式,包括建立表、插入資料、查詢資料和刪除表,這些 SQL 陳述式可以通過 PyMySQL 串連資料庫後執行,以實現相應的功能。

該檔案中的代碼主要包括以下幾個部分:

  1. 建立表的 SQL 陳述式。

    定義建立表test_pymysql的 SQL 陳述式,在表中定義了三個欄位,分別為idnameage,其中id是自增長的主鍵。

    代碼如下:

    CREATE_TABLE_SQL = '''
    CREATE TABLE test_pymysql (
    id INT(11) NOT NULL AUTO_INCREMENT,
    name VARCHAR(128) NOT NULL,
    age INT(11) NOT NULL,
    PRIMARY KEY (id)
    )
    '''
  2. 插入資料的 SQL 陳述式。

    定義一個向test_pymysql表插入資料的 SQL 陳述式,插入的資料將包含三條,每條資料包含兩個欄位:nameage。每個欄位的值將在執行 SQL 陳述式時,通過預留位置%s的形式被傳入。

    代碼如下:

    INSERT_DATA_SQL = '''
    INSERT INTO test_pymysql (name, age) VALUES 
    (%s, %s),
    (%s, %s),
    (%s, %s)
    '''
  3. 查詢資料的 SQL 陳述式。

    定義查詢資料的 SQL 陳述式,從test_pymysql表中查詢所有資料。

    代碼如下:

    SELECT_DATA_SQL = '''
    SELECT * FROM test_pymysql
    '''
  4. 刪除表的 SQL 陳述式。

    定義刪除表的 SQL 陳述式,將test_pymysql表刪除。

    代碼如下:

    DROP_TABLE_SQL = '''
    DROP TABLE test_pymysql
    '''

main.py 代碼介紹

本文擷取的main.py檔案中的代碼通過調用pymysql模組,串連 MySQL 資料庫,以及調用logging模組,輸出日誌資訊,實現建立表、插入資料、查詢資料和刪除表的操作。

main.py檔案中的代碼主要包括以下幾個部分:

  1. 匯入所需的模組。

    1. 匯入logging模組。

    2. 匯入pymysql模組。

    3. 匯入config.py模組,其中定義了資料庫連接資訊。

    4. 匯入test_sql.py模組,其中定義了資料庫操作的 SQL 陳述式。

    代碼如下:

    import logging
    import pymysql
    from config import DB_CONFIG
    from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL
  2. 設定日誌記錄的層級和格式,並輸出一條 INFO 層級的日誌資訊,表示開始執行指令碼。

    代碼如下:

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logging.info('Start executing the script')
  3. 定義建立表的函數。

    定義一個名為create_table()的函數,輸出一條 INFO 層級的日誌資訊,表示開始建立表。使用with語句管理資料庫串連和遊標對象的生命週期,保證了資料庫連接和遊標對象的安全關閉,避免了記憶體流失等問題。執行建立表的 SQL 陳述式,提交事務並輸出日誌資訊,或復原事務並輸出錯誤記錄檔資訊。

    代碼如下:

    def create_table():
        logging.info('Start creating the table')
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                try:
                    cursor.execute(CREATE_TABLE_SQL)
                    conn.commit()
                    logging.info('Table creation successful')
                except Exception as e:
                    conn.rollback()
                    logging.error('Table creation failed, Reason:%s' % e)
  4. 定義插入資料的函數。

    定義一個名為insert_data()的函數,輸出一條 INFO 層級的日誌資訊,表示開始插入資料。使用with語句管理資料庫串連和遊標對象的生命週期,保證了資料庫連接和遊標對象的安全關閉,避免了記憶體流失等問題。執行插入資料的 SQL 陳述式,提交事務並輸出日誌資訊,或復原事務並輸出錯誤記錄檔資訊。

    代碼如下:

    def insert_data():
        logging.info('Start inserting data')
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                try:
                    data = [('John', 20), ('Lucy', 25), ('Tom', 30)]
                    flattened_data = [d for item in data for d in item]
                    cursor.executemany(INSERT_DATA_SQL, [flattened_data])
                    conn.commit()
                    logging.info('Data insertion successful')
                except Exception as e:
                    conn.rollback()
                    logging.error('Data insertion failed, Reason:%s' % e)
  5. 定義查詢資料的函數。

    定義一個名為select_data()的函數,用於從資料庫中查詢資料。使用with語句管理資料庫串連和遊標對象的生命週期,保證了資料庫連接和遊標對象的安全關閉,避免了記憶體流失等問題。使用execute()方法執行SELECT_DATA_SQL定義的 SQL 陳述式查詢資料。使用fetchall()方法擷取查詢結果,並通過for迴圈逐行輸出結果。

    代碼如下:

    def select_data():
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                cursor.execute(SELECT_DATA_SQL)
                result = cursor.fetchall()
                for row in result:
                    print(row)
  6. 定義刪除表的函數。

    定義一個名為drop_table()函數,該函數使用了預先定義的資料庫連接資訊(DB_CONFIG)和刪除表的 SQL 陳述式(DROP_TABLE_SQL)。函數會執行刪除表的操作,並列印相應的日誌資訊表示操作成功或失敗。如果刪除表操作失敗,會列印出錯誤資訊。

    代碼如下:

    def drop_table():
        logging.info('Start dropping the table')
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                try:
                    cursor.execute(DROP_TABLE_SQL)
                    conn.commit()
                    logging.info('Table dropped successfully')
                except Exception as e:
                    conn.rollback()
                    logging.error('Table drop failed, Reason:%s' % e)
  7. 定義程式的進入點,主要用於執行資料庫操作的函數。

    首先判斷當前模組是否作為主程式運行,如果是,則執行以下操作:

    1. 調用create_table()函數,建立資料庫表。

    2. 調用insert_data()函數,向表中插入資料。

    3. 調用select_data()函數,從表中查詢資料。

    4. 調用drop_table()函數,刪除資料庫表。

    代碼如下:

    if __name__ == '__main__':
        create_table()
        insert_data()
        select_data()
        drop_table()
  8. 輸出一條 INFO 層級的日誌資訊,表示指令碼執行完成。

    代碼如下:

    logging.info('Script execution completed')

完整的代碼展示

config.py

# Database Connection
DB_CONFIG = {
    'host': '$host',
    'port': $port,
    'user': '$user_name',
    'password': '$password',
    'database': '$database_name',
    'charset': 'utf8mb4'
}

test_sql.py

Create table
CREATE_TABLE_SQL = '''
CREATE TABLE test_pymysql (
  id INT(11) NOT NULL AUTO_INCREMENT,
  name VARCHAR(128) NOT NULL,
  age INT(11) NOT NULL,
  PRIMARY KEY (id)
  )
'''

Insert data
INSERT_DATA_SQL = '''
INSERT INTO test_pymysql (name, age) VALUES 
(%s, %s),
(%s, %s),
(%s, %s)
'''

Query data
SELECT_DATA_SQL = '''
SELECT * FROM test_pymysql
'''

Delete table
DROP_TABLE_SQL = '''
DROP TABLE test_pymysql
'''

main.py

import logging
import pymysql
from config import DB_CONFIG
from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Start executing the script')

Create table 
def create_table():
    logging.info('Start creating the table')
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            try:
                cursor.execute(CREATE_TABLE_SQL)
                conn.commit()
                logging.info('Table creation successful')
            except Exception as e:
                conn.rollback()
                logging.error('Table creation failed, Reason:%s' % e)

Insert data
def insert_data():
    logging.info('Start inserting data')
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            try:
                data = [('John', 20), ('Lucy', 25), ('Tom', 30)]
                flattened_data = [d for item in data for d in item]
                cursor.executemany(INSERT_DATA_SQL, [flattened_data])
                conn.commit()
                logging.info('Data insertion successful')
            except Exception as e:
                conn.rollback()
                logging.error('Data insertion failed, Reason:%s' % e)

Query data
def select_data():
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            cursor.execute(SELECT_DATA_SQL)
            result = cursor.fetchall()
            for row in result:
                print(row)

Delete table
def drop_table():
    logging.info('Start dropping the table')
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            try:
                cursor.execute(DROP_TABLE_SQL)
                conn.commit()
                logging.info('Table dropped successfully')
            except Exception as e:
                conn.rollback()
                logging.error('Table drop failed, Reason:%s' % e)

if __name__ == '__main__':
    create_table()
    insert_data()
    select_data()
    drop_table()

logging.info('Script execution completed')

相關文檔