通知设置 新通知
akshare获取reits数据,搞笑,数据源测试过没有呀,没有集思录会员还没获取?
李魔佛 发表了文章 • 0 个评论 • 1834 次浏览 • 2022-11-07 21:52
限量: 单次返回所有 REITs 的基本信息数据
估计是旧的,以前游客就可以获取到数据的了。现在就不行了。
然后另外一个获取东财的函数,获取了20个就完事了。
结果人间现在超过20家,获取的数据都不全的。
akshare,用的还是有点糟心。长期来看,还是自己写靠谱,有些坑,你第一次用就可以发现,但是有些坑,却是埋在那里,像个定时炸弹。
akshare的代码:
查看全部
星球文章 获取所有文章 爬虫
李魔佛 发表了文章 • 0 个评论 • 2242 次浏览 • 2022-06-03 13:49
群里没有人没有吐槽过这个搜索功能的。
所以只好自己写个程序把自己的文章抓下来,作为文章目录:
生成的markdown文件
每次只需要运行python main.py 就可以拿到最新的星球文章链接了。
需要源码可以在公众号联系~
查看全部
知识星球获取文章链接与数据
李魔佛 发表了文章 • 0 个评论 • 2176 次浏览 • 2022-03-21 20:15
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。
想用python爬虫批量下载数据,下载下来的数据是excel表格形式,但是源码下载的链接如下,请问这样可以爬吗?
低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 2637 次浏览 • 2021-11-26 13:20
为什么登录成功但是爬取不到其他数据
低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 2705 次浏览 • 2021-08-04 01:17
韦世东 python3网络爬虫宝典 勘误
李魔佛 发表了文章 • 0 个评论 • 2696 次浏览 • 2021-05-21 20:06
1. 时间差是正数才是过期
2. 获取权限那里,permission = args[0].permission
不能后面再用get
P222:
写入mongodb后,原来的数据会被加入一个_id,值为OjectId,该值是无法被json dumps为string,
所以需要手工把ObjectId 转为str,或者del message['_id'] 将这个键去除。
查看全部
1. 时间差是正数才是过期
2. 获取权限那里,permission = args[0].permission
不能后面再用get
P222:
写入mongodb后,原来的数据会被加入一个_id,值为OjectId,该值是无法被json dumps为string,
所以需要手工把ObjectId 转为str,或者del message['_id'] 将这个键去除。
pyautogui无法再远程桌面最小化或者断线后进行截图
李魔佛 发表了文章 • 0 个评论 • 3346 次浏览 • 2021-04-29 17:19
知道的朋友可以私信下我。
国外的论坛也找不到答案,只能一直开着屏幕了。。。。
知道的朋友可以私信下我。
国外的论坛也找不到答案,只能一直开着屏幕了。。。。
pyppeteer 在AppData下的dev_profile 生成大量文件
李魔佛 发表了文章 • 0 个评论 • 3224 次浏览 • 2021-04-28 12:18
C:\Users\xda\AppData\Local\pyppeteer\pyppeteer\.dev_profile
运行次数多了,这个目录下积累了几十个G的文件。
因为每次启动pyppeteer后,如果不指定userData目录,会在dev_profile生成一个新的userData目录,每次大概30MB左右的打小,所以启动的次数,越多,这个文件夹的体积就越大。
其实可以直接删除,然后启动pyppeteer是加上一个参数:userDataDir
browser = await pyppeteer.launch(
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000},
# 'enable-automation':False,
# 'ignoreDefaultArgs':['--enable-automation'],
'ignoreDefaultArgs':True,
}userDataDir='D:\Temp'
这样每次pyppeteer都会用同一个配置文件,并且还可以把cookies,session文件存在同一个地方,如果登录过的网站,下次可以直接登录,不需要再次输入账号密码。
查看全部
C:\Users\xda\AppData\Local\pyppeteer\pyppeteer\.dev_profile
运行次数多了,这个目录下积累了几十个G的文件。
因为每次启动pyppeteer后,如果不指定userData目录,会在dev_profile生成一个新的userData目录,每次大概30MB左右的打小,所以启动的次数,越多,这个文件夹的体积就越大。
其实可以直接删除,然后启动pyppeteer是加上一个参数:userDataDir
browser = await pyppeteer.launch(userDataDir='D:\Temp'
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000},
# 'enable-automation':False,
# 'ignoreDefaultArgs':['--enable-automation'],
'ignoreDefaultArgs':True,
}
这样每次pyppeteer都会用同一个配置文件,并且还可以把cookies,session文件存在同一个地方,如果登录过的网站,下次可以直接登录,不需要再次输入账号密码。
pymongo update_one/update_many 返回更新数据的数目
李魔佛 发表了文章 • 0 个评论 • 5109 次浏览 • 2021-04-21 18:54
ret = self.doc.update_one({'announcementId':announcementId},{'$setOnInsert':item},upsert=True)
作用是如果某个id不存在时,则把数据插入文档,如果存在,则不进行任何操作。
那么ret是操作的返回结果。
我们可以对返回数据进行核对
ret.matched_count, ret.modified_count前者是匹配到文档的数目,而后者是修改了文档的个数。
查看全部
ret = self.doc.update_one({'announcementId':announcementId},{'$setOnInsert':item},upsert=True)
作用是如果某个id不存在时,则把数据插入文档,如果存在,则不进行任何操作。
那么ret是操作的返回结果。
我们可以对返回数据进行核对
ret.matched_count, ret.modified_count前者是匹配到文档的数目,而后者是修改了文档的个数。
pyppeteer-pdf not support latest version of pypeteer
李魔佛 发表了文章 • 0 个评论 • 2460 次浏览 • 2021-04-04 18:23
只好下载一个低版本的chrouium放到本地。
或者使用另一个库
https://github.com/shivanshs9/pdfgen-python
pyppeteer禁用自动化提示栏 --enable-automation参数关闭
李魔佛 发表了文章 • 0 个评论 • 5062 次浏览 • 2021-04-04 14:49
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000},
# 'enable-automation':False,
'ignoreDefaultArgs':['--enable-automation'],
}
)
忽略默认参数:
'ignoreDefaultArgs':['--enable-automation'],
即可,
如果需要去除更多的特征,在列表里面加入更多即可。
如果全部不要,那么 就把它设置为True即可。
查看全部
browser = await pyppeteer.launch(
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000},
# 'enable-automation':False,
'ignoreDefaultArgs':['--enable-automation'],
}
)
忽略默认参数:
'ignoreDefaultArgs':['--enable-automation'],
即可,
如果需要去除更多的特征,在列表里面加入更多即可。
如果全部不要,那么 就把它设置为True即可。
pyppeteer设置浏览器大小
李魔佛 发表了文章 • 0 个评论 • 3538 次浏览 • 2021-04-04 12:06
browser = await pyppeteer.launch(
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000}
}
)
更多参数可以查看pyppeteer的源码。
self.handleSIGINT = options.get('handleSIGINT', True)
self.handleSIGTERM = options.get('handleSIGTERM', True)
self.handleSIGHUP = options.get('handleSIGHUP', True)
self.ignoreHTTPSErrors = options.get('ignoreHTTPSErrors', False)
self.defaultViewport = options.get('defaultViewport', {'width': 800, 'height': 600}) # noqa: E501
self.slowMo = options.get('slowMo', 0)
self.timeout = options.get('timeout', 30000)
self.autoClose = options.get('autoClose', True)
查看全部
browser = await pyppeteer.launch(
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000}
}
)
更多参数可以查看pyppeteer的源码。
self.handleSIGINT = options.get('handleSIGINT', True)
self.handleSIGTERM = options.get('handleSIGTERM', True)
self.handleSIGHUP = options.get('handleSIGHUP', True)
self.ignoreHTTPSErrors = options.get('ignoreHTTPSErrors', False)
self.defaultViewport = options.get('defaultViewport', {'width': 800, 'height': 600}) # noqa: E501
self.slowMo = options.get('slowMo', 0)
self.timeout = options.get('timeout', 30000)
self.autoClose = options.get('autoClose', True)
pyppeteer下载chromedriver失败的解决办法
李魔佛 发表了文章 • 0 个评论 • 3876 次浏览 • 2021-04-01 00:05
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='storage.googleapis.com', port=443): Max retries exceeded with url: /chromium-browser-snapshots/Win_
x64/588429/chrome-win32.zip (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x00000000037D3880>: Failed to establish a new connect
ion: [Errno 11004] getaddrinfo failed'))
用浏览器看了,发现根本打不开。
然后谷歌了一下它的镜像,实际链接为:
https://commondatastorage.googleapis.com/chromium-browser-snapshots/Win_x64/575458/chrome-win32.zip
所以,哎,这个库真的有点烂。
直接下载上面的链接,然后解压到本地路径,然后在设置一个环境变量指向这个目录即可。
查看全部
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='storage.googleapis.com', port=443): Max retries exceeded with url: /chromium-browser-snapshots/Win_用浏览器看了,发现根本打不开。
x64/588429/chrome-win32.zip (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x00000000037D3880>: Failed to establish a new connect
ion: [Errno 11004] getaddrinfo failed'))
然后谷歌了一下它的镜像,实际链接为:
https://commondatastorage.googleapis.com/chromium-browser-snapshots/Win_x64/575458/chrome-win32.zip
所以,哎,这个库真的有点烂。
直接下载上面的链接,然后解压到本地路径,然后在设置一个环境变量指向这个目录即可。
requests使用断点续传时注意要加stream=True,不然你的硬盘可能会爆掉
李魔佛 发表了文章 • 0 个评论 • 4647 次浏览 • 2021-03-14 00:56
with open(dst, "ab") as f:
dp = Down_progress(file_size, first_size, dst)
dp.start()
chunk_size = 1024
for chunk in res.iter_content(chunk_size = chunk_size):
if chunk:
f.write(chunk)
dp.update(chunk_size)
如果不加stream=True,那么你的硬盘很可能就不停被写入,文件会变得无比巨大,最后磁盘空间不够死机。
不要问我为什么知道。 查看全部
res = requests.get(url, stream=True, headers=headers, verify=False)
with open(dst, "ab") as f:
dp = Down_progress(file_size, first_size, dst)
dp.start()
chunk_size = 1024
for chunk in res.iter_content(chunk_size = chunk_size):
if chunk:
f.write(chunk)
dp.update(chunk_size)
如果不加stream=True,那么你的硬盘很可能就不停被写入,文件会变得无比巨大,最后磁盘空间不够死机。
不要问我为什么知道。
不用一行代码 下载雪球嘉年华视频
李魔佛 发表了文章 • 0 个评论 • 3769 次浏览 • 2020-12-09 14:43
听说今年着重分享一些观念,抱着好奇心,就打算下载几部来看看。
雪球网站很简单,只要找到下载链接就可以下载了。
第一步。打开一个视频播放的页面,比如大金链的
11737544 粉丝主会场 | 巅峰对谈:金牛双子星主动VS量化
https://xueqiu.com/video/5285890810945319765
右键,查看源码,然后在源码里面试着查找 mp4,flv,webp等流媒体字样。
在这里找到一个了:
但是这个视频下载地址有很多转义字符:http:\u002F\u002F1256122120.vod2.myqcloud.com\u002F53ad1740vodtranscq1256122120\u002F17ebe5145285890810945319765\u002Fv.f20.mp4
直接在浏览器是无法直接打开的。
可以直接替换\u002f 为一个斜杠 \ 就可以了。
如果嫌麻烦,可以在浏览器里面,按下F12,在console页面里面输入上面的地址,前后加个双引号,然后回车,就可以得到完整的地址了。
原创文章,转载请注明出处
http://30daydo.com/article/44119
查看全部
听说今年着重分享一些观念,抱着好奇心,就打算下载几部来看看。
雪球网站很简单,只要找到下载链接就可以下载了。
第一步。打开一个视频播放的页面,比如大金链的
11737544 粉丝主会场 | 巅峰对谈:金牛双子星主动VS量化
https://xueqiu.com/video/5285890810945319765
右键,查看源码,然后在源码里面试着查找 mp4,flv,webp等流媒体字样。
在这里找到一个了:
但是这个视频下载地址有很多转义字符:
http:\u002F\u002F1256122120.vod2.myqcloud.com\u002F53ad1740vodtranscq1256122120\u002F17ebe5145285890810945319765\u002Fv.f20.mp4
直接在浏览器是无法直接打开的。
可以直接替换\u002f 为一个斜杠 \ 就可以了。
如果嫌麻烦,可以在浏览器里面,按下F12,在console页面里面输入上面的地址,前后加个双引号,然后回车,就可以得到完整的地址了。
原创文章,转载请注明出处
http://30daydo.com/article/44119
P站 视频下载 JS加密URL 【JS逆向】
李魔佛 发表了文章 • 0 个评论 • 29610 次浏览 • 2020-12-02 10:43
它的真实下载地址是隐藏在JS加密当中的。
接下来我们看看如何找到它的真实地址。
首先随便打开一个视频:
我就找一个python学习的视频(什么鬼,上面还有python学习? 是的有的,还有很多数学题目在上面讲解的呢)
然后按F12
找到一个疑似的下载地址
试下拷贝直接去打开,发现无法打开,显示403 fobiden。
但是看url,应该就是最终的下载url的格式,通过最后的随机数控制播放权限。
然后打开页面的源码
在里面找下有没有一些mp4等的字符, 然后发现有个720p,1080p不同分辨率格式的字段,我们把这个JavaScript的代码扣下来,然后保存为p_hub.js
用vs code 或者其他编辑器打开
看到了他们生成过程了吗?
然后尝试用nodejs运行一下。
什么都没有输出。
因为上面代码并没有任何输出语句,我们在最后加一个 console.log(quality_720p); 就会有输出的了。
看到了吗?
然后拿这个地址去试试,看能否播放和下载。
果然,浏览器里面出现了一个, 额, python学习的页面,一个单独的视频页面,所以可以直接右键,弹出一个保存视频的菜单,然后可以直接下载了。
用代码requesets.get(url) 保存text.content 写入文件,就可以把视频保存到本地了。
源码:#!/usr/bin/env python
# http://30daydo.com
import os
import re
import js2py
import requests
from lxml import etree
from clint.textui import progress
import fire
from loguru import logger
file='crawler'
logger.add(
"logs/%s.log" % file,
format="{time:MM-DD HH:mm:ss} {level} {message}",
)
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36",
}
proxies = {}
# 如果代理不稳定,不推荐使用
# local proxy service
# proxies example:
# proxies = {
# "http": "socks5://127.0.0.1:1080",
# "https": "socks5://127.0.0.1:1080",
# }
def list_page(url):
logger.info("crawling : %s" % url)
resp = requests.get(url, headers=headers, proxies=proxies, verify=False)
html = etree.HTML(resp.text)
buff = '//*[@class="phimage"]/a/'
names = html.xpath(f"{buff}@href")
urls = html.xpath(f"{buff}img/@data-mediabook")
for i in range(len(urls)):
try:
url = urls
[i] name = re.findall("=ph(\w+)", names[i])[-1]
logger.info(f"{url} {name}")
download(url, name, "webm")
except Exception as err:
logger.error(err)
def detail_page(url):
s = requests.Session()
resp = s.get(url, headers=headers, proxies=proxies, verify=False)
html = etree.HTML(resp.content)
title = "".join(html.xpath("//h1//text()")).strip()
logger.info(title)
js_temp = html.xpath("//script/text()")
for j in js_temp:
if "flashvars" in j:
videoUrl = exeJs(j)
download(videoUrl, title, "mp4")
continue
def exeJs(js):
flashvars = re.findall("flashvars_\d+", js)[0]
js = "\n\t".join(js.split("\n\t")[:-5]).strip()
js = js.replace("// var nextVideoObject = flashvars_['nextVideo'];",'')
js+=flashvars
res = js2py.eval_js(js)
if res.quality_720p:
return res.quality_720p
elif res.quality_480p:
return res.quality_480p
elif res.quality_240p:
return res.quality_240p
else:
logger.error("parse url error")
def download(url, name, filetype):
logger.info(f"{url} {name} {filetype}")
filepath = "%s/%s.%s" % (filetype, name, filetype)
if os.path.exists(filepath):
logger.info("this file had been downloaded :: %s" % filepath)
return
else:
response = requests.get(url, headers=headers, proxies=proxies, stream=True)
with open(filepath, "wb") as file:
total_length = int(response.headers.get("content-length"))
for ch in progress.bar(
response.iter_content(chunk_size=2391975),
expected_size=(total_length / 1024) + 1,
):
if ch:
file.write(ch)
logger.info("download success :: %s" % filepath)
def run(_arg=None):
paths = ["webm", "mp4"]
for path in paths:
if not os.path.exists(path):
os.mkdir(path)
if _arg == "webm":
# https://www.pornhub.com/categories
urls = [
# "https://www.pornhub.com/video?o=tr",
# "https://www.pornhub.com/video?o=ht",
# "https://www.pornhub.com/video?o=mv",
"https://www.pornhub.com/video",
]
for url in urls:
list_page(url)
elif _arg == "mp4":
with open("download.txt", "r") as file:
keys = list(set(file.readlines()))
logger.info(keys)
keys += [d.strip(".webm") for d in os.listdir("webm/")]
for key in keys:
if not key.strip():
continue
url = "https://www.pornhub.com/view_v ... ot%3B % key.strip()
logger.info("url: {}", url)
detail_page(url)
else:
_str = """
tips:
python crawler.py webm
- 下载热门页面的缩略图,路径为webm文件夹下
python crawler.py mp4
- 该命令会下载webm文件下对应的mp4文件
- 也可以将目标地址写入download.txt中
"""
logger.info(_str)
return
logger.info("finish !")
if __name__ == "__main__":
fire.Fire(run)
[/i][/i]
[i]原创文章,
转载请注明出处:
http://30daydo.com/article/44115
[/i]
[i]
[/i] 查看全部
它的真实下载地址是隐藏在JS加密当中的。
接下来我们看看如何找到它的真实地址。
首先随便打开一个视频:
我就找一个python学习的视频(什么鬼,上面还有python学习? 是的有的,还有很多数学题目在上面讲解的呢)
然后按F12
找到一个疑似的下载地址
试下拷贝直接去打开,发现无法打开,显示403 fobiden。
但是看url,应该就是最终的下载url的格式,通过最后的随机数控制播放权限。
然后打开页面的源码
在里面找下有没有一些mp4等的字符, 然后发现有个720p,1080p不同分辨率格式的字段,我们把这个JavaScript的代码扣下来,然后保存为p_hub.js
用vs code 或者其他编辑器打开
看到了他们生成过程了吗?
然后尝试用nodejs运行一下。
什么都没有输出。
因为上面代码并没有任何输出语句,我们在最后加一个 console.log(quality_720p); 就会有输出的了。
看到了吗?
然后拿这个地址去试试,看能否播放和下载。
果然,浏览器里面出现了一个, 额, python学习的页面,一个单独的视频页面,所以可以直接右键,弹出一个保存视频的菜单,然后可以直接下载了。
用代码requesets.get(url) 保存text.content 写入文件,就可以把视频保存到本地了。
源码:
#!/usr/bin/env python
# http://30daydo.com
import os
import re
import js2py
import requests
from lxml import etree
from clint.textui import progress
import fire
from loguru import logger
file='crawler'
logger.add(
"logs/%s.log" % file,
format="{time:MM-DD HH:mm:ss} {level} {message}",
)
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36",
}
proxies = {}
# 如果代理不稳定,不推荐使用
# local proxy service
# proxies example:
# proxies = {
# "http": "socks5://127.0.0.1:1080",
# "https": "socks5://127.0.0.1:1080",
# }
def list_page(url):
logger.info("crawling : %s" % url)
resp = requests.get(url, headers=headers, proxies=proxies, verify=False)
html = etree.HTML(resp.text)
buff = '//*[@class="phimage"]/a/'
names = html.xpath(f"{buff}@href")
urls = html.xpath(f"{buff}img/@data-mediabook")
for i in range(len(urls)):
try:
url = urls
[i] name = re.findall("=ph(\w+)", names[i])[-1]
logger.info(f"{url} {name}")
download(url, name, "webm")
except Exception as err:
logger.error(err)
def detail_page(url):
s = requests.Session()
resp = s.get(url, headers=headers, proxies=proxies, verify=False)
html = etree.HTML(resp.content)
title = "".join(html.xpath("//h1//text()")).strip()
logger.info(title)
js_temp = html.xpath("//script/text()")
for j in js_temp:
if "flashvars" in j:
videoUrl = exeJs(j)
download(videoUrl, title, "mp4")
continue
def exeJs(js):
flashvars = re.findall("flashvars_\d+", js)[0]
js = "\n\t".join(js.split("\n\t")[:-5]).strip()
js = js.replace("// var nextVideoObject = flashvars_['nextVideo'];",'')
js+=flashvars
res = js2py.eval_js(js)
if res.quality_720p:
return res.quality_720p
elif res.quality_480p:
return res.quality_480p
elif res.quality_240p:
return res.quality_240p
else:
logger.error("parse url error")
def download(url, name, filetype):
logger.info(f"{url} {name} {filetype}")
filepath = "%s/%s.%s" % (filetype, name, filetype)
if os.path.exists(filepath):
logger.info("this file had been downloaded :: %s" % filepath)
return
else:
response = requests.get(url, headers=headers, proxies=proxies, stream=True)
with open(filepath, "wb") as file:
total_length = int(response.headers.get("content-length"))
for ch in progress.bar(
response.iter_content(chunk_size=2391975),
expected_size=(total_length / 1024) + 1,
):
if ch:
file.write(ch)
logger.info("download success :: %s" % filepath)
def run(_arg=None):
paths = ["webm", "mp4"]
for path in paths:
if not os.path.exists(path):
os.mkdir(path)
if _arg == "webm":
# https://www.pornhub.com/categories
urls = [
# "https://www.pornhub.com/video?o=tr",
# "https://www.pornhub.com/video?o=ht",
# "https://www.pornhub.com/video?o=mv",
"https://www.pornhub.com/video",
]
for url in urls:
list_page(url)
elif _arg == "mp4":
with open("download.txt", "r") as file:
keys = list(set(file.readlines()))
logger.info(keys)
keys += [d.strip(".webm") for d in os.listdir("webm/")]
for key in keys:
if not key.strip():
continue
url = "https://www.pornhub.com/view_v ... ot%3B % key.strip()
logger.info("url: {}", url)
detail_page(url)
else:
_str = """
tips:
python crawler.py webm
- 下载热门页面的缩略图,路径为webm文件夹下
python crawler.py mp4
- 该命令会下载webm文件下对应的mp4文件
- 也可以将目标地址写入download.txt中
"""
logger.info(_str)
return
logger.info("finish !")
if __name__ == "__main__":
fire.Fire(run)
[/i][/i]
[i]原创文章,
转载请注明出处:
http://30daydo.com/article/44115
[/i]
[i] [/i]
简单快速下载知乎视频
李魔佛 发表了文章 • 0 个评论 • 5222 次浏览 • 2020-11-29 23:03
1. 打开视频前按F12
2. 播放视频
3. 查看F12的网络选项
4. 找到 https://vdn3.vzuu.com 的url
5. 对应的整个url链接就是视频的真实下载地址。把url复制到浏览器打开,然后右键另存为本地视频就可以了
查看全部
1. 打开视频前按F12
2. 播放视频
3. 查看F12的网络选项
4. 找到 https://vdn3.vzuu.com 的url
5. 对应的整个url链接就是视频的真实下载地址。把url复制到浏览器打开,然后右键另存为本地视频就可以了
集思录用户名密码JS加密流程解密 【JS加密破解教程一】
李魔佛 发表了文章 • 0 个评论 • 4926 次浏览 • 2020-11-27 17:26
而且提交的内容每次都固定,第一种最傻的方式就是每次提交就把加密的用户名和密码提交上去。
当然,对于有钻研的读者,可能想看看其具体的加密方式。
那么就按照流程,通过断点与搜索,找到其加密方法。
首先在上面的截图中很明显就知道,这个字符加密应该是aes,因为它的提交字段中aes:1
按F12,走完整个登录流程。
然后搜索password字样的地方。
在index.html首页中找到一处:
然后在该password的地方打个断点,然后跳转到jslencode 的地方。
跳转到的地方是这里。
然后我们浏览一下这个JS页面,尝试把整个JS代码抠出来。
放到我们的调试软件中,比较常用的是鬼鬼JS调试器。(有需要的可以关注wx公众号下载:回复 鬼鬼JS 即可)
这个调试器的好处是,可以很方便格式化JS代码,然后输入你要调试的字符,然后点击运行,可以当场拿到结果,等到结果ok了的话,就可以用python 的pyexecjs执行。
先点击代码格式化,然后在输入框里找到函数的入口:
jslencode(text, aes_key),好了,现在就把我们的密码 XXXXXX,和aes_key 放进去就可以了。
aes_key 在之前的index.html就能找到。var A397151C04723421F = '397151C04723421F';
function doLogin(){
var data = $('#login_form').serializeObjectToJson();
data['_post_type'] = 'ajax';
data['aes'] = 1;
data['user_name'] = jslencode(data['user_name'], A397151C04723421F);
data['password'] = jslencode(data['password'], A397151C04723421F);
$.post('/account/ajax/login_process/', data, function(rst){
on_login_error_processer(rst);
}, 'json');
var A397151C04723421F = '397151C04723421F';
jslencode(‘jisilupassword’, '397151C04723421F')
右下角有个系统引擎运行。
得到结果:
1d1bd2b22b8cc5c09ad8ce8f1e69b87f
对比一下第一张图里面的的post请求,发现是一样的。那么现在的JS解密就成功了一大半了。 接着我们就写python代码执行这段JS脚本。
尝试直接把JS放到一个文件,然后编写python代码# -*- coding: utf-8 -*-
# @Time : 2020/11/27 12:15
# @File : js_executor.py
# @Author : Rocky C@www.30daydo.com
import execjs
def main():
encode_user = ctx.call('jslencode', user, key)
encode_password = ctx.call('jslencode', password, key)
print(encode_user)
print(encode_password)
if __name__ == '__main__':
main()
然后发现报错: 说CryptoJS没有定义,那么我们看看代码。(function(root, factory) {
if (typeof exports === "object") {
module.exports = exports = factory()
} else {
if (typeof define === "function" && define.amd) {
define(, factory)
} else {
root.CryptoJS = factory()
}
}
}(this, function() {
var CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
发现这一行此时CryptoJS的定义
var CryptoJS = CryptoJS || (function(Math, undefined) { var create = Object.create || (function() {
那么我们把这一行上面的全部删除。最后的调试后,能够执行的JS代码如下,并保存为 集思录.js 文件,调用上面的python文件。
集思录.jsvar CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
function F() {}
return function(obj) {
var subtype;
F.prototype = obj;
subtype = new F();
F.prototype = null;
return subtype
}
}());
var C = {};
var C_lib = C.lib = {};
var Base = C_lib.Base = (function() {
下面的就跟上面的一模一样了:
源文件地址:https://www.jisilu.cn/static/js/crypto-js-3.3.0-min.js然后python运行后得到加密后的aes数据。
对比一下鬼鬼JS调试器的结果,一样的。
OK,手工。
这里汇聚了平时整理的JS破解工作流,大家可以参考参考。
https://github.com/Rockyzsu/JS-Reverse
原创文章,转载请注明出处:
http://30daydo.com/article/44109
喜欢的朋友可以加入星球探讨: 查看全部
而且提交的内容每次都固定,第一种最傻的方式就是每次提交就把加密的用户名和密码提交上去。
当然,对于有钻研的读者,可能想看看其具体的加密方式。
那么就按照流程,通过断点与搜索,找到其加密方法。
首先在上面的截图中很明显就知道,这个字符加密应该是aes,因为它的提交字段中aes:1
按F12,走完整个登录流程。
然后搜索password字样的地方。
在index.html首页中找到一处:
然后在该password的地方打个断点,然后跳转到jslencode 的地方。
跳转到的地方是这里。
然后我们浏览一下这个JS页面,尝试把整个JS代码抠出来。
放到我们的调试软件中,比较常用的是鬼鬼JS调试器。(有需要的可以关注wx公众号下载:回复 鬼鬼JS 即可)
这个调试器的好处是,可以很方便格式化JS代码,然后输入你要调试的字符,然后点击运行,可以当场拿到结果,等到结果ok了的话,就可以用python 的pyexecjs执行。
先点击代码格式化,然后在输入框里找到函数的入口:
jslencode(text, aes_key),好了,现在就把我们的密码 XXXXXX,和aes_key 放进去就可以了。
aes_key 在之前的index.html就能找到。
var A397151C04723421F = '397151C04723421F';
function doLogin(){
var data = $('#login_form').serializeObjectToJson();
data['_post_type'] = 'ajax';
data['aes'] = 1;
data['user_name'] = jslencode(data['user_name'], A397151C04723421F);
data['password'] = jslencode(data['password'], A397151C04723421F);
$.post('/account/ajax/login_process/', data, function(rst){
on_login_error_processer(rst);
}, 'json');
var A397151C04723421F = '397151C04723421F';
jslencode(‘jisilupassword’, '397151C04723421F')
右下角有个系统引擎运行。
得到结果:
1d1bd2b22b8cc5c09ad8ce8f1e69b87f
对比一下第一张图里面的的post请求,发现是一样的。那么现在的JS解密就成功了一大半了。 接着我们就写python代码执行这段JS脚本。
尝试直接把JS放到一个文件,然后编写python代码
# -*- coding: utf-8 -*-
# @Time : 2020/11/27 12:15
# @File : js_executor.py
# @Author : Rocky C@www.30daydo.com
import execjs
def main():
encode_user = ctx.call('jslencode', user, key)
encode_password = ctx.call('jslencode', password, key)
print(encode_user)
print(encode_password)
if __name__ == '__main__':
main()
然后发现报错: 说CryptoJS没有定义,那么我们看看代码。
(function(root, factory) {
if (typeof exports === "object") {
module.exports = exports = factory()
} else {
if (typeof define === "function" && define.amd) {
define(, factory)
} else {
root.CryptoJS = factory()
}
}
}(this, function() {
var CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
发现这一行此时CryptoJS的定义
var CryptoJS = CryptoJS || (function(Math, undefined) { var create = Object.create || (function() {
那么我们把这一行上面的全部删除。最后的调试后,能够执行的JS代码如下,并保存为 集思录.js 文件,调用上面的python文件。
集思录.js
var CryptoJS = CryptoJS || (function(Math, undefined) {下面的就跟上面的一模一样了:
var create = Object.create || (function() {
function F() {}
return function(obj) {
var subtype;
F.prototype = obj;
subtype = new F();
F.prototype = null;
return subtype
}
}());
var C = {};
var C_lib = C.lib = {};
var Base = C_lib.Base = (function() {
源文件地址:
https://www.jisilu.cn/static/js/crypto-js-3.3.0-min.js然后python运行后得到加密后的aes数据。
对比一下鬼鬼JS调试器的结果,一样的。
OK,手工。
这里汇聚了平时整理的JS破解工作流,大家可以参考参考。
https://github.com/Rockyzsu/JS-Reverse
原创文章,转载请注明出处:
http://30daydo.com/article/44109
喜欢的朋友可以加入星球探讨:
asyncio 异步爬取vs requests同步爬取 性能对比
李魔佛 发表了文章 • 0 个评论 • 3336 次浏览 • 2020-11-25 11:21
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService
SLEEP = 2
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}
class AsyncMongo():
def __init__(self):
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.db = self.client['db_stock']
async def update(self, table,data):
self.doc= self.db[table]
await self.doc.insert_many(data)
class Holdle(BaseService):
def __init__(self):
super(Holdle, self).__init__()
self.data_processor = AsyncMongo()
self.tables_list =['ROE','Cash_Ratio','Gross_Margin','Operation_Margin','Net_Profit_Ratio','Dividend_ratio']
async def home_page(self):
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url=URL_MAP['home_page'], headers=headers) as response:
html = await response.text() # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
task_list = []
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
industry_name = industry_name.replace('-', '').strip()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
task = asyncio.ensure_future(self.detail_list(session, industry_url, json_data))
task_list.append(task)
await asyncio.gather(*task_list)
end = time.time()
print(f'time used {end - start}')
async def detail_list(self, session, url, json_data):
async with session.get(URL_MAP['base'] + url, headers=headers) as response:
response = await response.text()
await self.parse_detail(response, json_data)
async def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
industry=json_data['industry_name']
tables = resp.xpath('//table[@class="table table-bordered"]')
if len(tables)!=6:
raise ValueError
for index,table in enumerate(self.tables_list):
rows = tables[index].xpath('.//tr')
result = []
for row in rows[1:]:
stock_name = row.xpath('.//td[1]/text()').extract_first()
value = row.xpath('.//td[2]/text()').extract_first()
value = float(value)
d={'industry':industry,'name':stock_name,'value':value,'crawltime':datetime.datetime.now()}
result.append(d)
await self.data_processor.update(table,result)
app = Holdle()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.home_page())
爬完并且入库,用时大约为35s
使用requests爬取
# -*- coding: utf-8 -*-
# @Time : 2020/11/24 21:42
# @File : sync_spider.py
# @Author : Rocky C@www.30daydo.com
import requests
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService
SLEEP = 2
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}
class Holdle(BaseService):
def __init__(self):
super(Holdle, self).__init__()
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.session = requests.Session()
def run(self):
start = time.time()
response = self.session.get(url=URL_MAP['home_page'], headers=headers)
html = response.text # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
self.detail_list(industry_url, json_data)
end = time.time()
print(f'time used {end-start}')
def detail_list(self, url, json_data):
response = self.session.get(URL_MAP['base']+url, headers=headers)
response =response.text
self.parse_detail(response, json_data)
def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
title =resp.xpath('//title/text()').extract_first()
print(title)
app = Holdle()
app.run()
用时约160s,而且这里还省略了mongo入库的时间。上面异步爬取里面包含了异步存入mongo。
所以单从网络IO性能上来说,异步是比纯同步要快很多。
但是,async的生态做得不是太好,第三方的异步框架做得也不够完善。
因为如果系统中引入了异步,很多耗时的地方也是需要使用异步的写法和框架,不然会导致系统的控制权没有被正确转移。
水文一篇。
完毕
查看全部
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService
SLEEP = 2
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}
class AsyncMongo():
def __init__(self):
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.db = self.client['db_stock']
async def update(self, table,data):
self.doc= self.db[table]
await self.doc.insert_many(data)
class Holdle(BaseService):
def __init__(self):
super(Holdle, self).__init__()
self.data_processor = AsyncMongo()
self.tables_list =['ROE','Cash_Ratio','Gross_Margin','Operation_Margin','Net_Profit_Ratio','Dividend_ratio']
async def home_page(self):
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url=URL_MAP['home_page'], headers=headers) as response:
html = await response.text() # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
task_list = []
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
industry_name = industry_name.replace('-', '').strip()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
task = asyncio.ensure_future(self.detail_list(session, industry_url, json_data))
task_list.append(task)
await asyncio.gather(*task_list)
end = time.time()
print(f'time used {end - start}')
async def detail_list(self, session, url, json_data):
async with session.get(URL_MAP['base'] + url, headers=headers) as response:
response = await response.text()
await self.parse_detail(response, json_data)
async def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
industry=json_data['industry_name']
tables = resp.xpath('//table[@class="table table-bordered"]')
if len(tables)!=6:
raise ValueError
for index,table in enumerate(self.tables_list):
rows = tables[index].xpath('.//tr')
result = []
for row in rows[1:]:
stock_name = row.xpath('.//td[1]/text()').extract_first()
value = row.xpath('.//td[2]/text()').extract_first()
value = float(value)
d={'industry':industry,'name':stock_name,'value':value,'crawltime':datetime.datetime.now()}
result.append(d)
await self.data_processor.update(table,result)
app = Holdle()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.home_page())
爬完并且入库,用时大约为35s
使用requests爬取
# -*- coding: utf-8 -*-用时约160s,而且这里还省略了mongo入库的时间。上面异步爬取里面包含了异步存入mongo。
# @Time : 2020/11/24 21:42
# @File : sync_spider.py
# @Author : Rocky C@www.30daydo.com
import requests
import sys
sys.path.append('..')
import asyncio
import datetime
import aiohttp
import re
import time
from parsel import Selector
from configure.settings import DBSelector
from common.BaseService import BaseService
SLEEP = 2
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
URL_MAP = {'home_page': 'https://holdle.com/stocks/industry', 'base': 'https://holdle.com'}
class Holdle(BaseService):
def __init__(self):
super(Holdle, self).__init__()
self.DB = DBSelector()
self.client = self.DB.mongo(location_type='qq', async_type=True)
self.session = requests.Session()
def run(self):
start = time.time()
response = self.session.get(url=URL_MAP['home_page'], headers=headers)
html = response.text # 这个阻塞
resp = Selector(text=html)
industries = resp.xpath('//ul[@class="list-unstyled"]/a')
for industry in industries:
json_data = {}
industry_url = industry.xpath('.//@href').extract_first()
industry_name = industry.xpath('.//li/text()').extract_first()
json_data['industry_url'] = industry_url
json_data['industry_name'] = industry_name
self.detail_list(industry_url, json_data)
end = time.time()
print(f'time used {end-start}')
def detail_list(self, url, json_data):
response = self.session.get(URL_MAP['base']+url, headers=headers)
response =response.text
self.parse_detail(response, json_data)
def parse_detail(self, html, json_data=None):
resp = Selector(text=html)
title =resp.xpath('//title/text()').extract_first()
print(title)
app = Holdle()
app.run()
所以单从网络IO性能上来说,异步是比纯同步要快很多。
但是,async的生态做得不是太好,第三方的异步框架做得也不够完善。
因为如果系统中引入了异步,很多耗时的地方也是需要使用异步的写法和框架,不然会导致系统的控制权没有被正确转移。
水文一篇。
完毕
免费代理ip与收费的代理ip
wanbainip 发表了文章 • 0 个评论 • 3398 次浏览 • 2020-10-30 18:00
曾经有尝试过使用免费的代理ip来搭建代理池,可是免费的代理ip不仅资源少,而且可用率、高匿性、速度等都极差,每次使用都需要借助第三方软件进行检查是否可用,严重影响效率,根本满足不了任务的需求。
收费的代理ip与免费的代理ip差距非常大,不仅拥有海量的ip资源,可用率、高匿性、速度都是极好。操作简单工作效率既然提高上去了。经过多家的测试,最终选择了性价比最高的万变ip。高质量的优质代理ip才可以真正用来防止爬虫被封锁,如果使用普通代理,爬虫的真实IP还是会暴露。新获取一批新IP 查看全部
曾经有尝试过使用免费的代理ip来搭建代理池,可是免费的代理ip不仅资源少,而且可用率、高匿性、速度等都极差,每次使用都需要借助第三方软件进行检查是否可用,严重影响效率,根本满足不了任务的需求。
收费的代理ip与免费的代理ip差距非常大,不仅拥有海量的ip资源,可用率、高匿性、速度都是极好。操作简单工作效率既然提高上去了。经过多家的测试,最终选择了性价比最高的万变ip。高质量的优质代理ip才可以真正用来防止爬虫被封锁,如果使用普通代理,爬虫的真实IP还是会暴露。新获取一批新IP
Python爬虫学习者需要注意什么?
wanbainip 发表了文章 • 0 个评论 • 3197 次浏览 • 2020-10-28 17:14
最常见的解决方法就是使用大量的ip,就是借着代理ip保证IP被封时有替换IP可用,永远保持着续航能力。这里推荐51代理ip,作为一家提供代理IP的专业服务商,万变ip代理拥有强大的技术团队运营维护,全高匿系统所产生的高匿ip不仅安全稳定、而且速度快, 以及与爬虫用户多年来合作的宝贵经验,是Python爬虫首选代理IP。
Python是一种全栈计算机程序设计语言,全栈,顾名思义,应用范围广。你可能听说过很多编程语言,例如C语言,Java语言等,众所周知,这些语言都非常难学,更别说景桐使用了。而python不一样,比如完成一个Web服务,C语言要写1000行代码,Java要写100行,而python可能只要写20行。对!这就是差距!目前由于python“简单易懂”,已逐步成为网络爬虫主流语言。
在初学python爬虫时,很多程序员会被一些“小问题”阻碍脚步,为避免大家再次犯同样的错误,加快学习进程,在爬取网站信息时一定要使用大量代理IP。好用的代理IP服务商,
高效率的爬虫工作离不开ip代理的支持,这就是ip代理越来越受欢迎的原因!收藏举报投诉 查看全部
最常见的解决方法就是使用大量的ip,就是借着代理ip保证IP被封时有替换IP可用,永远保持着续航能力。这里推荐51代理ip,作为一家提供代理IP的专业服务商,万变ip代理拥有强大的技术团队运营维护,全高匿系统所产生的高匿ip不仅安全稳定、而且速度快, 以及与爬虫用户多年来合作的宝贵经验,是Python爬虫首选代理IP。
Python是一种全栈计算机程序设计语言,全栈,顾名思义,应用范围广。你可能听说过很多编程语言,例如C语言,Java语言等,众所周知,这些语言都非常难学,更别说景桐使用了。而python不一样,比如完成一个Web服务,C语言要写1000行代码,Java要写100行,而python可能只要写20行。对!这就是差距!目前由于python“简单易懂”,已逐步成为网络爬虫主流语言。
在初学python爬虫时,很多程序员会被一些“小问题”阻碍脚步,为避免大家再次犯同样的错误,加快学习进程,在爬取网站信息时一定要使用大量代理IP。好用的代理IP服务商,
高效率的爬虫工作离不开ip代理的支持,这就是ip代理越来越受欢迎的原因!收藏举报投诉
Python爬虫虎牙平台主播的图片代码
wanbainip 发表了文章 • 0 个评论 • 3176 次浏览 • 2020-10-27 17:55
代码如下:
import urllib.request
import re
import os
# 全局变量用来记录图片的编号
gl_z = 0
def down_img(url1):
"""下载图片"""
# 处理图片链接,拼接http:
url = "https:" + re.sub(r"\?", "", url1)
global gl_z
print(url)
# 请求链接
response = urllib.request.urlopen(url)
# 读取内容
data = response.read()
# 切片取出图片名称
file_name = url[url.rfind('/') + 1:]
# 生成列表
a = [x for x in range(10000)]
# 打开文件用以写入
file = open(os.path.join("photo3", "img" + file_name + str(a[gl_z]) + ".jpg"), "wb")
file.write(data)
# 关闭文件
file.close()
# 编号加1
gl_z += 1
if __name__ == '__main__':
# 要抓去信息的网址
home = """http://www.huya.com/g/xingxiu"""
# 模拟请求头
headers = {
"Host": "www.huya.com",
"User-Agent": "agent信息"
}
# 构造好请求对象 将请求提交到服务器 获取的响应就是到首页的html代码
request = urllib.request.Request(url=home, headers=headers)
response = urllib.request.urlopen(request)
# 读取抓到的内容并解码
html_data = response.read().decode()
"""huyaimg.msstatic.com/avatar/1054/db/6590aa9bcf98e12e5d809d371e46cc_180_135.jpg
"""
# 使用正则 从首页中 提取出所有的图片链接
img_list = re.findall(r"//huyaimg\.msstatic\.com.+\.jpg\?", html_data)
print(img_list)
# 取出每张图片进行下载
for img_url in img_list:
print(img_url)
down_img(img_url) 查看全部
代码如下:
import urllib.request
import re
import os
# 全局变量用来记录图片的编号
gl_z = 0
def down_img(url1):
"""下载图片"""
# 处理图片链接,拼接http:
url = "https:" + re.sub(r"\?", "", url1)
global gl_z
print(url)
# 请求链接
response = urllib.request.urlopen(url)
# 读取内容
data = response.read()
# 切片取出图片名称
file_name = url[url.rfind('/') + 1:]
# 生成列表
a = [x for x in range(10000)]
# 打开文件用以写入
file = open(os.path.join("photo3", "img" + file_name + str(a[gl_z]) + ".jpg"), "wb")
file.write(data)
# 关闭文件
file.close()
# 编号加1
gl_z += 1
if __name__ == '__main__':
# 要抓去信息的网址
home = """http://www.huya.com/g/xingxiu"""
# 模拟请求头
headers = {
"Host": "www.huya.com",
"User-Agent": "agent信息"
}
# 构造好请求对象 将请求提交到服务器 获取的响应就是到首页的html代码
request = urllib.request.Request(url=home, headers=headers)
response = urllib.request.urlopen(request)
# 读取抓到的内容并解码
html_data = response.read().decode()
"""huyaimg.msstatic.com/avatar/1054/db/6590aa9bcf98e12e5d809d371e46cc_180_135.jpg
"""
# 使用正则 从首页中 提取出所有的图片链接
img_list = re.findall(r"//huyaimg\.msstatic\.com.+\.jpg\?", html_data)
print(img_list)
# 取出每张图片进行下载
for img_url in img_list:
print(img_url)
down_img(img_url)
Python爬虫基本框架
wanbainip 发表了文章 • 0 个评论 • 2942 次浏览 • 2020-10-25 18:01
1. 爬虫调度器负责统筹其他四个模块协调工作。
2. URL管理器负责管理URL链接,包括已爬取的链接和未爬取的链接。
3. HTML下载器用于从URL管理器中获取未爬取的链接并下载其HTML网页。
4. HTML解析器用于解析HTML下载器下载的HTML网页,获取URL链接交给URL管理器,提取要获取的数据交给数据存储器。
5. 数据存储器用于将HTML解析器解析出来的数据存储到数据库或文件。 查看全部
1. 爬虫调度器负责统筹其他四个模块协调工作。
2. URL管理器负责管理URL链接,包括已爬取的链接和未爬取的链接。
3. HTML下载器用于从URL管理器中获取未爬取的链接并下载其HTML网页。
4. HTML解析器用于解析HTML下载器下载的HTML网页,获取URL链接交给URL管理器,提取要获取的数据交给数据存储器。
5. 数据存储器用于将HTML解析器解析出来的数据存储到数据库或文件。
Python爬虫如何防止ip被封?
wanbainip 发表了文章 • 0 个评论 • 3172 次浏览 • 2020-10-24 10:22
第一种:降低访问的速度,我们可以使用 time模块中的sleep,使程序每运行一次后就睡眠1s,这样可以很有效的降低ip被封机率,但是效率效果不是很高,一般是用于量小的采集任务。
第二种:使用类似万变ip代理这样的优质换ip软件,这也是爬虫工作者最常用的手段之一,通过代理ip来伪装我们的ip,隐藏本地真实的ip地址,让目标服务器无法识别是相同ip发出的请求,这样就很有效的防止ip被封。突破了ip的限制,采集数据的任务就会顺利,工作效率自然会提高!
查看全部
第一种:降低访问的速度,我们可以使用 time模块中的sleep,使程序每运行一次后就睡眠1s,这样可以很有效的降低ip被封机率,但是效率效果不是很高,一般是用于量小的采集任务。
第二种:使用类似万变ip代理这样的优质换ip软件,这也是爬虫工作者最常用的手段之一,通过代理ip来伪装我们的ip,隐藏本地真实的ip地址,让目标服务器无法识别是相同ip发出的请求,这样就很有效的防止ip被封。突破了ip的限制,采集数据的任务就会顺利,工作效率自然会提高!
愿意付费购买商超 商品条形码对应的商品图片,有哪位大神可以帮帮忙啊?+V15032219667
回复heikekang 发起了问题 • 2 人关注 • 0 个回复 • 3633 次浏览 • 2020-09-25 19:10
python爬取网页中的超链接地址,获取到的跟浏览器中显示的不一样
acker 回复了问题 • 2 人关注 • 2 个回复 • 4470 次浏览 • 2020-09-25 16:55
python asyncio aiohttp motor异步爬虫例子 定时抓取bilibili首页热度网红
李魔佛 发表了文章 • 0 个评论 • 3923 次浏览 • 2020-09-22 22:35
AsyncIOMotorClient(connect_uri)
motor连接带用户名和密码的方法和pymongo一致。
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2020/9/22 10:07
# @File : bilibili_hot_anchor.py
# 异步爬取首页与列表
import asyncio
import datetime
import aiohttp
import re
import time
from motor.motor_asyncio import AsyncIOMotorClient
from parsel import Selector
from settings import _json_data
SLEEP = 60 * 10
INFO = _json_data['mongo']['arm']
host = INFO['host']
port = INFO['port']
user = INFO['user']
password = INFO['password']
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = AsyncIOMotorClient(connect_uri)
db = client['db_parker']
home_url = 'https://www.bilibili.com/ranking'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
def convertor(number_str):
'''
将小数点的万变为整数
:param number_str:
:return:
'''
number = re.search('(\d+\.+\d+)', number_str)
if number:
number = float(number.group(1))
if re.search('万', number_str):
number = int(number * 10000)
else:
number = 0
return number
async def home_page():
async with aiohttp.ClientSession() as session:
while True:
start = time.time()
async with session.get(url=home_url, headers=headers) as response:
html = await response.text()
resp = Selector(text=html)
items = resp.xpath('//ul[@class="rank-list"]/li')
for item in items:
json_data = {}
number = item.xpath('.//div[@class="num"]/text()').extract_first()
info = item.xpath('.//div[@class="info"][1]')
title = info.xpath('.//a/text()').extract_first()
detail_url = info.xpath('.//a/@href').extract_first()
play_number = info.xpath('.//div[@class="detail"]/span[1]/text()').extract_first()
viewing_number = info.xpath('.//div[@class="detail"]/span[2]/text()').extract_first()
json_data['number'] = int(number)
json_data['title'] = title
json_data['play_number'] = convertor(play_number)
json_data['viewing_number'] = convertor(viewing_number)
json_data['url'] = detail_url
task = asyncio.create_task(detail_list(session, detail_url, json_data))
# await detail_url()
end = time.time()
print(f'time used {end-start}')
await asyncio.sleep(SLEEP) # 暂停10分钟
print(f'sleep for {SLEEP}')
async def detail_list(session, url, json_data):
async with session.get(url, headers=headers) as response:
response = await response.text()
await parse_detail(response, json_data)
async def parse_detail(html, json_data=None):
resp = Selector(text=html)
info = resp.xpath('//div[@id="v_desc"]/div[@class="info open"]/text()').extract_first()
if not info:
info = '这个家伙很懒'
json_data['info'] = info.strip()
current = datetime.datetime.now()
json_data['crawltime'] = current
await db['bilibili'].update_one({'url': json_data['url']}, {'$set': json_data}, True, True)
loop = asyncio.get_event_loop()
loop.run_until_complete(home_page())
爬取的数据图:
原创文章,转载请注明:http://30daydo.com/article/605
需要源码,可私信。
查看全部
AsyncIOMotorClient(connect_uri)
motor连接带用户名和密码的方法和pymongo一致。
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2020/9/22 10:07
# @File : bilibili_hot_anchor.py
# 异步爬取首页与列表
import asyncio
import datetime
import aiohttp
import re
import time
from motor.motor_asyncio import AsyncIOMotorClient
from parsel import Selector
from settings import _json_data
SLEEP = 60 * 10
INFO = _json_data['mongo']['arm']
host = INFO['host']
port = INFO['port']
user = INFO['user']
password = INFO['password']
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = AsyncIOMotorClient(connect_uri)
db = client['db_parker']
home_url = 'https://www.bilibili.com/ranking'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
def convertor(number_str):
'''
将小数点的万变为整数
:param number_str:
:return:
'''
number = re.search('(\d+\.+\d+)', number_str)
if number:
number = float(number.group(1))
if re.search('万', number_str):
number = int(number * 10000)
else:
number = 0
return number
async def home_page():
async with aiohttp.ClientSession() as session:
while True:
start = time.time()
async with session.get(url=home_url, headers=headers) as response:
html = await response.text()
resp = Selector(text=html)
items = resp.xpath('//ul[@class="rank-list"]/li')
for item in items:
json_data = {}
number = item.xpath('.//div[@class="num"]/text()').extract_first()
info = item.xpath('.//div[@class="info"][1]')
title = info.xpath('.//a/text()').extract_first()
detail_url = info.xpath('.//a/@href').extract_first()
play_number = info.xpath('.//div[@class="detail"]/span[1]/text()').extract_first()
viewing_number = info.xpath('.//div[@class="detail"]/span[2]/text()').extract_first()
json_data['number'] = int(number)
json_data['title'] = title
json_data['play_number'] = convertor(play_number)
json_data['viewing_number'] = convertor(viewing_number)
json_data['url'] = detail_url
task = asyncio.create_task(detail_list(session, detail_url, json_data))
# await detail_url()
end = time.time()
print(f'time used {end-start}')
await asyncio.sleep(SLEEP) # 暂停10分钟
print(f'sleep for {SLEEP}')
async def detail_list(session, url, json_data):
async with session.get(url, headers=headers) as response:
response = await response.text()
await parse_detail(response, json_data)
async def parse_detail(html, json_data=None):
resp = Selector(text=html)
info = resp.xpath('//div[@id="v_desc"]/div[@class="info open"]/text()').extract_first()
if not info:
info = '这个家伙很懒'
json_data['info'] = info.strip()
current = datetime.datetime.now()
json_data['crawltime'] = current
await db['bilibili'].update_one({'url': json_data['url']}, {'$set': json_data}, True, True)
loop = asyncio.get_event_loop()
loop.run_until_complete(home_page())
爬取的数据图:
原创文章,转载请注明:http://30daydo.com/article/605
需要源码,可私信。
模拟登录网易163失败
xiaoai 回复了问题 • 2 人关注 • 2 个回复 • 5712 次浏览 • 2020-06-28 14:25