协程应用——aiohttp异步爬虫实战

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

aiohttp异步爬虫实战

1. 案例介绍

  • 本例要爬取的网站是https://spa5.scrape.center/,数据量相对大所以用到了异步爬虫主要学习这种方法是如何提高效率的。网站界面如图。
    在这里插入图片描述
  • 居然有500多页老师辛苦了。
  • 这是一个图书网站里面包含数千条图书信息网站数据是JavaScript渲染而得。数据可以通过Ajax接口获取并且为了学习用接口没有设置任何措施和加密参数。也是因为这个网站数据量大才更适合做异步爬取。
  • 本节我们要完成的目标
    • 🍏 使用aiohttp爬取全栈的图书数据。
    • 🍏 将数据通过异步的方式保存到MongoDB中。

2. 准备工作

  • 开始完成本节任务前要有做如下准备。
    • 安装好了Python3.7以上。
    • 了解Ajax爬取的知识。
    • 了解异步爬虫的知识和asyncio库的用法。
    • 了解aiohttp库的用法。
    • 安装并运行了MongoDB数据库并安装了异步爬虫库motor。

3. 页面分析

  • 前面学习了Ajax的方法按照同样的逻辑我们分析本站的情况。

  • 列表页的Ajax请求地址格式为https://spa5.scrape.center/api/book/?limit=18&offset={offset}。其中limit的值是每页包含的书的数量offset的值是换页之后的增量值。计算公式为offset = limit*page-1。如下图。
    在这里插入图片描述

  • 在列表页Ajax接口返回的数据里results字段包含当前页里18本图书的信息。
    在这里插入图片描述

  • 其中每本书的数据里面包含一个id字段这个id字段就是图书本身的ID可以用来进一步请求详情页。如下图。在这里插入图片描述

  • 因此详情页的Ajax请求地址格式为https://spa5.scrape.center/api/book/{id}。其中的id即为详情页对应图书的ID可以从列表页Ajax接口的返回结果中获取内容。

4. 实现思路

  • 一个完善的异步爬虫应该能够充分利用等待的时间进行最大并发量的爬取处理。
  • 这里我们相实现的相对简单分为两部分1爬取列表页2爬取详情页。具体如下
  • 第一步异步爬取所有列表页再把这些列表页作为列表形成一个task再进行异步爬取。
  • 第二步把上面所有列表页的内容解析找到所有的详情页的信息再把这些详情页的信息作为列表形成一个task再进行异步爬取并把结果放入MongoDB数据库中。

5. 基本配置

  • 首先先配置一些基本的变量并引入一些必需的库代码如下
import asyncio
import aiohttp
import logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s:%(message)s')

INDEX_URL = 'https://spa5.scrape.center/api/book/?limit=18&offset={offset}'
DETAIL_URL = 'https://spa5.scrape.center/api/book/{id}'
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5
  • 这里我们导入了asyncio、aiohttp、logging三个库然后定义了logging在基本配置接着定义了URL、当前页面中的元素PAGE_SIZE 、数量爬取页码数量PAGE_NUMBER 、并发量CONCURRENCY 等信息。

6. 爬取列表页

  • 第一阶段先爬取列表页。
  • 先定义一个通用的爬取方法。代码如下
""" 爬取列表页"""

# 设定一个并发限制信号量为5
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

# 定义一个函数引入信号量
async def scrape_api(url):
    async with semaphore:
        try:
            logging.info('scraping %s', url)
            # 调用session的get方法返回json格式的数据
            async with session.get(url) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.error('error occurred while scraping %s',
                          url, exc_info=True)
  • 这里声明一个并发量用来控制最大并发量。
  • 接着定义了scrape_api方法接收一个参数url。该方法首先使用async with语句引入信号量并发量作为上下文接着调用session的get方法请求这个url然后返回响应的JSON格式的结果。另外这里还进行了异常处理捕获了ClientError如果出现错误就会输出异常信息。
  • 然后爬取列表页实现代码如下
# 定义爬取列表页的函数
async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page-1))
    return await scrape_api(url)
  • 显然这里定义的scrape_index方法接收一个参数page用于爬取列表页。先构造完整的url再传给上面的scrape_api函数。这里面再次用async定义为异步方法而且返回时调用scrape_api方法前面再加await是因为scrape_api调用之后会返回一个协程对象它的作用就是暂时等待。最后的结果JSON格式也是我们想要的。
  • 接下来我们定义main方法。
async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [asyncio.ensure_future(
        scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info('results %s', json.dumps(
        results, ensure_ascii=False, indent=2))

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
  • 这里定义的scrape_index_tasks是用于爬取列表页的所有task完成的JSON结果集组成的列表。然后调用asyncio的gather(聚集)方法将这个task列表作为其参数将结果赋值给results那么这个results最后再由主入口程序中的loop对象的run_until_complete方法注册到事件循环启动完成。
  • main方法中用全局变量声明了session变量方便使用。
  • 把上面代码全部放入一个py文件中执行在VSCode中发现一个问题。如图。
    在这里插入图片描述
  • 搜索了下百度找到了答案。说是VScode调试Runtime error报错是因为解释器里面已经有loop循环事件。所以把上面
import nest_asyncio
nest_asyncio.apply()
  • 这两行加入就解决了。
  • 运行结果如下
PS D:\Programs\PythonProject\Practice\threading> & D:/Programs/Python/Python310/python.exe d:/Programs/PythonProject/Practice/threading/asyncio_aiohttp.py
2023-01-17 22:54:38,037 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=0
2023-01-17 22:54:38,063 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=18
2023-01-17 22:54:38,063 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=36
2023-01-17 22:54:38,064 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=54
2023-01-17 22:54:38,064 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=72
2023-01-17 22:54:38,537 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=90
2023-01-17 22:54:39,161 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=108
2023-01-17 22:54:39,177 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=126
2023-01-17 22:54:39,472 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=144
2023-01-17 22:54:40,067 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=162
2023-01-17 22:54:40,077 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=180
2023-01-17 22:54:40,668 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=198
2023-01-17 22:54:40,676 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=216
2023-01-17 22:54:41,173 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=234
2023-01-17 22:54:41,672 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=252
2023-01-17 22:54:42,064 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=270
2023-01-17 22:54:42,073 - INFO:scraping https://spa5.scrape.center/api/book/?limit=18&offset=288
………………

7. 爬取详情页

  • 第二阶段再爬取详情页并保存数据。
  • 在main方法里面加入解析功能的代码。
    ids = []
    for index_data in results:
        if not index_data:continue
        # 因为都是json数据所以以键取值
        for item in index_data.get('results'):
            ids.append(item.get('id'))
  • 得到所有的id以后就可以构造详情页的地址从而形成对应的task再进行异步爬取。
  • 这里再定义两个方法用于爬取详情页。如下
from motor.motor_asyncio import AsyncIOMotorClient

MONGO_CONNECTION_STRING = 'MongoDB://localhost:27017'
MONGO_DB_NAME = 'scrape_book'
MONGO_COLLECTION_NAME = 'books'

client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
db = client(MONGO_DB_NAME)
collection = db[MONGO_COLLECTION_NAME]

async def save_data(data):
    logging.info('saving data %s', data)
    if data:
        return await collection.update_one({
            'id' : data.get('id')
        },{
            '$set' : data
        }, upsert=True)

async def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    data = await scrape_api(url)
    await save_data(data)
  • 这里定义了scrape_detail方法用于爬取详情页数据并调用save_data方保存到MongoDB里面。
  • 这里用到了支持异步的MongoDB存储库motor。motor的声明和pymongo类似不过方法是异步方法。
  • 接着再在main方法里面加入对scrape_detail的调用完成详情页的爬取。代码如下 。
    # 加入对scrape_detail方法的调用用来爬取详情页并保存到数据库
    scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids]
    await asyncio.wait(scrape_detail_tasks)
    await session.close()

8. 总结

  • 结论就是在爬取数据量大的网站时优先使用异步爬虫。
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6