python链接池和pymysql批量入库——从0实现大规模异步爬虫框架项目4_python 批量入库
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
我将这个链接池和批量入库封装了一个工具类上传了pypi,可以直接import使用
使用也较为简单导入PooledDBhelper的DBhelper调用DBhelper.PooledDBhelper()方法传入数据库链接信息创建一个链接池即可
pip install PooledDBhelper==1.0.0
-------------------------------
正文
众所周知不管是爬虫也好后端也好Python开发最常用的ORM就是sqlAlchemy他很完善很强大但是为了更快更轻不用学习新的语法而且可以理解一些sql概念而不是直接使用工具。
我们先来维护一个链接池然后做一个sqlhelper工具类实现更简单好用的数据存储入库。
数据库现在的数据库很多关系型数据库 MySQL(MariaDB), PostgreSQL 等NoSQL数据库还有NewSqL数据库。但MySQL(Mariadb)从易获取性、易使用性、稳定性、社区活跃性方面都有较大优势所以我们在够用的情况下都选择MySQL。
数据库客户端模块然后我们选择PyMySQL这个库它可以和Python 3的异步模块aysncio结合起来形成了aiomysql 模块后面我们写异步爬虫时就可以对数据库进行异步操作了。链接池模块: 我们考虑到创建和释放数据库连接是一个很耗时的操作所以通常创建一个连接池需要就获取用完则放回连接池。这个模块有主要有两个模块PooledDB和PersistentDB我们选择PooledDB
一个简单的链接池案例引入pymysql和PooledDB实例化PooledDB在参数中传入数据库链接配置creator参数选择pymysql,得到一个链接池使用链接池的connection()方法获取一个链接,使用链接的cursor()方法获得游标然后execute()执行sql,从cursor.fetchall()中获取结果即可。下面task函数就是简单的使用然后如果需要我们可以开线程去跑。
import pymysql
from dbutils.pooled_db import PooledDB
test_POOL=PooledDB(
creator=pymysql,
maxconnections=10,
mincached=2,
blocking=True,
host='127.0.0.1',
# sshtunnel='',
port=3306,
user='root',
password='root',
database='jxc',
charset='utf8'
)
def task(num):
sql = "SELECT * FROM duty"
conn=test_POOL.connection()
cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute(sql)
data = cursor.fetchall()
print(num, '-' * 8)
for i in data:
print(i)
conn.close()
from threading import Thread
for i in range(32):
t=Thread(target=task,args=(i,))
t.start()
查询的方法就像上面这样接下来我们写一个类
包含创建链接池和一些数据库操作,具体解释一下解释使用dbutils库 传入数据库链接信息创建一个链接池我这用了一个{}接受参数也方便后面做扩展。
import pymysql
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB
class PooledDBhelper:
def __init__(self, dbconfig: {}):
'''
:param dbconfig: {
'host': '192.168.0.1',
'user': 'username',
'password': 'password',
'port': 3306,
'db': 'db_name'
}
'''
self.pool = self.connectionPool(dbconfig)
def connectionPool(self, dbconfig):
try:
pool = PooledDB(
creator=pymysql,
maxconnections=10, # 连接池允许的最大连接数0和None表示不限制连接数
mincached=2, # 初始化时链接池中至少创建的空闲的链接0表示不创建
# 连接池中如果没有可用连接后是否阻塞等待。True等待False不等待然后报错
blocking=True,
host=dbconfig['host'],
user=dbconfig['user'],
passwd=dbconfig['password'],
db=dbconfig['db'],
cursorclass=DictCursor
)
return pool
except Exception as e:
raise Exception("数据库链接失败(create connect failed):{}".format(e))
写完这个类我们就可以通过 pool=PooledDBhelper(dbconfig)获得一个链接池了这个池一般开局注册一个作为全局变量而不是每次数据库操作新注册一个池子。
if __name__ == "__main__":
pool=PooledDBhelper({
'host': '192.168.0.1',
'user': 'username',
'password': 'password',
'port': 3306,
'db': 'db_name'
})
然后接下来我们继续给这个类添加功能常用的两个执行一个sql语句获取一条结果或插入单条和执行一个sql语句获取多条结果这个直接就写了分别用了两种写法
def task(self, sql, *args):
'''
fetchall
:param sql:
:param args:
:return:
'''
conn = self.pool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
try:
cursor.execute(sql, args)
data = cursor.fetchall()
except Exception as e:
raise ("SQL execution failure", e)
else:
return data
finally:
cursor.close()
conn.close()
def fetchone(self, sql):
with self.pool.connection() as connection:
connection.autocommit = True
with connection.cursor() as cursor:
'''
在创建连接的时候增加参数 autocommit = 1 当发生update等操作时会实时更新到数据库内。避免 conn.commit() 来提交到数据库
如果没有设置自动提交也没有手动提交当进行插入或更新等操作时只在本地客户端能看到更新在其他客户端或数据库内数据无变化。
适合实时操作随时少量、频繁的更新'''
row=cursor.execute(sql)
result = cursor.fetchone()
connection.commit()
return result
最后我们做最主要的一个功能因为爬虫的数据库操作大部分都是入库,我们做一个批量入库
因为大部分时候我们爬的数据都是一个[{"k":"v"}] 这样的形式所以我做了一个只需传入字典列表
自动获取字典的key作为字段value作为内容的入库
def insert_many(self, many_data, table_name):
'''
:param [{"k1":"v1","k2":"v2"},{"k1":"v3","k2":"v4"}]:
:param table_name:
:return: affected_rows
'''
values = [tuple(i.values()) for i in many_data]
keys = list(many_data[-1].keys())
sql_1 = "insert into `{}`(`{}`) values({})".format(table_name, '`,`'.join(many_data[-1].keys()),
','.join([''.join('%s') for _ in keys]))
try:
with self.pool.connection() as conn:
with conn.cursor() as cursor:
row_number = cursor.executemany(sql_1, values)
conn.commit()
return "Successful affected_rows: {}".format(row_number)
except Exception as e:
conn.rollback()
return "ERROR:{}".format(e)
好接下来做一些入库的操作看看好不好用
if __name__ == "__main__":
pool=PooledDBhelper({
'host': '192.168.0.1',
'user': 'username',
'password': 'password',
'port': 3306,
'db': 'db_name'
})
data_list= [{"name":'a', 'info':'1'}, {"name":'b', 'info':'2'},{"name":'none', 'info':'3'}]
rows=pool.insert_many(data_list,"cy_self_test")
print(rows)
result_list=pool.task("select * from cy_self_test")
print(result_list)
query = "insert into `cy_self_test`({}) values {}".format("`name`,`info`", ("cy","world"))
pool.fetchone(query)
query="select * from cy_self_test where id=1"
result=pool.fetchone(query)
print(result)
然后整个封装起来我们就获得了一个数据库工具类做数据库链接池支持批量插入和查询基于PooledDB代码封装,并且简化了开发人员的操作。
下一节讲怎么把一个代码包传到pypi