python爬虫
知识星球获取文章链接与数据
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 2017 次浏览 • 2022-03-21 20:15
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。
如何使用控制台将动态加载数据刷新出来啊????
python • 低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 2450 次浏览 • 2021-08-11 02:06
为什么登录成功但是爬取不到其他数据
python爬虫 • 低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 2540 次浏览 • 2021-08-04 01:17
韦世东 python3网络爬虫宝典 勘误
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 2526 次浏览 • 2021-05-21 20:06
1. 时间差是正数才是过期
2. 获取权限那里,permission = args[0].permission
不能后面再用get
P222:
写入mongodb后,原来的数据会被加入一个_id,值为OjectId,该值是无法被json dumps为string,
所以需要手工把ObjectId 转为str,或者del message['_id'] 将这个键去除。
查看全部
1. 时间差是正数才是过期
2. 获取权限那里,permission = args[0].permission
不能后面再用get
P222:
写入mongodb后,原来的数据会被加入一个_id,值为OjectId,该值是无法被json dumps为string,
所以需要手工把ObjectId 转为str,或者del message['_id'] 将这个键去除。
茅台抢购程序 京东 苏宁
python • 李魔佛 发表了文章 • 0 个评论 • 11640 次浏览 • 2021-01-05 22:34
运行环境 windows,linux,mac,python3+
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶
苏宁家的:
============= 2021-01-11 更新 ============
感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了
main.pyimport sys
from maotai.jd_spider_requests import ProdectPurchase
if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)
product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)
jd_spider_requests.pyimport random
import time
import requests
import functools
import json
import os
import pickle
from lxml import etree
from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)
class SpiderSession:
"""
Session相关操作
"""
def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')
self.session = self._init_session()
def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session
def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}
def get_user_agent(self):
return self.user_agent
def get_session(self):
"""
获取当前Session
:return:
"""
return self.session
def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies
def set_cookies(self, cookies):
self.session.cookies.update(cookies)
def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)
def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)
class QrLogin:
"""
扫码登录
"""
def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'
self.spider_session = spider_session
self.session = self.spider_session.get_session()
self.is_login = False
self.refresh_login_status()
def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()
def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False
def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page
def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.info('获取二维码失败')
return False
save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True
def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False
resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']
def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}
resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False
resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies
# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')
# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')
# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')
self.refresh_login_status()
logger.info('二维码登录成功')
class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session
self.qrlogin = QrLogin(self.spider_session)
# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()
self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return
self.qrlogin.login_by_qrcode()
if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")
def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""
@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)
return new_func
@check_login
def reserve(self):
"""
预约
"""
self._reserve()
@check_login
def seckill(self):
"""
抢购
"""
self._seckill()
@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)
def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()
def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()
def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')
def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}
resp = self.session.get(url=url, params=payload, headers=headers)
try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')
def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]
def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()
def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞
self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)
def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)
def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return resp_json
def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}
return data
def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False
logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129
欢迎关注公众号:
可转债量化分析
查看全部
运行环境 windows,linux,mac,python3+
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶
苏宁家的:
============= 2021-01-11 更新 ============
感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了
main.py
import sys
from maotai.jd_spider_requests import ProdectPurchase
if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)
product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)
jd_spider_requests.py
import random
import time
import requests
import functools
import json
import os
import pickle
from lxml import etree
from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)
class SpiderSession:
"""
Session相关操作
"""
def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')
self.session = self._init_session()
def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session
def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}
def get_user_agent(self):
return self.user_agent
def get_session(self):
"""
获取当前Session
:return:
"""
return self.session
def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies
def set_cookies(self, cookies):
self.session.cookies.update(cookies)
def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)
def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)
class QrLogin:
"""
扫码登录
"""
def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'
self.spider_session = spider_session
self.session = self.spider_session.get_session()
self.is_login = False
self.refresh_login_status()
def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()
def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False
def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page
def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.info('获取二维码失败')
return False
save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True
def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False
resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']
def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}
resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False
resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies
# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')
# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')
# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')
self.refresh_login_status()
logger.info('二维码登录成功')
class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session
self.qrlogin = QrLogin(self.spider_session)
# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()
self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return
self.qrlogin.login_by_qrcode()
if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")
def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""
@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)
return new_func
@check_login
def reserve(self):
"""
预约
"""
self._reserve()
@check_login
def seckill(self):
"""
抢购
"""
self._seckill()
@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)
def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()
def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()
def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')
def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}
resp = self.session.get(url=url, params=payload, headers=headers)
try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')
def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]
def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()
def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞
self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)
def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)
def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return resp_json
def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}
return data
def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False
logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129
欢迎关注公众号:
可转债量化分析
不用一行代码 下载雪球嘉年华视频
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3573 次浏览 • 2020-12-09 14:43
听说今年着重分享一些观念,抱着好奇心,就打算下载几部来看看。
雪球网站很简单,只要找到下载链接就可以下载了。
第一步。打开一个视频播放的页面,比如大金链的
11737544 粉丝主会场 | 巅峰对谈:金牛双子星主动VS量化
https://xueqiu.com/video/5285890810945319765
右键,查看源码,然后在源码里面试着查找 mp4,flv,webp等流媒体字样。
在这里找到一个了:
但是这个视频下载地址有很多转义字符:http:\u002F\u002F1256122120.vod2.myqcloud.com\u002F53ad1740vodtranscq1256122120\u002F17ebe5145285890810945319765\u002Fv.f20.mp4
直接在浏览器是无法直接打开的。
可以直接替换\u002f 为一个斜杠 \ 就可以了。
如果嫌麻烦,可以在浏览器里面,按下F12,在console页面里面输入上面的地址,前后加个双引号,然后回车,就可以得到完整的地址了。
原创文章,转载请注明出处
http://30daydo.com/article/44119
查看全部
听说今年着重分享一些观念,抱着好奇心,就打算下载几部来看看。
雪球网站很简单,只要找到下载链接就可以下载了。
第一步。打开一个视频播放的页面,比如大金链的
11737544 粉丝主会场 | 巅峰对谈:金牛双子星主动VS量化
https://xueqiu.com/video/5285890810945319765
右键,查看源码,然后在源码里面试着查找 mp4,flv,webp等流媒体字样。
在这里找到一个了:
但是这个视频下载地址有很多转义字符:
http:\u002F\u002F1256122120.vod2.myqcloud.com\u002F53ad1740vodtranscq1256122120\u002F17ebe5145285890810945319765\u002Fv.f20.mp4
直接在浏览器是无法直接打开的。
可以直接替换\u002f 为一个斜杠 \ 就可以了。
如果嫌麻烦,可以在浏览器里面,按下F12,在console页面里面输入上面的地址,前后加个双引号,然后回车,就可以得到完整的地址了。
原创文章,转载请注明出处
http://30daydo.com/article/44119
监控公众号取消关注的粉丝
闲聊 • 李魔佛 发表了文章 • 0 个评论 • 2710 次浏览 • 2020-12-08 23:48
上面是获取到全部粉丝的数据,原理是直接利用mitmproxy抓包,监听到指定url数据后,把数据导出到文件就可以了。
如果需要教程和代码,可以私聊。(哦,网站关闭了注册了,那就公众号留言吧) 查看全部
简单快速下载知乎视频
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 5015 次浏览 • 2020-11-29 23:03
1. 打开视频前按F12
2. 播放视频
3. 查看F12的网络选项
4. 找到 https://vdn3.vzuu.com 的url
5. 对应的整个url链接就是视频的真实下载地址。把url复制到浏览器打开,然后右键另存为本地视频就可以了
查看全部
1. 打开视频前按F12
2. 播放视频
3. 查看F12的网络选项
4. 找到 https://vdn3.vzuu.com 的url
5. 对应的整个url链接就是视频的真实下载地址。把url复制到浏览器打开,然后右键另存为本地视频就可以了
集思录用户名密码JS加密流程解密 【JS加密破解教程一】
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4697 次浏览 • 2020-11-27 17:26
而且提交的内容每次都固定,第一种最傻的方式就是每次提交就把加密的用户名和密码提交上去。
当然,对于有钻研的读者,可能想看看其具体的加密方式。
那么就按照流程,通过断点与搜索,找到其加密方法。
首先在上面的截图中很明显就知道,这个字符加密应该是aes,因为它的提交字段中aes:1
按F12,走完整个登录流程。
然后搜索password字样的地方。
在index.html首页中找到一处:
然后在该password的地方打个断点,然后跳转到jslencode 的地方。
跳转到的地方是这里。
然后我们浏览一下这个JS页面,尝试把整个JS代码抠出来。
放到我们的调试软件中,比较常用的是鬼鬼JS调试器。(有需要的可以关注wx公众号下载:回复 鬼鬼JS 即可)
这个调试器的好处是,可以很方便格式化JS代码,然后输入你要调试的字符,然后点击运行,可以当场拿到结果,等到结果ok了的话,就可以用python 的pyexecjs执行。
先点击代码格式化,然后在输入框里找到函数的入口:
jslencode(text, aes_key),好了,现在就把我们的密码 XXXXXX,和aes_key 放进去就可以了。
aes_key 在之前的index.html就能找到。var A397151C04723421F = '397151C04723421F';
function doLogin(){
var data = $('#login_form').serializeObjectToJson();
data['_post_type'] = 'ajax';
data['aes'] = 1;
data['user_name'] = jslencode(data['user_name'], A397151C04723421F);
data['password'] = jslencode(data['password'], A397151C04723421F);
$.post('/account/ajax/login_process/', data, function(rst){
on_login_error_processer(rst);
}, 'json');
var A397151C04723421F = '397151C04723421F';
jslencode(‘jisilupassword’, '397151C04723421F')
右下角有个系统引擎运行。
得到结果:
1d1bd2b22b8cc5c09ad8ce8f1e69b87f
对比一下第一张图里面的的post请求,发现是一样的。那么现在的JS解密就成功了一大半了。 接着我们就写python代码执行这段JS脚本。
尝试直接把JS放到一个文件,然后编写python代码# -*- coding: utf-8 -*-
# @Time : 2020/11/27 12:15
# @File : js_executor.py
# @Author : Rocky C@www.30daydo.com
import execjs
def main():
encode_user = ctx.call('jslencode', user, key)
encode_password = ctx.call('jslencode', password, key)
print(encode_user)
print(encode_password)
if __name__ == '__main__':
main()
然后发现报错: 说CryptoJS没有定义,那么我们看看代码。(function(root, factory) {
if (typeof exports === "object") {
module.exports = exports = factory()
} else {
if (typeof define === "function" && define.amd) {
define(, factory)
} else {
root.CryptoJS = factory()
}
}
}(this, function() {
var CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
发现这一行此时CryptoJS的定义
var CryptoJS = CryptoJS || (function(Math, undefined) { var create = Object.create || (function() {
那么我们把这一行上面的全部删除。最后的调试后,能够执行的JS代码如下,并保存为 集思录.js 文件,调用上面的python文件。
集思录.jsvar CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
function F() {}
return function(obj) {
var subtype;
F.prototype = obj;
subtype = new F();
F.prototype = null;
return subtype
}
}());
var C = {};
var C_lib = C.lib = {};
var Base = C_lib.Base = (function() {
下面的就跟上面的一模一样了:
源文件地址:https://www.jisilu.cn/static/js/crypto-js-3.3.0-min.js然后python运行后得到加密后的aes数据。
对比一下鬼鬼JS调试器的结果,一样的。
OK,手工。
这里汇聚了平时整理的JS破解工作流,大家可以参考参考。
https://github.com/Rockyzsu/JS-Reverse
原创文章,转载请注明出处:
http://30daydo.com/article/44109
喜欢的朋友可以加入星球探讨: 查看全部
而且提交的内容每次都固定,第一种最傻的方式就是每次提交就把加密的用户名和密码提交上去。
当然,对于有钻研的读者,可能想看看其具体的加密方式。
那么就按照流程,通过断点与搜索,找到其加密方法。
首先在上面的截图中很明显就知道,这个字符加密应该是aes,因为它的提交字段中aes:1
按F12,走完整个登录流程。
然后搜索password字样的地方。
在index.html首页中找到一处:
然后在该password的地方打个断点,然后跳转到jslencode 的地方。
跳转到的地方是这里。
然后我们浏览一下这个JS页面,尝试把整个JS代码抠出来。
放到我们的调试软件中,比较常用的是鬼鬼JS调试器。(有需要的可以关注wx公众号下载:回复 鬼鬼JS 即可)
这个调试器的好处是,可以很方便格式化JS代码,然后输入你要调试的字符,然后点击运行,可以当场拿到结果,等到结果ok了的话,就可以用python 的pyexecjs执行。
先点击代码格式化,然后在输入框里找到函数的入口:
jslencode(text, aes_key),好了,现在就把我们的密码 XXXXXX,和aes_key 放进去就可以了。
aes_key 在之前的index.html就能找到。
var A397151C04723421F = '397151C04723421F';
function doLogin(){
var data = $('#login_form').serializeObjectToJson();
data['_post_type'] = 'ajax';
data['aes'] = 1;
data['user_name'] = jslencode(data['user_name'], A397151C04723421F);
data['password'] = jslencode(data['password'], A397151C04723421F);
$.post('/account/ajax/login_process/', data, function(rst){
on_login_error_processer(rst);
}, 'json');
var A397151C04723421F = '397151C04723421F';
jslencode(‘jisilupassword’, '397151C04723421F')
右下角有个系统引擎运行。
得到结果:
1d1bd2b22b8cc5c09ad8ce8f1e69b87f
对比一下第一张图里面的的post请求,发现是一样的。那么现在的JS解密就成功了一大半了。 接着我们就写python代码执行这段JS脚本。
尝试直接把JS放到一个文件,然后编写python代码
# -*- coding: utf-8 -*-
# @Time : 2020/11/27 12:15
# @File : js_executor.py
# @Author : Rocky C@www.30daydo.com
import execjs
def main():
encode_user = ctx.call('jslencode', user, key)
encode_password = ctx.call('jslencode', password, key)
print(encode_user)
print(encode_password)
if __name__ == '__main__':
main()
然后发现报错: 说CryptoJS没有定义,那么我们看看代码。
(function(root, factory) {
if (typeof exports === "object") {
module.exports = exports = factory()
} else {
if (typeof define === "function" && define.amd) {
define(, factory)
} else {
root.CryptoJS = factory()
}
}
}(this, function() {
var CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
发现这一行此时CryptoJS的定义
var CryptoJS = CryptoJS || (function(Math, undefined) { var create = Object.create || (function() {
那么我们把这一行上面的全部删除。最后的调试后,能够执行的JS代码如下,并保存为 集思录.js 文件,调用上面的python文件。
集思录.js
var CryptoJS = CryptoJS || (function(Math, undefined) {下面的就跟上面的一模一样了:
var create = Object.create || (function() {
function F() {}
return function(obj) {
var subtype;
F.prototype = obj;
subtype = new F();
F.prototype = null;
return subtype
}
}());
var C = {};
var C_lib = C.lib = {};
var Base = C_lib.Base = (function() {
源文件地址:
https://www.jisilu.cn/static/js/crypto-js-3.3.0-min.js然后python运行后得到加密后的aes数据。
对比一下鬼鬼JS调试器的结果,一样的。
OK,手工。
这里汇聚了平时整理的JS破解工作流,大家可以参考参考。
https://github.com/Rockyzsu/JS-Reverse
原创文章,转载请注明出处:
http://30daydo.com/article/44109
喜欢的朋友可以加入星球探讨:
免费代理ip与收费的代理ip
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3198 次浏览 • 2020-10-30 18:00
曾经有尝试过使用免费的代理ip来搭建代理池,可是免费的代理ip不仅资源少,而且可用率、高匿性、速度等都极差,每次使用都需要借助第三方软件进行检查是否可用,严重影响效率,根本满足不了任务的需求。
收费的代理ip与免费的代理ip差距非常大,不仅拥有海量的ip资源,可用率、高匿性、速度都是极好。操作简单工作效率既然提高上去了。经过多家的测试,最终选择了性价比最高的万变ip。高质量的优质代理ip才可以真正用来防止爬虫被封锁,如果使用普通代理,爬虫的真实IP还是会暴露。新获取一批新IP 查看全部
曾经有尝试过使用免费的代理ip来搭建代理池,可是免费的代理ip不仅资源少,而且可用率、高匿性、速度等都极差,每次使用都需要借助第三方软件进行检查是否可用,严重影响效率,根本满足不了任务的需求。
收费的代理ip与免费的代理ip差距非常大,不仅拥有海量的ip资源,可用率、高匿性、速度都是极好。操作简单工作效率既然提高上去了。经过多家的测试,最终选择了性价比最高的万变ip。高质量的优质代理ip才可以真正用来防止爬虫被封锁,如果使用普通代理,爬虫的真实IP还是会暴露。新获取一批新IP
Python爬虫学习者需要注意什么?
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3021 次浏览 • 2020-10-28 17:14
最常见的解决方法就是使用大量的ip,就是借着代理ip保证IP被封时有替换IP可用,永远保持着续航能力。这里推荐51代理ip,作为一家提供代理IP的专业服务商,万变ip代理拥有强大的技术团队运营维护,全高匿系统所产生的高匿ip不仅安全稳定、而且速度快, 以及与爬虫用户多年来合作的宝贵经验,是Python爬虫首选代理IP。
Python是一种全栈计算机程序设计语言,全栈,顾名思义,应用范围广。你可能听说过很多编程语言,例如C语言,Java语言等,众所周知,这些语言都非常难学,更别说景桐使用了。而python不一样,比如完成一个Web服务,C语言要写1000行代码,Java要写100行,而python可能只要写20行。对!这就是差距!目前由于python“简单易懂”,已逐步成为网络爬虫主流语言。
在初学python爬虫时,很多程序员会被一些“小问题”阻碍脚步,为避免大家再次犯同样的错误,加快学习进程,在爬取网站信息时一定要使用大量代理IP。好用的代理IP服务商,
高效率的爬虫工作离不开ip代理的支持,这就是ip代理越来越受欢迎的原因!收藏举报投诉 查看全部
最常见的解决方法就是使用大量的ip,就是借着代理ip保证IP被封时有替换IP可用,永远保持着续航能力。这里推荐51代理ip,作为一家提供代理IP的专业服务商,万变ip代理拥有强大的技术团队运营维护,全高匿系统所产生的高匿ip不仅安全稳定、而且速度快, 以及与爬虫用户多年来合作的宝贵经验,是Python爬虫首选代理IP。
Python是一种全栈计算机程序设计语言,全栈,顾名思义,应用范围广。你可能听说过很多编程语言,例如C语言,Java语言等,众所周知,这些语言都非常难学,更别说景桐使用了。而python不一样,比如完成一个Web服务,C语言要写1000行代码,Java要写100行,而python可能只要写20行。对!这就是差距!目前由于python“简单易懂”,已逐步成为网络爬虫主流语言。
在初学python爬虫时,很多程序员会被一些“小问题”阻碍脚步,为避免大家再次犯同样的错误,加快学习进程,在爬取网站信息时一定要使用大量代理IP。好用的代理IP服务商,
高效率的爬虫工作离不开ip代理的支持,这就是ip代理越来越受欢迎的原因!收藏举报投诉
Python爬虫虎牙平台主播的图片代码
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3020 次浏览 • 2020-10-27 17:55
代码如下:
import urllib.request
import re
import os
# 全局变量用来记录图片的编号
gl_z = 0
def down_img(url1):
"""下载图片"""
# 处理图片链接,拼接http:
url = "https:" + re.sub(r"\?", "", url1)
global gl_z
print(url)
# 请求链接
response = urllib.request.urlopen(url)
# 读取内容
data = response.read()
# 切片取出图片名称
file_name = url[url.rfind('/') + 1:]
# 生成列表
a = [x for x in range(10000)]
# 打开文件用以写入
file = open(os.path.join("photo3", "img" + file_name + str(a[gl_z]) + ".jpg"), "wb")
file.write(data)
# 关闭文件
file.close()
# 编号加1
gl_z += 1
if __name__ == '__main__':
# 要抓去信息的网址
home = """http://www.huya.com/g/xingxiu"""
# 模拟请求头
headers = {
"Host": "www.huya.com",
"User-Agent": "agent信息"
}
# 构造好请求对象 将请求提交到服务器 获取的响应就是到首页的html代码
request = urllib.request.Request(url=home, headers=headers)
response = urllib.request.urlopen(request)
# 读取抓到的内容并解码
html_data = response.read().decode()
"""huyaimg.msstatic.com/avatar/1054/db/6590aa9bcf98e12e5d809d371e46cc_180_135.jpg
"""
# 使用正则 从首页中 提取出所有的图片链接
img_list = re.findall(r"//huyaimg\.msstatic\.com.+\.jpg\?", html_data)
print(img_list)
# 取出每张图片进行下载
for img_url in img_list:
print(img_url)
down_img(img_url) 查看全部
代码如下:
import urllib.request
import re
import os
# 全局变量用来记录图片的编号
gl_z = 0
def down_img(url1):
"""下载图片"""
# 处理图片链接,拼接http:
url = "https:" + re.sub(r"\?", "", url1)
global gl_z
print(url)
# 请求链接
response = urllib.request.urlopen(url)
# 读取内容
data = response.read()
# 切片取出图片名称
file_name = url[url.rfind('/') + 1:]
# 生成列表
a = [x for x in range(10000)]
# 打开文件用以写入
file = open(os.path.join("photo3", "img" + file_name + str(a[gl_z]) + ".jpg"), "wb")
file.write(data)
# 关闭文件
file.close()
# 编号加1
gl_z += 1
if __name__ == '__main__':
# 要抓去信息的网址
home = """http://www.huya.com/g/xingxiu"""
# 模拟请求头
headers = {
"Host": "www.huya.com",
"User-Agent": "agent信息"
}
# 构造好请求对象 将请求提交到服务器 获取的响应就是到首页的html代码
request = urllib.request.Request(url=home, headers=headers)
response = urllib.request.urlopen(request)
# 读取抓到的内容并解码
html_data = response.read().decode()
"""huyaimg.msstatic.com/avatar/1054/db/6590aa9bcf98e12e5d809d371e46cc_180_135.jpg
"""
# 使用正则 从首页中 提取出所有的图片链接
img_list = re.findall(r"//huyaimg\.msstatic\.com.+\.jpg\?", html_data)
print(img_list)
# 取出每张图片进行下载
for img_url in img_list:
print(img_url)
down_img(img_url)
python打造3D撞球小游戏
python • wanbainip 发表了文章 • 0 个评论 • 3147 次浏览 • 2020-10-26 15:52
我们需要三组刚体(当您在Blender的对象上打开一个刚体的属性时,Blender将模拟与其它刚体的碰撞):
1.平面
第2行代码创建了一个简单的平面,立方体将放置在该平面上。为了防止它因重力而坠落,我们将其设为“受体”第4行代码。
2. 圆环
第11-12行将第一个圆环的"Enabled"属性设置为false,防止由于重力而坠落。这样它就固定在那牵住整个链条。
3. 立方体
因为z循环第13行嵌套在x循环[第5行]中,我们将得到一个18X10的立方体组成的墙。 查看全部
Python爬虫基本框架
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 2759 次浏览 • 2020-10-25 18:01
1. 爬虫调度器负责统筹其他四个模块协调工作。
2. URL管理器负责管理URL链接,包括已爬取的链接和未爬取的链接。
3. HTML下载器用于从URL管理器中获取未爬取的链接并下载其HTML网页。
4. HTML解析器用于解析HTML下载器下载的HTML网页,获取URL链接交给URL管理器,提取要获取的数据交给数据存储器。
5. 数据存储器用于将HTML解析器解析出来的数据存储到数据库或文件。 查看全部
1. 爬虫调度器负责统筹其他四个模块协调工作。
2. URL管理器负责管理URL链接,包括已爬取的链接和未爬取的链接。
3. HTML下载器用于从URL管理器中获取未爬取的链接并下载其HTML网页。
4. HTML解析器用于解析HTML下载器下载的HTML网页,获取URL链接交给URL管理器,提取要获取的数据交给数据存储器。
5. 数据存储器用于将HTML解析器解析出来的数据存储到数据库或文件。
Python爬虫如何防止ip被封?
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3018 次浏览 • 2020-10-24 10:22
第一种:降低访问的速度,我们可以使用 time模块中的sleep,使程序每运行一次后就睡眠1s,这样可以很有效的降低ip被封机率,但是效率效果不是很高,一般是用于量小的采集任务。
第二种:使用类似万变ip代理这样的优质换ip软件,这也是爬虫工作者最常用的手段之一,通过代理ip来伪装我们的ip,隐藏本地真实的ip地址,让目标服务器无法识别是相同ip发出的请求,这样就很有效的防止ip被封。突破了ip的限制,采集数据的任务就会顺利,工作效率自然会提高!
查看全部
第一种:降低访问的速度,我们可以使用 time模块中的sleep,使程序每运行一次后就睡眠1s,这样可以很有效的降低ip被封机率,但是效率效果不是很高,一般是用于量小的采集任务。
第二种:使用类似万变ip代理这样的优质换ip软件,这也是爬虫工作者最常用的手段之一,通过代理ip来伪装我们的ip,隐藏本地真实的ip地址,让目标服务器无法识别是相同ip发出的请求,这样就很有效的防止ip被封。突破了ip的限制,采集数据的任务就会顺利,工作效率自然会提高!
python爬取网页中的超链接地址,获取到的跟浏览器中显示的不一样
python爬虫 • acker 回复了问题 • 2 人关注 • 2 个回复 • 4303 次浏览 • 2020-09-25 16:55
python asyncio aiohttp motor异步爬虫例子 定时抓取bilibili首页热度网红
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3752 次浏览 • 2020-09-22 22:35
AsyncIOMotorClient(connect_uri)
motor连接带用户名和密码的方法和pymongo一致。
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2020/9/22 10:07
# @File : bilibili_hot_anchor.py
# 异步爬取首页与列表
import asyncio
import datetime
import aiohttp
import re
import time
from motor.motor_asyncio import AsyncIOMotorClient
from parsel import Selector
from settings import _json_data
SLEEP = 60 * 10
INFO = _json_data['mongo']['arm']
host = INFO['host']
port = INFO['port']
user = INFO['user']
password = INFO['password']
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = AsyncIOMotorClient(connect_uri)
db = client['db_parker']
home_url = 'https://www.bilibili.com/ranking'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
def convertor(number_str):
'''
将小数点的万变为整数
:param number_str:
:return:
'''
number = re.search('(\d+\.+\d+)', number_str)
if number:
number = float(number.group(1))
if re.search('万', number_str):
number = int(number * 10000)
else:
number = 0
return number
async def home_page():
async with aiohttp.ClientSession() as session:
while True:
start = time.time()
async with session.get(url=home_url, headers=headers) as response:
html = await response.text()
resp = Selector(text=html)
items = resp.xpath('//ul[@class="rank-list"]/li')
for item in items:
json_data = {}
number = item.xpath('.//div[@class="num"]/text()').extract_first()
info = item.xpath('.//div[@class="info"][1]')
title = info.xpath('.//a/text()').extract_first()
detail_url = info.xpath('.//a/@href').extract_first()
play_number = info.xpath('.//div[@class="detail"]/span[1]/text()').extract_first()
viewing_number = info.xpath('.//div[@class="detail"]/span[2]/text()').extract_first()
json_data['number'] = int(number)
json_data['title'] = title
json_data['play_number'] = convertor(play_number)
json_data['viewing_number'] = convertor(viewing_number)
json_data['url'] = detail_url
task = asyncio.create_task(detail_list(session, detail_url, json_data))
# await detail_url()
end = time.time()
print(f'time used {end-start}')
await asyncio.sleep(SLEEP) # 暂停10分钟
print(f'sleep for {SLEEP}')
async def detail_list(session, url, json_data):
async with session.get(url, headers=headers) as response:
response = await response.text()
await parse_detail(response, json_data)
async def parse_detail(html, json_data=None):
resp = Selector(text=html)
info = resp.xpath('//div[@id="v_desc"]/div[@class="info open"]/text()').extract_first()
if not info:
info = '这个家伙很懒'
json_data['info'] = info.strip()
current = datetime.datetime.now()
json_data['crawltime'] = current
await db['bilibili'].update_one({'url': json_data['url']}, {'$set': json_data}, True, True)
loop = asyncio.get_event_loop()
loop.run_until_complete(home_page())
爬取的数据图:
原创文章,转载请注明:http://30daydo.com/article/605
需要源码,可私信。
查看全部
AsyncIOMotorClient(connect_uri)
motor连接带用户名和密码的方法和pymongo一致。
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2020/9/22 10:07
# @File : bilibili_hot_anchor.py
# 异步爬取首页与列表
import asyncio
import datetime
import aiohttp
import re
import time
from motor.motor_asyncio import AsyncIOMotorClient
from parsel import Selector
from settings import _json_data
SLEEP = 60 * 10
INFO = _json_data['mongo']['arm']
host = INFO['host']
port = INFO['port']
user = INFO['user']
password = INFO['password']
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = AsyncIOMotorClient(connect_uri)
db = client['db_parker']
home_url = 'https://www.bilibili.com/ranking'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
def convertor(number_str):
'''
将小数点的万变为整数
:param number_str:
:return:
'''
number = re.search('(\d+\.+\d+)', number_str)
if number:
number = float(number.group(1))
if re.search('万', number_str):
number = int(number * 10000)
else:
number = 0
return number
async def home_page():
async with aiohttp.ClientSession() as session:
while True:
start = time.time()
async with session.get(url=home_url, headers=headers) as response:
html = await response.text()
resp = Selector(text=html)
items = resp.xpath('//ul[@class="rank-list"]/li')
for item in items:
json_data = {}
number = item.xpath('.//div[@class="num"]/text()').extract_first()
info = item.xpath('.//div[@class="info"][1]')
title = info.xpath('.//a/text()').extract_first()
detail_url = info.xpath('.//a/@href').extract_first()
play_number = info.xpath('.//div[@class="detail"]/span[1]/text()').extract_first()
viewing_number = info.xpath('.//div[@class="detail"]/span[2]/text()').extract_first()
json_data['number'] = int(number)
json_data['title'] = title
json_data['play_number'] = convertor(play_number)
json_data['viewing_number'] = convertor(viewing_number)
json_data['url'] = detail_url
task = asyncio.create_task(detail_list(session, detail_url, json_data))
# await detail_url()
end = time.time()
print(f'time used {end-start}')
await asyncio.sleep(SLEEP) # 暂停10分钟
print(f'sleep for {SLEEP}')
async def detail_list(session, url, json_data):
async with session.get(url, headers=headers) as response:
response = await response.text()
await parse_detail(response, json_data)
async def parse_detail(html, json_data=None):
resp = Selector(text=html)
info = resp.xpath('//div[@id="v_desc"]/div[@class="info open"]/text()').extract_first()
if not info:
info = '这个家伙很懒'
json_data['info'] = info.strip()
current = datetime.datetime.now()
json_data['crawltime'] = current
await db['bilibili'].update_one({'url': json_data['url']}, {'$set': json_data}, True, True)
loop = asyncio.get_event_loop()
loop.run_until_complete(home_page())
爬取的数据图:
原创文章,转载请注明:http://30daydo.com/article/605
需要源码,可私信。
请问各位用scrapy和redis方法爬取不到数据的问题(可悬赏),求大佬看下,感激不尽
python爬虫 • 李魔佛 回复了问题 • 2 人关注 • 1 个回复 • 8350 次浏览 • 2020-04-16 22:16
requests请求返回的json格式为bytes乱码
python爬虫 • 李魔佛 回复了问题 • 2 人关注 • 1 个回复 • 5252 次浏览 • 2020-02-16 23:35
scrapy在settings中定义变量不能包含小写!
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3043 次浏览 • 2019-11-16 16:39
比如定义了一个叫 Redis_host = '192.168.1.1',的值
然后在spider中,如果你调用self.settings.get('Redis_host')
那么返回值是 None。
如果用REDIS_HOST定义,那么就可以正确返回它的值。
如果你一定要用小写,也有其他方法可正常调用。
先导入settings文件
fromt xxxx import setttings # xxx为项目名
host = settings.Redis_host # 直接导入一个文件的形式来调用是可以的 查看全部
比如定义了一个叫 Redis_host = '192.168.1.1',的值
然后在spider中,如果你调用self.settings.get('Redis_host')
那么返回值是 None。
如果用REDIS_HOST定义,那么就可以正确返回它的值。
如果你一定要用小写,也有其他方法可正常调用。
先导入settings文件
fromt xxxx import setttings # xxx为项目名
host = settings.Redis_host # 直接导入一个文件的形式来调用是可以的
etree.strip_tags的用法
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4280 次浏览 • 2019-10-24 11:24
它把参数中的标签从源htmlelement中删除,并且把里面的标签文本给合并进来。
举个例子:from lxml.html import etree
from lxml.html import fromstring, HtmlElement
test_html = '''<p><span>hello</span><span>world</span></p>'''
test_element = fromstring(test_html)
etree.strip_tags(test_element,'span') # 清除span标签
etree.tostring(test_element)
因为上述操作直接应用于test_element上的,所以test_element的值已经被修改了。
所以现在test_element 的值是
b'<p>helloworld</p>'
原创文章,转载请注明出处
http://30daydo.com/article/553
查看全部
它把参数中的标签从源htmlelement中删除,并且把里面的标签文本给合并进来。
举个例子:
from lxml.html import etree
from lxml.html import fromstring, HtmlElement
test_html = '''<p><span>hello</span><span>world</span></p>'''
test_element = fromstring(test_html)
etree.strip_tags(test_element,'span') # 清除span标签
etree.tostring(test_element)
因为上述操作直接应用于test_element上的,所以test_element的值已经被修改了。
所以现在test_element 的值是
b'<p>helloworld</p>'
原创文章,转载请注明出处
http://30daydo.com/article/553
aiohttp异步下载图片
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4889 次浏览 • 2019-09-16 17:14
headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
async def getPage(num):
async with aiohttp.ClientSession() as session:
async with session.get(url.format(num),headers=headers) as resp:
if resp.status==200:
f= await aiofiles.open('{}.jpg'.format(num),mode='wb')
await f.write(await resp.read())
await f.close()
loop = asyncio.get_event_loop()
tasks = [getPage(i) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
原创文章,
转载请注明出处:
http://30daydo.com/article/537
查看全部
url = 'http://xyhz.huizhou.gov.cn/static/js/common/jigsaw/images/{}.jpg'
headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
async def getPage(num):
async with aiohttp.ClientSession() as session:
async with session.get(url.format(num),headers=headers) as resp:
if resp.status==200:
f= await aiofiles.open('{}.jpg'.format(num),mode='wb')
await f.write(await resp.read())
await f.close()
loop = asyncio.get_event_loop()
tasks = [getPage(i) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
原创文章,
转载请注明出处:
http://30daydo.com/article/537
scrapy源码分析<一>:入口函数以及是如何运行
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6076 次浏览 • 2019-08-31 10:47
下面我们从源码分析一下scrapy执行的流程:
执行scrapy crawl 命令时,调用的是Command类class Command(ScrapyCommand):
requires_project = True
def syntax(self):
return '[options]'
def short_desc(self):
return 'Runs all of the spiders - My Defined'
def run(self,args,opts):
print('==================')
print(type(self.crawler_process))
spider_list = self.crawler_process.spiders.list() # 找到爬虫类
for name in spider_list:
print('=================')
print(name)
self.crawler_process.crawl(name,**opts.__dict__)
self.crawler_process.start()
然后我们去看看crawler_process,这个是来自ScrapyCommand,而ScrapyCommand又是CrawlerProcess的子类,而CrawlerProcess又是CrawlerRunner的子类
在CrawlerRunner构造函数里面主要作用就是这个 def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings) # 构造爬虫
self._crawlers = set()
self._active = set()
self.bootstrap_failed = False
1. 加载配置文件def _get_spider_loader(settings):
cls_path = settings.get('SPIDER_LOADER_CLASS')
# settings文件没有定义SPIDER_LOADER_CLASS,所以这里获取到的是系统的默认配置文件,
# 默认配置文件在接下来的代码块A
# SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader'
loader_cls = load_object(cls_path)
# 这个函数就是根据路径转为类对象,也就是上面crapy.spiderloader.SpiderLoader 这个
# 字符串变成一个类对象
# 具体的load_object 对象代码见下面代码块B
return loader_cls.from_settings(settings.frozencopy())
默认配置文件defautl_settting.py# 代码块A
#......省略若干
SCHEDULER = 'scrapy.core.scheduler.Scheduler'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader' 就是这个值
SPIDER_LOADER_WARN_ONLY = False
SPIDER_MIDDLEWARES = {}
load_object的实现# 代码块B 为了方便,我把异常处理的去除
from importlib import import_module #导入第三方库
def load_object(path):
dot = path.rindex('.')
module, name = path[:dot], path[dot+1:]
# 上面把路径分为基本路径+模块名
mod = import_module(module)
obj = getattr(mod, name)
# 获取模块里面那个值
return obj
测试代码:In [33]: mod = import_module(module)
In [34]: mod
Out[34]: <module 'scrapy.spiderloader' from '/home/xda/anaconda3/lib/python3.7/site-packages/scrapy/spiderloader.py'>
In [35]: getattr(mod,name)
Out[35]: scrapy.spiderloader.SpiderLoader
In [36]: obj = getattr(mod,name)
In [37]: obj
Out[37]: scrapy.spiderloader.SpiderLoader
In [38]: type(obj)
Out[38]: type
在代码块A中,loader_cls是SpiderLoader,最后返回的的是SpiderLoader.from_settings(settings.frozencopy())
接下来看看SpiderLoader.from_settings, def from_settings(cls, settings):
return cls(settings)
返回类对象自己,所以直接看__init__函数即可class SpiderLoader(object):
"""
SpiderLoader is a class which locates and loads spiders
in a Scrapy project.
"""
def __init__(self, settings):
self.spider_modules = settings.getlist('SPIDER_MODULES')
# 获得settting中的模块名字,创建scrapy的时候就默认帮你生成了
# 你可以看看你的settings文件里面的内容就可以找到这个值,是一个list
self.warn_only = settings.getbool('SPIDER_LOADER_WARN_ONLY')
self._spiders = {}
self._found = defaultdict(list)
self._load_all_spiders() # 加载所有爬虫
核心就是这个_load_all_spiders:
走起:def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name): # 这个遍历文件夹里面的文件,然后再转化为类对象,
# 保存到字典:self._spiders = {}
self._load_spiders(module) # 模块变成spider
self._check_name_duplicates() # 去重,如果名字一样就异常
接下来看看_load_spiders
核心就是下面的。def iter_spider_classes(module):
from scrapy.spiders import Spider
for obj in six.itervalues(vars(module)): # 找到模块里面的变量,然后迭代出来
if inspect.isclass(obj) and \
issubclass(obj, Spider) and \
obj.__module__ == module.__name__ and \
getattr(obj, 'name', None): # 有name属性,继承于Spider
yield obj
这个obj就是我们平时写的spider类了。
原来分析了这么多,才找到了我们平时写的爬虫类
待续。。。。
原创文章
转载请注明出处
http://30daydo.com/article/530
查看全部
下面我们从源码分析一下scrapy执行的流程:
执行scrapy crawl 命令时,调用的是Command类
class Command(ScrapyCommand):
requires_project = True
def syntax(self):
return '[options]'
def short_desc(self):
return 'Runs all of the spiders - My Defined'
def run(self,args,opts):
print('==================')
print(type(self.crawler_process))
spider_list = self.crawler_process.spiders.list() # 找到爬虫类
for name in spider_list:
print('=================')
print(name)
self.crawler_process.crawl(name,**opts.__dict__)
self.crawler_process.start()
然后我们去看看crawler_process,这个是来自ScrapyCommand,而ScrapyCommand又是CrawlerProcess的子类,而CrawlerProcess又是CrawlerRunner的子类
在CrawlerRunner构造函数里面主要作用就是这个
def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings) # 构造爬虫
self._crawlers = set()
self._active = set()
self.bootstrap_failed = False
1. 加载配置文件
def _get_spider_loader(settings):
cls_path = settings.get('SPIDER_LOADER_CLASS')
# settings文件没有定义SPIDER_LOADER_CLASS,所以这里获取到的是系统的默认配置文件,
# 默认配置文件在接下来的代码块A
# SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader'
loader_cls = load_object(cls_path)
# 这个函数就是根据路径转为类对象,也就是上面crapy.spiderloader.SpiderLoader 这个
# 字符串变成一个类对象
# 具体的load_object 对象代码见下面代码块B
return loader_cls.from_settings(settings.frozencopy())
默认配置文件defautl_settting.py
# 代码块A
#......省略若干
SCHEDULER = 'scrapy.core.scheduler.Scheduler'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader' 就是这个值
SPIDER_LOADER_WARN_ONLY = False
SPIDER_MIDDLEWARES = {}
load_object的实现
# 代码块B 为了方便,我把异常处理的去除
from importlib import import_module #导入第三方库
def load_object(path):
dot = path.rindex('.')
module, name = path[:dot], path[dot+1:]
# 上面把路径分为基本路径+模块名
mod = import_module(module)
obj = getattr(mod, name)
# 获取模块里面那个值
return obj
测试代码:
In [33]: mod = import_module(module)
In [34]: mod
Out[34]: <module 'scrapy.spiderloader' from '/home/xda/anaconda3/lib/python3.7/site-packages/scrapy/spiderloader.py'>
In [35]: getattr(mod,name)
Out[35]: scrapy.spiderloader.SpiderLoader
In [36]: obj = getattr(mod,name)
In [37]: obj
Out[37]: scrapy.spiderloader.SpiderLoader
In [38]: type(obj)
Out[38]: type
在代码块A中,loader_cls是SpiderLoader,最后返回的的是SpiderLoader.from_settings(settings.frozencopy())
接下来看看SpiderLoader.from_settings,
def from_settings(cls, settings):
return cls(settings)
返回类对象自己,所以直接看__init__函数即可
class SpiderLoader(object):
"""
SpiderLoader is a class which locates and loads spiders
in a Scrapy project.
"""
def __init__(self, settings):
self.spider_modules = settings.getlist('SPIDER_MODULES')
# 获得settting中的模块名字,创建scrapy的时候就默认帮你生成了
# 你可以看看你的settings文件里面的内容就可以找到这个值,是一个list
self.warn_only = settings.getbool('SPIDER_LOADER_WARN_ONLY')
self._spiders = {}
self._found = defaultdict(list)
self._load_all_spiders() # 加载所有爬虫
核心就是这个_load_all_spiders:
走起:
def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name): # 这个遍历文件夹里面的文件,然后再转化为类对象,
# 保存到字典:self._spiders = {}
self._load_spiders(module) # 模块变成spider
self._check_name_duplicates() # 去重,如果名字一样就异常
接下来看看_load_spiders
核心就是下面的。
def iter_spider_classes(module):
from scrapy.spiders import Spider
for obj in six.itervalues(vars(module)): # 找到模块里面的变量,然后迭代出来
if inspect.isclass(obj) and \
issubclass(obj, Spider) and \
obj.__module__ == module.__name__ and \
getattr(obj, 'name', None): # 有name属性,继承于Spider
yield obj
这个obj就是我们平时写的spider类了。
原来分析了这么多,才找到了我们平时写的爬虫类
待续。。。。
原创文章
转载请注明出处
http://30daydo.com/article/530
scrapy-rabbitmq 不支持python3 [修改源码使它支持]
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3396 次浏览 • 2019-07-17 17:24
在python3上运行的收会报错。
需要修改以下地方:
待续。。
在python3上运行的收会报错。
需要修改以下地方:
待续。。
scrapy rabbitmq 分布式爬虫
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6171 次浏览 • 2019-07-17 16:59
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
下面是一个简单的demo:import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika
# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider
class Website(scrapy.Spider):
name = "rabbit"
def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}
url = 'https://36kr.com/information/web_news'
yield Request(url=url,
headers=headers)
def parse(self, response):
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')
result = channel.queue_declare(exclusive=True, queue='')
queue_name = result.method.queue
# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'
# 绑定多个值
channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))
channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)
channel.start_consuming()
def callback_func(self, ch, method, properties, body):
print(body)
启动spider:from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())
然后往rabbitmq里面推送数据:import pika
import settings
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播
routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)
print('sending message {}'.format(message))
connection.close()
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
待续。。。
###### 2019-08-29 更新 ###################
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
查看全部
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
下面是一个简单的demo:
import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika
# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider
class Website(scrapy.Spider):
name = "rabbit"
def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}
url = 'https://36kr.com/information/web_news'
yield Request(url=url,
headers=headers)
def parse(self, response):
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')
result = channel.queue_declare(exclusive=True, queue='')
queue_name = result.method.queue
# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'
# 绑定多个值
channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))
channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)
channel.start_consuming()
def callback_func(self, ch, method, properties, body):
print(body)
启动spider:
from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())
然后往rabbitmq里面推送数据:
import pika
import settings
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播
routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)
print('sending message {}'.format(message))
connection.close()
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
待续。。。
###### 2019-08-29 更新 ###################
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
Win10下PhantomJS无法运行 【版本兼容问题】
python • 李魔佛 发表了文章 • 0 个评论 • 5639 次浏览 • 2019-07-04 09:07
在win10下就报错:
selenium.common.exceptions.WebDriverException: Message: Service C:\Tool\phantomjs-2.5.0-beta2-windows\phantomjs-2.5.0-beta2-windows\bin\phantomjs.exe unexpectedly exited. Status code was: 4294967295
后来替换了一个旧的版本,发现问题就这么解决了。
旧版本:phantomjs-2.1.1-windows
原创文章
转载请注明出处
http://30daydo.com/article/505
查看全部
在win10下就报错:
selenium.common.exceptions.WebDriverException: Message: Service C:\Tool\phantomjs-2.5.0-beta2-windows\phantomjs-2.5.0-beta2-windows\bin\phantomjs.exe unexpectedly exited. Status code was: 4294967295
后来替换了一个旧的版本,发现问题就这么解决了。
旧版本:phantomjs-2.1.1-windows
原创文章
转载请注明出处
http://30daydo.com/article/505
喜马拉雅app 爬取音频文件
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6218 次浏览 • 2019-06-30 12:24
因为喜马拉雅的源码格式改了,所以爬虫代码也更新了一波
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
import os
url = 'http://180.153.255.6/mobile/v1/album/track/ts-1571294887744?albumId=23057324&device=android&isAsc=true&isQueryInvitationBrand=true&pageId={}&pageSize=20&pre_page=0'
headers = {'User-Agent': 'Xiaomi'}
def download():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
# trackName=re.sub(':','',trackName)
src_url = item.get('playUrl64')
filename = '{}.mp3'.format(trackName)
if not os.path.exists(filename):
try:
r0 = requests.get(src_url, headers=headers)
except Exception as e:
print(e)
print(trackName)
r0 = requests.get(src_url, headers=headers)
else:
with open(filename, 'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
else:
print(f'{filename}已经下载过了')
import shutil
def rename_():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
src_url = item.get('playUrl64')
orderNo=item.get('orderNo')
filename = '{}.mp3'.format(trackName)
try:
if os.path.exists(filename):
new_file='{}_{}.mp3'.format(orderNo,trackName)
shutil.move(filename,new_file)
except Exception as e:
print(e)
if __name__=='__main__':
rename_()
音频文件也更新了,详情见百度网盘。
======== 2018-10=============
爬取喜马拉雅app上 杨继东的投资之道 的音频文件
运行环境:python3# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
url = 'https://www.ximalaya.com/revision/play/album?albumId=23057324&pageNum=1&sort=1&pageSize=60'
headers={'User-Agent':'Xiaomi'}
r = requests.get(url=url,headers=headers)
js_data = r.json()
data_list = js_data.get('data',{}).get('tracksAudioPlay',)
for item in data_list:
trackName=item.get('trackName')
trackName=re.sub(':','',trackName)
src_url = item.get('src')
try:
r0=requests.get(src_url,headers=headers)
except Exception as e:
print(e)
print(trackName)
else:
with open('{}.m4a'.format(trackName),'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
保存为main.py
然后运行 python main.py
稍微等几分钟就自动下载好了。
附下载好的音频文件:
链接:https://pan.baidu.com/s/1t_vJhTvSJSeFdI1IaDS6fA
提取码:e3zb
原创文章
转载请注明出处
http://30daydo.com/article/503 查看全部
因为喜马拉雅的源码格式改了,所以爬虫代码也更新了一波
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
import os
url = 'http://180.153.255.6/mobile/v1/album/track/ts-1571294887744?albumId=23057324&device=android&isAsc=true&isQueryInvitationBrand=true&pageId={}&pageSize=20&pre_page=0'
headers = {'User-Agent': 'Xiaomi'}
def download():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
# trackName=re.sub(':','',trackName)
src_url = item.get('playUrl64')
filename = '{}.mp3'.format(trackName)
if not os.path.exists(filename):
try:
r0 = requests.get(src_url, headers=headers)
except Exception as e:
print(e)
print(trackName)
r0 = requests.get(src_url, headers=headers)
else:
with open(filename, 'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
else:
print(f'{filename}已经下载过了')
import shutil
def rename_():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
src_url = item.get('playUrl64')
orderNo=item.get('orderNo')
filename = '{}.mp3'.format(trackName)
try:
if os.path.exists(filename):
new_file='{}_{}.mp3'.format(orderNo,trackName)
shutil.move(filename,new_file)
except Exception as e:
print(e)
if __name__=='__main__':
rename_()
音频文件也更新了,详情见百度网盘。
======== 2018-10=============
爬取喜马拉雅app上 杨继东的投资之道 的音频文件
运行环境:python3
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
url = 'https://www.ximalaya.com/revision/play/album?albumId=23057324&pageNum=1&sort=1&pageSize=60'
headers={'User-Agent':'Xiaomi'}
r = requests.get(url=url,headers=headers)
js_data = r.json()
data_list = js_data.get('data',{}).get('tracksAudioPlay',)
for item in data_list:
trackName=item.get('trackName')
trackName=re.sub(':','',trackName)
src_url = item.get('src')
try:
r0=requests.get(src_url,headers=headers)
except Exception as e:
print(e)
print(trackName)
else:
with open('{}.m4a'.format(trackName),'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
保存为main.py
然后运行 python main.py
稍微等几分钟就自动下载好了。
附下载好的音频文件:
链接:https://pan.baidu.com/s/1t_vJhTvSJSeFdI1IaDS6fA
提取码:e3zb
原创文章
转载请注明出处
http://30daydo.com/article/503
scrapy源码分析<一>:入口函数以及是如何运行
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6076 次浏览 • 2019-08-31 10:47
下面我们从源码分析一下scrapy执行的流程:
执行scrapy crawl 命令时,调用的是Command类class Command(ScrapyCommand):
requires_project = True
def syntax(self):
return '[options]'
def short_desc(self):
return 'Runs all of the spiders - My Defined'
def run(self,args,opts):
print('==================')
print(type(self.crawler_process))
spider_list = self.crawler_process.spiders.list() # 找到爬虫类
for name in spider_list:
print('=================')
print(name)
self.crawler_process.crawl(name,**opts.__dict__)
self.crawler_process.start()
然后我们去看看crawler_process,这个是来自ScrapyCommand,而ScrapyCommand又是CrawlerProcess的子类,而CrawlerProcess又是CrawlerRunner的子类
在CrawlerRunner构造函数里面主要作用就是这个 def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings) # 构造爬虫
self._crawlers = set()
self._active = set()
self.bootstrap_failed = False
1. 加载配置文件def _get_spider_loader(settings):
cls_path = settings.get('SPIDER_LOADER_CLASS')
# settings文件没有定义SPIDER_LOADER_CLASS,所以这里获取到的是系统的默认配置文件,
# 默认配置文件在接下来的代码块A
# SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader'
loader_cls = load_object(cls_path)
# 这个函数就是根据路径转为类对象,也就是上面crapy.spiderloader.SpiderLoader 这个
# 字符串变成一个类对象
# 具体的load_object 对象代码见下面代码块B
return loader_cls.from_settings(settings.frozencopy())
默认配置文件defautl_settting.py# 代码块A
#......省略若干
SCHEDULER = 'scrapy.core.scheduler.Scheduler'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader' 就是这个值
SPIDER_LOADER_WARN_ONLY = False
SPIDER_MIDDLEWARES = {}
load_object的实现# 代码块B 为了方便,我把异常处理的去除
from importlib import import_module #导入第三方库
def load_object(path):
dot = path.rindex('.')
module, name = path[:dot], path[dot+1:]
# 上面把路径分为基本路径+模块名
mod = import_module(module)
obj = getattr(mod, name)
# 获取模块里面那个值
return obj
测试代码:In [33]: mod = import_module(module)
In [34]: mod
Out[34]: <module 'scrapy.spiderloader' from '/home/xda/anaconda3/lib/python3.7/site-packages/scrapy/spiderloader.py'>
In [35]: getattr(mod,name)
Out[35]: scrapy.spiderloader.SpiderLoader
In [36]: obj = getattr(mod,name)
In [37]: obj
Out[37]: scrapy.spiderloader.SpiderLoader
In [38]: type(obj)
Out[38]: type
在代码块A中,loader_cls是SpiderLoader,最后返回的的是SpiderLoader.from_settings(settings.frozencopy())
接下来看看SpiderLoader.from_settings, def from_settings(cls, settings):
return cls(settings)
返回类对象自己,所以直接看__init__函数即可class SpiderLoader(object):
"""
SpiderLoader is a class which locates and loads spiders
in a Scrapy project.
"""
def __init__(self, settings):
self.spider_modules = settings.getlist('SPIDER_MODULES')
# 获得settting中的模块名字,创建scrapy的时候就默认帮你生成了
# 你可以看看你的settings文件里面的内容就可以找到这个值,是一个list
self.warn_only = settings.getbool('SPIDER_LOADER_WARN_ONLY')
self._spiders = {}
self._found = defaultdict(list)
self._load_all_spiders() # 加载所有爬虫
核心就是这个_load_all_spiders:
走起:def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name): # 这个遍历文件夹里面的文件,然后再转化为类对象,
# 保存到字典:self._spiders = {}
self._load_spiders(module) # 模块变成spider
self._check_name_duplicates() # 去重,如果名字一样就异常
接下来看看_load_spiders
核心就是下面的。def iter_spider_classes(module):
from scrapy.spiders import Spider
for obj in six.itervalues(vars(module)): # 找到模块里面的变量,然后迭代出来
if inspect.isclass(obj) and \
issubclass(obj, Spider) and \
obj.__module__ == module.__name__ and \
getattr(obj, 'name', None): # 有name属性,继承于Spider
yield obj
这个obj就是我们平时写的spider类了。
原来分析了这么多,才找到了我们平时写的爬虫类
待续。。。。
原创文章
转载请注明出处
http://30daydo.com/article/530
查看全部
下面我们从源码分析一下scrapy执行的流程:
执行scrapy crawl 命令时,调用的是Command类
class Command(ScrapyCommand):
requires_project = True
def syntax(self):
return '[options]'
def short_desc(self):
return 'Runs all of the spiders - My Defined'
def run(self,args,opts):
print('==================')
print(type(self.crawler_process))
spider_list = self.crawler_process.spiders.list() # 找到爬虫类
for name in spider_list:
print('=================')
print(name)
self.crawler_process.crawl(name,**opts.__dict__)
self.crawler_process.start()
然后我们去看看crawler_process,这个是来自ScrapyCommand,而ScrapyCommand又是CrawlerProcess的子类,而CrawlerProcess又是CrawlerRunner的子类
在CrawlerRunner构造函数里面主要作用就是这个
def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings) # 构造爬虫
self._crawlers = set()
self._active = set()
self.bootstrap_failed = False
1. 加载配置文件
def _get_spider_loader(settings):
cls_path = settings.get('SPIDER_LOADER_CLASS')
# settings文件没有定义SPIDER_LOADER_CLASS,所以这里获取到的是系统的默认配置文件,
# 默认配置文件在接下来的代码块A
# SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader'
loader_cls = load_object(cls_path)
# 这个函数就是根据路径转为类对象,也就是上面crapy.spiderloader.SpiderLoader 这个
# 字符串变成一个类对象
# 具体的load_object 对象代码见下面代码块B
return loader_cls.from_settings(settings.frozencopy())
默认配置文件defautl_settting.py
# 代码块A
#......省略若干
SCHEDULER = 'scrapy.core.scheduler.Scheduler'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader' 就是这个值
SPIDER_LOADER_WARN_ONLY = False
SPIDER_MIDDLEWARES = {}
load_object的实现
# 代码块B 为了方便,我把异常处理的去除
from importlib import import_module #导入第三方库
def load_object(path):
dot = path.rindex('.')
module, name = path[:dot], path[dot+1:]
# 上面把路径分为基本路径+模块名
mod = import_module(module)
obj = getattr(mod, name)
# 获取模块里面那个值
return obj
测试代码:
In [33]: mod = import_module(module)
In [34]: mod
Out[34]: <module 'scrapy.spiderloader' from '/home/xda/anaconda3/lib/python3.7/site-packages/scrapy/spiderloader.py'>
In [35]: getattr(mod,name)
Out[35]: scrapy.spiderloader.SpiderLoader
In [36]: obj = getattr(mod,name)
In [37]: obj
Out[37]: scrapy.spiderloader.SpiderLoader
In [38]: type(obj)
Out[38]: type
在代码块A中,loader_cls是SpiderLoader,最后返回的的是SpiderLoader.from_settings(settings.frozencopy())
接下来看看SpiderLoader.from_settings,
def from_settings(cls, settings):
return cls(settings)
返回类对象自己,所以直接看__init__函数即可
class SpiderLoader(object):
"""
SpiderLoader is a class which locates and loads spiders
in a Scrapy project.
"""
def __init__(self, settings):
self.spider_modules = settings.getlist('SPIDER_MODULES')
# 获得settting中的模块名字,创建scrapy的时候就默认帮你生成了
# 你可以看看你的settings文件里面的内容就可以找到这个值,是一个list
self.warn_only = settings.getbool('SPIDER_LOADER_WARN_ONLY')
self._spiders = {}
self._found = defaultdict(list)
self._load_all_spiders() # 加载所有爬虫
核心就是这个_load_all_spiders:
走起:
def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name): # 这个遍历文件夹里面的文件,然后再转化为类对象,
# 保存到字典:self._spiders = {}
self._load_spiders(module) # 模块变成spider
self._check_name_duplicates() # 去重,如果名字一样就异常
接下来看看_load_spiders
核心就是下面的。
def iter_spider_classes(module):
from scrapy.spiders import Spider
for obj in six.itervalues(vars(module)): # 找到模块里面的变量,然后迭代出来
if inspect.isclass(obj) and \
issubclass(obj, Spider) and \
obj.__module__ == module.__name__ and \
getattr(obj, 'name', None): # 有name属性,继承于Spider
yield obj
这个obj就是我们平时写的spider类了。
原来分析了这么多,才找到了我们平时写的爬虫类
待续。。。。
原创文章
转载请注明出处
http://30daydo.com/article/530
Linux下自制有道词典 - python 解密有道词典JS加密
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4877 次浏览 • 2019-02-23 20:17
平时在linux下开发,鉴于没有什么好用翻译软件,打开网易也占用系统资源,所以写了个在控制台的翻译软件接口。
使用python爬虫,查看网页的JS加密方法,一步一步地分析,就能够得到最后的加密方法啦。
直接给出代码:
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/2/23 19:34
# @File : youdao.py
# 解密有道词典的JS
import hashlib
import random
import requests
import time
def md5_(word):
s = bytes(word, encoding='utf8')
m = hashlib.md5()
m.update(s)
ret = m.hexdigest()
return ret
def get_sign(word, salt):
ret = md5_('fanyideskweb' + word + salt + 'p09@Bn{h02_BIEe]$P^nG')
return ret
def youdao(word):
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
headers = {
'Host': 'fanyi.youdao.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:47.0) Gecko/20100101 Firefox/47.0',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'http://fanyi.youdao.com/',
'Content-Length': '252',
'Cookie': 'YOUDAO_MOBILE_ACCESS_TYPE=1; OUTFOX_SEARCH_USER_ID=1672542763@10.169.0.83; JSESSIONID=aaaWzxpjeDu1gbhopLzKw; ___rl__test__cookies=1550913722828; OUTFOX_SEARCH_USER_ID_NCOO=372126049.6326876',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
ts = str(int(time.time()*1000))
salt=ts+str(random.randint(0,10))
bv = md5_("5.0 (Windows)")
sign= get_sign(word,salt)
post_data = {
'i': word,
'from': 'AUTO', 'to': 'AUTO', 'smartresult': 'dict', 'client': 'fanyideskweb', 'salt': salt,
'sign': sign, 'ts': ts, 'bv': bv, 'doctype': 'json', 'version': '2.1',
'keyfrom': 'fanyi.web', 'action': 'FY_BY_REALTIME', 'typoResult': 'false'
}
r = requests.post(
url=url,
headers=headers,
data=post_data
)
for item in r.json().get('smartResult',{}).get('entries'):
print(item)
word='student'
youdao(word)
得到结果:
Github:
https://github.com/Rockyzsu/CrawlMan/tree/master/youdao_dictionary
原创文章,转载请注明出处
http://30daydo.com/article/416 查看全部
平时在linux下开发,鉴于没有什么好用翻译软件,打开网易也占用系统资源,所以写了个在控制台的翻译软件接口。
使用python爬虫,查看网页的JS加密方法,一步一步地分析,就能够得到最后的加密方法啦。
直接给出代码:
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/2/23 19:34
# @File : youdao.py
# 解密有道词典的JS
import hashlib
import random
import requests
import time
def md5_(word):
s = bytes(word, encoding='utf8')
m = hashlib.md5()
m.update(s)
ret = m.hexdigest()
return ret
def get_sign(word, salt):
ret = md5_('fanyideskweb' + word + salt + 'p09@Bn{h02_BIEe]$P^nG')
return ret
def youdao(word):
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
headers = {
'Host': 'fanyi.youdao.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:47.0) Gecko/20100101 Firefox/47.0',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'http://fanyi.youdao.com/',
'Content-Length': '252',
'Cookie': 'YOUDAO_MOBILE_ACCESS_TYPE=1; OUTFOX_SEARCH_USER_ID=1672542763@10.169.0.83; JSESSIONID=aaaWzxpjeDu1gbhopLzKw; ___rl__test__cookies=1550913722828; OUTFOX_SEARCH_USER_ID_NCOO=372126049.6326876',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
ts = str(int(time.time()*1000))
salt=ts+str(random.randint(0,10))
bv = md5_("5.0 (Windows)")
sign= get_sign(word,salt)
post_data = {
'i': word,
'from': 'AUTO', 'to': 'AUTO', 'smartresult': 'dict', 'client': 'fanyideskweb', 'salt': salt,
'sign': sign, 'ts': ts, 'bv': bv, 'doctype': 'json', 'version': '2.1',
'keyfrom': 'fanyi.web', 'action': 'FY_BY_REALTIME', 'typoResult': 'false'
}
r = requests.post(
url=url,
headers=headers,
data=post_data
)
for item in r.json().get('smartResult',{}).get('entries'):
print(item)
word='student'
youdao(word)
得到结果:
Github:
https://github.com/Rockyzsu/CrawlMan/tree/master/youdao_dictionary
原创文章,转载请注明出处
http://30daydo.com/article/416
python 获取 中国证券网 的公告
python爬虫 • 李魔佛 发表了文章 • 11 个评论 • 21420 次浏览 • 2016-06-30 15:45
这个网站的公告会比同花顺东方财富的早一点,而且还出现过早上中国证券网已经发了公告,而东财却拿去做午间公告,以至于可以提前获取公告提前埋伏。
现在程序自动把抓取的公告存入本网站中:http://30daydo.com/news.php
每天早上8:30更新一次。
生成的公告保存在stock/文件夹下,以日期命名。 下面脚本是循坏检测,如果有新的公告就会继续生成。
默认保存前3页的公告。(一次过太多页会被网站暂时屏蔽几分钟)。 代码以及使用了切换header来躲避网站的封杀。
修改
getInfo(3) 里面的数字就可以抓取前面某页数据
__author__ = 'rocchen'
# working v1.0
from bs4 import BeautifulSoup
import urllib2, datetime, time, codecs, cookielib, random, threading
import os,sys
def getInfo(max_index_user=5):
stock_news_site =
"http://ggjd.cnstock.com/gglist/search/ggkx/"
my_userAgent = [
'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)',
'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
'Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)']
index = 0
max_index = max_index_user
num = 1
temp_time = time.strftime("[%Y-%m-%d]-[%H-%M]", time.localtime())
store_filename = "StockNews-%s.log" % temp_time
fOpen = codecs.open(store_filename, 'w', 'utf-8')
while index < max_index:
user_agent = random.choice(my_userAgent)
# print user_agent
company_news_site = stock_news_site + str(index)
# content = urllib2.urlopen(company_news_site)
headers = {'User-Agent': user_agent, 'Host': "ggjd.cnstock.com", 'DNT': '1',
'Accept': 'text/html, application/xhtml+xml, */*', }
req = urllib2.Request(url=company_news_site, headers=headers)
resp = None
raw_content = ""
try:
resp = urllib2.urlopen(req, timeout=30)
except urllib2.HTTPError as e:
e.fp.read()
except urllib2.URLError as e:
if hasattr(e, 'code'):
print "error code %d" % e.code
elif hasattr(e, 'reason'):
print "error reason %s " % e.reason
finally:
if resp:
raw_content = resp.read()
time.sleep(2)
resp.close()
soup = BeautifulSoup(raw_content, "html.parser")
all_content = soup.find_all("span", "time")
for i in all_content:
news_time = i.string
node = i.next_sibling
str_temp = "No.%s \n%s\t%s\n---> %s \n\n" % (str(num), news_time, node['title'], node['href'])
#print "inside %d" %num
#print str_temp
fOpen.write(str_temp)
num = num + 1
#print "index %d" %index
index = index + 1
fOpen.close()
def execute_task(n=60):
period = int(n)
while True:
print datetime.datetime.now()
getInfo(3)
time.sleep(60 * period)
if __name__ == "__main__":
sub_folder = os.path.join(os.getcwd(), "stock")
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
start_time = time.time() # user can change the max index number getInfo(10), by default is getInfo(5)
if len(sys.argv) <2:
n = raw_input("Input Period : ? mins to download every cycle")
else:
n=int(sys.argv[1])
execute_task(n)
end_time = time.time()
print "Total time: %s s." % str(round((end_time - start_time), 4))
github:https://github.com/Rockyzsu/cnstock
查看全部
这个网站的公告会比同花顺东方财富的早一点,而且还出现过早上中国证券网已经发了公告,而东财却拿去做午间公告,以至于可以提前获取公告提前埋伏。
现在程序自动把抓取的公告存入本网站中:http://30daydo.com/news.php
每天早上8:30更新一次。
生成的公告保存在stock/文件夹下,以日期命名。 下面脚本是循坏检测,如果有新的公告就会继续生成。
默认保存前3页的公告。(一次过太多页会被网站暂时屏蔽几分钟)。 代码以及使用了切换header来躲避网站的封杀。
修改
getInfo(3) 里面的数字就可以抓取前面某页数据
__author__ = 'rocchen'
# working v1.0
from bs4 import BeautifulSoup
import urllib2, datetime, time, codecs, cookielib, random, threading
import os,sys
def getInfo(max_index_user=5):
stock_news_site =
"http://ggjd.cnstock.com/gglist/search/ggkx/"
my_userAgent = [
'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)',
'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
'Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)']
index = 0
max_index = max_index_user
num = 1
temp_time = time.strftime("[%Y-%m-%d]-[%H-%M]", time.localtime())
store_filename = "StockNews-%s.log" % temp_time
fOpen = codecs.open(store_filename, 'w', 'utf-8')
while index < max_index:
user_agent = random.choice(my_userAgent)
# print user_agent
company_news_site = stock_news_site + str(index)
# content = urllib2.urlopen(company_news_site)
headers = {'User-Agent': user_agent, 'Host': "ggjd.cnstock.com", 'DNT': '1',
'Accept': 'text/html, application/xhtml+xml, */*', }
req = urllib2.Request(url=company_news_site, headers=headers)
resp = None
raw_content = ""
try:
resp = urllib2.urlopen(req, timeout=30)
except urllib2.HTTPError as e:
e.fp.read()
except urllib2.URLError as e:
if hasattr(e, 'code'):
print "error code %d" % e.code
elif hasattr(e, 'reason'):
print "error reason %s " % e.reason
finally:
if resp:
raw_content = resp.read()
time.sleep(2)
resp.close()
soup = BeautifulSoup(raw_content, "html.parser")
all_content = soup.find_all("span", "time")
for i in all_content:
news_time = i.string
node = i.next_sibling
str_temp = "No.%s \n%s\t%s\n---> %s \n\n" % (str(num), news_time, node['title'], node['href'])
#print "inside %d" %num
#print str_temp
fOpen.write(str_temp)
num = num + 1
#print "index %d" %index
index = index + 1
fOpen.close()
def execute_task(n=60):
period = int(n)
while True:
print datetime.datetime.now()
getInfo(3)
time.sleep(60 * period)
if __name__ == "__main__":
sub_folder = os.path.join(os.getcwd(), "stock")
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
start_time = time.time() # user can change the max index number getInfo(10), by default is getInfo(5)
if len(sys.argv) <2:
n = raw_input("Input Period : ? mins to download every cycle")
else:
n=int(sys.argv[1])
execute_task(n)
end_time = time.time()
print "Total time: %s s." % str(round((end_time - start_time), 4))
github:https://github.com/Rockyzsu/cnstock
python 批量获取色影无忌 获奖图片
python爬虫 • 李魔佛 发表了文章 • 6 个评论 • 16570 次浏览 • 2016-06-29 16:41
不多说,直接来代码:#-*-coding=utf-8-*-
__author__ = 'rocky chen'
from bs4 import BeautifulSoup
import urllib2,sys,StringIO,gzip,time,random,re,urllib,os
reload(sys)
sys.setdefaultencoding('utf-8')
class Xitek():
def __init__(self):
self.url="http://photo.xitek.com/"
user_agent="Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
self.headers={"User-Agent":user_agent}
self.last_page=self.__get_last_page()
def __get_last_page(self):
html=self.__getContentAuto(self.url)
bs=BeautifulSoup(html,"html.parser")
page=bs.find_all('a',class_="blast")
last_page=page[0]['href'].split('/')[-1]
return int(last_page)
def __getContentAuto(self,url):
req=urllib2.Request(url,headers=self.headers)
resp=urllib2.urlopen(req)
#time.sleep(2*random.random())
content=resp.read()
info=resp.info().get("Content-Encoding")
if info==None:
return content
else:
t=StringIO.StringIO(content)
gziper=gzip.GzipFile(fileobj=t)
html = gziper.read()
return html
#def __getFileName(self,stream):
def __download(self,url):
p=re.compile(r'href="(/photoid/\d+)"')
#html=self.__getContentNoZip(url)
html=self.__getContentAuto(url)
content = p.findall(html)
for i in content:
print i
photoid=self.__getContentAuto(self.url+i)
bs=BeautifulSoup(photoid,"html.parser")
final_link=bs.find('img',class_="mimg")['src']
print final_link
#pic_stream=self.__getContentAuto(final_link)
title=bs.title.string.strip()
filename = re.sub('[\/:*?"<>|]', '-', title)
filename=filename+'.jpg'
urllib.urlretrieve(final_link,filename)
#f=open(filename,'w')
#f.write(pic_stream)
#f.close()
#print html
#bs=BeautifulSoup(html,"html.parser")
#content=bs.find_all(p)
#for i in content:
# print i
'''
print bs.title
element_link=bs.find_all('div',class_="element")
print len(element_link)
k=1
for href in element_link:
#print type(href)
#print href.tag
'''
'''
if href.children[0]:
print href.children[0]
'''
'''
t=0
for i in href.children:
#if i.a:
if t==0:
#print k
if i['href']
print link
if p.findall(link):
full_path=self.url[0:len(self.url)-1]+link
sub_html=self.__getContent(full_path)
bs=BeautifulSoup(sub_html,"html.parser")
final_link=bs.find('img',class_="mimg")['src']
#time.sleep(2*random.random())
print final_link
#k=k+1
#print type(i)
#print i.tag
#if hasattr(i,"href"):
#print i['href']
#print i.tag
t=t+1
#print "*"
'''
'''
if href:
if href.children:
print href.children[0]
'''
#print "one element link"
def getPhoto(self):
start=0
#use style/0
photo_url="http://photo.xitek.com/style/0/p/"
for i in range(start,self.last_page+1):
url=photo_url+str(i)
print url
#time.sleep(1)
self.__download(url)
'''
url="http://photo.xitek.com/style/0/p/10"
self.__download(url)
'''
#url="http://photo.xitek.com/style/0/p/0"
#html=self.__getContent(url)
#url="http://photo.xitek.com/"
#html=self.__getContentNoZip(url)
#print html
#'''
def main():
sub_folder = os.path.join(os.getcwd(), "content")
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
obj=Xitek()
obj.getPhoto()
if __name__=="__main__":
main()
下载后在content文件夹下会自动抓取所有图片。 (色影无忌的服务器没有做任何的屏蔽处理,所以脚本不能跑那么快,可以适当调用sleep函数,不要让服务器压力那么大)
已经下载好的图片:
github: https://github.com/Rockyzsu/fetchXitek (欢迎前来star) 查看全部
不多说,直接来代码:
#-*-coding=utf-8-*-
__author__ = 'rocky chen'
from bs4 import BeautifulSoup
import urllib2,sys,StringIO,gzip,time,random,re,urllib,os
reload(sys)
sys.setdefaultencoding('utf-8')
class Xitek():
def __init__(self):
self.url="http://photo.xitek.com/"
user_agent="Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
self.headers={"User-Agent":user_agent}
self.last_page=self.__get_last_page()
def __get_last_page(self):
html=self.__getContentAuto(self.url)
bs=BeautifulSoup(html,"html.parser")
page=bs.find_all('a',class_="blast")
last_page=page[0]['href'].split('/')[-1]
return int(last_page)
def __getContentAuto(self,url):
req=urllib2.Request(url,headers=self.headers)
resp=urllib2.urlopen(req)
#time.sleep(2*random.random())
content=resp.read()
info=resp.info().get("Content-Encoding")
if info==None:
return content
else:
t=StringIO.StringIO(content)
gziper=gzip.GzipFile(fileobj=t)
html = gziper.read()
return html
#def __getFileName(self,stream):
def __download(self,url):
p=re.compile(r'href="(/photoid/\d+)"')
#html=self.__getContentNoZip(url)
html=self.__getContentAuto(url)
content = p.findall(html)
for i in content:
print i
photoid=self.__getContentAuto(self.url+i)
bs=BeautifulSoup(photoid,"html.parser")
final_link=bs.find('img',class_="mimg")['src']
print final_link
#pic_stream=self.__getContentAuto(final_link)
title=bs.title.string.strip()
filename = re.sub('[\/:*?"<>|]', '-', title)
filename=filename+'.jpg'
urllib.urlretrieve(final_link,filename)
#f=open(filename,'w')
#f.write(pic_stream)
#f.close()
#print html
#bs=BeautifulSoup(html,"html.parser")
#content=bs.find_all(p)
#for i in content:
# print i
'''
print bs.title
element_link=bs.find_all('div',class_="element")
print len(element_link)
k=1
for href in element_link:
#print type(href)
#print href.tag
'''
'''
if href.children[0]:
print href.children[0]
'''
'''
t=0
for i in href.children:
#if i.a:
if t==0:
#print k
if i['href']
print link
if p.findall(link):
full_path=self.url[0:len(self.url)-1]+link
sub_html=self.__getContent(full_path)
bs=BeautifulSoup(sub_html,"html.parser")
final_link=bs.find('img',class_="mimg")['src']
#time.sleep(2*random.random())
print final_link
#k=k+1
#print type(i)
#print i.tag
#if hasattr(i,"href"):
#print i['href']
#print i.tag
t=t+1
#print "*"
'''
'''
if href:
if href.children:
print href.children[0]
'''
#print "one element link"
def getPhoto(self):
start=0
#use style/0
photo_url="http://photo.xitek.com/style/0/p/"
for i in range(start,self.last_page+1):
url=photo_url+str(i)
print url
#time.sleep(1)
self.__download(url)
'''
url="http://photo.xitek.com/style/0/p/10"
self.__download(url)
'''
#url="http://photo.xitek.com/style/0/p/0"
#html=self.__getContent(url)
#url="http://photo.xitek.com/"
#html=self.__getContentNoZip(url)
#print html
#'''
def main():
sub_folder = os.path.join(os.getcwd(), "content")
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
obj=Xitek()
obj.getPhoto()
if __name__=="__main__":
main()
下载后在content文件夹下会自动抓取所有图片。 (色影无忌的服务器没有做任何的屏蔽处理,所以脚本不能跑那么快,可以适当调用sleep函数,不要让服务器压力那么大)
已经下载好的图片:
github: https://github.com/Rockyzsu/fetchXitek (欢迎前来star)
抓取 知乎日报 中的 大误 系类文章,生成电子书推送到kindle
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 9463 次浏览 • 2016-06-12 08:52
所以写了下面的python脚本,一劳永逸。 脚本抓取大误从开始到现在的所有文章,并推送到你自己的kindle账号。
# -*- coding=utf-8 -*-
__author__ = 'rocky @ www.30daydo.com'
import urllib2, re, os, codecs,sys,datetime
from bs4 import BeautifulSoup
# example https://zhhrb.sinaapp.com/index.php?date=20160610
from mail_template import MailAtt
reload(sys)
sys.setdefaultencoding('utf-8')
def save2file(filename, content):
filename = filename + ".txt"
f = codecs.open(filename, 'a', encoding='utf-8')
f.write(content)
f.close()
def getPost(date_time, filter_p):
url = 'https://zhhrb.sinaapp.com/index.php?date=' + date_time
user_agent = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
header = {"User-Agent": user_agent}
req = urllib2.Request(url, headers=header)
resp = urllib2.urlopen(req)
content = resp.read()
p = re.compile('<h2 class="question-title">(.*)</h2></br></a>')
result = re.findall(p, content)
count = -1
row = -1
for i in result:
#print i
return_content = re.findall(filter_p, i)
if return_content:
row = count
break
#print return_content[0]
count = count + 1
#print row
if row == -1:
return 0
link_p = re.compile('<a href="(.*)" target="_blank" rel="nofollow">')
link_result = re.findall(link_p, content)[row + 1]
print link_result
result_req = urllib2.Request(link_result, headers=header)
result_resp = urllib2.urlopen(result_req)
#result_content= result_resp.read()
#print result_content
bs = BeautifulSoup(result_resp, "html.parser")
title = bs.title.string.strip()
#print title
filename = re.sub('[\/:*?"<>|]', '-', title)
print filename
print date_time
save2file(filename, title)
save2file(filename, "\n\n\n\n--------------------%s Detail----------------------\n\n" %date_time)
detail_content = bs.find_all('div', class_='content')
for i in detail_content:
#print i
save2file(filename,"\n\n-------------------------answer -------------------------\n\n")
for j in i.strings:
save2file(filename, j)
smtp_server = 'smtp.126.com'
from_mail = sys.argv[1]
password = sys.argv[2]
to_mail = 'xxxxx@kindle.cn'
send_kindle = MailAtt(smtp_server, from_mail, password, to_mail)
send_kindle.send_txt(filename)
def main():
sub_folder = os.path.join(os.getcwd(), "content")
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
date_time = '20160611'
filter_p = re.compile('大误.*')
ori_day=datetime.date(datetime.date.today().year,01,01)
t=datetime.date(datetime.date.today().year,datetime.date.today().month,datetime.date.today().day)
delta=(t-ori_day).days
print delta
for i in range(delta):
day=datetime.date(datetime.date.today().year,01,01)+datetime.timedelta(i)
getPost(day.strftime("%Y%m%d"),filter_p)
#getPost(date_time, filter_p)
if __name__ == "__main__":
main()
github: https://github.com/Rockyzsu/zhihu_daily__kindle
上面的代码可以稍作修改,就可以抓取瞎扯或者深夜食堂的系列文章。
附福利:
http://pan.baidu.com/s/1kVewz59
所有的知乎日报的大误文章。(截止2016/6/12日) 查看全部
所以写了下面的python脚本,一劳永逸。 脚本抓取大误从开始到现在的所有文章,并推送到你自己的kindle账号。
# -*- coding=utf-8 -*-
__author__ = 'rocky @ www.30daydo.com'
import urllib2, re, os, codecs,sys,datetime
from bs4 import BeautifulSoup
# example https://zhhrb.sinaapp.com/index.php?date=20160610
from mail_template import MailAtt
reload(sys)
sys.setdefaultencoding('utf-8')
def save2file(filename, content):
filename = filename + ".txt"
f = codecs.open(filename, 'a', encoding='utf-8')
f.write(content)
f.close()
def getPost(date_time, filter_p):
url = 'https://zhhrb.sinaapp.com/index.php?date=' + date_time
user_agent = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
header = {"User-Agent": user_agent}
req = urllib2.Request(url, headers=header)
resp = urllib2.urlopen(req)
content = resp.read()
p = re.compile('<h2 class="question-title">(.*)</h2></br></a>')
result = re.findall(p, content)
count = -1
row = -1
for i in result:
#print i
return_content = re.findall(filter_p, i)
if return_content:
row = count
break
#print return_content[0]
count = count + 1
#print row
if row == -1:
return 0
link_p = re.compile('<a href="(.*)" target="_blank" rel="nofollow">')
link_result = re.findall(link_p, content)[row + 1]
print link_result
result_req = urllib2.Request(link_result, headers=header)
result_resp = urllib2.urlopen(result_req)
#result_content= result_resp.read()
#print result_content
bs = BeautifulSoup(result_resp, "html.parser")
title = bs.title.string.strip()
#print title
filename = re.sub('[\/:*?"<>|]', '-', title)
print filename
print date_time
save2file(filename, title)
save2file(filename, "\n\n\n\n--------------------%s Detail----------------------\n\n" %date_time)
detail_content = bs.find_all('div', class_='content')
for i in detail_content:
#print i
save2file(filename,"\n\n-------------------------answer -------------------------\n\n")
for j in i.strings:
save2file(filename, j)
smtp_server = 'smtp.126.com'
from_mail = sys.argv[1]
password = sys.argv[2]
to_mail = 'xxxxx@kindle.cn'
send_kindle = MailAtt(smtp_server, from_mail, password, to_mail)
send_kindle.send_txt(filename)
def main():
sub_folder = os.path.join(os.getcwd(), "content")
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
date_time = '20160611'
filter_p = re.compile('大误.*')
ori_day=datetime.date(datetime.date.today().year,01,01)
t=datetime.date(datetime.date.today().year,datetime.date.today().month,datetime.date.today().day)
delta=(t-ori_day).days
print delta
for i in range(delta):
day=datetime.date(datetime.date.today().year,01,01)+datetime.timedelta(i)
getPost(day.strftime("%Y%m%d"),filter_p)
#getPost(date_time, filter_p)
if __name__ == "__main__":
main()
github: https://github.com/Rockyzsu/zhihu_daily__kindle
上面的代码可以稍作修改,就可以抓取瞎扯或者深夜食堂的系列文章。
附福利:
http://pan.baidu.com/s/1kVewz59
所有的知乎日报的大误文章。(截止2016/6/12日)
python雪球爬虫 抓取雪球 大V的所有文章 推送到kindle
python爬虫 • 李魔佛 发表了文章 • 3 个评论 • 21158 次浏览 • 2016-05-29 00:06
因为雪球上喷子很多,不少大V都不堪忍受,被喷的删帖离开。 比如 易碎品,小小辛巴。
所以利用python可以有效便捷的抓取想要的大V发言内容,并保存到本地。也方便自己检索,考证(有些伪大V喜欢频繁删帖,比如今天预测明天大盘大涨,明天暴跌后就把昨天的预测给删掉,给后来者造成的错觉改大V每次都能精准预测)。
下面以 抓取狂龙的帖子为例(狂龙最近老是掀人家庄家的老底,哈)
https://xueqiu.com/4742988362
2017年2月20日更新:
爬取雪球上我的收藏的文章,并生成电子书。
(PS:收藏夹中一些文章已经被作者删掉了 - -|, 这速度也蛮快了呀。估计是以前写的现在怕被放出来打脸)
# -*-coding=utf-8-*-
#抓取雪球的收藏文章
__author__ = 'Rocky'
import requests,cookielib,re,json,time
from toolkit import Toolkit
from lxml import etree
url='https://xueqiu.com/snowman/login'
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies")
try:
session.cookies.load(ignore_discard=True)
except:
print "Cookie can't load"
agent = 'Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0'
headers = {'Host': 'xueqiu.com',
'Referer': 'https://xueqiu.com/',
'Origin':'https://xueqiu.com',
'User-Agent': agent}
account=Toolkit.getUserData('data.cfg')
print account['snowball_user']
print account['snowball_password']
data={'username':account['snowball_user'],'password':account['snowball_password']}
s=session.post(url,data=data,headers=headers)
print s.status_code
#print s.text
session.cookies.save()
fav_temp='https://xueqiu.com/favs?page=1'
collection=session.get(fav_temp,headers=headers)
fav_content= collection.text
p=re.compile('"maxPage":(\d+)')
maxPage=p.findall(fav_content)[0]
print maxPage
print type(maxPage)
maxPage=int(maxPage)
print type(maxPage)
for i in range(1,maxPage+1):
fav='https://xueqiu.com/favs?page=%d' %i
collection=session.get(fav,headers=headers)
fav_content= collection.text
#print fav_content
p=re.compile('var favs = {(.*?)};',re.S|re.M)
result=p.findall(fav_content)[0].strip()
new_result='{'+result+'}'
#print type(new_result)
#print new_result
data=json.loads(new_result)
use_data= data['list']
host='https://xueqiu.com'
for i in use_data:
url=host+ i['target']
print url
txt_content=session.get(url,headers=headers).text
#print txt_content.text
tree=etree.HTML(txt_content)
title=tree.xpath('//title/text()')[0]
filename = re.sub('[\/:*?"<>|]', '-', title)
print filename
content=tree.xpath('//div[@class="detail"]')
for i in content:
Toolkit.save2filecn(filename, i.xpath('string(.)'))
#print content
#Toolkit.save2file(filename,)
time.sleep(10)
用法:
1. snowball.py -- 抓取雪球上我的收藏的文章
使用: 创建一个data.cfg的文件,里面格式如下:
snowball_user=xxxxx@xx.com
snowball_password=密码
然后运行python snowball.py ,会自动登录雪球,然后 在当前目录生产txt文件。
github代码:https://github.com/Rockyzsu/xueqiu 查看全部
因为雪球上喷子很多,不少大V都不堪忍受,被喷的删帖离开。 比如 易碎品,小小辛巴。
所以利用python可以有效便捷的抓取想要的大V发言内容,并保存到本地。也方便自己检索,考证(有些伪大V喜欢频繁删帖,比如今天预测明天大盘大涨,明天暴跌后就把昨天的预测给删掉,给后来者造成的错觉改大V每次都能精准预测)。
下面以 抓取狂龙的帖子为例(狂龙最近老是掀人家庄家的老底,哈)
https://xueqiu.com/4742988362
2017年2月20日更新:
爬取雪球上我的收藏的文章,并生成电子书。
(PS:收藏夹中一些文章已经被作者删掉了 - -|, 这速度也蛮快了呀。估计是以前写的现在怕被放出来打脸)
# -*-coding=utf-8-*-
#抓取雪球的收藏文章
__author__ = 'Rocky'
import requests,cookielib,re,json,time
from toolkit import Toolkit
from lxml import etree
url='https://xueqiu.com/snowman/login'
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies")
try:
session.cookies.load(ignore_discard=True)
except:
print "Cookie can't load"
agent = 'Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0'
headers = {'Host': 'xueqiu.com',
'Referer': 'https://xueqiu.com/',
'Origin':'https://xueqiu.com',
'User-Agent': agent}
account=Toolkit.getUserData('data.cfg')
print account['snowball_user']
print account['snowball_password']
data={'username':account['snowball_user'],'password':account['snowball_password']}
s=session.post(url,data=data,headers=headers)
print s.status_code
#print s.text
session.cookies.save()
fav_temp='https://xueqiu.com/favs?page=1'
collection=session.get(fav_temp,headers=headers)
fav_content= collection.text
p=re.compile('"maxPage":(\d+)')
maxPage=p.findall(fav_content)[0]
print maxPage
print type(maxPage)
maxPage=int(maxPage)
print type(maxPage)
for i in range(1,maxPage+1):
fav='https://xueqiu.com/favs?page=%d' %i
collection=session.get(fav,headers=headers)
fav_content= collection.text
#print fav_content
p=re.compile('var favs = {(.*?)};',re.S|re.M)
result=p.findall(fav_content)[0].strip()
new_result='{'+result+'}'
#print type(new_result)
#print new_result
data=json.loads(new_result)
use_data= data['list']
host='https://xueqiu.com'
for i in use_data:
url=host+ i['target']
print url
txt_content=session.get(url,headers=headers).text
#print txt_content.text
tree=etree.HTML(txt_content)
title=tree.xpath('//title/text()')[0]
filename = re.sub('[\/:*?"<>|]', '-', title)
print filename
content=tree.xpath('//div[@class="detail"]')
for i in content:
Toolkit.save2filecn(filename, i.xpath('string(.)'))
#print content
#Toolkit.save2file(filename,)
time.sleep(10)
用法:
1. snowball.py -- 抓取雪球上我的收藏的文章
使用: 创建一个data.cfg的文件,里面格式如下:
snowball_user=xxxxx@xx.com
snowball_password=密码
然后运行python snowball.py ,会自动登录雪球,然后 在当前目录生产txt文件。
github代码:https://github.com/Rockyzsu/xueqiu
python爬虫 模拟登陆知乎 推送知乎文章到kindle电子书 获取自己的关注问题
python爬虫 • 低调的哥哥 发表了文章 • 0 个评论 • 38603 次浏览 • 2016-05-12 17:53
#2016-08-19更新:
添加了模拟登陆知乎的模块,自动获取自己的关注的问题id,然后把这些问题的所有答案抓取下来推送到kindle
# -*-coding=utf-8-*-
__author__ = 'Rocky'
# -*-coding=utf-8-*-
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
from email import Encoders, Utils
import urllib2
import time
import re
import sys
import os
from bs4 import BeautifulSoup
from email.Header import Header
reload(sys)
sys.setdefaultencoding('utf-8')
class GetContent():
def __init__(self, id):
# 给出的第一个参数 就是你要下载的问题的id
# 比如 想要下载的问题链接是 https://www.zhihu.com/question/29372574
# 那么 就输入 python zhihu.py 29372574
id_link = "/question/" + id
self.getAnswer(id_link)
def save2file(self, filename, content):
# 保存为电子书文件
filename = filename + ".txt"
f = open(filename, 'a')
f.write(content)
f.close()
def getAnswer(self, answerID):
host = "http://www.zhihu.com"
url = host + answerID
print url
user_agent = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
# 构造header 伪装一下
header = {"User-Agent": user_agent}
req = urllib2.Request(url, headers=header)
try:
resp = urllib2.urlopen(req)
except:
print "Time out. Retry"
time.sleep(30)
# try to switch with proxy ip
resp = urllib2.urlopen(req)
# 这里已经获取了 网页的代码,接下来就是提取你想要的内容。 使用beautifulSoup 来处理,很方便
try:
bs = BeautifulSoup(resp)
except:
print "Beautifulsoup error"
return None
title = bs.title
# 获取的标题
filename_old = title.string.strip()
print filename_old
filename = re.sub('[\/:*?"<>|]', '-', filename_old)
# 用来保存内容的文件名,因为文件名不能有一些特殊符号,所以使用正则表达式过滤掉
self.save2file(filename, title.string)
detail = bs.find("div", class_="zm-editable-content")
self.save2file(filename, "\n\n\n\n--------------------Detail----------------------\n\n")
# 获取问题的补充内容
if detail is not None:
for i in detail.strings:
self.save2file(filename, unicode(i))
answer = bs.find_all("div", class_="zm-editable-content clearfix")
k = 0
index = 0
for each_answer in answer:
self.save2file(filename, "\n\n-------------------------answer %s via -------------------------\n\n" % k)
for a in each_answer.strings:
# 循环获取每一个答案的内容,然后保存到文件中
self.save2file(filename, unicode(a))
k += 1
index = index + 1
smtp_server = 'smtp.126.com'
from_mail = 'your@126.com'
password = 'yourpassword'
to_mail = 'yourname@kindle.cn'
# send_kindle=MailAtt(smtp_server,from_mail,password,to_mail)
# send_kindle.send_txt(filename)
# 调用发送邮件函数,把电子书发送到你的kindle用户的邮箱账号,这样你的kindle就可以收到电子书啦
print filename
class MailAtt():
def __init__(self, smtp_server, from_mail, password, to_mail):
self.server = smtp_server
self.username = from_mail.split("@")[0]
self.from_mail = from_mail
self.password = password
self.to_mail = to_mail
# 初始化邮箱设置
def send_txt(self, filename):
# 这里发送附件尤其要注意字符编码,当时调试了挺久的,因为收到的文件总是乱码
self.smtp = smtplib.SMTP()
self.smtp.connect(self.server)
self.smtp.login(self.username, self.password)
self.msg = MIMEMultipart()
self.msg['to'] = self.to_mail
self.msg['from'] = self.from_mail
self.msg['Subject'] = "Convert"
self.filename = filename + ".txt"
self.msg['Date'] = Utils.formatdate(localtime=1)
content = open(self.filename.decode('utf-8'), 'rb').read()
# print content
self.att = MIMEText(content, 'base64', 'utf-8')
self.att['Content-Type'] = 'application/octet-stream'
# self.att["Content-Disposition"] = "attachment;filename=\"%s\"" %(self.filename.encode('gb2312'))
self.att["Content-Disposition"] = "attachment;filename=\"%s\"" % Header(self.filename, 'gb2312')
# print self.att["Content-Disposition"]
self.msg.attach(self.att)
self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string())
self.smtp.quit()
if __name__ == "__main__":
sub_folder = os.path.join(os.getcwd(), "content")
# 专门用于存放下载的电子书的目录
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
id = sys.argv[1]
# 给出的第一个参数 就是你要下载的问题的id
# 比如 想要下载的问题链接是 https://www.zhihu.com/question/29372574
# 那么 就输入 python zhihu.py 29372574
# id_link="/question/"+id
obj = GetContent(id)
# obj.getAnswer(id_link)
# 调用获取函数
print "Done"
#######################################
2016.8.19 更新
添加了新功能,模拟知乎登陆,自动获取自己关注的答案,制作成电子书并且发送到kindle
# -*-coding=utf-8-*-
__author__ = 'Rocky'
import requests
import cookielib
import re
import json
import time
import os
from getContent import GetContent
agent='Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0'
headers={'Host':'www.zhihu.com',
'Referer':'https://www.zhihu.com',
'User-Agent':agent}
#全局变量
session=requests.session()
session.cookies=cookielib.LWPCookieJar(filename="cookies")
try:
session.cookies.load(ignore_discard=True)
except:
print "Cookie can't load"
def isLogin():
url='https://www.zhihu.com/settings/profile'
login_code=session.get(url,headers=headers,allow_redirects=False).status_code
print login_code
if login_code == 200:
return True
else:
return False
def get_xsrf():
url='http://www.zhihu.com'
r=session.get(url,headers=headers,allow_redirects=False)
txt=r.text
result=re.findall(r'<input type=\"hidden\" name=\"_xsrf\" value=\"(\w+)\"/>',txt)[0]
return result
def getCaptcha():
#r=1471341285051
r=(time.time()*1000)
url='http://www.zhihu.com/captcha.gif?r='+str(r)+'&type=login'
image=session.get(url,headers=headers)
f=open("photo.jpg",'wb')
f.write(image.content)
f.close()
def Login():
xsrf=get_xsrf()
print xsrf
print len(xsrf)
login_url='http://www.zhihu.com/login/email'
data={
'_xsrf':xsrf,
'password':'*',
'remember_me':'true',
'email':'*'
}
try:
content=session.post(login_url,data=data,headers=headers)
login_code=content.text
print content.status_code
#this line important ! if no status, if will fail and execute the except part
#print content.status
if content.status_code != requests.codes.ok:
print "Need to verification code !"
getCaptcha()
#print "Please input the code of the captcha"
code=raw_input("Please input the code of the captcha")
data['captcha']=code
content=session.post(login_url,data=data,headers=headers)
print content.status_code
if content.status_code==requests.codes.ok:
print "Login successful"
session.cookies.save()
#print login_code
else:
session.cookies.save()
except:
print "Error in login"
return False
def focus_question():
focus_id=
url='https://www.zhihu.com/question/following'
content=session.get(url,headers=headers)
print content
p=re.compile(r'<a class="question_link" href="/question/(\d+)" target="_blank" data-id')
id_list=p.findall(content.text)
pattern=re.compile(r'<input type=\"hidden\" name=\"_xsrf\" value=\"(\w+)\"/>')
result=re.findall(pattern,content.text)[0]
print result
for i in id_list:
print i
focus_id.append(i)
url_next='https://www.zhihu.com/node/ProfileFollowedQuestionsV2'
page=20
offset=20
end_page=500
xsrf=re.findall(r'<input type=\"hidden\" name=\"_xsrf\" value=\"(\w+)\"',content.text)[0]
while offset < end_page:
#para='{"offset":20}'
#print para
print "page: %d" %offset
params={"offset":offset}
params_json=json.dumps(params)
data={
'method':'next',
'params':params_json,
'_xsrf':xsrf
}
#注意上面那里 post的data需要一个xsrf的字段,不然会返回403 的错误,这个在抓包的过程中一直都没有看到提交到xsrf,所以自己摸索出来的
offset=offset+page
headers_l={
'Host':'www.zhihu.com',
'Referer':'https://www.zhihu.com/question/following',
'User-Agent':agent,
'Origin':'https://www.zhihu.com',
'X-Requested-With':'XMLHttpRequest'
}
try:
s=session.post(url_next,data=data,headers=headers_l)
#print s.status_code
#print s.text
msgs=json.loads(s.text)
msg=msgs['msg']
for i in msg:
id_sub=re.findall(p,i)
for j in id_sub:
print j
id_list.append(j)
except:
print "Getting Error "
return id_list
def main():
if isLogin():
print "Has login"
else:
print "Need to login"
Login()
list_id=focus_question()
for i in list_id:
print i
obj=GetContent(i)
#getCaptcha()
if __name__=='__main__':
sub_folder=os.path.join(os.getcwd(),"content")
#专门用于存放下载的电子书的目录
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
main()
完整代码请猛击这里:
github: https://github.com/Rockyzsu/zhihuToKindle
查看全部
#2016-08-19更新:
添加了模拟登陆知乎的模块,自动获取自己的关注的问题id,然后把这些问题的所有答案抓取下来推送到kindle
# -*-coding=utf-8-*-
__author__ = 'Rocky'
# -*-coding=utf-8-*-
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
from email import Encoders, Utils
import urllib2
import time
import re
import sys
import os
from bs4 import BeautifulSoup
from email.Header import Header
reload(sys)
sys.setdefaultencoding('utf-8')
class GetContent():
def __init__(self, id):
# 给出的第一个参数 就是你要下载的问题的id
# 比如 想要下载的问题链接是 https://www.zhihu.com/question/29372574
# 那么 就输入 python zhihu.py 29372574
id_link = "/question/" + id
self.getAnswer(id_link)
def save2file(self, filename, content):
# 保存为电子书文件
filename = filename + ".txt"
f = open(filename, 'a')
f.write(content)
f.close()
def getAnswer(self, answerID):
host = "http://www.zhihu.com"
url = host + answerID
print url
user_agent = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
# 构造header 伪装一下
header = {"User-Agent": user_agent}
req = urllib2.Request(url, headers=header)
try:
resp = urllib2.urlopen(req)
except:
print "Time out. Retry"
time.sleep(30)
# try to switch with proxy ip
resp = urllib2.urlopen(req)
# 这里已经获取了 网页的代码,接下来就是提取你想要的内容。 使用beautifulSoup 来处理,很方便
try:
bs = BeautifulSoup(resp)
except:
print "Beautifulsoup error"
return None
title = bs.title
# 获取的标题
filename_old = title.string.strip()
print filename_old
filename = re.sub('[\/:*?"<>|]', '-', filename_old)
# 用来保存内容的文件名,因为文件名不能有一些特殊符号,所以使用正则表达式过滤掉
self.save2file(filename, title.string)
detail = bs.find("div", class_="zm-editable-content")
self.save2file(filename, "\n\n\n\n--------------------Detail----------------------\n\n")
# 获取问题的补充内容
if detail is not None:
for i in detail.strings:
self.save2file(filename, unicode(i))
answer = bs.find_all("div", class_="zm-editable-content clearfix")
k = 0
index = 0
for each_answer in answer:
self.save2file(filename, "\n\n-------------------------answer %s via -------------------------\n\n" % k)
for a in each_answer.strings:
# 循环获取每一个答案的内容,然后保存到文件中
self.save2file(filename, unicode(a))
k += 1
index = index + 1
smtp_server = 'smtp.126.com'
from_mail = 'your@126.com'
password = 'yourpassword'
to_mail = 'yourname@kindle.cn'
# send_kindle=MailAtt(smtp_server,from_mail,password,to_mail)
# send_kindle.send_txt(filename)
# 调用发送邮件函数,把电子书发送到你的kindle用户的邮箱账号,这样你的kindle就可以收到电子书啦
print filename
class MailAtt():
def __init__(self, smtp_server, from_mail, password, to_mail):
self.server = smtp_server
self.username = from_mail.split("@")[0]
self.from_mail = from_mail
self.password = password
self.to_mail = to_mail
# 初始化邮箱设置
def send_txt(self, filename):
# 这里发送附件尤其要注意字符编码,当时调试了挺久的,因为收到的文件总是乱码
self.smtp = smtplib.SMTP()
self.smtp.connect(self.server)
self.smtp.login(self.username, self.password)
self.msg = MIMEMultipart()
self.msg['to'] = self.to_mail
self.msg['from'] = self.from_mail
self.msg['Subject'] = "Convert"
self.filename = filename + ".txt"
self.msg['Date'] = Utils.formatdate(localtime=1)
content = open(self.filename.decode('utf-8'), 'rb').read()
# print content
self.att = MIMEText(content, 'base64', 'utf-8')
self.att['Content-Type'] = 'application/octet-stream'
# self.att["Content-Disposition"] = "attachment;filename=\"%s\"" %(self.filename.encode('gb2312'))
self.att["Content-Disposition"] = "attachment;filename=\"%s\"" % Header(self.filename, 'gb2312')
# print self.att["Content-Disposition"]
self.msg.attach(self.att)
self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string())
self.smtp.quit()
if __name__ == "__main__":
sub_folder = os.path.join(os.getcwd(), "content")
# 专门用于存放下载的电子书的目录
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
id = sys.argv[1]
# 给出的第一个参数 就是你要下载的问题的id
# 比如 想要下载的问题链接是 https://www.zhihu.com/question/29372574
# 那么 就输入 python zhihu.py 29372574
# id_link="/question/"+id
obj = GetContent(id)
# obj.getAnswer(id_link)
# 调用获取函数
print "Done"
#######################################
2016.8.19 更新
添加了新功能,模拟知乎登陆,自动获取自己关注的答案,制作成电子书并且发送到kindle
# -*-coding=utf-8-*-
__author__ = 'Rocky'
import requests
import cookielib
import re
import json
import time
import os
from getContent import GetContent
agent='Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0'
headers={'Host':'www.zhihu.com',
'Referer':'https://www.zhihu.com',
'User-Agent':agent}
#全局变量
session=requests.session()
session.cookies=cookielib.LWPCookieJar(filename="cookies")
try:
session.cookies.load(ignore_discard=True)
except:
print "Cookie can't load"
def isLogin():
url='https://www.zhihu.com/settings/profile'
login_code=session.get(url,headers=headers,allow_redirects=False).status_code
print login_code
if login_code == 200:
return True
else:
return False
def get_xsrf():
url='http://www.zhihu.com'
r=session.get(url,headers=headers,allow_redirects=False)
txt=r.text
result=re.findall(r'<input type=\"hidden\" name=\"_xsrf\" value=\"(\w+)\"/>',txt)[0]
return result
def getCaptcha():
#r=1471341285051
r=(time.time()*1000)
url='http://www.zhihu.com/captcha.gif?r='+str(r)+'&type=login'
image=session.get(url,headers=headers)
f=open("photo.jpg",'wb')
f.write(image.content)
f.close()
def Login():
xsrf=get_xsrf()
print xsrf
print len(xsrf)
login_url='http://www.zhihu.com/login/email'
data={
'_xsrf':xsrf,
'password':'*',
'remember_me':'true',
'email':'*'
}
try:
content=session.post(login_url,data=data,headers=headers)
login_code=content.text
print content.status_code
#this line important ! if no status, if will fail and execute the except part
#print content.status
if content.status_code != requests.codes.ok:
print "Need to verification code !"
getCaptcha()
#print "Please input the code of the captcha"
code=raw_input("Please input the code of the captcha")
data['captcha']=code
content=session.post(login_url,data=data,headers=headers)
print content.status_code
if content.status_code==requests.codes.ok:
print "Login successful"
session.cookies.save()
#print login_code
else:
session.cookies.save()
except:
print "Error in login"
return False
def focus_question():
focus_id=
url='https://www.zhihu.com/question/following'
content=session.get(url,headers=headers)
print content
p=re.compile(r'<a class="question_link" href="/question/(\d+)" target="_blank" data-id')
id_list=p.findall(content.text)
pattern=re.compile(r'<input type=\"hidden\" name=\"_xsrf\" value=\"(\w+)\"/>')
result=re.findall(pattern,content.text)[0]
print result
for i in id_list:
print i
focus_id.append(i)
url_next='https://www.zhihu.com/node/ProfileFollowedQuestionsV2'
page=20
offset=20
end_page=500
xsrf=re.findall(r'<input type=\"hidden\" name=\"_xsrf\" value=\"(\w+)\"',content.text)[0]
while offset < end_page:
#para='{"offset":20}'
#print para
print "page: %d" %offset
params={"offset":offset}
params_json=json.dumps(params)
data={
'method':'next',
'params':params_json,
'_xsrf':xsrf
}
#注意上面那里 post的data需要一个xsrf的字段,不然会返回403 的错误,这个在抓包的过程中一直都没有看到提交到xsrf,所以自己摸索出来的
offset=offset+page
headers_l={
'Host':'www.zhihu.com',
'Referer':'https://www.zhihu.com/question/following',
'User-Agent':agent,
'Origin':'https://www.zhihu.com',
'X-Requested-With':'XMLHttpRequest'
}
try:
s=session.post(url_next,data=data,headers=headers_l)
#print s.status_code
#print s.text
msgs=json.loads(s.text)
msg=msgs['msg']
for i in msg:
id_sub=re.findall(p,i)
for j in id_sub:
print j
id_list.append(j)
except:
print "Getting Error "
return id_list
def main():
if isLogin():
print "Has login"
else:
print "Need to login"
Login()
list_id=focus_question()
for i in list_id:
print i
obj=GetContent(i)
#getCaptcha()
if __name__=='__main__':
sub_folder=os.path.join(os.getcwd(),"content")
#专门用于存放下载的电子书的目录
if not os.path.exists(sub_folder):
os.mkdir(sub_folder)
os.chdir(sub_folder)
main()
完整代码请猛击这里:
github: https://github.com/Rockyzsu/zhihuToKindle
python爬取网页中的超链接地址,获取到的跟浏览器中显示的不一样
回复python爬虫 • acker 回复了问题 • 2 人关注 • 2 个回复 • 4303 次浏览 • 2020-09-25 16:55
请问各位用scrapy和redis方法爬取不到数据的问题(可悬赏),求大佬看下,感激不尽
回复python爬虫 • 李魔佛 回复了问题 • 2 人关注 • 1 个回复 • 8350 次浏览 • 2020-04-16 22:16
scrapy 爬虫执行之前 如何运行自定义的函数来初始化一些数据?
回复python • 低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 9704 次浏览 • 2016-06-20 18:25
知识星球获取文章链接与数据
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 2017 次浏览 • 2022-03-21 20:15
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。
韦世东 python3网络爬虫宝典 勘误
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 2526 次浏览 • 2021-05-21 20:06
1. 时间差是正数才是过期
2. 获取权限那里,permission = args[0].permission
不能后面再用get
P222:
写入mongodb后,原来的数据会被加入一个_id,值为OjectId,该值是无法被json dumps为string,
所以需要手工把ObjectId 转为str,或者del message['_id'] 将这个键去除。
查看全部
1. 时间差是正数才是过期
2. 获取权限那里,permission = args[0].permission
不能后面再用get
P222:
写入mongodb后,原来的数据会被加入一个_id,值为OjectId,该值是无法被json dumps为string,
所以需要手工把ObjectId 转为str,或者del message['_id'] 将这个键去除。
茅台抢购程序 京东 苏宁
python • 李魔佛 发表了文章 • 0 个评论 • 11640 次浏览 • 2021-01-05 22:34
运行环境 windows,linux,mac,python3+
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶
苏宁家的:
============= 2021-01-11 更新 ============
感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了
main.pyimport sys
from maotai.jd_spider_requests import ProdectPurchase
if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)
product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)
jd_spider_requests.pyimport random
import time
import requests
import functools
import json
import os
import pickle
from lxml import etree
from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)
class SpiderSession:
"""
Session相关操作
"""
def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')
self.session = self._init_session()
def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session
def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}
def get_user_agent(self):
return self.user_agent
def get_session(self):
"""
获取当前Session
:return:
"""
return self.session
def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies
def set_cookies(self, cookies):
self.session.cookies.update(cookies)
def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)
def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)
class QrLogin:
"""
扫码登录
"""
def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'
self.spider_session = spider_session
self.session = self.spider_session.get_session()
self.is_login = False
self.refresh_login_status()
def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()
def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False
def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page
def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.info('获取二维码失败')
return False
save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True
def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False
resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']
def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}
resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False
resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies
# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')
# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')
# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')
self.refresh_login_status()
logger.info('二维码登录成功')
class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session
self.qrlogin = QrLogin(self.spider_session)
# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()
self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return
self.qrlogin.login_by_qrcode()
if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")
def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""
@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)
return new_func
@check_login
def reserve(self):
"""
预约
"""
self._reserve()
@check_login
def seckill(self):
"""
抢购
"""
self._seckill()
@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)
def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()
def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()
def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')
def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}
resp = self.session.get(url=url, params=payload, headers=headers)
try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')
def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]
def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()
def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞
self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)
def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)
def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return resp_json
def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}
return data
def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False
logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129
欢迎关注公众号:
可转债量化分析
查看全部
运行环境 windows,linux,mac,python3+
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶
苏宁家的:
============= 2021-01-11 更新 ============
感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了
main.py
import sys
from maotai.jd_spider_requests import ProdectPurchase
if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)
product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)
jd_spider_requests.py
import random
import time
import requests
import functools
import json
import os
import pickle
from lxml import etree
from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)
class SpiderSession:
"""
Session相关操作
"""
def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')
self.session = self._init_session()
def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session
def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}
def get_user_agent(self):
return self.user_agent
def get_session(self):
"""
获取当前Session
:return:
"""
return self.session
def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies
def set_cookies(self, cookies):
self.session.cookies.update(cookies)
def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)
def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)
class QrLogin:
"""
扫码登录
"""
def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'
self.spider_session = spider_session
self.session = self.spider_session.get_session()
self.is_login = False
self.refresh_login_status()
def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()
def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False
def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page
def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.info('获取二维码失败')
return False
save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True
def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False
resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']
def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}
resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False
resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies
# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')
# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')
# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')
self.refresh_login_status()
logger.info('二维码登录成功')
class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session
self.qrlogin = QrLogin(self.spider_session)
# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()
self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return
self.qrlogin.login_by_qrcode()
if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")
def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""
@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)
return new_func
@check_login
def reserve(self):
"""
预约
"""
self._reserve()
@check_login
def seckill(self):
"""
抢购
"""
self._seckill()
@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)
def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()
def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()
def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')
def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}
resp = self.session.get(url=url, params=payload, headers=headers)
try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')
def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]
def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()
def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞
self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)
def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)
def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return resp_json
def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}
return data
def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False
logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129
欢迎关注公众号:
可转债量化分析
不用一行代码 下载雪球嘉年华视频
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3573 次浏览 • 2020-12-09 14:43
听说今年着重分享一些观念,抱着好奇心,就打算下载几部来看看。
雪球网站很简单,只要找到下载链接就可以下载了。
第一步。打开一个视频播放的页面,比如大金链的
11737544 粉丝主会场 | 巅峰对谈:金牛双子星主动VS量化
https://xueqiu.com/video/5285890810945319765
右键,查看源码,然后在源码里面试着查找 mp4,flv,webp等流媒体字样。
在这里找到一个了:
但是这个视频下载地址有很多转义字符:http:\u002F\u002F1256122120.vod2.myqcloud.com\u002F53ad1740vodtranscq1256122120\u002F17ebe5145285890810945319765\u002Fv.f20.mp4
直接在浏览器是无法直接打开的。
可以直接替换\u002f 为一个斜杠 \ 就可以了。
如果嫌麻烦,可以在浏览器里面,按下F12,在console页面里面输入上面的地址,前后加个双引号,然后回车,就可以得到完整的地址了。
原创文章,转载请注明出处
http://30daydo.com/article/44119
查看全部
听说今年着重分享一些观念,抱着好奇心,就打算下载几部来看看。
雪球网站很简单,只要找到下载链接就可以下载了。
第一步。打开一个视频播放的页面,比如大金链的
11737544 粉丝主会场 | 巅峰对谈:金牛双子星主动VS量化
https://xueqiu.com/video/5285890810945319765
右键,查看源码,然后在源码里面试着查找 mp4,flv,webp等流媒体字样。
在这里找到一个了:
但是这个视频下载地址有很多转义字符:
http:\u002F\u002F1256122120.vod2.myqcloud.com\u002F53ad1740vodtranscq1256122120\u002F17ebe5145285890810945319765\u002Fv.f20.mp4
直接在浏览器是无法直接打开的。
可以直接替换\u002f 为一个斜杠 \ 就可以了。
如果嫌麻烦,可以在浏览器里面,按下F12,在console页面里面输入上面的地址,前后加个双引号,然后回车,就可以得到完整的地址了。
原创文章,转载请注明出处
http://30daydo.com/article/44119
监控公众号取消关注的粉丝
闲聊 • 李魔佛 发表了文章 • 0 个评论 • 2710 次浏览 • 2020-12-08 23:48
上面是获取到全部粉丝的数据,原理是直接利用mitmproxy抓包,监听到指定url数据后,把数据导出到文件就可以了。
如果需要教程和代码,可以私聊。(哦,网站关闭了注册了,那就公众号留言吧) 查看全部
简单快速下载知乎视频
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 5015 次浏览 • 2020-11-29 23:03
1. 打开视频前按F12
2. 播放视频
3. 查看F12的网络选项
4. 找到 https://vdn3.vzuu.com 的url
5. 对应的整个url链接就是视频的真实下载地址。把url复制到浏览器打开,然后右键另存为本地视频就可以了
查看全部
1. 打开视频前按F12
2. 播放视频
3. 查看F12的网络选项
4. 找到 https://vdn3.vzuu.com 的url
5. 对应的整个url链接就是视频的真实下载地址。把url复制到浏览器打开,然后右键另存为本地视频就可以了
集思录用户名密码JS加密流程解密 【JS加密破解教程一】
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4697 次浏览 • 2020-11-27 17:26
而且提交的内容每次都固定,第一种最傻的方式就是每次提交就把加密的用户名和密码提交上去。
当然,对于有钻研的读者,可能想看看其具体的加密方式。
那么就按照流程,通过断点与搜索,找到其加密方法。
首先在上面的截图中很明显就知道,这个字符加密应该是aes,因为它的提交字段中aes:1
按F12,走完整个登录流程。
然后搜索password字样的地方。
在index.html首页中找到一处:
然后在该password的地方打个断点,然后跳转到jslencode 的地方。
跳转到的地方是这里。
然后我们浏览一下这个JS页面,尝试把整个JS代码抠出来。
放到我们的调试软件中,比较常用的是鬼鬼JS调试器。(有需要的可以关注wx公众号下载:回复 鬼鬼JS 即可)
这个调试器的好处是,可以很方便格式化JS代码,然后输入你要调试的字符,然后点击运行,可以当场拿到结果,等到结果ok了的话,就可以用python 的pyexecjs执行。
先点击代码格式化,然后在输入框里找到函数的入口:
jslencode(text, aes_key),好了,现在就把我们的密码 XXXXXX,和aes_key 放进去就可以了。
aes_key 在之前的index.html就能找到。var A397151C04723421F = '397151C04723421F';
function doLogin(){
var data = $('#login_form').serializeObjectToJson();
data['_post_type'] = 'ajax';
data['aes'] = 1;
data['user_name'] = jslencode(data['user_name'], A397151C04723421F);
data['password'] = jslencode(data['password'], A397151C04723421F);
$.post('/account/ajax/login_process/', data, function(rst){
on_login_error_processer(rst);
}, 'json');
var A397151C04723421F = '397151C04723421F';
jslencode(‘jisilupassword’, '397151C04723421F')
右下角有个系统引擎运行。
得到结果:
1d1bd2b22b8cc5c09ad8ce8f1e69b87f
对比一下第一张图里面的的post请求,发现是一样的。那么现在的JS解密就成功了一大半了。 接着我们就写python代码执行这段JS脚本。
尝试直接把JS放到一个文件,然后编写python代码# -*- coding: utf-8 -*-
# @Time : 2020/11/27 12:15
# @File : js_executor.py
# @Author : Rocky C@www.30daydo.com
import execjs
def main():
encode_user = ctx.call('jslencode', user, key)
encode_password = ctx.call('jslencode', password, key)
print(encode_user)
print(encode_password)
if __name__ == '__main__':
main()
然后发现报错: 说CryptoJS没有定义,那么我们看看代码。(function(root, factory) {
if (typeof exports === "object") {
module.exports = exports = factory()
} else {
if (typeof define === "function" && define.amd) {
define(, factory)
} else {
root.CryptoJS = factory()
}
}
}(this, function() {
var CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
发现这一行此时CryptoJS的定义
var CryptoJS = CryptoJS || (function(Math, undefined) { var create = Object.create || (function() {
那么我们把这一行上面的全部删除。最后的调试后,能够执行的JS代码如下,并保存为 集思录.js 文件,调用上面的python文件。
集思录.jsvar CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
function F() {}
return function(obj) {
var subtype;
F.prototype = obj;
subtype = new F();
F.prototype = null;
return subtype
}
}());
var C = {};
var C_lib = C.lib = {};
var Base = C_lib.Base = (function() {
下面的就跟上面的一模一样了:
源文件地址:https://www.jisilu.cn/static/js/crypto-js-3.3.0-min.js然后python运行后得到加密后的aes数据。
对比一下鬼鬼JS调试器的结果,一样的。
OK,手工。
这里汇聚了平时整理的JS破解工作流,大家可以参考参考。
https://github.com/Rockyzsu/JS-Reverse
原创文章,转载请注明出处:
http://30daydo.com/article/44109
喜欢的朋友可以加入星球探讨: 查看全部
而且提交的内容每次都固定,第一种最傻的方式就是每次提交就把加密的用户名和密码提交上去。
当然,对于有钻研的读者,可能想看看其具体的加密方式。
那么就按照流程,通过断点与搜索,找到其加密方法。
首先在上面的截图中很明显就知道,这个字符加密应该是aes,因为它的提交字段中aes:1
按F12,走完整个登录流程。
然后搜索password字样的地方。
在index.html首页中找到一处:
然后在该password的地方打个断点,然后跳转到jslencode 的地方。
跳转到的地方是这里。
然后我们浏览一下这个JS页面,尝试把整个JS代码抠出来。
放到我们的调试软件中,比较常用的是鬼鬼JS调试器。(有需要的可以关注wx公众号下载:回复 鬼鬼JS 即可)
这个调试器的好处是,可以很方便格式化JS代码,然后输入你要调试的字符,然后点击运行,可以当场拿到结果,等到结果ok了的话,就可以用python 的pyexecjs执行。
先点击代码格式化,然后在输入框里找到函数的入口:
jslencode(text, aes_key),好了,现在就把我们的密码 XXXXXX,和aes_key 放进去就可以了。
aes_key 在之前的index.html就能找到。
var A397151C04723421F = '397151C04723421F';
function doLogin(){
var data = $('#login_form').serializeObjectToJson();
data['_post_type'] = 'ajax';
data['aes'] = 1;
data['user_name'] = jslencode(data['user_name'], A397151C04723421F);
data['password'] = jslencode(data['password'], A397151C04723421F);
$.post('/account/ajax/login_process/', data, function(rst){
on_login_error_processer(rst);
}, 'json');
var A397151C04723421F = '397151C04723421F';
jslencode(‘jisilupassword’, '397151C04723421F')
右下角有个系统引擎运行。
得到结果:
1d1bd2b22b8cc5c09ad8ce8f1e69b87f
对比一下第一张图里面的的post请求,发现是一样的。那么现在的JS解密就成功了一大半了。 接着我们就写python代码执行这段JS脚本。
尝试直接把JS放到一个文件,然后编写python代码
# -*- coding: utf-8 -*-
# @Time : 2020/11/27 12:15
# @File : js_executor.py
# @Author : Rocky C@www.30daydo.com
import execjs
def main():
encode_user = ctx.call('jslencode', user, key)
encode_password = ctx.call('jslencode', password, key)
print(encode_user)
print(encode_password)
if __name__ == '__main__':
main()
然后发现报错: 说CryptoJS没有定义,那么我们看看代码。
(function(root, factory) {
if (typeof exports === "object") {
module.exports = exports = factory()
} else {
if (typeof define === "function" && define.amd) {
define(, factory)
} else {
root.CryptoJS = factory()
}
}
}(this, function() {
var CryptoJS = CryptoJS || (function(Math, undefined) {
var create = Object.create || (function() {
发现这一行此时CryptoJS的定义
var CryptoJS = CryptoJS || (function(Math, undefined) { var create = Object.create || (function() {
那么我们把这一行上面的全部删除。最后的调试后,能够执行的JS代码如下,并保存为 集思录.js 文件,调用上面的python文件。
集思录.js
var CryptoJS = CryptoJS || (function(Math, undefined) {下面的就跟上面的一模一样了:
var create = Object.create || (function() {
function F() {}
return function(obj) {
var subtype;
F.prototype = obj;
subtype = new F();
F.prototype = null;
return subtype
}
}());
var C = {};
var C_lib = C.lib = {};
var Base = C_lib.Base = (function() {
源文件地址:
https://www.jisilu.cn/static/js/crypto-js-3.3.0-min.js然后python运行后得到加密后的aes数据。
对比一下鬼鬼JS调试器的结果,一样的。
OK,手工。
这里汇聚了平时整理的JS破解工作流,大家可以参考参考。
https://github.com/Rockyzsu/JS-Reverse
原创文章,转载请注明出处:
http://30daydo.com/article/44109
喜欢的朋友可以加入星球探讨:
免费代理ip与收费的代理ip
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3198 次浏览 • 2020-10-30 18:00
曾经有尝试过使用免费的代理ip来搭建代理池,可是免费的代理ip不仅资源少,而且可用率、高匿性、速度等都极差,每次使用都需要借助第三方软件进行检查是否可用,严重影响效率,根本满足不了任务的需求。
收费的代理ip与免费的代理ip差距非常大,不仅拥有海量的ip资源,可用率、高匿性、速度都是极好。操作简单工作效率既然提高上去了。经过多家的测试,最终选择了性价比最高的万变ip。高质量的优质代理ip才可以真正用来防止爬虫被封锁,如果使用普通代理,爬虫的真实IP还是会暴露。新获取一批新IP 查看全部
曾经有尝试过使用免费的代理ip来搭建代理池,可是免费的代理ip不仅资源少,而且可用率、高匿性、速度等都极差,每次使用都需要借助第三方软件进行检查是否可用,严重影响效率,根本满足不了任务的需求。
收费的代理ip与免费的代理ip差距非常大,不仅拥有海量的ip资源,可用率、高匿性、速度都是极好。操作简单工作效率既然提高上去了。经过多家的测试,最终选择了性价比最高的万变ip。高质量的优质代理ip才可以真正用来防止爬虫被封锁,如果使用普通代理,爬虫的真实IP还是会暴露。新获取一批新IP
Python爬虫学习者需要注意什么?
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3021 次浏览 • 2020-10-28 17:14
最常见的解决方法就是使用大量的ip,就是借着代理ip保证IP被封时有替换IP可用,永远保持着续航能力。这里推荐51代理ip,作为一家提供代理IP的专业服务商,万变ip代理拥有强大的技术团队运营维护,全高匿系统所产生的高匿ip不仅安全稳定、而且速度快, 以及与爬虫用户多年来合作的宝贵经验,是Python爬虫首选代理IP。
Python是一种全栈计算机程序设计语言,全栈,顾名思义,应用范围广。你可能听说过很多编程语言,例如C语言,Java语言等,众所周知,这些语言都非常难学,更别说景桐使用了。而python不一样,比如完成一个Web服务,C语言要写1000行代码,Java要写100行,而python可能只要写20行。对!这就是差距!目前由于python“简单易懂”,已逐步成为网络爬虫主流语言。
在初学python爬虫时,很多程序员会被一些“小问题”阻碍脚步,为避免大家再次犯同样的错误,加快学习进程,在爬取网站信息时一定要使用大量代理IP。好用的代理IP服务商,
高效率的爬虫工作离不开ip代理的支持,这就是ip代理越来越受欢迎的原因!收藏举报投诉 查看全部
最常见的解决方法就是使用大量的ip,就是借着代理ip保证IP被封时有替换IP可用,永远保持着续航能力。这里推荐51代理ip,作为一家提供代理IP的专业服务商,万变ip代理拥有强大的技术团队运营维护,全高匿系统所产生的高匿ip不仅安全稳定、而且速度快, 以及与爬虫用户多年来合作的宝贵经验,是Python爬虫首选代理IP。
Python是一种全栈计算机程序设计语言,全栈,顾名思义,应用范围广。你可能听说过很多编程语言,例如C语言,Java语言等,众所周知,这些语言都非常难学,更别说景桐使用了。而python不一样,比如完成一个Web服务,C语言要写1000行代码,Java要写100行,而python可能只要写20行。对!这就是差距!目前由于python“简单易懂”,已逐步成为网络爬虫主流语言。
在初学python爬虫时,很多程序员会被一些“小问题”阻碍脚步,为避免大家再次犯同样的错误,加快学习进程,在爬取网站信息时一定要使用大量代理IP。好用的代理IP服务商,
高效率的爬虫工作离不开ip代理的支持,这就是ip代理越来越受欢迎的原因!收藏举报投诉
Python爬虫虎牙平台主播的图片代码
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3020 次浏览 • 2020-10-27 17:55
代码如下:
import urllib.request
import re
import os
# 全局变量用来记录图片的编号
gl_z = 0
def down_img(url1):
"""下载图片"""
# 处理图片链接,拼接http:
url = "https:" + re.sub(r"\?", "", url1)
global gl_z
print(url)
# 请求链接
response = urllib.request.urlopen(url)
# 读取内容
data = response.read()
# 切片取出图片名称
file_name = url[url.rfind('/') + 1:]
# 生成列表
a = [x for x in range(10000)]
# 打开文件用以写入
file = open(os.path.join("photo3", "img" + file_name + str(a[gl_z]) + ".jpg"), "wb")
file.write(data)
# 关闭文件
file.close()
# 编号加1
gl_z += 1
if __name__ == '__main__':
# 要抓去信息的网址
home = """http://www.huya.com/g/xingxiu"""
# 模拟请求头
headers = {
"Host": "www.huya.com",
"User-Agent": "agent信息"
}
# 构造好请求对象 将请求提交到服务器 获取的响应就是到首页的html代码
request = urllib.request.Request(url=home, headers=headers)
response = urllib.request.urlopen(request)
# 读取抓到的内容并解码
html_data = response.read().decode()
"""huyaimg.msstatic.com/avatar/1054/db/6590aa9bcf98e12e5d809d371e46cc_180_135.jpg
"""
# 使用正则 从首页中 提取出所有的图片链接
img_list = re.findall(r"//huyaimg\.msstatic\.com.+\.jpg\?", html_data)
print(img_list)
# 取出每张图片进行下载
for img_url in img_list:
print(img_url)
down_img(img_url) 查看全部
代码如下:
import urllib.request
import re
import os
# 全局变量用来记录图片的编号
gl_z = 0
def down_img(url1):
"""下载图片"""
# 处理图片链接,拼接http:
url = "https:" + re.sub(r"\?", "", url1)
global gl_z
print(url)
# 请求链接
response = urllib.request.urlopen(url)
# 读取内容
data = response.read()
# 切片取出图片名称
file_name = url[url.rfind('/') + 1:]
# 生成列表
a = [x for x in range(10000)]
# 打开文件用以写入
file = open(os.path.join("photo3", "img" + file_name + str(a[gl_z]) + ".jpg"), "wb")
file.write(data)
# 关闭文件
file.close()
# 编号加1
gl_z += 1
if __name__ == '__main__':
# 要抓去信息的网址
home = """http://www.huya.com/g/xingxiu"""
# 模拟请求头
headers = {
"Host": "www.huya.com",
"User-Agent": "agent信息"
}
# 构造好请求对象 将请求提交到服务器 获取的响应就是到首页的html代码
request = urllib.request.Request(url=home, headers=headers)
response = urllib.request.urlopen(request)
# 读取抓到的内容并解码
html_data = response.read().decode()
"""huyaimg.msstatic.com/avatar/1054/db/6590aa9bcf98e12e5d809d371e46cc_180_135.jpg
"""
# 使用正则 从首页中 提取出所有的图片链接
img_list = re.findall(r"//huyaimg\.msstatic\.com.+\.jpg\?", html_data)
print(img_list)
# 取出每张图片进行下载
for img_url in img_list:
print(img_url)
down_img(img_url)
python打造3D撞球小游戏
python • wanbainip 发表了文章 • 0 个评论 • 3147 次浏览 • 2020-10-26 15:52
我们需要三组刚体(当您在Blender的对象上打开一个刚体的属性时,Blender将模拟与其它刚体的碰撞):
1.平面
第2行代码创建了一个简单的平面,立方体将放置在该平面上。为了防止它因重力而坠落,我们将其设为“受体”第4行代码。
2. 圆环
第11-12行将第一个圆环的"Enabled"属性设置为false,防止由于重力而坠落。这样它就固定在那牵住整个链条。
3. 立方体
因为z循环第13行嵌套在x循环[第5行]中,我们将得到一个18X10的立方体组成的墙。 查看全部
Python爬虫基本框架
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 2759 次浏览 • 2020-10-25 18:01
1. 爬虫调度器负责统筹其他四个模块协调工作。
2. URL管理器负责管理URL链接,包括已爬取的链接和未爬取的链接。
3. HTML下载器用于从URL管理器中获取未爬取的链接并下载其HTML网页。
4. HTML解析器用于解析HTML下载器下载的HTML网页,获取URL链接交给URL管理器,提取要获取的数据交给数据存储器。
5. 数据存储器用于将HTML解析器解析出来的数据存储到数据库或文件。 查看全部
1. 爬虫调度器负责统筹其他四个模块协调工作。
2. URL管理器负责管理URL链接,包括已爬取的链接和未爬取的链接。
3. HTML下载器用于从URL管理器中获取未爬取的链接并下载其HTML网页。
4. HTML解析器用于解析HTML下载器下载的HTML网页,获取URL链接交给URL管理器,提取要获取的数据交给数据存储器。
5. 数据存储器用于将HTML解析器解析出来的数据存储到数据库或文件。
Python爬虫如何防止ip被封?
python爬虫 • wanbainip 发表了文章 • 0 个评论 • 3018 次浏览 • 2020-10-24 10:22
第一种:降低访问的速度,我们可以使用 time模块中的sleep,使程序每运行一次后就睡眠1s,这样可以很有效的降低ip被封机率,但是效率效果不是很高,一般是用于量小的采集任务。
第二种:使用类似万变ip代理这样的优质换ip软件,这也是爬虫工作者最常用的手段之一,通过代理ip来伪装我们的ip,隐藏本地真实的ip地址,让目标服务器无法识别是相同ip发出的请求,这样就很有效的防止ip被封。突破了ip的限制,采集数据的任务就会顺利,工作效率自然会提高!
查看全部
第一种:降低访问的速度,我们可以使用 time模块中的sleep,使程序每运行一次后就睡眠1s,这样可以很有效的降低ip被封机率,但是效率效果不是很高,一般是用于量小的采集任务。
第二种:使用类似万变ip代理这样的优质换ip软件,这也是爬虫工作者最常用的手段之一,通过代理ip来伪装我们的ip,隐藏本地真实的ip地址,让目标服务器无法识别是相同ip发出的请求,这样就很有效的防止ip被封。突破了ip的限制,采集数据的任务就会顺利,工作效率自然会提高!
python asyncio aiohttp motor异步爬虫例子 定时抓取bilibili首页热度网红
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3752 次浏览 • 2020-09-22 22:35
AsyncIOMotorClient(connect_uri)
motor连接带用户名和密码的方法和pymongo一致。
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2020/9/22 10:07
# @File : bilibili_hot_anchor.py
# 异步爬取首页与列表
import asyncio
import datetime
import aiohttp
import re
import time
from motor.motor_asyncio import AsyncIOMotorClient
from parsel import Selector
from settings import _json_data
SLEEP = 60 * 10
INFO = _json_data['mongo']['arm']
host = INFO['host']
port = INFO['port']
user = INFO['user']
password = INFO['password']
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = AsyncIOMotorClient(connect_uri)
db = client['db_parker']
home_url = 'https://www.bilibili.com/ranking'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
def convertor(number_str):
'''
将小数点的万变为整数
:param number_str:
:return:
'''
number = re.search('(\d+\.+\d+)', number_str)
if number:
number = float(number.group(1))
if re.search('万', number_str):
number = int(number * 10000)
else:
number = 0
return number
async def home_page():
async with aiohttp.ClientSession() as session:
while True:
start = time.time()
async with session.get(url=home_url, headers=headers) as response:
html = await response.text()
resp = Selector(text=html)
items = resp.xpath('//ul[@class="rank-list"]/li')
for item in items:
json_data = {}
number = item.xpath('.//div[@class="num"]/text()').extract_first()
info = item.xpath('.//div[@class="info"][1]')
title = info.xpath('.//a/text()').extract_first()
detail_url = info.xpath('.//a/@href').extract_first()
play_number = info.xpath('.//div[@class="detail"]/span[1]/text()').extract_first()
viewing_number = info.xpath('.//div[@class="detail"]/span[2]/text()').extract_first()
json_data['number'] = int(number)
json_data['title'] = title
json_data['play_number'] = convertor(play_number)
json_data['viewing_number'] = convertor(viewing_number)
json_data['url'] = detail_url
task = asyncio.create_task(detail_list(session, detail_url, json_data))
# await detail_url()
end = time.time()
print(f'time used {end-start}')
await asyncio.sleep(SLEEP) # 暂停10分钟
print(f'sleep for {SLEEP}')
async def detail_list(session, url, json_data):
async with session.get(url, headers=headers) as response:
response = await response.text()
await parse_detail(response, json_data)
async def parse_detail(html, json_data=None):
resp = Selector(text=html)
info = resp.xpath('//div[@id="v_desc"]/div[@class="info open"]/text()').extract_first()
if not info:
info = '这个家伙很懒'
json_data['info'] = info.strip()
current = datetime.datetime.now()
json_data['crawltime'] = current
await db['bilibili'].update_one({'url': json_data['url']}, {'$set': json_data}, True, True)
loop = asyncio.get_event_loop()
loop.run_until_complete(home_page())
爬取的数据图:
原创文章,转载请注明:http://30daydo.com/article/605
需要源码,可私信。
查看全部
AsyncIOMotorClient(connect_uri)
motor连接带用户名和密码的方法和pymongo一致。
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2020/9/22 10:07
# @File : bilibili_hot_anchor.py
# 异步爬取首页与列表
import asyncio
import datetime
import aiohttp
import re
import time
from motor.motor_asyncio import AsyncIOMotorClient
from parsel import Selector
from settings import _json_data
SLEEP = 60 * 10
INFO = _json_data['mongo']['arm']
host = INFO['host']
port = INFO['port']
user = INFO['user']
password = INFO['password']
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = AsyncIOMotorClient(connect_uri)
db = client['db_parker']
home_url = 'https://www.bilibili.com/ranking'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}
def convertor(number_str):
'''
将小数点的万变为整数
:param number_str:
:return:
'''
number = re.search('(\d+\.+\d+)', number_str)
if number:
number = float(number.group(1))
if re.search('万', number_str):
number = int(number * 10000)
else:
number = 0
return number
async def home_page():
async with aiohttp.ClientSession() as session:
while True:
start = time.time()
async with session.get(url=home_url, headers=headers) as response:
html = await response.text()
resp = Selector(text=html)
items = resp.xpath('//ul[@class="rank-list"]/li')
for item in items:
json_data = {}
number = item.xpath('.//div[@class="num"]/text()').extract_first()
info = item.xpath('.//div[@class="info"][1]')
title = info.xpath('.//a/text()').extract_first()
detail_url = info.xpath('.//a/@href').extract_first()
play_number = info.xpath('.//div[@class="detail"]/span[1]/text()').extract_first()
viewing_number = info.xpath('.//div[@class="detail"]/span[2]/text()').extract_first()
json_data['number'] = int(number)
json_data['title'] = title
json_data['play_number'] = convertor(play_number)
json_data['viewing_number'] = convertor(viewing_number)
json_data['url'] = detail_url
task = asyncio.create_task(detail_list(session, detail_url, json_data))
# await detail_url()
end = time.time()
print(f'time used {end-start}')
await asyncio.sleep(SLEEP) # 暂停10分钟
print(f'sleep for {SLEEP}')
async def detail_list(session, url, json_data):
async with session.get(url, headers=headers) as response:
response = await response.text()
await parse_detail(response, json_data)
async def parse_detail(html, json_data=None):
resp = Selector(text=html)
info = resp.xpath('//div[@id="v_desc"]/div[@class="info open"]/text()').extract_first()
if not info:
info = '这个家伙很懒'
json_data['info'] = info.strip()
current = datetime.datetime.now()
json_data['crawltime'] = current
await db['bilibili'].update_one({'url': json_data['url']}, {'$set': json_data}, True, True)
loop = asyncio.get_event_loop()
loop.run_until_complete(home_page())
爬取的数据图:
原创文章,转载请注明:http://30daydo.com/article/605
需要源码,可私信。
scrapy在settings中定义变量不能包含小写!
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3043 次浏览 • 2019-11-16 16:39
比如定义了一个叫 Redis_host = '192.168.1.1',的值
然后在spider中,如果你调用self.settings.get('Redis_host')
那么返回值是 None。
如果用REDIS_HOST定义,那么就可以正确返回它的值。
如果你一定要用小写,也有其他方法可正常调用。
先导入settings文件
fromt xxxx import setttings # xxx为项目名
host = settings.Redis_host # 直接导入一个文件的形式来调用是可以的 查看全部
比如定义了一个叫 Redis_host = '192.168.1.1',的值
然后在spider中,如果你调用self.settings.get('Redis_host')
那么返回值是 None。
如果用REDIS_HOST定义,那么就可以正确返回它的值。
如果你一定要用小写,也有其他方法可正常调用。
先导入settings文件
fromt xxxx import setttings # xxx为项目名
host = settings.Redis_host # 直接导入一个文件的形式来调用是可以的
etree.strip_tags的用法
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4280 次浏览 • 2019-10-24 11:24
它把参数中的标签从源htmlelement中删除,并且把里面的标签文本给合并进来。
举个例子:from lxml.html import etree
from lxml.html import fromstring, HtmlElement
test_html = '''<p><span>hello</span><span>world</span></p>'''
test_element = fromstring(test_html)
etree.strip_tags(test_element,'span') # 清除span标签
etree.tostring(test_element)
因为上述操作直接应用于test_element上的,所以test_element的值已经被修改了。
所以现在test_element 的值是
b'<p>helloworld</p>'
原创文章,转载请注明出处
http://30daydo.com/article/553
查看全部
它把参数中的标签从源htmlelement中删除,并且把里面的标签文本给合并进来。
举个例子:
from lxml.html import etree
from lxml.html import fromstring, HtmlElement
test_html = '''<p><span>hello</span><span>world</span></p>'''
test_element = fromstring(test_html)
etree.strip_tags(test_element,'span') # 清除span标签
etree.tostring(test_element)
因为上述操作直接应用于test_element上的,所以test_element的值已经被修改了。
所以现在test_element 的值是
b'<p>helloworld</p>'
原创文章,转载请注明出处
http://30daydo.com/article/553
aiohttp异步下载图片
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4889 次浏览 • 2019-09-16 17:14
headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
async def getPage(num):
async with aiohttp.ClientSession() as session:
async with session.get(url.format(num),headers=headers) as resp:
if resp.status==200:
f= await aiofiles.open('{}.jpg'.format(num),mode='wb')
await f.write(await resp.read())
await f.close()
loop = asyncio.get_event_loop()
tasks = [getPage(i) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
原创文章,
转载请注明出处:
http://30daydo.com/article/537
查看全部
url = 'http://xyhz.huizhou.gov.cn/static/js/common/jigsaw/images/{}.jpg'
headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
async def getPage(num):
async with aiohttp.ClientSession() as session:
async with session.get(url.format(num),headers=headers) as resp:
if resp.status==200:
f= await aiofiles.open('{}.jpg'.format(num),mode='wb')
await f.write(await resp.read())
await f.close()
loop = asyncio.get_event_loop()
tasks = [getPage(i) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
原创文章,
转载请注明出处:
http://30daydo.com/article/537
scrapy源码分析<一>:入口函数以及是如何运行
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6076 次浏览 • 2019-08-31 10:47
下面我们从源码分析一下scrapy执行的流程:
执行scrapy crawl 命令时,调用的是Command类class Command(ScrapyCommand):
requires_project = True
def syntax(self):
return '[options]'
def short_desc(self):
return 'Runs all of the spiders - My Defined'
def run(self,args,opts):
print('==================')
print(type(self.crawler_process))
spider_list = self.crawler_process.spiders.list() # 找到爬虫类
for name in spider_list:
print('=================')
print(name)
self.crawler_process.crawl(name,**opts.__dict__)
self.crawler_process.start()
然后我们去看看crawler_process,这个是来自ScrapyCommand,而ScrapyCommand又是CrawlerProcess的子类,而CrawlerProcess又是CrawlerRunner的子类
在CrawlerRunner构造函数里面主要作用就是这个 def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings) # 构造爬虫
self._crawlers = set()
self._active = set()
self.bootstrap_failed = False
1. 加载配置文件def _get_spider_loader(settings):
cls_path = settings.get('SPIDER_LOADER_CLASS')
# settings文件没有定义SPIDER_LOADER_CLASS,所以这里获取到的是系统的默认配置文件,
# 默认配置文件在接下来的代码块A
# SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader'
loader_cls = load_object(cls_path)
# 这个函数就是根据路径转为类对象,也就是上面crapy.spiderloader.SpiderLoader 这个
# 字符串变成一个类对象
# 具体的load_object 对象代码见下面代码块B
return loader_cls.from_settings(settings.frozencopy())
默认配置文件defautl_settting.py# 代码块A
#......省略若干
SCHEDULER = 'scrapy.core.scheduler.Scheduler'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader' 就是这个值
SPIDER_LOADER_WARN_ONLY = False
SPIDER_MIDDLEWARES = {}
load_object的实现# 代码块B 为了方便,我把异常处理的去除
from importlib import import_module #导入第三方库
def load_object(path):
dot = path.rindex('.')
module, name = path[:dot], path[dot+1:]
# 上面把路径分为基本路径+模块名
mod = import_module(module)
obj = getattr(mod, name)
# 获取模块里面那个值
return obj
测试代码:In [33]: mod = import_module(module)
In [34]: mod
Out[34]: <module 'scrapy.spiderloader' from '/home/xda/anaconda3/lib/python3.7/site-packages/scrapy/spiderloader.py'>
In [35]: getattr(mod,name)
Out[35]: scrapy.spiderloader.SpiderLoader
In [36]: obj = getattr(mod,name)
In [37]: obj
Out[37]: scrapy.spiderloader.SpiderLoader
In [38]: type(obj)
Out[38]: type
在代码块A中,loader_cls是SpiderLoader,最后返回的的是SpiderLoader.from_settings(settings.frozencopy())
接下来看看SpiderLoader.from_settings, def from_settings(cls, settings):
return cls(settings)
返回类对象自己,所以直接看__init__函数即可class SpiderLoader(object):
"""
SpiderLoader is a class which locates and loads spiders
in a Scrapy project.
"""
def __init__(self, settings):
self.spider_modules = settings.getlist('SPIDER_MODULES')
# 获得settting中的模块名字,创建scrapy的时候就默认帮你生成了
# 你可以看看你的settings文件里面的内容就可以找到这个值,是一个list
self.warn_only = settings.getbool('SPIDER_LOADER_WARN_ONLY')
self._spiders = {}
self._found = defaultdict(list)
self._load_all_spiders() # 加载所有爬虫
核心就是这个_load_all_spiders:
走起:def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name): # 这个遍历文件夹里面的文件,然后再转化为类对象,
# 保存到字典:self._spiders = {}
self._load_spiders(module) # 模块变成spider
self._check_name_duplicates() # 去重,如果名字一样就异常
接下来看看_load_spiders
核心就是下面的。def iter_spider_classes(module):
from scrapy.spiders import Spider
for obj in six.itervalues(vars(module)): # 找到模块里面的变量,然后迭代出来
if inspect.isclass(obj) and \
issubclass(obj, Spider) and \
obj.__module__ == module.__name__ and \
getattr(obj, 'name', None): # 有name属性,继承于Spider
yield obj
这个obj就是我们平时写的spider类了。
原来分析了这么多,才找到了我们平时写的爬虫类
待续。。。。
原创文章
转载请注明出处
http://30daydo.com/article/530
查看全部
下面我们从源码分析一下scrapy执行的流程:
执行scrapy crawl 命令时,调用的是Command类
class Command(ScrapyCommand):
requires_project = True
def syntax(self):
return '[options]'
def short_desc(self):
return 'Runs all of the spiders - My Defined'
def run(self,args,opts):
print('==================')
print(type(self.crawler_process))
spider_list = self.crawler_process.spiders.list() # 找到爬虫类
for name in spider_list:
print('=================')
print(name)
self.crawler_process.crawl(name,**opts.__dict__)
self.crawler_process.start()
然后我们去看看crawler_process,这个是来自ScrapyCommand,而ScrapyCommand又是CrawlerProcess的子类,而CrawlerProcess又是CrawlerRunner的子类
在CrawlerRunner构造函数里面主要作用就是这个
def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings) # 构造爬虫
self._crawlers = set()
self._active = set()
self.bootstrap_failed = False
1. 加载配置文件
def _get_spider_loader(settings):
cls_path = settings.get('SPIDER_LOADER_CLASS')
# settings文件没有定义SPIDER_LOADER_CLASS,所以这里获取到的是系统的默认配置文件,
# 默认配置文件在接下来的代码块A
# SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader'
loader_cls = load_object(cls_path)
# 这个函数就是根据路径转为类对象,也就是上面crapy.spiderloader.SpiderLoader 这个
# 字符串变成一个类对象
# 具体的load_object 对象代码见下面代码块B
return loader_cls.from_settings(settings.frozencopy())
默认配置文件defautl_settting.py
# 代码块A
#......省略若干
SCHEDULER = 'scrapy.core.scheduler.Scheduler'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SPIDER_LOADER_CLASS = 'scrapy.spiderloader.SpiderLoader' 就是这个值
SPIDER_LOADER_WARN_ONLY = False
SPIDER_MIDDLEWARES = {}
load_object的实现
# 代码块B 为了方便,我把异常处理的去除
from importlib import import_module #导入第三方库
def load_object(path):
dot = path.rindex('.')
module, name = path[:dot], path[dot+1:]
# 上面把路径分为基本路径+模块名
mod = import_module(module)
obj = getattr(mod, name)
# 获取模块里面那个值
return obj
测试代码:
In [33]: mod = import_module(module)
In [34]: mod
Out[34]: <module 'scrapy.spiderloader' from '/home/xda/anaconda3/lib/python3.7/site-packages/scrapy/spiderloader.py'>
In [35]: getattr(mod,name)
Out[35]: scrapy.spiderloader.SpiderLoader
In [36]: obj = getattr(mod,name)
In [37]: obj
Out[37]: scrapy.spiderloader.SpiderLoader
In [38]: type(obj)
Out[38]: type
在代码块A中,loader_cls是SpiderLoader,最后返回的的是SpiderLoader.from_settings(settings.frozencopy())
接下来看看SpiderLoader.from_settings,
def from_settings(cls, settings):
return cls(settings)
返回类对象自己,所以直接看__init__函数即可
class SpiderLoader(object):
"""
SpiderLoader is a class which locates and loads spiders
in a Scrapy project.
"""
def __init__(self, settings):
self.spider_modules = settings.getlist('SPIDER_MODULES')
# 获得settting中的模块名字,创建scrapy的时候就默认帮你生成了
# 你可以看看你的settings文件里面的内容就可以找到这个值,是一个list
self.warn_only = settings.getbool('SPIDER_LOADER_WARN_ONLY')
self._spiders = {}
self._found = defaultdict(list)
self._load_all_spiders() # 加载所有爬虫
核心就是这个_load_all_spiders:
走起:
def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name): # 这个遍历文件夹里面的文件,然后再转化为类对象,
# 保存到字典:self._spiders = {}
self._load_spiders(module) # 模块变成spider
self._check_name_duplicates() # 去重,如果名字一样就异常
接下来看看_load_spiders
核心就是下面的。
def iter_spider_classes(module):
from scrapy.spiders import Spider
for obj in six.itervalues(vars(module)): # 找到模块里面的变量,然后迭代出来
if inspect.isclass(obj) and \
issubclass(obj, Spider) and \
obj.__module__ == module.__name__ and \
getattr(obj, 'name', None): # 有name属性,继承于Spider
yield obj
这个obj就是我们平时写的spider类了。
原来分析了这么多,才找到了我们平时写的爬虫类
待续。。。。
原创文章
转载请注明出处
http://30daydo.com/article/530
scrapy-rabbitmq 不支持python3 [修改源码使它支持]
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 3396 次浏览 • 2019-07-17 17:24
在python3上运行的收会报错。
需要修改以下地方:
待续。。
在python3上运行的收会报错。
需要修改以下地方:
待续。。
scrapy rabbitmq 分布式爬虫
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6171 次浏览 • 2019-07-17 16:59
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
下面是一个简单的demo:import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika
# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider
class Website(scrapy.Spider):
name = "rabbit"
def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}
url = 'https://36kr.com/information/web_news'
yield Request(url=url,
headers=headers)
def parse(self, response):
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')
result = channel.queue_declare(exclusive=True, queue='')
queue_name = result.method.queue
# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'
# 绑定多个值
channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))
channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)
channel.start_consuming()
def callback_func(self, ch, method, properties, body):
print(body)
启动spider:from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())
然后往rabbitmq里面推送数据:import pika
import settings
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播
routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)
print('sending message {}'.format(message))
connection.close()
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
待续。。。
###### 2019-08-29 更新 ###################
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
查看全部
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
下面是一个简单的demo:
import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika
# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider
class Website(scrapy.Spider):
name = "rabbit"
def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}
url = 'https://36kr.com/information/web_news'
yield Request(url=url,
headers=headers)
def parse(self, response):
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')
result = channel.queue_declare(exclusive=True, queue='')
queue_name = result.method.queue
# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'
# 绑定多个值
channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))
channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)
channel.start_consuming()
def callback_func(self, ch, method, properties, body):
print(body)
启动spider:
from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())
然后往rabbitmq里面推送数据:
import pika
import settings
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播
routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)
print('sending message {}'.format(message))
connection.close()
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
待续。。。
###### 2019-08-29 更新 ###################
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
Win10下PhantomJS无法运行 【版本兼容问题】
python • 李魔佛 发表了文章 • 0 个评论 • 5639 次浏览 • 2019-07-04 09:07
在win10下就报错:
selenium.common.exceptions.WebDriverException: Message: Service C:\Tool\phantomjs-2.5.0-beta2-windows\phantomjs-2.5.0-beta2-windows\bin\phantomjs.exe unexpectedly exited. Status code was: 4294967295
后来替换了一个旧的版本,发现问题就这么解决了。
旧版本:phantomjs-2.1.1-windows
原创文章
转载请注明出处
http://30daydo.com/article/505
查看全部
在win10下就报错:
selenium.common.exceptions.WebDriverException: Message: Service C:\Tool\phantomjs-2.5.0-beta2-windows\phantomjs-2.5.0-beta2-windows\bin\phantomjs.exe unexpectedly exited. Status code was: 4294967295
后来替换了一个旧的版本,发现问题就这么解决了。
旧版本:phantomjs-2.1.1-windows
原创文章
转载请注明出处
http://30daydo.com/article/505
喜马拉雅app 爬取音频文件
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 6218 次浏览 • 2019-06-30 12:24
因为喜马拉雅的源码格式改了,所以爬虫代码也更新了一波
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
import os
url = 'http://180.153.255.6/mobile/v1/album/track/ts-1571294887744?albumId=23057324&device=android&isAsc=true&isQueryInvitationBrand=true&pageId={}&pageSize=20&pre_page=0'
headers = {'User-Agent': 'Xiaomi'}
def download():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
# trackName=re.sub(':','',trackName)
src_url = item.get('playUrl64')
filename = '{}.mp3'.format(trackName)
if not os.path.exists(filename):
try:
r0 = requests.get(src_url, headers=headers)
except Exception as e:
print(e)
print(trackName)
r0 = requests.get(src_url, headers=headers)
else:
with open(filename, 'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
else:
print(f'{filename}已经下载过了')
import shutil
def rename_():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
src_url = item.get('playUrl64')
orderNo=item.get('orderNo')
filename = '{}.mp3'.format(trackName)
try:
if os.path.exists(filename):
new_file='{}_{}.mp3'.format(orderNo,trackName)
shutil.move(filename,new_file)
except Exception as e:
print(e)
if __name__=='__main__':
rename_()
音频文件也更新了,详情见百度网盘。
======== 2018-10=============
爬取喜马拉雅app上 杨继东的投资之道 的音频文件
运行环境:python3# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
url = 'https://www.ximalaya.com/revision/play/album?albumId=23057324&pageNum=1&sort=1&pageSize=60'
headers={'User-Agent':'Xiaomi'}
r = requests.get(url=url,headers=headers)
js_data = r.json()
data_list = js_data.get('data',{}).get('tracksAudioPlay',)
for item in data_list:
trackName=item.get('trackName')
trackName=re.sub(':','',trackName)
src_url = item.get('src')
try:
r0=requests.get(src_url,headers=headers)
except Exception as e:
print(e)
print(trackName)
else:
with open('{}.m4a'.format(trackName),'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
保存为main.py
然后运行 python main.py
稍微等几分钟就自动下载好了。
附下载好的音频文件:
链接:https://pan.baidu.com/s/1t_vJhTvSJSeFdI1IaDS6fA
提取码:e3zb
原创文章
转载请注明出处
http://30daydo.com/article/503 查看全部
因为喜马拉雅的源码格式改了,所以爬虫代码也更新了一波
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
import os
url = 'http://180.153.255.6/mobile/v1/album/track/ts-1571294887744?albumId=23057324&device=android&isAsc=true&isQueryInvitationBrand=true&pageId={}&pageSize=20&pre_page=0'
headers = {'User-Agent': 'Xiaomi'}
def download():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
# trackName=re.sub(':','',trackName)
src_url = item.get('playUrl64')
filename = '{}.mp3'.format(trackName)
if not os.path.exists(filename):
try:
r0 = requests.get(src_url, headers=headers)
except Exception as e:
print(e)
print(trackName)
r0 = requests.get(src_url, headers=headers)
else:
with open(filename, 'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
else:
print(f'{filename}已经下载过了')
import shutil
def rename_():
for i in range(1, 3):
r = requests.get(url=url.format(i), headers=headers)
js_data = r.json()
data_list = js_data.get('data', {}).get('list', [])
for item in data_list:
trackName = item.get('title')
trackName = re.sub('[\/\\\:\*\?\"\<\>\|]', '_', trackName)
src_url = item.get('playUrl64')
orderNo=item.get('orderNo')
filename = '{}.mp3'.format(trackName)
try:
if os.path.exists(filename):
new_file='{}_{}.mp3'.format(orderNo,trackName)
shutil.move(filename,new_file)
except Exception as e:
print(e)
if __name__=='__main__':
rename_()
音频文件也更新了,详情见百度网盘。
======== 2018-10=============
爬取喜马拉雅app上 杨继东的投资之道 的音频文件
运行环境:python3
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/6/30 12:03
# @File : main.py
import requests
import re
url = 'https://www.ximalaya.com/revision/play/album?albumId=23057324&pageNum=1&sort=1&pageSize=60'
headers={'User-Agent':'Xiaomi'}
r = requests.get(url=url,headers=headers)
js_data = r.json()
data_list = js_data.get('data',{}).get('tracksAudioPlay',)
for item in data_list:
trackName=item.get('trackName')
trackName=re.sub(':','',trackName)
src_url = item.get('src')
try:
r0=requests.get(src_url,headers=headers)
except Exception as e:
print(e)
print(trackName)
else:
with open('{}.m4a'.format(trackName),'wb') as f:
f.write(r0.content)
print('{} downloaded'.format(trackName))
保存为main.py
然后运行 python main.py
稍微等几分钟就自动下载好了。
附下载好的音频文件:
链接:https://pan.baidu.com/s/1t_vJhTvSJSeFdI1IaDS6fA
提取码:e3zb
原创文章
转载请注明出处
http://30daydo.com/article/503
scrapy-redis使用redis集群进行分布式爬取
python爬虫 • 李魔佛 发表了文章 • 2 个评论 • 7732 次浏览 • 2019-04-03 17:05
使用redis集群可以增加redis集体内存,防止出现上面的情况。
scrapy redis-cluster很简单,只需要按照以下步骤:
1. 按照库
pip install scrapy-redis-cluster
2. 修改settings文件
# Redis集群地址
REDIS_MASTER_NODES = [
{"host": "192.168.10.233", "port": "30001"},
{"host": "192.168.10.234", "port": "30002"},
{"host": "192.168.10.235", "port": "30003"},
]
# 使用的哈希函数数,默认为6
BLOOMFILTER_HASH_NUMBER = 6
# Bloomfilter使用的Redis内存位,30表示2 ^ 30 = 128MB,默认为22 (1MB 可去重130W URL)
BLOOMFILTER_BIT = 22
# 不清空redis队列
SCHEDULER_PERSIST = True
# 调度队列
SCHEDULER = "scrapy_redis_cluster.scheduler.Scheduler"
# 去重
DUPEFILTER_CLASS = "scrapy_redis_cluster.dupefilter.RFPDupeFilter"
# queue
SCHEDULER_QUEUE_CLASS = 'scrapy_redis_cluster.queue.PriorityQueue'
然后就可以运行啦。 查看全部
使用redis集群可以增加redis集体内存,防止出现上面的情况。
scrapy redis-cluster很简单,只需要按照以下步骤:
1. 按照库
pip install scrapy-redis-cluster
2. 修改settings文件
# Redis集群地址
REDIS_MASTER_NODES = [
{"host": "192.168.10.233", "port": "30001"},
{"host": "192.168.10.234", "port": "30002"},
{"host": "192.168.10.235", "port": "30003"},
]
# 使用的哈希函数数,默认为6
BLOOMFILTER_HASH_NUMBER = 6
# Bloomfilter使用的Redis内存位,30表示2 ^ 30 = 128MB,默认为22 (1MB 可去重130W URL)
BLOOMFILTER_BIT = 22
# 不清空redis队列
SCHEDULER_PERSIST = True
# 调度队列
SCHEDULER = "scrapy_redis_cluster.scheduler.Scheduler"
# 去重
DUPEFILTER_CLASS = "scrapy_redis_cluster.dupefilter.RFPDupeFilter"
# queue
SCHEDULER_QUEUE_CLASS = 'scrapy_redis_cluster.queue.PriorityQueue'
然后就可以运行啦。
scrapy命令行执行传递多个参数给spider 动态传参
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 7368 次浏览 • 2019-03-28 11:24
那么需要在spider中定义一个构造函数
def __init__(self,page=None,*args, **kwargs):
super(Gaode,self).__init__(*args, **kwargs)
self.page=page
def start_requests(self):
XXXXXX 调用self.page 即可
yield Request(XXXX)
然后在启动scrapy的时候赋予参数的值:
scrapy crawl spider -a page=10
就可以动态传入参数
原创文章
转载请注明出处:http://30daydo.com/article/436
查看全部
那么需要在spider中定义一个构造函数
def __init__(self,page=None,*args, **kwargs):
super(Gaode,self).__init__(*args, **kwargs)
self.page=page
def start_requests(self):
XXXXXX 调用self.page 即可
yield Request(XXXX)
然后在启动scrapy的时候赋予参数的值:
scrapy crawl spider -a page=10
就可以动态传入参数
原创文章
转载请注明出处:http://30daydo.com/article/436
scrapyd 日志文件中文乱码 解决方案
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4761 次浏览 • 2019-03-27 17:13
网上一般的解决方法是修改scrapyd的源码,增加一个utf8的编码页面,需要重新写一个html的页面框架,对于一般只是看看日志的朋友来说,没必要这么大刀阔斧的。
可以直接使用postman来打开日志文件,里面的中文是正常的。
查看全部
Linux下自制有道词典 - python 解密有道词典JS加密
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4877 次浏览 • 2019-02-23 20:17
平时在linux下开发,鉴于没有什么好用翻译软件,打开网易也占用系统资源,所以写了个在控制台的翻译软件接口。
使用python爬虫,查看网页的JS加密方法,一步一步地分析,就能够得到最后的加密方法啦。
直接给出代码:
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/2/23 19:34
# @File : youdao.py
# 解密有道词典的JS
import hashlib
import random
import requests
import time
def md5_(word):
s = bytes(word, encoding='utf8')
m = hashlib.md5()
m.update(s)
ret = m.hexdigest()
return ret
def get_sign(word, salt):
ret = md5_('fanyideskweb' + word + salt + 'p09@Bn{h02_BIEe]$P^nG')
return ret
def youdao(word):
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
headers = {
'Host': 'fanyi.youdao.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:47.0) Gecko/20100101 Firefox/47.0',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'http://fanyi.youdao.com/',
'Content-Length': '252',
'Cookie': 'YOUDAO_MOBILE_ACCESS_TYPE=1; OUTFOX_SEARCH_USER_ID=1672542763@10.169.0.83; JSESSIONID=aaaWzxpjeDu1gbhopLzKw; ___rl__test__cookies=1550913722828; OUTFOX_SEARCH_USER_ID_NCOO=372126049.6326876',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
ts = str(int(time.time()*1000))
salt=ts+str(random.randint(0,10))
bv = md5_("5.0 (Windows)")
sign= get_sign(word,salt)
post_data = {
'i': word,
'from': 'AUTO', 'to': 'AUTO', 'smartresult': 'dict', 'client': 'fanyideskweb', 'salt': salt,
'sign': sign, 'ts': ts, 'bv': bv, 'doctype': 'json', 'version': '2.1',
'keyfrom': 'fanyi.web', 'action': 'FY_BY_REALTIME', 'typoResult': 'false'
}
r = requests.post(
url=url,
headers=headers,
data=post_data
)
for item in r.json().get('smartResult',{}).get('entries'):
print(item)
word='student'
youdao(word)
得到结果:
Github:
https://github.com/Rockyzsu/CrawlMan/tree/master/youdao_dictionary
原创文章,转载请注明出处
http://30daydo.com/article/416 查看全部
平时在linux下开发,鉴于没有什么好用翻译软件,打开网易也占用系统资源,所以写了个在控制台的翻译软件接口。
使用python爬虫,查看网页的JS加密方法,一步一步地分析,就能够得到最后的加密方法啦。
直接给出代码:
# -*- coding: utf-8 -*-
# website: http://30daydo.com
# @Time : 2019/2/23 19:34
# @File : youdao.py
# 解密有道词典的JS
import hashlib
import random
import requests
import time
def md5_(word):
s = bytes(word, encoding='utf8')
m = hashlib.md5()
m.update(s)
ret = m.hexdigest()
return ret
def get_sign(word, salt):
ret = md5_('fanyideskweb' + word + salt + 'p09@Bn{h02_BIEe]$P^nG')
return ret
def youdao(word):
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
headers = {
'Host': 'fanyi.youdao.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:47.0) Gecko/20100101 Firefox/47.0',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'http://fanyi.youdao.com/',
'Content-Length': '252',
'Cookie': 'YOUDAO_MOBILE_ACCESS_TYPE=1; OUTFOX_SEARCH_USER_ID=1672542763@10.169.0.83; JSESSIONID=aaaWzxpjeDu1gbhopLzKw; ___rl__test__cookies=1550913722828; OUTFOX_SEARCH_USER_ID_NCOO=372126049.6326876',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
ts = str(int(time.time()*1000))
salt=ts+str(random.randint(0,10))
bv = md5_("5.0 (Windows)")
sign= get_sign(word,salt)
post_data = {
'i': word,
'from': 'AUTO', 'to': 'AUTO', 'smartresult': 'dict', 'client': 'fanyideskweb', 'salt': salt,
'sign': sign, 'ts': ts, 'bv': bv, 'doctype': 'json', 'version': '2.1',
'keyfrom': 'fanyi.web', 'action': 'FY_BY_REALTIME', 'typoResult': 'false'
}
r = requests.post(
url=url,
headers=headers,
data=post_data
)
for item in r.json().get('smartResult',{}).get('entries'):
print(item)
word='student'
youdao(word)
得到结果:
Github:
https://github.com/Rockyzsu/CrawlMan/tree/master/youdao_dictionary
原创文章,转载请注明出处
http://30daydo.com/article/416
拉勾网的反爬策略
python爬虫 • 李魔佛 发表了文章 • 0 个评论 • 4116 次浏览 • 2019-01-23 10:18
(请注意日期,因为不保证往后的日子里面反爬策略还有效)
1. 封IP,这个没的说,肯定要使用代理IP
2. scrapy里面的需要添加headers,而headers中一定要加上Cookies的数据。 之前要做Request中的cookies参数添加cookies,现在发现失效了,只能在headers中添加cookies数据。
headers = {'Accept': 'application/json,text/javascript,*/*;q=0.01', 'Accept-Encoding':
'gzip,deflate,br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Cache-Control': 'no-cache',
# 'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
'Cookie': 'JSESSIONID=ABAAABAABEEAAJAACF8F22F99AFA35F9EEC28F2D0E46A41;_ga=GA1.2.331323650.1548204973;_gat=1;Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1548204973;user_trace_token=20190123085612-adf35b62-1ea9-11e9-b744-5254005c3644;LGSID=20190123085612-adf35c69-1ea9-11e9-b744-5254005c3644;PRE_UTM=;PRE_HOST=;PRE_SITE=;PRE_LAND=https%3A%2F%2Fwww.lagou.com%2F;LGUID=20190123085612-adf35ed5-1ea9-11e9-b744-5254005c3644;_gid=GA1.2.1809874038.1548204973;index_location_city=%E6%B7%B1%E5%9C%B3;TG-TRACK-CODE=index_search;SEARCH_ID=169bf76c08b548f8830967a1968d10ca;Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1548204985;LGRID=20190123085624-b52a0555-1ea9-11e9-b744-5254005c3644',
'Host': 'www.lagou.com', 'Origin': 'https://www.lagou.com', 'Pragma': 'no-cache',
'Referer': 'https://www.lagou.com/jobs/list_%E7%88%AC%E8%99%AB?labelWords=&fromSearch=true&suginput=',
'User-Agent': 'Mozilla/5.0(WindowsNT6.3;WOW64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/71.0.3578.98Safari/537.36',
'X-Anit-Forge-Code': '0',
'X-Anit-Forge-Token': 'None',
'X-Requested-With': 'XMLHttpRequest'
} 查看全部
(请注意日期,因为不保证往后的日子里面反爬策略还有效)
1. 封IP,这个没的说,肯定要使用代理IP
2. scrapy里面的需要添加headers,而headers中一定要加上Cookies的数据。 之前要做Request中的cookies参数添加cookies,现在发现失效了,只能在headers中添加cookies数据。
headers = {'Accept': 'application/json,text/javascript,*/*;q=0.01', 'Accept-Encoding':
'gzip,deflate,br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Cache-Control': 'no-cache',
# 'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
'Cookie': 'JSESSIONID=ABAAABAABEEAAJAACF8F22F99AFA35F9EEC28F2D0E46A41;_ga=GA1.2.331323650.1548204973;_gat=1;Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1548204973;user_trace_token=20190123085612-adf35b62-1ea9-11e9-b744-5254005c3644;LGSID=20190123085612-adf35c69-1ea9-11e9-b744-5254005c3644;PRE_UTM=;PRE_HOST=;PRE_SITE=;PRE_LAND=https%3A%2F%2Fwww.lagou.com%2F;LGUID=20190123085612-adf35ed5-1ea9-11e9-b744-5254005c3644;_gid=GA1.2.1809874038.1548204973;index_location_city=%E6%B7%B1%E5%9C%B3;TG-TRACK-CODE=index_search;SEARCH_ID=169bf76c08b548f8830967a1968d10ca;Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1548204985;LGRID=20190123085624-b52a0555-1ea9-11e9-b744-5254005c3644',
'Host': 'www.lagou.com', 'Origin': 'https://www.lagou.com', 'Pragma': 'no-cache',
'Referer': 'https://www.lagou.com/jobs/list_%E7%88%AC%E8%99%AB?labelWords=&fromSearch=true&suginput=',
'User-Agent': 'Mozilla/5.0(WindowsNT6.3;WOW64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/71.0.3578.98Safari/537.36',
'X-Anit-Forge-Code': '0',
'X-Anit-Forge-Token': 'None',
'X-Requested-With': 'XMLHttpRequest'
}
python数据爬取技术与实战手册 读后感
书籍 • 李魔佛 发表了文章 • 0 个评论 • 3345 次浏览 • 2019-01-14 08:01
实话实说,质量不行。 最基本的代码排版都有很大问题(缩进,注释,命名规范等等),代码明显没有按照PEP8规范。
比如这里的第16行代码 ,注释和代码不相符。
这里根本是2行代码,然后排版没有换行。
而爬虫的代码也过于简单,缺乏注释,不推荐大家去看。 查看全部