程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

[Python engineers high performance crawler]

編輯:Python

Preface

How to be in spiders Use in Asynchronous operations Achieve high performance Data crawling
First, let's talk about the way of asynchronous crawler :

  1. Multithreading 、 Multi process ( Don't suggest ):
    disadvantages : unable unlimited On Multithreading or multiprocessing
    advantage : It can be used separately for method classes that are related to blocking Open thread or process , So as to realize the asynchronous execution of scripts
  2. Thread pool 、 The process of pool ( Use properly ):
    disadvantages : Thread pool or process pool There is an upper limit to the quantity Of .
    advantage : Fixed the number of threads and processes , So as to reduce the number of processes or threads created and destroyed by the system , It can be very good Reduce system overhead .
  3. Single thread + Asynchronous coroutine ( recommend ):
    Some concepts and two keywords :
    ①event_loop( The event loop ): It's like an infinite loop , We can register some functions on this event loop , When certain conditions are met , The function will be executed in a loop .
    ②coroutline( Coroutine object ): We can register the coroutine object in the event loop , It will be called by the event loop . We can use async Keyword to define a method , This method will not be executed immediately when called , Instead, it returns a coroutine object .
    ③task( Mission ):, It is a further encapsulation of the coroutine object , Contains the various states of the task .
    ④future( Mission ): Represents tasks to be or not to be performed in the future , Actually and task There is no essential difference .
    ⑤async( coroutines ): Define a coroutine .
    ⑥await( Waiting for execution ): Used to suspend the execution of a blocking method

tips:

await

await The statement must be followed by a Can wait for the object , There are three kinds of waiting objects :Python coroutines ,Task,Future. In general, it is not necessary to create in application level code Future object .

Coroutine

 coroutines (Coroutine), Also called tasklet , fibers . We usually think of threads as lightweight processes , Therefore, we also understand the process as a lightweight thread, that is, a micro thread .
The function of a coroutine is to execute a function A Can be interrupted at any time to execute functions B, Then interrupt the function B Continue executing functions A( You can switch freely ).
The interruption here , Not a function call , It's kind of like CPU The interrupt . This whole process looks like multithreading , However, there is only one thread in the process .
The advantages of synergy
Very efficient execution , Because it's a subroutine ( function ) Switching is not thread switching , Controlled by the program itself , There is no cost of switching threads . So compared to multithreading , The more threads ,
The more obvious the performance advantages of coprocessing .
No lock mechanism is needed , Because there's only one thread , There is no conflict between writing variables at the same time , There is no need to lock when controlling shared resources , Just judge the State , Therefore, the implementation efficiency is much higher .
The process can handle IO The efficiency of intensive programs , But it is not suitable for processing CPU Intensive program , If we want to give full play to CPU Utilization should be combined with multiple processes + coroutines .

asyncio

asyncio yes Python3.4 A standard library introduced , Direct built-in for asynchronous IO Support for .asyncio Module provides a tool to build concurrent applications by using CO process . It uses a single thread
Single process concurrency , All parts of the application work together , Switching tasks that can be displayed , It's usually blocked in the program I/O Context switching occurs during operation, such as waiting for reading and writing files ,
Or request network . meanwhile asyncio It also supports scheduling code to run at a specific event in the future , Thus, it supports one process waiting for another process to complete , To process system signals and identify their
He has some events .
stay asyncio Although the synchronization code used in the program will not report an error , But it also loses the meaning of concurrency , For example, network request , If you use a that only supports synchronization requests,
After a request is initiated, no other request can be initiated before the response result is received , When you want to visit multiple web pages concurrently , Even if asyncio, Sending a request
after , Switching to another collaboration will still be blocked due to synchronization problems , There can be no improvement in speed , At this time, you need other request libraries that support asynchronous operations, such as aiohttp.

Single threaded crawler

Use here requests request ,requests Is a class library for synchronous requests

import requests
headers = {

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
}
def get_content(url):
response = requests.get(url=url, headers=headers)
if response.status_code == 200:
return response.content
def parse_content(content):
print(' The length of the corresponding data is :', len(content))
if __name__ == "__main__":
urls = [
'https://item.jd.com/100030771664.html',
'https://item.jd.com/100030771664.html',
'https://item.jd.com/100030771664.html',
]
for url in urls:
content = get_content(url)
parse_content(content)

coroutines

asyncio yes Python Asynchronous in IO library , Used to write concurrent procedures , Apply to IO Blocking and requiring a lot of concurrency , Like reptiles 、 File read and write .

asyncio stay Python3.4 By introducing , After several iterations , characteristic 、 Grammar sugar has been improved to varying degrees , This also makes different versions Python stay asyncio There are different usages of , It looks a little messy , In the past, it was also based on the principle of being able to use , I took some detours in writing , Right now Python3.7+ and Python3.6 in asyncio Make a comb of the usage of , So that it can be better used in the future

import asyncio
async def request(url):
return url
c = request('www.baidu.com')
def callback_func(task):
print(task.result())
# Bind a callback 
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
# Bind the callback function to the task object 
task.add_done_callback(callback_func)
loop.run_until_complete(task)

Single thread asynchronous co process implementation

stay request On the basis of Use asynchronous IO Library asyncio

import requests
import asyncio
import time
start = time.time()
urls = [
'http://127.0.0.1:5000/111',
'http://127.0.0.1:5000/222',
'http://127.0.0.1:5000/333',
]
async def get_page(url):
print(' Downloading ', url)
response = requests.get(url)
print(' The download ', response.text)
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print(' Total time :', time.time()-start)

Thread pool crawls data

from multiprocessing.dummy import Pool as Pool
import time
def func(msg):
print('msg:', msg)
time.sleep(2)
print('end:')
# Three threads 
pool = Pool(processes=3)
for i in range(1, 5):
msg = 'hello %d' % (i)
# Non blocking 
pool.apply_async(func, (msg,))
# Blocking ,apply() From built-in function , Used for calling functions indirectly , And pass Yuanzu or dictionary as a parameter by position .
# pool.apply(func,(msg,))
# Non blocking , Pay attention to and apply The difference between the transmitted parameters 
# pool.imap(func,[msg,])
# Blocking 
# pool.map(func, [msg, ])
print('start~~~~~~~~~~~~~~~')
pool.close()
pool.join()
print('done~~~~~~~~~~~~~~~')

Here's a demonstration of aiohttp Realize multi task asynchronous collaboration

aiohttp It's a building on asyncio Upper , Support both http And support websocket A library . And support both client and server .

import asyncio
import logging
import time
import json
from threading import Thread
from aiohttp import ClientSession, ClientTimeout, TCPConnector, BasicAuth
import base64
from urllib.parse import unquote, quote
# Default request header 
HEADERS = {

'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
# "User-Agent": "curl/7.x/line",
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
}
# Default timeout 
TIMEOUT = 15
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
class AioCrawl:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.proxyServer = None
# Start the event cycle 
self.event_loop = asyncio.new_event_loop()
self.t = Thread(target=start_loop, args=(self.event_loop,))
self.t.setDaemon(True)
self.t.start()
self.concurrent = 0 # Record concurrency 
async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None):
""" Collect fiber path :param url: str :param method: 'GET' or 'POST' :param headers: dict() :param timeout: int :param cookies: :param data: dict() :param proxy: str :return: (status, content) """
method = 'POST' if method.upper() == 'POST' else 'GET'
headers = headers if headers else HEADERS
timeout = ClientTimeout(total=timeout)
cookies = cookies if cookies else None
data = data if data and isinstance(data, dict) else {
}
proxy = proxy if proxy else self.proxyServer
tcp_connector = TCPConnector(limit=64) # Disable certificate validation 
async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session:
try:
if method == 'GET':
async with session.get(url, proxy=proxy) as response:
content = await response.read()
return response.status, content
else:
async with session.post(url, data=data, proxy=proxy) as response:
content = await response.read()
return response.status, content
except Exception as e:
raise e
def callback(self, future):
""" Callback function 1. Process and convert to Result object 2. Write database """
msg = str(future.exception()) if future.exception() else 'success'
code = 1 if msg == 'success' else 0
status = future.result()[0] if code == 1 else None
data = future.result()[1] if code == 1 else b'' # Empty string 
data_len = len(data) if data else 0
if code == 0 or (status is not None and status != 200): # Print small exceptions 
self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format(
future.url, code, msg, status, data_len))
self.concurrent -= 1 # Concurrency number -1
return data
def add_tasks(self, tasks, method='GET', data=None, headers=None):
""" Add tasks :param tasks: list <class Task> :return: future """
resultList = []
for task in tasks:
headers = headers if headers else HEADERS
# asyncio.run_coroutine_threadsafe Receive a coroutine object and , Event loop object 
future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers), self.event_loop)
future.add_done_callback(self.callback) # to future Object to add a callback function 
self.concurrent += 1 # Concurrent number plus 1
result = future.result()
# print(result)
resultList.append(str(result[1], encoding="utf-8"))
return resultList
def add_one_tasks(self, task, headers=None, method='GET', data=None, proxy=None):
""" Add tasks :param tasks: list <class Task> :return: future """
future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers, proxy=proxy), self.event_loop)
future.add_done_callback(self.callback) # to future Object to add a callback function 
result = future.result()
return [str(result[1], encoding="utf-8")]
def getProductParm(self, productguid):
base = '{"productguid":"%s","areacode":"","referer":"https://zc.plap.mil.cn/productdetail.html?productguid=%s"}' % (
productguid, productguid)
# code 
base_d = quote(base)
return str(base64.b64encode(base_d.encode("utf-8")), "utf-8")
if __name__ == '__main__':
a = AioCrawl()
headers = {

"Host": "api.erp.idodb.com",
"Accept": "application/json",
"Content-Type": "application/json;charset=UTF-8",
"token": "f62f837d0c9fda331fd6ce35d0017a16",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"
}
data = {
"ware_name": " masks ", "ware_model": "", "ware_brand_name": " Han Dun ", "pagesize": 10, "pageindex": 2,
"sc_id": "4A6F7946-0704-41B2-8027-2CC13B6E96F2"}
result = a.add_one_tasks(
task='https://zc.plap.mil.cn/productdetail.html?productguid=118fc555-e384-11eb-89a9-fefcfe9556b7',
data=json.dumps(data),
headers=headers,
method="POST") # Simulate dynamically adding tasks 
print(result)

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved