Python 连接 MySQL 数据库
#!/usr/bin/env python
# -*-coding:UTF-8-*-
# @Time: 2018/6/6 11:03
# @Author: en:zhu jin
# @File: test.py
# Description: None
from adb.db import getPool
import MySQLdb
import MySQLdb.cursors
from DBUtils.PooledDB import PooledDB
# encoding=utf-8
from util.app_trace import appTrace
from exception.awserverexception import AWServerException
class connMysql:
pool = None
def getPool(self):
pool = PooledDB(MySQLdb,
host=str("10.90.3.206"),
user=str("root"),
passwd=str("root"),
db=str("testbot_aw_dev"),
port=int(3306),
charset=str('utf8'),
mincached=0, maxcached=50, maxshared=0, maxconnections=50, blocking=True,
cursorclass=MySQLdb.cursors.DictCursor)
return pool
def __init__(self, tableName):
if not self.pool:
self.pool = getPool()
self.tableName = tableName
self.connection = self.pool.connection()
self.cursor = self.connection.cursor()
def execute(self, sql, commit=True, response=False):
try:
if not sql:
return
print sql
self.cursor.execute(sql)
if commit:
self.connection.commit()
if response:
retDict = self.cursor.fetchall()
if response:
return retDict
except Exception as e:
appTrace.error('sql is executed failed. error:%s, sql is %s' % (str(e), sql))
raise Exception("Failed to run the database")
finally:
self.cursor.close()
self.connection.close()
def select(self, keyValueDict, conj='and'):
sql = 'select * from %s ' % self.tableName
if keyValueDict:
whereClause = conj.join(' %s="%s" ' % (str(k), str(v)) for k, v in keyValueDict.items())
sql += ' where %s' % whereClause
sql += ';'
return self.execute(sql, commit=False, response=True)
def insertRecords(self, kvDictList):
try:
if not kvDictList:
appTrace.info('kvDictList is not set!')
return
kvDict = kvDictList[0]
sql = 'insert into ' + self.tableName + r' (' + ','.join(v for v in kvDict.keys()) + r') values '
vals = []
for item in kvDictList:
strItems = ','.join('"%s"' % str(v).replace("'", "\\\'").replace('"', '\\\"') for v in item.values())
vals.append('(%s)' % strItems)
insertContent = ','.join(i for i in vals)
sql += insertContent + ';'
self.execute(sql, commit=True)
except Exception as e:
appTrace.error('sql is executed failed. error:%s, sql is %s' % (str(e), sql))
return None
def insertOne(self, kvDict):
sql = None
try:
if not kvDict:
appTrace.info('kvDict is None!')
return
sql = 'insert into ' + self.tableName + r' (' + ','.join(
v for v in kvDict.keys()) + r') values ' + r' (' + ','.join(
'"%s"' % (str(v).replace("'", "\\\'").replace('"', '\\\"')) for v in kvDict.values()) + r');'
self.cursor.execute(sql)
self.connection.commit()
rowid = self.cursor.lastrowid
self.cursor.close()
self.connection.close()
return rowid
except Exception as e:
appTrace.error('sql is executed failed. error:%s, sql is %s' % (str(e), sql))
return None
if __name__ == "__main__":
aw_table = connMysql("t_aw_info")
print aw_table.select(dict(id=1076))
