這篇文章主要介紹了在Python中編寫數(shù)據(jù)庫模塊的教程,本文代碼基于Python2.x版本,需要的朋友可以參考下
在一個Web App中,所有數(shù)據(jù),包括用戶信息、發(fā)布的日志、評論等,都存儲在數(shù)據(jù)庫中。在awesome-python-app中,我們選擇MySQL作為數(shù)據(jù)庫。
Web App里面有很多地方都要訪問數(shù)據(jù)庫。訪問數(shù)據(jù)庫需要創(chuàng)建數(shù)據(jù)庫連接、游標(biāo)對象,然后執(zhí)行SQL語句,最后處理異常,清理資源。這些訪問數(shù)據(jù)庫的代碼如果分散到各個函數(shù)中,勢必?zé)o法維護,也不利于代碼復(fù)用。
此外,在一個Web App中,有多個用戶會同時訪問,系統(tǒng)以多進程或多線程模式來處理每個用戶的請求。假設(shè)以多線程為例,每個線程在訪問數(shù)據(jù)庫時,都必須創(chuàng)建僅屬于自身的連接,對別的線程不可見,否則,就會造成數(shù)據(jù)庫操作混亂。
所以,我們還要創(chuàng)建一個簡單可靠的數(shù)據(jù)庫訪問模型,在一個線程中,能既安全又簡單地操作數(shù)據(jù)庫。
為什么不選擇SQLAlchemy?SQLAlchemy太龐大,過度地面向?qū)ο笤O(shè)計導(dǎo)致API太復(fù)雜。
所以我們決定自己設(shè)計一個封裝基本的SELECT、INSERT、UPDATE和DELETE操作的db模塊:transwarp.db。
設(shè)計db接口
設(shè)計底層模塊的原則是,根據(jù)上層調(diào)用者設(shè)計簡單易用的API接口,然后,實現(xiàn)模塊內(nèi)部代碼。
假設(shè)transwarp.db模塊已經(jīng)編寫完畢,我們希望以這樣的方式來調(diào)用它:
首先,初始化數(shù)據(jù)庫連接信息,通過create_engine()函數(shù):
from transwarp import db
db.create_engine(user='root', password='password', database='test', host='127.0.0.1', port=3306)
然后,就可以直接操作SQL了。
如果需要做一個查詢,可以直接調(diào)用select()方法,返回的是list,每一個元素是用dict表示的對應(yīng)的行:
users = db.select('select * from user')
# users =>
# [
# { "id": 1, "name": "Michael"},
# { "id": 2, "name": "Bob"},
# { "id": 3, "name": "Adam"}
# ]
如果要執(zhí)行INSERT、UPDATE或DELETE操作,執(zhí)行update()方法,返回受影響的行數(shù):
n = db.update('insert into user(id, name) values(?, ?)', 4, 'Jack')
update()函數(shù)簽名為:
update(sql, *args)
統(tǒng)一用?作為占位符,并傳入可變參數(shù)來綁定,從根本上避免SQL注入攻擊。
每個select()或update()調(diào)用,都隱含地自動打開并關(guān)閉了數(shù)據(jù)庫連接,這樣,上層調(diào)用者就完全不必關(guān)心數(shù)據(jù)庫底層連接。
但是,如果要在一個數(shù)據(jù)庫連接里執(zhí)行多個SQL語句怎么辦?我們用一個with語句實現(xiàn):
with db.connection():
db.select('...')
db.update('...')
db.update('...')
如果要在一個數(shù)據(jù)庫事務(wù)中執(zhí)行多個SQL語句怎么辦?我們還是用一個with語句實現(xiàn):
with db.transaction():
db.select('...')
db.update('...')
db.update('...')
實現(xiàn)db模塊
由于模塊是全局對象,模塊變量是全局唯一變量,所以,有兩個重要的模塊變量:
# db.py
# 數(shù)據(jù)庫引擎對象:
class _Engine(object):
def __init__(self, connect):
self._connect = connect
def connect(self):
return self._connect()
engine = None
# 持有數(shù)據(jù)庫連接的上下文對象:
class _DbCtx(threading.local):
def __init__(self):
self.connection = None
self.transactions = 0
def is_init(self):
return not self.connection is None
def init(self):
self.connection = _LasyConnection()
self.transactions = 0
def cleanup(self):
self.connection.cleanup()
self.connection = None
def cursor(self):
return self.connection.cursor()
_db_ctx = _DbCtx()
由于_db_ctx是threadlocal對象,所以,它持有的數(shù)據(jù)庫連接對于每個線程看到的都是不一樣的。任何一個線程都無法訪問到其他線程持有的數(shù)據(jù)庫連接。
有了這兩個全局變量,我們繼續(xù)實現(xiàn)數(shù)據(jù)庫連接的上下文,目的是自動獲取和釋放連接:
class _ConnectionCtx(object):
def __enter__(self):
global _db_ctx
self.should_cleanup = False
if not _db_ctx.is_init():
_db_ctx.init()
self.should_cleanup = True
return self
def __exit__(self, exctype, excvalue, traceback):
global _db_ctx
if self.should_cleanup:
_db_ctx.cleanup()
def connection():
return _ConnectionCtx()
定義了__enter__()和__exit__()的對象可以用于with語句,確保任何情況下__exit__()方法可以被調(diào)用。
把_ConnectionCtx的作用域作用到一個函數(shù)調(diào)用上,可以這么寫:
with connection():
do_some_db_operation()
但是更簡單的寫法是寫個@decorator:
@with_connection
def do_some_db_operation():
pass
這樣,我們實現(xiàn)select()、update()方法就更簡單了:
@with_connection
def select(sql, *args):
pass
@with_connection
def update(sql, *args):
pass
注意到Connection對象是存儲在_DbCtx這個threadlocal對象里的,因此,嵌套使用with connection()也沒有問題。_DbCtx永遠(yuǎn)檢測當(dāng)前是否已存在Connection,如果存在,直接使用,如果不存在,則打開一個新的Connection。
對于transaction也是類似的,with transaction()定義了一個數(shù)據(jù)庫事務(wù):
with db.transaction():
db.select('...')
db.update('...')
db.update('...')
函數(shù)作用域的事務(wù)也有一個簡化的@decorator:
@with_transaction
def do_in_transaction():
pass
事務(wù)也可以嵌套,內(nèi)層事務(wù)會自動合并到外層事務(wù)中,這種事務(wù)模型足夠滿足99%的需求。
事務(wù)嵌套比Connection嵌套復(fù)雜一點,因為事務(wù)嵌套需要計數(shù),每遇到一層嵌套就+1,離開一層嵌套就-1,最后到0時提交事務(wù):
class _TransactionCtx(object):
def __enter__(self):
global _db_ctx
self.should_close_conn = False
if not _db_ctx.is_init():
_db_ctx.init()
self.should_close_conn = True
_db_ctx.transactions = _db_ctx.transactions + 1
return self
def __exit__(self, exctype, excvalue, traceback):
global _db_ctx
_db_ctx.transactions = _db_ctx.transactions - 1
try:
if _db_ctx.transactions==0:
if exctype is None:
self.commit()
else:
self.rollback()
finally:
if self.should_close_conn:
_db_ctx.cleanup()
def commit(self):
global _db_ctx
try:
_db_ctx.connection.commit()
except:
_db_ctx.connection.rollback()
raise
def rollback(self):
global _db_ctx
_db_ctx.connection.rollback()
最后,把select()和update()方法實現(xiàn)了,db模塊就完成了。
更多信息請查看IT技術(shù)專欄