本文主要介紹如何使用SQLAlchemy將Python DataFrame的資料匯入至AnalyticDB for MySQL。
前提條件
已安裝Python環境,且Python版本為3.7及以上版本。
已建立AnalyticDB for MySQL叢集的資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號、授予普通帳號相應的庫表許可權並將RAM使用者綁定到普通帳號上。
已將運行Python應用程式的伺服器IP地址添加至AnalyticDB for MySQL叢集的白名單中。
注意事項
通過SQLAlchemy在AnalyticDB for MySQL中建立表時,需要配置checkfirst=False參數(例如 metadata.create_all(engine,checkfirst=False)),否則會報錯。
準備工作
進入SQL開發頁面。
企業版、基礎版或湖倉版:
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。
在左側導覽列,單擊。
在SQLConsole視窗,選擇XIHE引擎和Interactive型資源群組。
數倉版:串連叢集。
建立資料庫。
CREATE DATABASE demo_db;
操作步驟
本樣本涵蓋了自動產生動態隨機資料、構造待用資料、建立資料庫連接、建立表結構和寫入資料的過程,為您示範如何將資料匯入AnalyticDB for MySQL的內表。
執行以下代碼,下載Python依賴。
pip install pandas pip install pymysql替換以下代碼中的相關配置參數並執行,將DataFrame資料匯入至AnalyticDB for MySQL內表。
import pandas as pd from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float, DateTime, inspect import pytz import datetime import random # 定義一個函數用於動態產生隨機資料 def generate_random_book_data(num_records=4): """ Define a function to dynamically generate random book data parameter: num_records (int): The number of records to generate, default is 4. redsponse: pd.DataFrame: A Pandas DataFrame containing random book data. """ book_titles = [ "The Great Gatsby", "To Kill a Mockingbird", "1984", "Pride and Prejudice", "Moby Dick", "War and Peace", "The Catcher in the Rye", "Brave New World", "The Hobbit", "Crime and Punishment" ] timezones = ["America/New_York", "America/Chicago", "Europe/London", "Europe/Dublin", "Asia/Tokyo"] min_publication_year = 1700 max_publication_year = 2023 min_pages = 50 max_pages = 1000 current_year = datetime.datetime.now().year records = [] for i in range(num_records): book_title = random.choice(book_titles) publication_year = random.randint(min_publication_year, max_publication_year) pages = random.randint(min_pages, max_pages) timezone = random.choice(timezones) publication_date_local = datetime.datetime( publication_year, random.randint(1, 12), random.randint(1, 28), random.randint(0, 23), random.randint(0, 59), random.randint(0, 59) ) publication_date_utc = pytz.timezone(timezone).localize(publication_date_local).astimezone(pytz.utc) ebook_min_year = publication_year + 1 ebook_max_year = min(current_year + 10, publication_year + 50) if ebook_min_year > ebook_max_year: ebook_release_year = ebook_min_year else: ebook_release_year = random.randint(ebook_min_year, ebook_max_year) ebook_release = datetime.datetime( ebook_release_year, random.randint(1, 12), random.randint(1, 28), random.randint(0, 23), random.randint(0, 59), random.randint(0, 59) ) record = { "book_title": book_title, "publication_year": publication_year, "pages": pages, "publication_date": publication_date_utc, "ebook_release": ebook_release, } records.append(record) dataframe = pd.DataFrame(records) dataframe.index = [f"B{random.randint(1000, 9999)}" for _ in range(num_records)] dataframe.index.name = "book_id" return dataframe.reset_index() if __name__ == '__main__': # 配置AnalyticDB for MySQL串連資訊 # AnalyticDB for MySQL資料庫帳號 username = "user" # AnalyticDB for MySQL資料庫帳號的密碼 password = "password****" # AnalyticDB for MySQL的串連地址 host = "amv-t4ny9j5****.ads.aliyuncs.com" # AnalyticDB for MySQL的連接埠號碼,預設為3306 port = 3306 # AnalyticDB for MySQL的資料庫名稱 database = "demo_db" # AnalyticDB for MySQL的表名稱,若該表不存在,則自動建立,且會在建立時定義book_id為主鍵 table_name = "test_dataframe_insert_table" connection_string = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}" # 建立SQLAlchemy串連 try: engine = create_engine(connection_string) print("The connection with ADB MySQL was created successfully!") except Exception as e: print(f"Failed to create connection with ADB MySQL!: {e}") exit() # 構建樣本資料 records_fixed = [ { "book_title": "The Great Gatsby", "publication_year": 1925, "pages": 218, "publication_date": pytz.timezone("America/New_York") .localize(datetime.datetime(1925, 4, 10, 9, 0, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(2000, 6, 15, 10, 0, 0), }, { "book_title": "To Kill a Mockingbird", "publication_year": 1960, "pages": 281, "publication_date": pytz.timezone("America/Chicago") .localize(datetime.datetime(1960, 7, 11, 15, 30, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(2002, 8, 20, 12, 0, 0), }, { "book_title": "1984", "publication_year": 1949, "pages": 328, "publication_date": pytz.timezone("Europe/London") .localize(datetime.datetime(1949, 6, 8, 11, 0, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(1998, 5, 1, 9, 0, 0), }, { "book_title": "Pride and Prejudice", "publication_year": 1813, "pages": 432, "publication_date": pytz.timezone("Europe/Dublin") .localize(datetime.datetime(1813, 1, 28, 14, 0, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(2003, 12, 10, 14, 30, 0), }, ] # 轉換為Pandas DataFrame dataframe_fixed = pd.DataFrame( records_fixed, columns=[ "book_title", "publication_year", "pages", "publication_date", "ebook_release", ], index=pd.Index(["B1001", "B1002", "B1003", "B1004"], name="book_id"), ) # 將索引列添加為DataFrame的列 dataframe_fixed.reset_index(inplace=True) # 定義表結構 metadata = MetaData() books_table = Table( table_name, metadata, Column("book_id", String(10), primary_key=True), Column("book_title", String(255)), Column("publication_year", Integer), Column("pages", Integer), Column("publication_date", DateTime), Column("ebook_release", DateTime) ) try: inspector = inspect(engine) existing_tables = inspector.get_table_names() if table_name in existing_tables: print(f"Table {table_name} already exists, no need to recreate!") else: metadata.create_all(engine,checkfirst=False) print(f"Table {table_name} Successfully created!") except Exception as e: print(f"Error of checking or creating table:{e}") # 寫入待用資料 try: dataframe_fixed.to_sql(name=table_name, con=engine, if_exists="append", index=False) print(f"Static data is successfully written to the table {table_name}") except Exception as e: print(f"Error of writing static data::{e}") # 產生動態資料並寫入表 dataframe_dynamic = generate_random_book_data(100000) try: dataframe_dynamic.to_sql(name=table_name, con=engine, if_exists="append", index=False) print(f"Dynamic data is successfully written to the table {table_name}") except Exception as e: print(f"Error writing dynamic data:{e}") finally: # 關閉串連 engine.dispose()參數說明:
參數
說明
username
AnalyticDB for MySQL資料庫帳號。
password
AnalyticDB for MySQL資料庫帳號的密碼。
host
AnalyticDB for MySQL的串連地址。
port
AnalyticDB for MySQL的連接埠號碼,預設為3306。
database
AnalyticDB for MySQL的資料庫名稱。
table_name
AnalyticDB for MySQL的表名稱。若該表不存在,則自動建立,且會在建立時定義book_id為主鍵。
說明如果您想瞭解DataFrame資料的完整匯入流程,請參見SQLAlchemy官方文檔。
在AnalyticDB for MySQL中查詢資料是否匯入成功。
SELECT * FROM `demo_db`.`test_dataframe_insert_table` LIMIT 20;返回結果如下:
+---------+------------------------+------------------+-------+---------------------+---------------------+ | book_id | book_title | publication_year | pages | publication_date | ebook_release | +---------+------------------------+------------------+-------+---------------------+---------------------+ | B1752 | 1984 | 1861 | 493 | 1861-11-08 00:40:32 | 1906-08-15 08:12:56 | | B5420 | To Kill a Mockingbird | 1856 | 175 | 1856-02-26 00:50:57 | 1894-02-12 22:42:06 | | B4033 | Crime and Punishment | 1886 | 675 | 1886-11-20 08:23:29 | 1912-02-14 04:32:22 | | B3597 | Moby Dick | 1719 | 779 | 1719-10-05 02:15:13 | 1734-02-21 06:53:00 | | B4708 | To Kill a Mockingbird | 1811 | 951 | 1811-05-08 06:31:02 | 1830-11-03 22:23:42 | | B9493 | The Great Gatsby | 1976 | 843 | 1976-10-23 10:57:41 | 1986-06-11 22:18:59 | | B8066 | Crime and Punishment | 1750 | 689 | 1750-07-01 12:45:52 | 1766-10-06 00:35:15 | | B6823 | 1984 | 1921 | 462 | 1921-04-12 18:48:16 | 1953-09-14 03:30:51 | | B3875 | War and Peace | 1927 | 893 | 1927-03-12 19:08:28 | 1950-12-14 06:05:49 | | B3528 | The Hobbit | 1713 | 774 | 1713-11-05 13:59:30 | 1734-08-12 13:58:10 | | B7133 | Moby Dick | 1835 | 347 | 1835-07-02 15:03:11 | 1838-03-27 22:29:10 | | B1140 | War and Peace | 1837 | 496 | 1837-11-27 00:27:22 | 1881-08-10 13:37:35 | | B3244 | The Catcher in the Rye | 1842 | 739 | 1842-08-02 13:17:45 | 1876-10-07 07:35:05 | | B4300 | Brave New World | 1764 | 654 | 1764-06-26 08:56:17 | 1775-03-15 20:47:09 | | B4958 | The Catcher in the Rye | 1867 | 321 | 1867-12-21 16:55:14 | 1907-09-15 14:54:07 | | B6145 | Brave New World | 1844 | 872 | 1844-07-17 20:51:49 | 1866-08-06 14:17:23 | | B5163 | The Catcher in the Rye | 1798 | 952 | 1798-05-16 02:48:41 | 1813-09-12 15:07:39 | | B1310 | 1984 | 1975 | 588 | 1975-02-27 07:03:15 | 1985-03-09 23:27:13 | | B1857 | Crime and Punishment | 1990 | 59 | 1990-02-28 07:02:26 | 2015-10-19 08:21:48 | | B3155 | To Kill a Mockingbird | 1928 | 236 | 1928-05-17 13:04:56 | 1958-11-01 16:19:38 | +---------+------------------------+------------------+-------+---------------------+---------------------+