程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python implements simple sqlhelper

編輯:Python

Regular use Python Connect mysql database , We need to design a sqlHelper To facilitate our access mysql database , We use DBUtils.PooledDB To create a database connection pool , Get a database connection from the connection pool each time .

First installation pymysql and DBUtils

pip install pymysql==1.0.2
pip install DBUtils==3.0.2

structure SqlHelper

import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # Use the linked database module 
maxconnections=6, # The maximum number of connections allowed by the connection pool ,0 and None Indicates that there is no limit to the number of connections 
mincached=2, # On initialization , At least links created in the link pool ,0 Means not to create 
blocking=True, # If there is no connection available in the connection pool , Whether to block waiting .True, wait for ;False, Don't wait and report an error 
ping=0,
# ping MySQL Server side , Check if the service is available .# Such as :0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" Get all the data """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" Get all the data """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
return self.open()[1]
def __exit__(self):
pass
db = SqlHelper()
print(db)

Test database connection pool :

from SqlHelper import db
def task(num):
# Go to the connection pool to get a connection 
conn,cursor = db.open()
cursor.execute('select sleep(3)') # Execute... In the database 3 Second 
result = cursor.fetchall()
cursor.close()
# Put the connection to the connection pool 
conn.close()
print(num,'------------>',result)
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()

We performed a database pause 3 Of a second sql request , Determine the maximum number of connections in the connection pool . When running program output , every other 6 Will output once , It can be explained that the maximum number of connections in the connection pool is 6 individual .

with Context

We can combine with Context , To optimize :

def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()

But here's the thing , Although used with, Automatic execution __enter__ Assigned to cursor, But in multithreading , How to execute automatically close(), It can make the thread release the connection . Because in multithreading , Without interference , Each thread interferes with each other ( Because we have the singleton mode above , There is only one object , Will interfere with each other ), So you need a place to identify the current thread .

So we need to introduce threading.local()

threading.local()

class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # Use the linked database module 
maxconnections=6, # The maximum number of connections allowed by the connection pool ,0 and None Indicates that there is no limit to the number of connections 
mincached=2, # On initialization , At least links created in the link pool ,0 Means not to create 
blocking=True, # If there is no connection available in the connection pool , Whether to block waiting .True, wait for ;False, Don't wait and report an error 
ping=0,
# ping MySQL Server side , Check if the service is available .# Such as :0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
self.local = threading.local()
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" Get all the data """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" Get all the data """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
conn, cursor = self.open()
rv = getattr(self.local, 'stack', None)
if not rv:
self.local.stack = [(conn, cursor)]
else:
rv.append((conn, cursor))
self.local.stack = rv
return cursor
def __exit__(self): # According to different threads, close the corresponding conn and cursor
rv = getattr(self.local, 'stack', None)
if not rv:
# del self.local.stack
return
conn, cursor = self.local.stack.pop()
cursor.close()
conn.close()
db = SqlHelper()

test :

def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved