程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

【Python工程師之高性能爬蟲】

編輯:Python

前言

如何在spiders中使用異步操作實現高性能的數據爬取
首先講解一下異步爬蟲的方式:

  1. 多線程、多進程(不建議):
    弊端:無法無限制的開啟多線程或者多進程
    優勢:可以為相關阻塞的方法類單獨開啟線程進程,從而實現異步執行腳本
  2. 線程池、進程池(適當的使用):
    弊端:線程池或進程池中的數量是有上限的。
    優勢:固定了線程和進程的數量,從而降低系統對進程或者線程創建和銷毀次數,可以很好地降低系統的開銷
  3. 單線程 + 異步協程(推薦):
    一些概念和兩個關鍵字:
    ①event_loop(事件循環):相當於一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足某些條件時,函數就會被循環執行。
    ②coroutline(協程對象):我們可以將協程對象注冊到事件循環中,它會被事件循環調用。我們可以使用async關鍵字來定義一個方法,這個方法在調用時不會被立即被執行,而是返回一個協程對象。
    ③task(任務):,它是對協程對象的進一步封裝,包含了任務的各個狀態。
    ④future(任務):代表將來執行或還沒有執行的任務,實際上和task沒有本質區別。
    ⑤async(協程):定義一個協程。
    ⑥await(等待執行):用來掛起阻塞方法的執行

tips:

await

await語句後必須是一個 可等待對象 ,可等待對象主要有三種:Python協程,Task,Future。通常情況下沒有必要在應用層級的代碼中創建 Future 對象。

Coroutine

協程(Coroutine),又稱微線程,纖程。通常我們認為線程是輕量級的進程,因此我們也把協程理解為輕量級的線程即微線程。
協程的作用是在執行函數A時可以隨時中斷去執行函數B,然後中斷函數B繼續執行函數A(可以自由切換)。
這裡的中斷,不是函數的調用,而是有點類似CPU的中斷。這一整個過程看似像多線程,然而協程只有一個線程執行。
協程的優勢
執行效率極高,因為是子程序(函數)切換不是線程切換,由程序自身控制,沒有切換線程的開銷。所以與多線程相比,線程的數量越多,
協程的性能優勢越明顯。
不需要鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在控制共享資源時也不需要加鎖,只需要判斷狀態,因此執行效率高的多。
協程可以處理IO密集型程序的效率問題,但不適合處理CPU密集型程序,如要充分發揮CPU利用率應結合多進程+協程。

asyncio

asyncio是Python3.4引入的一個標准庫,直接內置了對異步IO的支持。asyncio模塊提供了使用協程構建並發應用的工具。它使用一種單線程
單進程的的方式實現並發,應用的各個部分彼此合作, 可以顯示的切換任務,一般會在程序阻塞I/O操作的時候發生上下文切換如等待讀寫文件,
或者請求網絡。同時asyncio也支持調度代碼在將來的某個特定事件運行,從而支持一個協程等待另一個協程完成,以處理系統信號和識別其
他一些事件。
在 asyncio 程序中使用同步代碼雖然並不會報錯,但是也失去了並發的意義,例如網絡請求,如果使用僅支持同步的 requests,
在發起一次請求後在收到響應結果之前不能發起其他請求,這樣要並發訪問多個網頁時,即使使用了 asyncio,在發送一次請求
後, 切換到其他協程還是會因為同步問題而阻塞,並不能有速度上的提升,這時候就需要其他支持異步操作的請求庫如 aiohttp.

單線程爬蟲

這裡使用requests 請求,requests是一個同步請求的類庫

import requests
headers = {

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
}
def get_content(url):
response = requests.get(url=url, headers=headers)
if response.status_code == 200:
return response.content
def parse_content(content):
print('相應數據的長度為:', len(content))
if __name__ == "__main__":
urls = [
'https://item.jd.com/100030771664.html',
'https://item.jd.com/100030771664.html',
'https://item.jd.com/100030771664.html',
]
for url in urls:
content = get_content(url)
parse_content(content)

協程

asyncio 是 Python 中的異步IO庫,用來編寫並發協程,適用於IO阻塞且需要大量並發的場景,例如爬蟲、文件讀寫。

asyncio 在 Python3.4 被引入,經過幾個版本的迭代,特性、語法糖均有了不同程度的改進,這也使得不同版本的 Python 在 asyncio 的用法上各不相同,顯得有些雜亂,以前使用的時候也是本著能用就行的原則,在寫法上走了一些彎路,現在對 Python3.7+ 和 Python3.6 中 asyncio 的用法做一個梳理,以便以後能更好的使用

import asyncio
async def request(url):
return url
c = request('www.baidu.com')
def callback_func(task):
print(task.result())
# 綁定回調
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
# 將回調函數綁定到任務對象中
task.add_done_callback(callback_func)
loop.run_until_complete(task)

單線程異步協程實現

在request的基礎上 使用異步IO庫的asyncio

import requests
import asyncio
import time
start = time.time()
urls = [
'http://127.0.0.1:5000/111',
'http://127.0.0.1:5000/222',
'http://127.0.0.1:5000/333',
]
async def get_page(url):
print('正在下載', url)
response = requests.get(url)
print('下載完畢', response.text)
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time()-start)

線程池爬數據

from multiprocessing.dummy import Pool as Pool
import time
def func(msg):
print('msg:', msg)
time.sleep(2)
print('end:')
# 三個線程
pool = Pool(processes=3)
for i in range(1, 5):
msg = 'hello %d' % (i)
# 非阻塞
pool.apply_async(func, (msg,))
# 阻塞,apply()源自內建函數,用於間接的調用函數,並且按位置把元祖或字典作為參數傳入。
# pool.apply(func,(msg,))
# 非阻塞, 注意與apply傳的參數的區別
# pool.imap(func,[msg,])
# 阻塞
# pool.map(func, [msg, ])
print('start~~~~~~~~~~~~~~~')
pool.close()
pool.join()
print('done~~~~~~~~~~~~~~~')

這裡演示一個aiohttp實現多任務異步協程

aiohttp是一個建立在asyncio上的,既支持http又支持websocket的一個庫。並且同時支持客戶端和服務端。

import asyncio
import logging
import time
import json
from threading import Thread
from aiohttp import ClientSession, ClientTimeout, TCPConnector, BasicAuth
import base64
from urllib.parse import unquote, quote
# 默認請求頭
HEADERS = {

'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
# "User-Agent": "curl/7.x/line",
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
}
# 默認超時時間
TIMEOUT = 15
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
class AioCrawl:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.proxyServer = None
# 啟動事件循環
self.event_loop = asyncio.new_event_loop()
self.t = Thread(target=start_loop, args=(self.event_loop,))
self.t.setDaemon(True)
self.t.start()
self.concurrent = 0 # 記錄並發數
async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None):
"""采集纖程 :param url: str :param method: 'GET' or 'POST' :param headers: dict() :param timeout: int :param cookies: :param data: dict() :param proxy: str :return: (status, content) """
method = 'POST' if method.upper() == 'POST' else 'GET'
headers = headers if headers else HEADERS
timeout = ClientTimeout(total=timeout)
cookies = cookies if cookies else None
data = data if data and isinstance(data, dict) else {
}
proxy = proxy if proxy else self.proxyServer
tcp_connector = TCPConnector(limit=64) # 禁用證書驗證
async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session:
try:
if method == 'GET':
async with session.get(url, proxy=proxy) as response:
content = await response.read()
return response.status, content
else:
async with session.post(url, data=data, proxy=proxy) as response:
content = await response.read()
return response.status, content
except Exception as e:
raise e
def callback(self, future):
"""回調函數 1.處理並轉換成Result對象 2.寫數據庫 """
msg = str(future.exception()) if future.exception() else 'success'
code = 1 if msg == 'success' else 0
status = future.result()[0] if code == 1 else None
data = future.result()[1] if code == 1 else b'' # 空串
data_len = len(data) if data else 0
if code == 0 or (status is not None and status != 200): # 打印小異常
self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format(
future.url, code, msg, status, data_len))
self.concurrent -= 1 # 並發數-1
return data
def add_tasks(self, tasks, method='GET', data=None, headers=None):
"""添加任務 :param tasks: list <class Task> :return: future """
resultList = []
for task in tasks:
headers = headers if headers else HEADERS
# asyncio.run_coroutine_threadsafe 接收一個協程對象和,事件循環對象
future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers), self.event_loop)
future.add_done_callback(self.callback) # 給future對象添加回調函數
self.concurrent += 1 # 並發數加 1
result = future.result()
# print(result)
resultList.append(str(result[1], encoding="utf-8"))
return resultList
def add_one_tasks(self, task, headers=None, method='GET', data=None, proxy=None):
"""添加任務 :param tasks: list <class Task> :return: future """
future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers, proxy=proxy), self.event_loop)
future.add_done_callback(self.callback) # 給future對象添加回調函數
result = future.result()
return [str(result[1], encoding="utf-8")]
def getProductParm(self, productguid):
base = '{"productguid":"%s","areacode":"","referer":"https://zc.plap.mil.cn/productdetail.html?productguid=%s"}' % (
productguid, productguid)
# 編碼
base_d = quote(base)
return str(base64.b64encode(base_d.encode("utf-8")), "utf-8")
if __name__ == '__main__':
a = AioCrawl()
headers = {

"Host": "api.erp.idodb.com",
"Accept": "application/json",
"Content-Type": "application/json;charset=UTF-8",
"token": "f62f837d0c9fda331fd6ce35d0017a16",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"
}
data = {
"ware_name": "口罩", "ware_model": "", "ware_brand_name": "漢盾", "pagesize": 10, "pageindex": 2,
"sc_id": "4A6F7946-0704-41B2-8027-2CC13B6E96F2"}
result = a.add_one_tasks(
task='https://zc.plap.mil.cn/productdetail.html?productguid=118fc555-e384-11eb-89a9-fefcfe9556b7',
data=json.dumps(data),
headers=headers,
method="POST") # 模擬動態添加任務
print(result)

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved