本文將介紹如何使用 PyMySQL 庫和 OceanBase 資料庫構建一個應用程式,實現基本的資料庫操作,包括建立表、插入資料、查詢資料和刪除表等。
前提條件
您已安裝 Python 3.x 和 pip。
您已安裝 OceanBase 資料庫並且建立了 MySQL 模式租戶。
操作步驟
檢查
Python和pip的版本。安裝 PyMySQL 庫。
擷取 OceanBase 資料庫連接資訊。
修改
config.py檔案中的資料庫連接資訊。運行
main.py檔案。
步驟一:檢查 Python 和 pip 的版本
開啟命令提示字元或 PowerShell 終端,運行python --version和pip --version命令,確保 Python 和 pip 正常安裝。
樣本如下:
PS C:\Windows\system32> python --version
Python 3.7.0
PS C:\Windows\system32> pip --version
pip 22.3.1 from d:\python\python37\lib\site-packages\pip (python 3.7)步驟二:安裝 PyMySQL 庫
開啟命令提示字元或 PowerShell 終端,運行以下命令,安裝 PyMySQL 庫。
運行以下命令,進入代碼的
python-pymysql目錄。樣本如下:
cd python-pymysql運行以下命令,安裝專案所需的 Python 庫。
樣本如下:
pip install -r requirements.txt
您也可以直接開啟命令提示字元或 PowerShell 終端運行pip install pymysql命令安裝 PyMySQL 庫。
步驟三:擷取 OceanBase 資料庫連接資訊
聯絡 OceanBase 資料庫部署人員或者管理員擷取相應的資料庫連接串。
obclient -h$host -P$port -u$user_name -p$password -D$database_name參數說明:
$host:OceanBase 資料庫連接的網域名稱。$port:OceanBase 資料庫連接連接埠,MySQL 模式租戶預設是 3306。$database_name:需要訪問的資料庫名稱。$user_name:租戶的串連帳號。$password:提供賬戶密碼。
更多串連串的資訊,請參見 擷取串連參數。
樣本如下:
obclient -hxxx.xxx.xxx.xxx -P3306 -utest_user001 -p****** -Dtest步驟四:修改 config.py 檔案中的資料庫連接資訊
根據 步驟三:擷取 OceanBase 資料庫連接資訊 中的資訊修改專案檔python-pymysql/config.py中的資料庫連接資訊。
進入
python-pymysql專案檔夾。修改
config.py檔案中的資料庫連接資訊。在 Windows 環境下,使用文字編輯器開啟
config.py檔案,修改檔案中的資料庫連接資訊,確保與實際情況相符。在 Linux 環境下,可以使用
vi config.py或者vim config.py命令編輯config.py檔案,修改檔案中的資料庫連接資訊,確保與實際情況相符。
config.py檔案中的資料庫連接資訊樣本如下:DB_CONFIG = { 'host': '10.10.10.1', 'port': 3306, 'user': 'test_user001', 'password': '******', 'database': 'test', 'charset': 'utf8mb4' }
步驟五:運行 main.py 檔案
開啟命令提示字元或 PowerShell 終端,運行main.py檔案,查詢資料並輸出結果。
進入
python-pymysql專案目錄下。樣本如下:
cd D:\demo\demo\python-pymysql運行
main.py檔案。樣本如下:
python main.py返回結果如下:
2023-11-10 16:56:48,021 - INFO - Start executing the script 2023-11-10 16:56:48,021 - INFO - Start creating the table 2023-11-10 16:56:48,281 - INFO - Table creation successful 2023-11-10 16:56:48,281 - INFO - Start inserting data 2023-11-10 16:56:48,540 - INFO - Data insertion successful (1, 'John', 20) (2, 'Lucy', 25) (3, 'Tom', 30) 2023-11-10 16:56:48,737 - INFO - Start dropping the table 2023-11-10 16:56:48,999 - INFO - Table dropped successfully 2023-11-10 16:56:48,999 - INFO - Script execution completed
專案代碼介紹
點擊 python-pymysql 下載專案代碼,是一個名稱為python-pymysql.zip的壓縮包。
解壓後,得到一個名為python-pymysql的檔案夾。目錄結構如下所示:
python-pymysql
├── config.py
├── test_sql.py
├── main.py
└── requirements.txt檔案說明:
config.py:用於儲存資料庫連接資訊。test_sql.py:用於儲存 SQL 陳述式。main.py:主程式入口,用於執行資料庫的基本操作,包括建立表、插入資料、查詢資料和刪除表。requirements.txt:用於儲存專案所需要的 Python 包及其版本資訊。說明本文擷取的代碼中只列出了 PyMySQL 庫的版本要求,可以通過
sudo pip install -r requirements.txt命令安裝,執行以上命令後,會自動安裝所需的庫。
config.py 代碼介紹
本文擷取的config.py檔案中
的代碼定義了資料庫連接資訊。資料庫連接資訊主要包括以下幾個部分:
指定串連資料庫的 IP 位址、連接埠號碼、使用者名稱、密碼、資料庫名稱和字元集。
代碼如下:
DB_CONFIG = {
'host': '$host',
'port': $port,
'user': '$user_name',
'password': '$password',
'database': '$database_name',
'charset': 'utf8mb4'
}參數解釋:
host:OceanBase 資料庫連接的網域名稱。port:提供 OceanBase 資料庫連接連接埠,預設連接埠是 3306。user:提供租戶的串連賬戶。password:提供賬戶密碼。database:需要串連的資料庫名。charset:串連資料庫時使用的字元集。
這裡的參數值是根據具體環境和資料庫設定而定的,需要根據實際情況進行修改。
test_sql.py 代碼介紹
本文擷取的test_sql.py檔案中的代碼定義了資料庫操作的 SQL 陳述式,包括建立表、插入資料、查詢資料和刪除表,這些 SQL 陳述式可以通過 PyMySQL 串連資料庫後執行,以實現相應的功能。
該檔案中的代碼主要包括以下幾個部分:
建立表的 SQL 陳述式。
定義建立表
test_pymysql的 SQL 陳述式,在表中定義了三個欄位,分別為id、name和age,其中id是自增長的主鍵。代碼如下:
CREATE_TABLE_SQL = ''' CREATE TABLE test_pymysql ( id INT(11) NOT NULL AUTO_INCREMENT, name VARCHAR(128) NOT NULL, age INT(11) NOT NULL, PRIMARY KEY (id) ) '''插入資料的 SQL 陳述式。
定義一個向
test_pymysql表插入資料的 SQL 陳述式,插入的資料將包含三條,每條資料包含兩個欄位:name和age。每個欄位的值將在執行 SQL 陳述式時,通過預留位置%s的形式被傳入。代碼如下:
INSERT_DATA_SQL = ''' INSERT INTO test_pymysql (name, age) VALUES (%s, %s), (%s, %s), (%s, %s) '''查詢資料的 SQL 陳述式。
定義查詢資料的 SQL 陳述式,從
test_pymysql表中查詢所有資料。代碼如下:
SELECT_DATA_SQL = ''' SELECT * FROM test_pymysql '''刪除表的 SQL 陳述式。
定義刪除表的 SQL 陳述式,將
test_pymysql表刪除。代碼如下:
DROP_TABLE_SQL = ''' DROP TABLE test_pymysql '''
main.py 代碼介紹
本文擷取的main.py檔案中的代碼通過調用pymysql模組,串連 MySQL 資料庫,以及調用logging模組,輸出日誌資訊,實現建立表、插入資料、查詢資料和刪除表的操作。
main.py檔案中的代碼主要包括以下幾個部分:
匯入所需的模組。
匯入
logging模組。匯入
pymysql模組。匯入
config.py模組,其中定義了資料庫連接資訊。匯入
test_sql.py模組,其中定義了資料庫操作的 SQL 陳述式。
代碼如下:
import logging import pymysql from config import DB_CONFIG from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL設定日誌記錄的層級和格式,並輸出一條 INFO 層級的日誌資訊,表示開始執行指令碼。
代碼如下:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.info('Start executing the script')定義建立表的函數。
定義一個名為
create_table()的函數,輸出一條 INFO 層級的日誌資訊,表示開始建立表。使用with語句管理資料庫串連和遊標對象的生命週期,保證了資料庫連接和遊標對象的安全關閉,避免了記憶體流失等問題。執行建立表的 SQL 陳述式,提交事務並輸出日誌資訊,或復原事務並輸出錯誤記錄檔資訊。代碼如下:
def create_table(): logging.info('Start creating the table') with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: try: cursor.execute(CREATE_TABLE_SQL) conn.commit() logging.info('Table creation successful') except Exception as e: conn.rollback() logging.error('Table creation failed, Reason:%s' % e)定義插入資料的函數。
定義一個名為
insert_data()的函數,輸出一條 INFO 層級的日誌資訊,表示開始插入資料。使用with語句管理資料庫串連和遊標對象的生命週期,保證了資料庫連接和遊標對象的安全關閉,避免了記憶體流失等問題。執行插入資料的 SQL 陳述式,提交事務並輸出日誌資訊,或復原事務並輸出錯誤記錄檔資訊。代碼如下:
def insert_data(): logging.info('Start inserting data') with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: try: data = [('John', 20), ('Lucy', 25), ('Tom', 30)] flattened_data = [d for item in data for d in item] cursor.executemany(INSERT_DATA_SQL, [flattened_data]) conn.commit() logging.info('Data insertion successful') except Exception as e: conn.rollback() logging.error('Data insertion failed, Reason:%s' % e)定義查詢資料的函數。
定義一個名為
select_data()的函數,用於從資料庫中查詢資料。使用with語句管理資料庫串連和遊標對象的生命週期,保證了資料庫連接和遊標對象的安全關閉,避免了記憶體流失等問題。使用execute()方法執行SELECT_DATA_SQL定義的 SQL 陳述式查詢資料。使用fetchall()方法擷取查詢結果,並通過for迴圈逐行輸出結果。代碼如下:
def select_data(): with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: cursor.execute(SELECT_DATA_SQL) result = cursor.fetchall() for row in result: print(row)定義刪除表的函數。
定義一個名為
drop_table()函數,該函數使用了預先定義的資料庫連接資訊(DB_CONFIG)和刪除表的 SQL 陳述式(DROP_TABLE_SQL)。函數會執行刪除表的操作,並列印相應的日誌資訊表示操作成功或失敗。如果刪除表操作失敗,會列印出錯誤資訊。代碼如下:
def drop_table(): logging.info('Start dropping the table') with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: try: cursor.execute(DROP_TABLE_SQL) conn.commit() logging.info('Table dropped successfully') except Exception as e: conn.rollback() logging.error('Table drop failed, Reason:%s' % e)定義程式的進入點,主要用於執行資料庫操作的函數。
首先判斷當前模組是否作為主程式運行,如果是,則執行以下操作:
調用
create_table()函數,建立資料庫表。調用
insert_data()函數,向表中插入資料。調用
select_data()函數,從表中查詢資料。調用
drop_table()函數,刪除資料庫表。
代碼如下:
if __name__ == '__main__': create_table() insert_data() select_data() drop_table()輸出一條 INFO 層級的日誌資訊,表示指令碼執行完成。
代碼如下:
logging.info('Script execution completed')
完整的代碼展示
config.py
# Database Connection
DB_CONFIG = {
'host': '$host',
'port': $port,
'user': '$user_name',
'password': '$password',
'database': '$database_name',
'charset': 'utf8mb4'
}test_sql.py
Create table
CREATE_TABLE_SQL = '''
CREATE TABLE test_pymysql (
id INT(11) NOT NULL AUTO_INCREMENT,
name VARCHAR(128) NOT NULL,
age INT(11) NOT NULL,
PRIMARY KEY (id)
)
'''
Insert data
INSERT_DATA_SQL = '''
INSERT INTO test_pymysql (name, age) VALUES
(%s, %s),
(%s, %s),
(%s, %s)
'''
Query data
SELECT_DATA_SQL = '''
SELECT * FROM test_pymysql
'''
Delete table
DROP_TABLE_SQL = '''
DROP TABLE test_pymysql
'''main.py
import logging
import pymysql
from config import DB_CONFIG
from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Start executing the script')
Create table
def create_table():
logging.info('Start creating the table')
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
try:
cursor.execute(CREATE_TABLE_SQL)
conn.commit()
logging.info('Table creation successful')
except Exception as e:
conn.rollback()
logging.error('Table creation failed, Reason:%s' % e)
Insert data
def insert_data():
logging.info('Start inserting data')
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
try:
data = [('John', 20), ('Lucy', 25), ('Tom', 30)]
flattened_data = [d for item in data for d in item]
cursor.executemany(INSERT_DATA_SQL, [flattened_data])
conn.commit()
logging.info('Data insertion successful')
except Exception as e:
conn.rollback()
logging.error('Data insertion failed, Reason:%s' % e)
Query data
def select_data():
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
cursor.execute(SELECT_DATA_SQL)
result = cursor.fetchall()
for row in result:
print(row)
Delete table
def drop_table():
logging.info('Start dropping the table')
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
try:
cursor.execute(DROP_TABLE_SQL)
conn.commit()
logging.info('Table dropped successfully')
except Exception as e:
conn.rollback()
logging.error('Table drop failed, Reason:%s' % e)
if __name__ == '__main__':
create_table()
insert_data()
select_data()
drop_table()
logging.info('Script execution completed')相關文檔
更多串連 OceanBase 資料庫的資訊,請參見 串連方式概述。
