asyncio

asyncio

asyncio 异步爬取vs requests同步爬取 性能对比

python爬虫李魔佛 发表了文章 • 0 个评论 • 26 次浏览 • 2020-11-25 11:21 • 来自相关话题

 首先是异步爬取:
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class AsyncMongo():
def __init__(self):
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.db = self.client['db_stock']

async def update(self, table,data):
self.doc= self.db[table]
await self.doc.insert_many(data)


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()
self.data_processor = AsyncMongo()
self.tables_list =['ROE','Cash_Ratio','Gross_Margin','Operation_Margin','Net_Profit_Ratio','Dividend_ratio']

async def home_page(self):
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url=URL_MAP['home_page'], headers=headers) as response:
html = await response.text() # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
task_list = []
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
industry_name = industry_name.replace('-', '').strip()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name

task = asyncio.ensure_future(self.detail_list(session, industry_url, json_data))
task_list.append(task)

await asyncio.gather(*task_list)
end = time.time()

print(f'time used {end - start}')

async def detail_list(self, session, url, json_data):

async with session.get(URL_MAP['base'] + url, headers=headers) as response:
response = await response.text()
await self.parse_detail(response, json_data)

async def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
industry=json_data['industry_name']
tables = resp.xpath('//table[@class="table table-bordered"]')
if len(tables)!=6:
raise ValueError

for index,table in enumerate(self.tables_list):
rows = tables[index].xpath('.//tr')
result = []
for row in rows[1:]:
stock_name = row.xpath('.//td[1]/text()').extract_first()
value = row.xpath('.//td[2]/text()').extract_first()
value = float(value)
d={'industry':industry,'name':stock_name,'value':value,'crawltime':datetime.datetime.now()}
result.append(d)
await self.data_processor.update(table,result)


app = Holdle()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.home_page())
爬完并且入库,用时大约为35s
 
使用requests爬取
# -*- coding: utf-8 -*-
# @Time : 2020/11/24 21:42
# @File : sync_spider.py
# @Author : Rocky C@www.30daydo.com
import requests
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()

self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.session = requests.Session()

def run(self):
start = time.time()

response = self.session.get(url=URL_MAP['home_page'], headers=headers)
html = response.text # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
self.detail_list(industry_url, json_data)

end = time.time()
print(f'time used {end-start}')

def detail_list(self, url, json_data):

response = self.session.get(URL_MAP['base']+url, headers=headers)
response =response.text
self.parse_detail(response, json_data)

def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
title =resp.xpath('//title/text()').extract_first()
print(title)


app = Holdle()
app.run()
用时约160s,而且这里还省略了mongo入库的时间。上面异步爬取里面包含了异步存入mongo。
 
所以单从网络IO性能上来说,异步是比纯同步要快很多。
但是,async的生态做得不是太好,第三方的异步框架做得也不够完善。 
 
因为如果系统中引入了异步,很多耗时的地方也是需要使用异步的写法和框架,不然会导致系统的控制权没有被正确转移。
 
水文一篇。
完毕
  查看全部
 首先是异步爬取:
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class AsyncMongo():
def __init__(self):
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.db = self.client['db_stock']

async def update(self, table,data):
self.doc= self.db[table]
await self.doc.insert_many(data)


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()
self.data_processor = AsyncMongo()
self.tables_list =['ROE','Cash_Ratio','Gross_Margin','Operation_Margin','Net_Profit_Ratio','Dividend_ratio']

async def home_page(self):
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url=URL_MAP['home_page'], headers=headers) as response:
html = await response.text() # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
task_list = []
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
industry_name = industry_name.replace('-', '').strip()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name

task = asyncio.ensure_future(self.detail_list(session, industry_url, json_data))
task_list.append(task)

await asyncio.gather(*task_list)
end = time.time()

print(f'time used {end - start}')

async def detail_list(self, session, url, json_data):

async with session.get(URL_MAP['base'] + url, headers=headers) as response:
response = await response.text()
await self.parse_detail(response, json_data)

async def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
industry=json_data['industry_name']
tables = resp.xpath('//table[@class="table table-bordered"]')
if len(tables)!=6:
raise ValueError

for index,table in enumerate(self.tables_list):
rows = tables[index].xpath('.//tr')
result = []
for row in rows[1:]:
stock_name = row.xpath('.//td[1]/text()').extract_first()
value = row.xpath('.//td[2]/text()').extract_first()
value = float(value)
d={'industry':industry,'name':stock_name,'value':value,'crawltime':datetime.datetime.now()}
result.append(d)
await self.data_processor.update(table,result)


app = Holdle()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.home_page())

爬完并且入库,用时大约为35s
 
使用requests爬取
# -*- coding: utf-8 -*-
# @Time : 2020/11/24 21:42
# @File : sync_spider.py
# @Author : Rocky C@www.30daydo.com
import requests
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()

self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.session = requests.Session()

def run(self):
start = time.time()

response = self.session.get(url=URL_MAP['home_page'], headers=headers)
html = response.text # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
self.detail_list(industry_url, json_data)

end = time.time()
print(f'time used {end-start}')

def detail_list(self, url, json_data):

response = self.session.get(URL_MAP['base']+url, headers=headers)
response =response.text
self.parse_detail(response, json_data)

def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
title =resp.xpath('//title/text()').extract_first()
print(title)


app = Holdle()
app.run()
用时约160s,而且这里还省略了mongo入库的时间。上面异步爬取里面包含了异步存入mongo。
 
所以单从网络IO性能上来说,异步是比纯同步要快很多。
但是,async的生态做得不是太好,第三方的异步框架做得也不够完善。 
 
因为如果系统中引入了异步,很多耗时的地方也是需要使用异步的写法和框架,不然会导致系统的控制权没有被正确转移。
 
水文一篇。
完毕
 

异步asyncio加锁 的正确用法

python李魔佛 发表了文章 • 0 个评论 • 81 次浏览 • 2020-11-15 10:19 • 来自相关话题

对于全局变量count进行统计加锁
import aiohttp
import asyncio
import execjs
import threading
global pages
global count

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}

home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'

loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result


async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()

try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)

async with lock:
count=count+1

print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()



async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:

content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))

await asyncio.sleep(1)


asyncio.ensure_future(main())
loop.run_forever()
1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
  查看全部
对于全局变量count进行统计加锁
import aiohttp
import asyncio
import execjs
import threading
global pages
global count

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}

home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'

loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result


async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()

try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)

async with lock:
count=count+1

print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()



async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:

content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))

await asyncio.sleep(1)


asyncio.ensure_future(main())
loop.run_forever()

1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
 

asyncio 异步爬取vs requests同步爬取 性能对比

python爬虫李魔佛 发表了文章 • 0 个评论 • 26 次浏览 • 2020-11-25 11:21 • 来自相关话题

 首先是异步爬取:
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class AsyncMongo():
def __init__(self):
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.db = self.client['db_stock']

async def update(self, table,data):
self.doc= self.db[table]
await self.doc.insert_many(data)


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()
self.data_processor = AsyncMongo()
self.tables_list =['ROE','Cash_Ratio','Gross_Margin','Operation_Margin','Net_Profit_Ratio','Dividend_ratio']

async def home_page(self):
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url=URL_MAP['home_page'], headers=headers) as response:
html = await response.text() # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
task_list = []
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
industry_name = industry_name.replace('-', '').strip()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name

task = asyncio.ensure_future(self.detail_list(session, industry_url, json_data))
task_list.append(task)

await asyncio.gather(*task_list)
end = time.time()

print(f'time used {end - start}')

async def detail_list(self, session, url, json_data):

async with session.get(URL_MAP['base'] + url, headers=headers) as response:
response = await response.text()
await self.parse_detail(response, json_data)

async def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
industry=json_data['industry_name']
tables = resp.xpath('//table[@class="table table-bordered"]')
if len(tables)!=6:
raise ValueError

for index,table in enumerate(self.tables_list):
rows = tables[index].xpath('.//tr')
result = []
for row in rows[1:]:
stock_name = row.xpath('.//td[1]/text()').extract_first()
value = row.xpath('.//td[2]/text()').extract_first()
value = float(value)
d={'industry':industry,'name':stock_name,'value':value,'crawltime':datetime.datetime.now()}
result.append(d)
await self.data_processor.update(table,result)


app = Holdle()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.home_page())
爬完并且入库,用时大约为35s
 
使用requests爬取
# -*- coding: utf-8 -*-
# @Time : 2020/11/24 21:42
# @File : sync_spider.py
# @Author : Rocky C@www.30daydo.com
import requests
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()

self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.session = requests.Session()

def run(self):
start = time.time()

response = self.session.get(url=URL_MAP['home_page'], headers=headers)
html = response.text # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
self.detail_list(industry_url, json_data)

end = time.time()
print(f'time used {end-start}')

def detail_list(self, url, json_data):

response = self.session.get(URL_MAP['base']+url, headers=headers)
response =response.text
self.parse_detail(response, json_data)

def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
title =resp.xpath('//title/text()').extract_first()
print(title)


app = Holdle()
app.run()
用时约160s,而且这里还省略了mongo入库的时间。上面异步爬取里面包含了异步存入mongo。
 
所以单从网络IO性能上来说,异步是比纯同步要快很多。
但是,async的生态做得不是太好,第三方的异步框架做得也不够完善。 
 
因为如果系统中引入了异步,很多耗时的地方也是需要使用异步的写法和框架,不然会导致系统的控制权没有被正确转移。
 
水文一篇。
完毕
  查看全部
 首先是异步爬取:
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class AsyncMongo():
def __init__(self):
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.db = self.client['db_stock']

async def update(self, table,data):
self.doc= self.db[table]
await self.doc.insert_many(data)


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()
self.data_processor = AsyncMongo()
self.tables_list =['ROE','Cash_Ratio','Gross_Margin','Operation_Margin','Net_Profit_Ratio','Dividend_ratio']

async def home_page(self):
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url=URL_MAP['home_page'], headers=headers) as response:
html = await response.text() # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
task_list = []
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
industry_name = industry_name.replace('-', '').strip()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name

task = asyncio.ensure_future(self.detail_list(session, industry_url, json_data))
task_list.append(task)

await asyncio.gather(*task_list)
end = time.time()

print(f'time used {end - start}')

async def detail_list(self, session, url, json_data):

async with session.get(URL_MAP['base'] + url, headers=headers) as response:
response = await response.text()
await self.parse_detail(response, json_data)

async def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
industry=json_data['industry_name']
tables = resp.xpath('//table[@class="table table-bordered"]')
if len(tables)!=6:
raise ValueError

for index,table in enumerate(self.tables_list):
rows = tables[index].xpath('.//tr')
result = []
for row in rows[1:]:
stock_name = row.xpath('.//td[1]/text()').extract_first()
value = row.xpath('.//td[2]/text()').extract_first()
value = float(value)
d={'industry':industry,'name':stock_name,'value':value,'crawltime':datetime.datetime.now()}
result.append(d)
await self.data_processor.update(table,result)


app = Holdle()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.home_page())

爬完并且入库,用时大约为35s
 
使用requests爬取
# -*- coding: utf-8 -*-
# @Time : 2020/11/24 21:42
# @File : sync_spider.py
# @Author : Rocky C@www.30daydo.com
import requests
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService

SLEEP = 2

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}

URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}


class Holdle(BaseService):

def __init__(self):
super(Holdle, self).__init__()

self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.session = requests.Session()

def run(self):
start = time.time()

response = self.session.get(url=URL_MAP['home_page'], headers=headers)
html = response.text # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
self.detail_list(industry_url, json_data)

end = time.time()
print(f'time used {end-start}')

def detail_list(self, url, json_data):

response = self.session.get(URL_MAP['base']+url, headers=headers)
response =response.text
self.parse_detail(response, json_data)

def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
title =resp.xpath('//title/text()').extract_first()
print(title)


app = Holdle()
app.run()
用时约160s,而且这里还省略了mongo入库的时间。上面异步爬取里面包含了异步存入mongo。
 
所以单从网络IO性能上来说,异步是比纯同步要快很多。
但是,async的生态做得不是太好,第三方的异步框架做得也不够完善。 
 
因为如果系统中引入了异步,很多耗时的地方也是需要使用异步的写法和框架,不然会导致系统的控制权没有被正确转移。
 
水文一篇。
完毕
 

异步asyncio加锁 的正确用法

python李魔佛 发表了文章 • 0 个评论 • 81 次浏览 • 2020-11-15 10:19 • 来自相关话题

对于全局变量count进行统计加锁
import aiohttp
import asyncio
import execjs
import threading
global pages
global count

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}

home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'

loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result


async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()

try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)

async with lock:
count=count+1

print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()



async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:

content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))

await asyncio.sleep(1)


asyncio.ensure_future(main())
loop.run_forever()
1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
  查看全部
对于全局变量count进行统计加锁
import aiohttp
import asyncio
import execjs
import threading
global pages
global count

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}

home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'

loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result


async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()

try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)

async with lock:
count=count+1

print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()



async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:

content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))

await asyncio.sleep(1)


asyncio.ensure_future(main())
loop.run_forever()

1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async