python解析windows日志文件,查询服务器是否被人攻击

李魔佛 发表了文章 • 0 个评论 • 84 次浏览 • 2021-01-17 23:49 • 来自相关话题

最近大致浏览了下windows server的日志记录,发现有不少的异地IP进行了登录尝试,而且有部分是登录成功的,但不确定是否本人自己登陆,所以借助python,对日志进行解析,并根据IP查询其远程物理地址。
 
最终效果:








【MD,老毛子就是天天在扫描,爆破密码,即使改了端口还是在枚举】
 
大致代码如下:import mmap
import contextlib
from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from xml.dom import minidom
from ip_convertor import IP
import re

class WindowsLogger():

def __init__(self,path):
self.path = path
self.formator = 'IP:{:10}\tlocation:{:20}\tUser:{:15}\tProcess:{}'

def read_file(self):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
return fh

return None

def parse_log_detail(self,filteID):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
for xml, record in evtx_file_xml_view(fh):
#只输出事件ID为4624的内容
# InterestEvent(xml,4624)
for IpAddress,ip,targetUsername,ProcessName in self.filter_event(xml,filteID):
print(self.formator.format(IpAddress,ip,targetUsername,ProcessName))

# 过滤掉不需要的事件,输出感兴趣的事件
def filter_event(self,xml,EventID,use_filter=True):
xmldoc = minidom.parseString(xml)
# 获取EventID节点的事件ID
collections = xmldoc.documentElement
events=xmldoc.getElementsByTagName('Event')
for evt in events:
eventId = evt.getElementsByTagName('EventID')[0].childNodes[0].data
time_create = evt.getElementsByTagName('TimeCreated')[0].getAttribute('SystemTime')
eventData = evt.getElementsByTagName('EventData')[0]

for data in eventData.getElementsByTagName('Data'):
if data.getAttribute('Name')=='IpAddress':
IpAddress=data.childNodes[0].data

if data.getAttribute('Name')=='TargetUserName':
targetUsername = data.childNodes[0].data

if data.getAttribute('Name')=='ProcessName':
ProcessName = data.childNodes[0].data

if use_filter is True and eventId==EventID:
ip=''
if re.search('^\d+',IpAddress):
ip = IP(IpAddress).ip_address

yield IpAddress,ip,targetUsername,ProcessName

def main():
path=r'D:\share\1.evtx'
filter_id = '4624'
app = WindowsLogger(path)
app.parse_log_detail(filter_id)

if __name__ == '__main__':
main()
D:\share\1.evtx 为日志导出文件

原创文章,转载请注明出处:
http://30daydo.com/article/44130 
 
完整代码,可以通过公众号回复: windows日志解析获取
 

  查看全部
最近大致浏览了下windows server的日志记录,发现有不少的异地IP进行了登录尝试,而且有部分是登录成功的,但不确定是否本人自己登陆,所以借助python,对日志进行解析,并根据IP查询其远程物理地址。
 
最终效果:




cmd_vKiBIjQLpd.png

【MD,老毛子就是天天在扫描,爆破密码,即使改了端口还是在枚举】
 
大致代码如下:
import mmap
import contextlib
from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from xml.dom import minidom
from ip_convertor import IP
import re

class WindowsLogger():

def __init__(self,path):
self.path = path
self.formator = 'IP:{:10}\tlocation:{:20}\tUser:{:15}\tProcess:{}'

def read_file(self):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
return fh

return None

def parse_log_detail(self,filteID):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
for xml, record in evtx_file_xml_view(fh):
#只输出事件ID为4624的内容
# InterestEvent(xml,4624)
for IpAddress,ip,targetUsername,ProcessName in self.filter_event(xml,filteID):
print(self.formator.format(IpAddress,ip,targetUsername,ProcessName))

# 过滤掉不需要的事件,输出感兴趣的事件
def filter_event(self,xml,EventID,use_filter=True):
xmldoc = minidom.parseString(xml)
# 获取EventID节点的事件ID
collections = xmldoc.documentElement
events=xmldoc.getElementsByTagName('Event')
for evt in events:
eventId = evt.getElementsByTagName('EventID')[0].childNodes[0].data
time_create = evt.getElementsByTagName('TimeCreated')[0].getAttribute('SystemTime')
eventData = evt.getElementsByTagName('EventData')[0]

for data in eventData.getElementsByTagName('Data'):
if data.getAttribute('Name')=='IpAddress':
IpAddress=data.childNodes[0].data

if data.getAttribute('Name')=='TargetUserName':
targetUsername = data.childNodes[0].data

if data.getAttribute('Name')=='ProcessName':
ProcessName = data.childNodes[0].data

if use_filter is True and eventId==EventID:
ip=''
if re.search('^\d+',IpAddress):
ip = IP(IpAddress).ip_address

yield IpAddress,ip,targetUsername,ProcessName

def main():
path=r'D:\share\1.evtx'
filter_id = '4624'
app = WindowsLogger(path)
app.parse_log_detail(filter_id)

if __name__ == '__main__':
main()

D:\share\1.evtx 为日志导出文件

原创文章,转载请注明出处:
http://30daydo.com/article/44130 
 
完整代码,可以通过公众号回复: windows日志解析获取
 

 

茅台抢购程序 京东 苏宁

李魔佛 发表了文章 • 0 个评论 • 1545 次浏览 • 2021-01-05 22:34 • 来自相关话题

最近掀起了茅台抢购风,所以分享一个python抢购脚本。
运行环境 windows,linux,mac,python3+
 
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
 
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶










 
苏宁家的:





 


============= 2021-01-11 更新 ============

感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了 


main.pyimport sys

from maotai.jd_spider_requests import ProdectPurchase


if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)

product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)







jd_spider_requests.pyimport random
import time
import requests
import functools
import json
import os
import pickle

from lxml import etree

from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)


class SpiderSession:
"""
Session相关操作
"""

def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')

self.session = self._init_session()

def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session

def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}

def get_user_agent(self):
return self.user_agent

def get_session(self):
"""
获取当前Session
:return:
"""
return self.session

def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies

def set_cookies(self, cookies):
self.session.cookies.update(cookies)

def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)

def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)


class QrLogin:
"""
扫码登录
"""

def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'

self.spider_session = spider_session
self.session = self.spider_session.get_session()

self.is_login = False
self.refresh_login_status()

def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()

def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False

def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page

def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)

if not response_status(resp):
logger.info('获取二维码失败')
return False

save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True

def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)

if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False

resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']

def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}

resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False

resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False

def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies

# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')

# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')

# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')

self.refresh_login_status()

logger.info('二维码登录成功')


class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session

self.qrlogin = QrLogin(self.spider_session)

# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()

self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None

def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return

self.qrlogin.login_by_qrcode()

if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")

def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""

@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)

return new_func

@check_login
def reserve(self):
"""
预约
"""
self._reserve()

@check_login
def seckill(self):
"""
抢购
"""
self._seckill()

@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)

def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()

def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()

def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')

def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}

resp = self.session.get(url=url, params=payload, headers=headers)

try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')

def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]

def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()

def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞

self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)

def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)

def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)

resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))

return resp_json

def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}

return data

def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False

logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False





 
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129 
欢迎关注公众号:
可转债量化分析


  查看全部
最近掀起了茅台抢购风,所以分享一个python抢购脚本。
运行环境 windows,linux,mac,python3+
 
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
 
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶

微信图片_20210113104908.jpg


photo_2021-01-11_10-07-41.jpg

 
苏宁家的:

photo_2021-01-13_10-51-53.jpg

 


============= 2021-01-11 更新 ============


感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了 



main.py
import sys

from maotai.jd_spider_requests import ProdectPurchase


if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)

product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)







jd_spider_requests.py
import random
import time
import requests
import functools
import json
import os
import pickle

from lxml import etree

from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)


class SpiderSession:
"""
Session相关操作
"""

def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')

self.session = self._init_session()

def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session

def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}

def get_user_agent(self):
return self.user_agent

def get_session(self):
"""
获取当前Session
:return:
"""
return self.session

def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies

def set_cookies(self, cookies):
self.session.cookies.update(cookies)

def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)

def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)


class QrLogin:
"""
扫码登录
"""

def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'

self.spider_session = spider_session
self.session = self.spider_session.get_session()

self.is_login = False
self.refresh_login_status()

def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()

def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False

def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page

def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)

if not response_status(resp):
logger.info('获取二维码失败')
return False

save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True

def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)

if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False

resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']

def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}

resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False

resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False

def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies

# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')

# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')

# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')

self.refresh_login_status()

logger.info('二维码登录成功')


class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session

self.qrlogin = QrLogin(self.spider_session)

# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()

self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None

def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return

self.qrlogin.login_by_qrcode()

if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")

def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""

@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)

return new_func

@check_login
def reserve(self):
"""
预约
"""
self._reserve()

@check_login
def seckill(self):
"""
抢购
"""
self._seckill()

@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)

def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()

def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()

def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')

def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}

resp = self.session.get(url=url, params=payload, headers=headers)

try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')

def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]

def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()

def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞

self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)

def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)

def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)

resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))

return resp_json

def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}

return data

def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False

logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False





 
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129 
欢迎关注公众号:
可转债量化分析


 

python函数调用后面可以有一个空格

李魔佛 发表了文章 • 0 个评论 • 303 次浏览 • 2020-12-13 11:13 • 来自相关话题

没想到居然可以这样。
print ('hello')
hello
def sayhi():
...: print('Done')
...:
sayhi () # 这里有一个空格
Done

不过如果平时这么写,会被人打的 查看全部
没想到居然可以这样。
print ('hello')
hello
def sayhi():
...: print('Done')
...:
sayhi () # 这里有一个空格
Done

不过如果平时这么写,会被人打的

导出python自带关键字 keyword

李魔佛 发表了文章 • 0 个评论 • 239 次浏览 • 2020-12-13 10:57 • 来自相关话题

居然还自带这个库
import keyword
keyword.kwlist
Out[3]:
['False',
'None',
'True',
'and',
'as',
'assert',
'async',
'await',
'break',
'class',
'continue',
'def',
'del',
'elif',
'else',
'except',
'finally',
'for',
'from',
'global',
'if',
'import',
'in',
'is',
'lambda',
'nonlocal',
'not',
'or',
'pass',
'raise',
'return',
'try',
'while',
'with',
'yield']
len(keyword.kwlist)
Out[4]: 35 查看全部
居然还自带这个库
import keyword
keyword.kwlist
Out[3]:
['False',
'None',
'True',
'and',
'as',
'assert',
'async',
'await',
'break',
'class',
'continue',
'def',
'del',
'elif',
'else',
'except',
'finally',
'for',
'from',
'global',
'if',
'import',
'in',
'is',
'lambda',
'nonlocal',
'not',
'or',
'pass',
'raise',
'return',
'try',
'while',
'with',
'yield']
len(keyword.kwlist)
Out[4]: 35

微信公众号后台的签名校验的官方教程在python3下不兼容

李魔佛 发表了文章 • 0 个评论 • 234 次浏览 • 2020-12-11 11:43 • 来自相关话题

感觉写这个文档的人是个菜鸡。 
首先文档用的python2代码写的,但文中没有标明。
 
 
python2旧就算了,而且那么多框架不用,还要用一个老掉牙的web.py来写,也是醉了。
 
django下的签名校验:token = '123456789'
def Services(request):
print(request.method)
if request.method=='GET':

signature = request.GET.get('signature')
echostr = request.GET.get('echostr')
timestamp = request.GET.get('timestamp')
nonce = request.GET.get('nonce')
list_ = [token, timestamp, nonce]
list_.sort()
list_str = ''.join(list_)

sha1 = hashlib.sha1(list_str.encode('utf8'))
hashcode = sha1.hexdigest()
if hashcode==signature:
return HttpResponse(echostr)
else:
return HttpResponse('')
原创文章,转载请注明出处http://30daydo.com/article/44121
 
 
  查看全部
感觉写这个文档的人是个菜鸡。 
首先文档用的python2代码写的,但文中没有标明。
 
 
python2旧就算了,而且那么多框架不用,还要用一个老掉牙的web.py来写,也是醉了。
 
django下的签名校验:
token = '123456789'
def Services(request):
print(request.method)
if request.method=='GET':

signature = request.GET.get('signature')
echostr = request.GET.get('echostr')
timestamp = request.GET.get('timestamp')
nonce = request.GET.get('nonce')
list_ = [token, timestamp, nonce]
list_.sort()
list_str = ''.join(list_)

sha1 = hashlib.sha1(list_str.encode('utf8'))
hashcode = sha1.hexdigest()
if hashcode==signature:
return HttpResponse(echostr)
else:
return HttpResponse('')

原创文章,转载请注明出处http://30daydo.com/article/44121
 
 
 

pycharm 条件断点

李魔佛 发表了文章 • 0 个评论 • 264 次浏览 • 2020-12-01 13:42 • 来自相关话题

如果在一个循环或者需要执行很多次的的递归里面,可以使用条件断点。
 
先在想要停下来的地方打一个断点,然后再点击一下断点,弹出的一个条件断点的窗口,在窗口输入一个条件即可。
 
比如:
def main():
for i in range(100):
print(i*i)


if __name__ == '__main__':
main()
在i*i的地方打一个断点,然后在条件断点那里输入 i=50, 那么在调试模式下只有在i=50的时候才会停下来。
  查看全部
如果在一个循环或者需要执行很多次的的递归里面,可以使用条件断点。
 
先在想要停下来的地方打一个断点,然后再点击一下断点,弹出的一个条件断点的窗口,在窗口输入一个条件即可。
 
比如:
def main():
for i in range(100):
print(i*i)


if __name__ == '__main__':
main()

在i*i的地方打一个断点,然后在条件断点那里输入 i=50, 那么在调试模式下只有在i=50的时候才会停下来。
 

python执行js语句,无函数返回值

李魔佛 发表了文章 • 0 个评论 • 255 次浏览 • 2020-11-30 16:11 • 来自相关话题

有时候在JS代码里面抠出部分语句,但是不是一个函数。
如下面的一段JSvar radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
做了很多的运算,掩人耳目,虽然看多几下,用python写也简单,或者把它放入一个function里面也可以。
比如实时网络上下载获取一段js,然后再在头部和尾部组装为function。
function getValue(){
xxxxxx
xxxxxx
return flashvars_324075351
}
 
然后执行这一段JS,call返回函数 getValue() 就可以拿到返回值了。
 
不过今天我们用其他的方法直接获取flashvars_324075351
 
使用jsp库即可。
 
把上面的JS语句var radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
后面加一个返回值,但不需要加return
比如....
ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
flashvars_324075351;
然后直接调用jspy
res = js2py.eval_js(js)
 
执行后print(res) , 显示的值就是flashvars_324075351
原创文章,转载请注明出处
http://30daydo.com/article/44112
  查看全部
有时候在JS代码里面抠出部分语句,但是不是一个函数。
如下面的一段JS
var radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;

做了很多的运算,掩人耳目,虽然看多几下,用python写也简单,或者把它放入一个function里面也可以。
比如实时网络上下载获取一段js,然后再在头部和尾部组装为function。
function getValue(){
xxxxxx
xxxxxx
return flashvars_324075351
}
 
然后执行这一段JS,call返回函数 getValue() 就可以拿到返回值了。
 
不过今天我们用其他的方法直接获取flashvars_324075351
 
使用jsp库即可。
 
把上面的JS语句
var radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;

后面加一个返回值,但不需要加return
比如
....
ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
flashvars_324075351;

然后直接调用jspy
res = js2py.eval_js(js)
 
执行后print(res) , 显示的值就是flashvars_324075351
原创文章,转载请注明出处
http://30daydo.com/article/44112
 

python多重继承的super调用父类的兄弟类

李魔佛 发表了文章 • 0 个评论 • 268 次浏览 • 2020-11-26 19:04 • 来自相关话题

先看一段代码:class A:
def __init__(self):
print('A init')
print(self)


class B:
def __init__(self):
print('B init')
print(self)


class C:
def __init__(self):
print('C init')
print(self)


class D(C, B, A):
def __init__(self):
super(A, self).__init__()
super(C, self).__init__()
super(B, self).__init__()
print('D ')


def main():
d = D()
看输出的是什么:B init
<__main__.D object at 0x00000000026365B0>
A init
<__main__.D object at 0x00000000026365B0>
D init-
为什么不输出C init ?
 
那么这个药从super的函数实现说起:def super(class, obj):
mro_list = obj.__class__.mro()
next_parent_class = mro_list[mro_list.index(class)+1]
return next_parent_class
super函数中,mro_list得到的是类的mro列表,mro列表就是类的继承有序关系图
比如上面代码中 
C, B, A 在D中的mro是下面这样的。[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]




你提供的类名class传入来后,比如是C,那么找到C所在的index,然后加1后的class,也就是B,所以super(C,self).__init__() 
实际调用的是B.__init__()
 
 
另外,如果在多重继承中,要调用父类的父类的父类。。。。,
可以直接用改类的类名就可以# 多重继承


class A:
def __init__(self):
print('A init')
print(self)

def fun(self):
print('A',self)

class B(A):
def __init__(self):
print('B init')
print(self)

def fun(self):
print('B', self)


class C(B):

def __init__(self):
print('C init')
print(self)

def fun(self):
print('C', self)

class X:

def fun(self):
print('X',self)

class D(C):
def __init__(self):
# super(B, self).__init__() # super(D) -> 指向了C
print('D init')

def fun(self):
C.fun(self)
B.fun(self)
A.fun(self)
X.fun(self)



def main():
d = D()
print(d.__class__.mro())
d.fun()

if __name__ == '__main__':
main()
 
结果:D init
[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
C <__main__.D object at 0x00000000025F6700>
B <__main__.D object at 0x00000000025F6700>
A <__main__.D object at 0x00000000025F6700>
X <__main__.D object at 0x00000000025F6700>
其实super和父类子类没什么关系, super关系的是mro里面的顺序。

 原文链接:http://30daydo.com/article/44107 
转载请注明出处
  查看全部
先看一段代码:
class A:
def __init__(self):
print('A init')
print(self)


class B:
def __init__(self):
print('B init')
print(self)


class C:
def __init__(self):
print('C init')
print(self)


class D(C, B, A):
def __init__(self):
super(A, self).__init__()
super(C, self).__init__()
super(B, self).__init__()
print('D ')


def main():
d = D()

看输出的是什么:
B init
<__main__.D object at 0x00000000026365B0>
A init
<__main__.D object at 0x00000000026365B0>
D init-

为什么不输出C init ?
 
那么这个药从super的函数实现说起:
def super(class, obj):
mro_list = obj.__class__.mro()
next_parent_class = mro_list[mro_list.index(class)+1]
return next_parent_class

super函数中,mro_list得到的是类的mro列表,mro列表就是类的继承有序关系图
比如上面代码中 
C, B, A 在D中的mro是下面这样的。
[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]




你提供的类名class传入来后,比如是C,那么找到C所在的index,然后加1后的class,也就是B,所以super(C,self).__init__() 
实际调用的是B.__init__()
 
 
另外,如果在多重继承中,要调用父类的父类的父类。。。。,
可以直接用改类的类名就可以
# 多重继承


class A:
def __init__(self):
print('A init')
print(self)

def fun(self):
print('A',self)

class B(A):
def __init__(self):
print('B init')
print(self)

def fun(self):
print('B', self)


class C(B):

def __init__(self):
print('C init')
print(self)

def fun(self):
print('C', self)

class X:

def fun(self):
print('X',self)

class D(C):
def __init__(self):
# super(B, self).__init__() # super(D) -> 指向了C
print('D init')

def fun(self):
C.fun(self)
B.fun(self)
A.fun(self)
X.fun(self)



def main():
d = D()
print(d.__class__.mro())
d.fun()

if __name__ == '__main__':
main()

 
结果:
D init
[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
C <__main__.D object at 0x00000000025F6700>
B <__main__.D object at 0x00000000025F6700>
A <__main__.D object at 0x00000000025F6700>
X <__main__.D object at 0x00000000025F6700>

其实super和父类子类没什么关系, super关系的是mro里面的顺序。

 原文链接:http://30daydo.com/article/44107 
转载请注明出处
 

python pathspec 库的作用

李魔佛 发表了文章 • 0 个评论 • 261 次浏览 • 2020-11-25 13:28 • 来自相关话题

作为路径匹配用的。
 
看以下实例:
def get_ignore_matches():
# 排除文件
global ignore_matches
ignore_file = os.path.join(os.path.abspath(os.curdir), '.gitignore')
if not os.path.exists(ignore_file):
return None
if ignore_matches is not None:
return ignore_matches
with open(ignore_file, 'r') as fh:
spec = pathspec.PathSpec.from_lines('gitwildmatch', fh)
ignore_matches = spec
return ignore_matches


def is_ignored(file_name: str) -> bool:
# 匹配就ignore
matches = get_ignore_matches()
if matches is None:
return False
return matches.match_file(file_name)

gitignore文件里面的内容就会被匹配到
.idea/
build/
dist/
venv/
*.pyc
__pycache__/
*.egg-info/
tmp/ 查看全部
作为路径匹配用的。
 
看以下实例:
def get_ignore_matches():
# 排除文件
global ignore_matches
ignore_file = os.path.join(os.path.abspath(os.curdir), '.gitignore')
if not os.path.exists(ignore_file):
return None
if ignore_matches is not None:
return ignore_matches
with open(ignore_file, 'r') as fh:
spec = pathspec.PathSpec.from_lines('gitwildmatch', fh)
ignore_matches = spec
return ignore_matches


def is_ignored(file_name: str) -> bool:
# 匹配就ignore
matches = get_ignore_matches()
if matches is None:
return False
return matches.match_file(file_name)

gitignore文件里面的内容就会被匹配到
.idea/
build/
dist/
venv/
*.pyc
__pycache__/
*.egg-info/
tmp/

夜深了,你们还在吗?

chenchen 发表了文章 • 0 个评论 • 175 次浏览 • 2020-11-20 22:34 • 来自相关话题

夜深了,你们还在吗???????????????????
夜深了,你们还在吗???????????????????

大家好啊,日常报道,关照关照

chenchen 发表了文章 • 0 个评论 • 184 次浏览 • 2020-11-20 17:11 • 来自相关话题

大家好啊,日常报道,关照关照。。。。。。。。。。。。
大家好啊,日常报道,关照关照。。。。。。。。。。。。

异步asyncio加锁 的正确用法

李魔佛 发表了文章 • 0 个评论 • 382 次浏览 • 2020-11-15 10:19 • 来自相关话题

对于全局变量count进行统计加锁
import aiohttp
import asyncio
import execjs
import threading
global pages
global count

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}

home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'

loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result


async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()

try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)

async with lock:
count=count+1

print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()



async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:

content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))

await asyncio.sleep(1)


asyncio.ensure_future(main())
loop.run_forever()
1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
  查看全部
对于全局变量count进行统计加锁
import aiohttp
import asyncio
import execjs
import threading
global pages
global count

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}

home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'

loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result


async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()

try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)

async with lock:
count=count+1

print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()



async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:

content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))

await asyncio.sleep(1)


asyncio.ensure_future(main())
loop.run_forever()

1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
 

attrs() got an unexpected keyword argument 'eq'

李魔佛 发表了文章 • 0 个评论 • 382 次浏览 • 2020-11-12 22:42 • 来自相关话题

xda@xda-dt:~$ pip install attrs --upgrade
Collecting attrs
  Downloading https://files.pythonhosted.org ... y.whl (49kB)
    100% |████████████████████████████████| 51kB 79kB/s 
Installing collected packages: attrs
  Found existing installation: attrs 18.2.0
    Uninstalling attrs-18.2.0:
      Successfully uninstalled attrs-18.2.0
Successfully installed attrs-20.3.0 查看全部
xda@xda-dt:~$ pip install attrs --upgrade
Collecting attrs
  Downloading https://files.pythonhosted.org ... y.whl (49kB)
    100% |████████████████████████████████| 51kB 79kB/s 
Installing collected packages: attrs
  Found existing installation: attrs 18.2.0
    Uninstalling attrs-18.2.0:
      Successfully uninstalled attrs-18.2.0
Successfully installed attrs-20.3.0

pyecharts绘图保存为图片 适用于ssh无头浏览器运行

李魔佛 发表了文章 • 0 个评论 • 332 次浏览 • 2020-11-04 22:27 • 来自相关话题

网上搜索到的答案是使用chrome driver实现的, 但是本人的程序是运行在centos下的,centos下折腾chrome driver比较蛋疼,所以看了下pyecharts.render的源码,其实这个也支持使用无头phantomjs进行截图的,当然这个不是一般的直接截取屏幕,是通过JS代码把html里面的渲染图像下载下来,清晰度比普通截图要高很多很多。
 

make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)
 
在最后一行传入一个driver既可以了,这个driver使用phantomjs的实例。
 
import os
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot
import pandas as pd
from pyecharts import options as opts
from pyecharts.charts import Bar
import sys
from selenium import webdriver
from pyecharts.commons.utils import JsCode

if sys.platform == 'win32':
SELENIUM_PATH = r'C:\OneDrive\Tool\phantomjs-2.1.1-windows\phantomjs-2.1.1-windows\bin\phantomjs.exe'
driver = None
else:
SELENIUM_PATH = './phantomjs'
driver = webdriver.PhantomJS(executable_path=SELENIUM_PATH)


bar = (
Bar()
.add_xaxis(list(result_dict .keys()))
.add_yaxis(f"{today}-可转债价格分布", y_list, category_gap=3)
.add_yaxis(f"{today}-正股价格分布", y_zg_list, category_gap=3)
.set_series_opts(
label_opts=opts.LabelOpts(is_show=True),
axispointer_opts=opts.AxisPointerOpts(is_show=True))
.set_global_opts(
title_opts=opts.TitleOpts(title="可转债价格分布"),
xaxis_opts=opts.AxisOpts(
name="涨跌幅",
is_show=True,
name_rotate=30,
),
graphic_opts=[
opts.GraphicGroup(
graphic_item=opts.GraphicItem(
left="70%",
top="20%",
),
children=[
opts.GraphicText(
graphic_item=opts.GraphicItem(
left="center",
top="middle",
z=100,
),
graphic_textstyle_opts=opts.GraphicTextStyleOpts(
text=JsCode(
f"['涨幅>=0:{bigger}',"
f"'涨幅<0:{smaller}',"
f"'平均涨幅:{avg}%',"
f"'波动方差:{std}',"
f"'',"
f"'最大:{max_name} {max_pct}%',"
f"'最小:{min_name} {min_pct}%',"
"''].join('\\n')"
),
font="14px Microsoft YaHei",
graphic_basicstyle_opts=opts.GraphicBasicStyleOpts(
fill="#333"
)
)
)
]
)
],
)

)

bar.render(os.path.join('data', f"{today}_cb.html"))
make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)



  查看全部
网上搜索到的答案是使用chrome driver实现的, 但是本人的程序是运行在centos下的,centos下折腾chrome driver比较蛋疼,所以看了下pyecharts.render的源码,其实这个也支持使用无头phantomjs进行截图的,当然这个不是一般的直接截取屏幕,是通过JS代码把html里面的渲染图像下载下来,清晰度比普通截图要高很多很多。
 

make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)
 
在最后一行传入一个driver既可以了,这个driver使用phantomjs的实例。
 
import os
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot
import pandas as pd
from pyecharts import options as opts
from pyecharts.charts import Bar
import sys
from selenium import webdriver
from pyecharts.commons.utils import JsCode

if sys.platform == 'win32':
SELENIUM_PATH = r'C:\OneDrive\Tool\phantomjs-2.1.1-windows\phantomjs-2.1.1-windows\bin\phantomjs.exe'
driver = None
else:
SELENIUM_PATH = './phantomjs'
driver = webdriver.PhantomJS(executable_path=SELENIUM_PATH)


bar = (
Bar()
.add_xaxis(list(result_dict .keys()))
.add_yaxis(f"{today}-可转债价格分布", y_list, category_gap=3)
.add_yaxis(f"{today}-正股价格分布", y_zg_list, category_gap=3)
.set_series_opts(
label_opts=opts.LabelOpts(is_show=True),
axispointer_opts=opts.AxisPointerOpts(is_show=True))
.set_global_opts(
title_opts=opts.TitleOpts(title="可转债价格分布"),
xaxis_opts=opts.AxisOpts(
name="涨跌幅",
is_show=True,
name_rotate=30,
),
graphic_opts=[
opts.GraphicGroup(
graphic_item=opts.GraphicItem(
left="70%",
top="20%",
),
children=[
opts.GraphicText(
graphic_item=opts.GraphicItem(
left="center",
top="middle",
z=100,
),
graphic_textstyle_opts=opts.GraphicTextStyleOpts(
text=JsCode(
f"['涨幅>=0:{bigger}',"
f"'涨幅<0:{smaller}',"
f"'平均涨幅:{avg}%',"
f"'波动方差:{std}',"
f"'',"
f"'最大:{max_name} {max_pct}%',"
f"'最小:{min_name} {min_pct}%',"
"''].join('\\n')"
),
font="14px Microsoft YaHei",
graphic_basicstyle_opts=opts.GraphicBasicStyleOpts(
fill="#333"
)
)
)
]
)
],
)

)

bar.render(os.path.join('data', f"{today}_cb.html"))
make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)



 

使用sshtunnel SSHTunnelForwarder 作为跳板连接mysql后一直卡住不退出

李魔佛 发表了文章 • 0 个评论 • 534 次浏览 • 2020-11-04 10:10 • 来自相关话题

代码如下:server = SSHTunnelForwarder(
ssh_address_or_host=host,
ssh_port=port,
ssh_username=user,
ssh_password=password,
local_bind_address=('127.0.0.1', local_port),
remote_bind_address=(host, mysql_port)
)

server.start()
conn = pymysql.connect(
host='127.0.0.1',
port=local_port,
user=user,
password=password,
db='db_stock'
)

cursor = conn.cursor()
cursor.execute('select count(*) from tb_cb_index')
ret = cursor.fetchall()
print(ret)
server.stop()
print('stop')

代码运行后并没有结束,或者没有答应stop的字符。 在程序里已经使用了server.stop()关闭ssh的连接。
 
后面发现日志里面,mysql的连接没有断开,导致server没有被关闭,所以在上面的代码中加一句:
 print(ret)
conn.close()
server.stop()
print('stop')
把mysql的连接关闭,然后就可以把ssh的连接关闭,然后打印stop字符了。
  查看全部
代码如下:
server = SSHTunnelForwarder(
ssh_address_or_host=host,
ssh_port=port,
ssh_username=user,
ssh_password=password,
local_bind_address=('127.0.0.1', local_port),
remote_bind_address=(host, mysql_port)
)

server.start()
conn = pymysql.connect(
host='127.0.0.1',
port=local_port,
user=user,
password=password,
db='db_stock'
)

cursor = conn.cursor()
cursor.execute('select count(*) from tb_cb_index')
ret = cursor.fetchall()
print(ret)
server.stop()
print('stop')

代码运行后并没有结束,或者没有答应stop的字符。 在程序里已经使用了server.stop()关闭ssh的连接。
 
后面发现日志里面,mysql的连接没有断开,导致server没有被关闭,所以在上面的代码中加一句:
 
print(ret)
conn.close()
server.stop()
print('stop')

把mysql的连接关闭,然后就可以把ssh的连接关闭,然后打印stop字符了。