国庆节 微信头像红旗 制作 附 python代码 和 红旗素材

我是一个新兵 发表了文章 • 0 个评论 • 40 次浏览 • 2022-09-23 10:31 • 来自相关话题

国庆节的时候,很多地方都会升挂国旗,庆祝祖国一年一度的节日。

给自己制作国旗头像,是一件很有意义的事。微信官方就曾经举办过活动。

制作国旗头像的方法有很多,本文给大家介绍用Python制作渐变的微信国旗头像。
 





 

渐变的国旗头像效果非常好看。
 
制作方式也很简单,下面介绍实现方法,可以复制本文的代码,制作属于自己的国旗头像。
 
 

1.准备国旗图片





 
声明:严禁非法使用国旗图片。在国旗图片下载页面下方有《国旗法》等相关规定。

 
本文使用1024像素的图片。
 

2.准备头像图片
登录自己的微信,打开个人信息,点击头像,点击右上角的三个点,将图片保存到手机,
 
然后将图片传到电脑上。

为了不失一般性,本文使用的图片是我从网络上获取的一张600*600像素的头像图片。

准备好后,将国旗图片和头像图片拷贝到代码同一个目录下。
 

代码实现

先安装Python中用于处理图片的pillow库。
 pip install pillow

安装完成后,使用pillow库来制作国旗头像。
只需要十几行代码就能实现。完整代码如下。
 
# coding=utf-8
from PIL import Image
import math

key = 3.2# 修改key值可以调整国旗的范围,推荐2~4之间的数字,支持小数
motherland_flag = Image.open('flag-1024.png')
head_picture = Image.open('mmexport1663893338571.png')
# 截图国旗上的五颗五角星
flag_width, flag_height = motherland_flag.size
crop_flag = motherland_flag.crop((66, 0, flag_height+66, flag_height))
# 将国旗截图处理成颜色渐变
for i in range(flag_height):
for j in range(flag_height):
color = crop_flag.getpixel((i, j))
distance = int(math.sqrt(i*i + j*j))
alpha = 255 - int(distance//key)
new_color = (*color[0:-1], alpha if alpha > 0 else 0)
crop_flag.putpixel((i, j), new_color)
# 修改渐变图片的尺寸,适应头像大小,粘贴到头像上
new_crop_flag = crop_flag.resize(head_picture.size)
head_picture.paste(new_crop_flag, (0, 0), new_crop_flag)
# 保存自己的国旗头像
head_picture.save('国旗头像.png')
 
到此为止,已经制作好了国旗头像了。
 
 
下面是代码的详细结束:

代码介绍:代码介绍:

导入需要使用的Python库,pillow库用于对图片进行截取、大小修改、粘贴等处理。math库用于计算像素点的距离。

使用Image.open()方法,读取准备好的国旗图片和头像图片到代码中。

对国旗图片进行截取,获取一张正方形的图片,截取时调整截取位置,保证5颗五角星完整展示在截图中。

crop()方法中传入的是一个(left, upper, right, lower)的元组,分别是截图的左、上、右、下像素位置。
将正方形国旗截图设置成透明度渐变的图片。国旗图片的模式默认是RGBA,本文需要的刚好是RGBA(red,green,blue,alpha)模式的图片,RGBA模式的图片颜色值是一个长度为4的元组,我们修改不同像素点的A值即可将图片设置成渐变。

本文是以国旗左上角为圆心,离圆心越远的像素点A值越小,像素点越透明。使用getpixel()和putpixel()两个方法来获取和重设像素点的颜色值,使用math.sqrt()计算像素点距离。

将渐变图片的大小转换成和头像的大小一样,然后粘贴到图片顶层。使用resize()方法重设图片大小,使用paste()方法粘贴图片。

保存图片,此时的微信头像图片上已经粘贴了透明渐变的国旗图片,微信国旗头像制作完成。

本文介绍了用Python制作微信国旗头像的方法。在本文的代码中,以左上角为圆心,离圆心越远越透明,可以修改key值,调整国旗透明变化的范围。

如果需要制作其他渐变类型,如以右上角为圆心逐渐透明、从左侧向右侧逐渐透明、从上到下逐渐透明等,可以修改国旗渐变部分的代码、翻转图片等。
 

 
 
 

  查看全部
国庆节的时候,很多地方都会升挂国旗,庆祝祖国一年一度的节日。

给自己制作国旗头像,是一件很有意义的事。微信官方就曾经举办过活动。

制作国旗头像的方法有很多,本文给大家介绍用Python制作渐变的微信国旗头像。
 

5EFB522509E5461D870CDA736ED7E6DF.png

 

渐变的国旗头像效果非常好看。
 
制作方式也很简单,下面介绍实现方法,可以复制本文的代码,制作属于自己的国旗头像。
 
 

1.准备国旗图片

flag-1024.png

 
声明:严禁非法使用国旗图片。在国旗图片下载页面下方有《国旗法》等相关规定。

 
本文使用1024像素的图片。
 

2.准备头像图片
登录自己的微信,打开个人信息,点击头像,点击右上角的三个点,将图片保存到手机,
 
然后将图片传到电脑上。

为了不失一般性,本文使用的图片是我从网络上获取的一张600*600像素的头像图片。

准备好后,将国旗图片和头像图片拷贝到代码同一个目录下。
 

代码实现

先安装Python中用于处理图片的pillow库。
 
pip install pillow


安装完成后,使用pillow库来制作国旗头像。
只需要十几行代码就能实现。完整代码如下。
 
# coding=utf-8
from PIL import Image
import math

key = 3.2# 修改key值可以调整国旗的范围,推荐2~4之间的数字,支持小数
motherland_flag = Image.open('flag-1024.png')
head_picture = Image.open('mmexport1663893338571.png')
# 截图国旗上的五颗五角星
flag_width, flag_height = motherland_flag.size
crop_flag = motherland_flag.crop((66, 0, flag_height+66, flag_height))
# 将国旗截图处理成颜色渐变
for i in range(flag_height):
for j in range(flag_height):
color = crop_flag.getpixel((i, j))
distance = int(math.sqrt(i*i + j*j))
alpha = 255 - int(distance//key)
new_color = (*color[0:-1], alpha if alpha > 0 else 0)
crop_flag.putpixel((i, j), new_color)
# 修改渐变图片的尺寸,适应头像大小,粘贴到头像上
new_crop_flag = crop_flag.resize(head_picture.size)
head_picture.paste(new_crop_flag, (0, 0), new_crop_flag)
# 保存自己的国旗头像
head_picture.save('国旗头像.png')

 
到此为止,已经制作好了国旗头像了。
 
 
下面是代码的详细结束:

代码介绍:
代码介绍:

导入需要使用的Python库,pillow库用于对图片进行截取、大小修改、粘贴等处理。math库用于计算像素点的距离。

使用Image.open()方法,读取准备好的国旗图片和头像图片到代码中。

对国旗图片进行截取,获取一张正方形的图片,截取时调整截取位置,保证5颗五角星完整展示在截图中。

crop()方法中传入的是一个(left, upper, right, lower)的元组,分别是截图的左、上、右、下像素位置。
将正方形国旗截图设置成透明度渐变的图片。国旗图片的模式默认是RGBA,本文需要的刚好是RGBA(red,green,blue,alpha)模式的图片,RGBA模式的图片颜色值是一个长度为4的元组,我们修改不同像素点的A值即可将图片设置成渐变。

本文是以国旗左上角为圆心,离圆心越远的像素点A值越小,像素点越透明。使用getpixel()和putpixel()两个方法来获取和重设像素点的颜色值,使用math.sqrt()计算像素点距离。

将渐变图片的大小转换成和头像的大小一样,然后粘贴到图片顶层。使用resize()方法重设图片大小,使用paste()方法粘贴图片。

保存图片,此时的微信头像图片上已经粘贴了透明渐变的国旗图片,微信国旗头像制作完成。


本文介绍了用Python制作微信国旗头像的方法。在本文的代码中,以左上角为圆心,离圆心越远越透明,可以修改key值,调整国旗透明变化的范围。

如果需要制作其他渐变类型,如以右上角为圆心逐渐透明、从左侧向右侧逐渐透明、从上到下逐渐透明等,可以修改国旗渐变部分的代码、翻转图片等。
 

 
 
 

 

ciso8601 性能对比 datetime 默认库

李魔佛 发表了文章 • 0 个评论 • 233 次浏览 • 2022-07-22 12:04 • 来自相关话题

In [1]: import datetime, aniso8601, iso8601, isodate, dateutil.parser, arrow, ciso8601

In [2]: ds = u'2014-01-09T21:48:00.921000'

In [3]: %timeit ciso8601.parse_datetime(ds)
1000000 loops, best of 3: 204 ns per loop

In [4]: %timeit datetime.datetime.strptime(ds, "%Y-%m-%dT%H:%M:%S.%f")
100000 loops, best of 3: 15 µs per loop

In [5]: %timeit dateutil.parser.parse(ds)
10000 loops, best of 3: 122 µs per loop

In [6]: %timeit aniso8601.parse_datetime(ds)
10000 loops, best of 3: 28.9 µs per loop

In [7]: %timeit iso8601.parse_date(ds)
10000 loops, best of 3: 42 µs per loop

In [8]: %timeit isodate.parse_datetime(ds)
10000 loops, best of 3: 69.4 µs per loop

In [9]: %timeit arrow.get(ds).datetime
10000 loops, best of 3: 87 µs per loopIn [1]: import datetime, aniso8601, iso8601, isodate, dateutil.parser, arrow, ciso8601

In [2]: ds = u'2014-01-09T21:48:00.921000+05:30'

In [3]: %timeit ciso8601.parse_datetime(ds)
1000000 loops, best of 3: 525 ns per loop

In [4]: %timeit dateutil.parser.parse(ds)
10000 loops, best of 3: 162 µs per loop

In [5]: %timeit aniso8601.parse_datetime(ds)
10000 loops, best of 3: 36.8 µs per loop

In [6]: %timeit iso8601.parse_date(ds)
10000 loops, best of 3: 53.5 µs per loop

In [7]: %timeit isodate.parse_datetime(ds)
10000 loops, best of 3: 82.6 µs per loop

In [8]: %timeit arrow.get(ds).datetime
10000 loops, best of 3: 104 µs per loop
 

Even with time zone information, ciso8601 is 70x as fast as aniso8601.

Tested on Python 2.7.10 on macOS 10.12.6 using the following modules:

 
 
 ciso8601 是纳秒级别的,如果要对上千万的数据操作,建议使用ciso这个C库。
  查看全部
In [1]: import datetime, aniso8601, iso8601, isodate, dateutil.parser, arrow, ciso8601

In [2]: ds = u'2014-01-09T21:48:00.921000'

In [3]: %timeit ciso8601.parse_datetime(ds)
1000000 loops, best of 3: 204 ns per loop

In [4]: %timeit datetime.datetime.strptime(ds, "%Y-%m-%dT%H:%M:%S.%f")
100000 loops, best of 3: 15 µs per loop

In [5]: %timeit dateutil.parser.parse(ds)
10000 loops, best of 3: 122 µs per loop

In [6]: %timeit aniso8601.parse_datetime(ds)
10000 loops, best of 3: 28.9 µs per loop

In [7]: %timeit iso8601.parse_date(ds)
10000 loops, best of 3: 42 µs per loop

In [8]: %timeit isodate.parse_datetime(ds)
10000 loops, best of 3: 69.4 µs per loop

In [9]: %timeit arrow.get(ds).datetime
10000 loops, best of 3: 87 µs per loop
In [1]: import datetime, aniso8601, iso8601, isodate, dateutil.parser, arrow, ciso8601

In [2]: ds = u'2014-01-09T21:48:00.921000+05:30'

In [3]: %timeit ciso8601.parse_datetime(ds)
1000000 loops, best of 3: 525 ns per loop

In [4]: %timeit dateutil.parser.parse(ds)
10000 loops, best of 3: 162 µs per loop

In [5]: %timeit aniso8601.parse_datetime(ds)
10000 loops, best of 3: 36.8 µs per loop

In [6]: %timeit iso8601.parse_date(ds)
10000 loops, best of 3: 53.5 µs per loop

In [7]: %timeit isodate.parse_datetime(ds)
10000 loops, best of 3: 82.6 µs per loop

In [8]: %timeit arrow.get(ds).datetime
10000 loops, best of 3: 104 µs per loop

 


Even with time zone information, ciso8601 is 70x as fast as aniso8601.

Tested on Python 2.7.10 on macOS 10.12.6 using the following modules:


 
 
 ciso8601 是纳秒级别的,如果要对上千万的数据操作,建议使用ciso这个C库。
 

python的 influxdb-client 和 influxdb这两个库有什么区别?

回复

李魔佛 回复了问题 • 1 人关注 • 1 个回复 • 319 次浏览 • 2022-07-20 23:48 • 来自相关话题

python sqlite3 多线程 批量写入 【代码】

李魔佛 发表了文章 • 0 个评论 • 522 次浏览 • 2022-07-09 18:55 • 来自相关话题

1. 随机生成一个数组数据
2. 在多线程里面批量插入数据
 
几个关注点:
sqlite3.connect(_type, check_same_thread=False) 要设置为False
 
批量写的时候,记得要加锁
 
 import datetime
import random
import sqlite3
import threading
import logging as log
import time

lock = threading.Lock()
class SQLiteDBCls:

def __init__(self, cache=True):
_type = ":memory:"

self.db = sqlite3.connect(_type, check_same_thread=False)

self.table_name = 'tick_data'

def create_index(self):

cmd = 'CREATE INDEX code_ix ON {} (current)'.format(self.table_name)
with lock:
try:

cursor = self.db.cursor()
cursor.execute(cmd)
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def create_table(self):
# cursor = self.db.cursor()
cmd = 'create table if not exists {} (id INTEGER PRIMARY KEY AUTOINCREMENT,code text,open double,current time)'.format(
self.table_name)
with lock:
try:
cursor = self.db.cursor()
cursor.execute(cmd)
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def add(self, code, price, t):
cmd = 'insert into {} (code,open,current) values (?,?,?);'.format(self.table_name)
with lock:
try:
cursor = self.db.cursor()
cursor.execute(cmd, (code, price, t))
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def batch_add(self, data):

# 批量加入
print('===========',threading.current_thread().getName())
# log.info(threading.current_thread().getName())
cmd = 'insert into {} (code,open,current) values (?,?,?)'.format(self.table_name)
with lock:
try:
cursor = self.db.cursor()
cursor.executemany(cmd, data)
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def result(self):
cmd = 'select count(*) from `{}`'.format(self.table_name)

with lock:

try:
cursor = self.db.cursor()
cursor.execute(cmd)
except Exception as e:
log.info(e)
self.db.rollback()
else:
return cursor.fetchone()


def data_gen():
minute = 6000
code = ['123011.SS','110010.SS','112111.SS']
for i in range(minute):
current = (datetime.datetime.now()+datetime.timedelta(minutes=i)).strftime('%H:%M:%D')
data_list =
for c in code:
price = 5+random.random()+120
data = (c,price,current)
data_list.append(data)
yield data_list
# time.sleep(0.5)

app = SQLiteDBCls(cache=True)
app.create_table()
app.create_index()

def data_validation():
print(app.result())
app.sync_up()

def multithread_mode():
total_count = 0
thread_list =
for d in data_gen():
print(d)
total_count+=len(d)
# app.batch_add(d)
t=threading.Thread(target=app.batch_add,args=(d,))
thread_list.append(t)
for t in thread_list:
t.start()

for t in thread_list:
t.join()

print(total_count)


if __name__=='__main__':
multithread_mode()
data_validation()



 假如不加锁会出错:
File "/home/xda/miniconda3/envs/cpy/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/home/xda/github/stock_strategy/sqlite_issue_debug.py", line 77, in batch_add
self.db.commit()
Exception in thread Thread-3824:
Exception in thread Thread-3826:
Traceback (most recent call last):
File "/home/xda/miniconda3/envs/cpy/lib/python3.9/threading.py", line 973, in _bootstrap_inner
sqlite3.OperationalError: cannot commit - no transaction is activeTraceback (most recent call last):
File "/home/xda/github/stock_strategy/sqlite_issue_debug.py", line 72, in batch_add

cursor.executemany(cmd, data)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.
  查看全部
1. 随机生成一个数组数据
2. 在多线程里面批量插入数据
 
几个关注点:
sqlite3.connect(_type, check_same_thread=False) 要设置为False
 
批量写的时候,记得要加锁
 
 
import datetime
import random
import sqlite3
import threading
import logging as log
import time

lock = threading.Lock()
class SQLiteDBCls:

def __init__(self, cache=True):
_type = ":memory:"

self.db = sqlite3.connect(_type, check_same_thread=False)

self.table_name = 'tick_data'

def create_index(self):

cmd = 'CREATE INDEX code_ix ON {} (current)'.format(self.table_name)
with lock:
try:

cursor = self.db.cursor()
cursor.execute(cmd)
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def create_table(self):
# cursor = self.db.cursor()
cmd = 'create table if not exists {} (id INTEGER PRIMARY KEY AUTOINCREMENT,code text,open double,current time)'.format(
self.table_name)
with lock:
try:
cursor = self.db.cursor()
cursor.execute(cmd)
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def add(self, code, price, t):
cmd = 'insert into {} (code,open,current) values (?,?,?);'.format(self.table_name)
with lock:
try:
cursor = self.db.cursor()
cursor.execute(cmd, (code, price, t))
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def batch_add(self, data):

# 批量加入
print('===========',threading.current_thread().getName())
# log.info(threading.current_thread().getName())
cmd = 'insert into {} (code,open,current) values (?,?,?)'.format(self.table_name)
with lock:
try:
cursor = self.db.cursor()
cursor.executemany(cmd, data)
except Exception as e:
log.info(e)
self.db.rollback()
else:
self.db.commit()

def result(self):
cmd = 'select count(*) from `{}`'.format(self.table_name)

with lock:

try:
cursor = self.db.cursor()
cursor.execute(cmd)
except Exception as e:
log.info(e)
self.db.rollback()
else:
return cursor.fetchone()


def data_gen():
minute = 6000
code = ['123011.SS','110010.SS','112111.SS']
for i in range(minute):
current = (datetime.datetime.now()+datetime.timedelta(minutes=i)).strftime('%H:%M:%D')
data_list =
for c in code:
price = 5+random.random()+120
data = (c,price,current)
data_list.append(data)
yield data_list
# time.sleep(0.5)

app = SQLiteDBCls(cache=True)
app.create_table()
app.create_index()

def data_validation():
print(app.result())
app.sync_up()

def multithread_mode():
total_count = 0
thread_list =
for d in data_gen():
print(d)
total_count+=len(d)
# app.batch_add(d)
t=threading.Thread(target=app.batch_add,args=(d,))
thread_list.append(t)
for t in thread_list:
t.start()

for t in thread_list:
t.join()

print(total_count)


if __name__=='__main__':
multithread_mode()
data_validation()



 假如不加锁会出错:
File "/home/xda/miniconda3/envs/cpy/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/home/xda/github/stock_strategy/sqlite_issue_debug.py", line 77, in batch_add
self.db.commit()
Exception in thread Thread-3824:
Exception in thread Thread-3826:
Traceback (most recent call last):
File "/home/xda/miniconda3/envs/cpy/lib/python3.9/threading.py", line 973, in _bootstrap_inner
sqlite3.OperationalError: cannot commit - no transaction is activeTraceback (most recent call last):
File "/home/xda/github/stock_strategy/sqlite_issue_debug.py", line 72, in batch_add

cursor.executemany(cmd, data)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.

 

控制pymysql的链接超时

李魔佛 发表了文章 • 0 个评论 • 443 次浏览 • 2022-06-14 23:39 • 来自相关话题

看到网上尤其csdn上,大部分的教程都是说加timeout参数
 


conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db, charset='utf8',timeout=3)
 

结果运行的时候直接报错的。好家伙。
 
难道都是东家抄西家,西家抄东家?
 
直接点进去源码:





 
这里直接有一个connect_timeout 的参数,这个才是最新的常数名。 查看全部
看到网上尤其csdn上,大部分的教程都是说加timeout参数
 



conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db, charset='utf8',timeout=3)
 


结果运行的时候直接报错的。好家伙。
 
难道都是东家抄西家,西家抄东家?
 
直接点进去源码:

20220614032.png

 
这里直接有一个connect_timeout 的参数,这个才是最新的常数名。

python安装demjson报错:error in setup command: use_2to3 is invalid.

李魔佛 发表了文章 • 0 个评论 • 443 次浏览 • 2022-06-06 19:23 • 来自相关话题

原因:在setuptools 58之后的版本已经废弃了use_2to3所以安装一个旧版本的setuptools就可以了
 
随便整一个
pip install setuptools==57.5.0
原因:在setuptools 58之后的版本已经废弃了use_2to3
所以安装一个旧版本的setuptools就可以了
 
随便整一个
pip install setuptools==57.5.0

星球文章 获取所有文章 爬虫

python爬虫李魔佛 发表了文章 • 0 个评论 • 355 次浏览 • 2022-06-03 13:49 • 来自相关话题

挺讽刺的,星球自身没有目录结构,所以浏览文章,只能通过时间线,一些一年前的文章,基本就没有会看到,而且,星球的搜索功能也是一堆bug,处于搜不到的状态。
 




群里没有人没有吐槽过这个搜索功能的。
 
所以只好自己写个程序把自己的文章抓下来,作为文章目录:










 
生成的markdown文件





每次只需要运行python main.py 就可以拿到最新的星球文章链接了。 
 
需要源码可以在公众号联系~
  查看全部
挺讽刺的,星球自身没有目录结构,所以浏览文章,只能通过时间线,一些一年前的文章,基本就没有会看到,而且,星球的搜索功能也是一堆bug,处于搜不到的状态。
 
Screenshot_2022_0603_133612.jpg

群里没有人没有吐槽过这个搜索功能的。
 
所以只好自己写个程序把自己的文章抓下来,作为文章目录:

20220603002.png


20220603001.png

 
生成的markdown文件
20220603004.png


每次只需要运行python main.py 就可以拿到最新的星球文章链接了。 
 
需要源码可以在公众号联系~
 

python seo 小工具 查询百度权重,备案信息

李魔佛 发表了文章 • 0 个评论 • 294 次浏览 • 2022-05-28 14:29 • 来自相关话题

平时主要比较频繁查询 站长之家这个网站:
 




还有百度的收录情况:





 
对于经常操作的朋友,需要使用程序查询,还可以批量查询,并保存到excel或者数据库。





 
上图为入库到mongodb的数据
 
源码实现:
main.py 入口函数:from baidu_collection import baidu_site_collect
from seo_info import crawl_info
from configure.settings import DBSelector
import datetime
import argparse

client = DBSelector().mongo('qq')
doc = client['db_parker']['seo']


def main():

parser = argparse.ArgumentParser()
'''
Command line options
'''
parser.add_argument(
'-n',
'--name', type=str,
help='input web domain'
)

parser.add_argument(
'-f',
'--file', type=str,
help='input web site domain file name'
)

FLAGS = parser.parse_args()
site_list=
if FLAGS.name:
print(FLAGS.name)
if '.' in FLAGS.name:
site_list.append(FLAGS.name)

elif FLAGS.file:
print(FLAGS.file)
with open(FLAGS.file,'r') as fp:
webs=fp.readlines()

site_list.extend(list(map(lambda x:x.strip(),webs)))

if site_list:

run(site_list=site_list)
else:
print("please input correct web domain")


def run(site_list):

# TODO: 改为命令行形式


for site in site_list:
count = baidu_site_collect(site)
info = crawl_info(site)
print(info)
print(count)
info['site'] = site
info['baidu_count'] = count
info['update_time'] = datetime.datetime.now()
doc.insert_one(info)


if __name__ == '__main__':
main()


其他具体实现的文件:
 
baidu_collection.py from parsel import Selector
import requests

def baidu_site_collect(site):
# 百度收录
headers = {'User-Agent': 'Chrome Google FireFox IE'}
url = 'https://www.baidu.com/s?wd=site:{}&rsv_spt=1&rsv_iqid=0xf8b7b7e50006c034&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=baiduhome_pg&rsv_enter=0&rsv_dl=ib&rsv_sug3=14&rsv_sug1=7&rsv_sug7=100&rsv_n=2&rsv_btype=i&inputT=8238&rsv_sug4=8238'.format(site)
resp = requests.get(
url=url,
headers=headers
)

resp.encoding='utf8'
html = resp.text
selector = Selector(text=html)

count = selector.xpath('//div[@class="op_site_domain c-row"]/div/p/span/b/text()').extract_first()
if count:
count=int(count.replace(',',''))
return count

if __name__=='__main__':
site='30daydo.com'
print(baidu_site_collect(site))




seo_info.pyimport argparse
from atexit import register
import sys
import requests
import re
from parsel import Selector

#参数自定义

# parser = argparse.ArgumentParser()
# parser.add_argument('-r', dest='read', help='path file')
# parser.add_argument('-u',dest='read',help='targetdomain')
# parser_args = parser.parse_args()
#爬虫模块查询

VERBOSE = True

def askurl(target_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36'
}


#baidu权重
baidu_url=f"https://rank.chinaz.com/{target_url}"
baidu_txt=requests.get(url=baidu_url,headers=headers)
baidu_html=baidu_txt.content.decode('utf-8')
baidu_PC=re.findall('PC端</i><img src="//csstools.chinaz.com/tools/images/rankicons/baidu(.*?).png"></a></li>',baidu_html,re.S)
baidu_moblie=re.findall('移动端</i><img src="//csstools.chinaz.com/tools/images/rankicons/bd(.*?).png"></a></li>',baidu_html,re.S)
#分割线
print("*"*60)

#如果查询html中有正则出来到权重关键字就输出,否则将不输出
result={}

baidu_pc_weight = None
baidu_mobile_weight = None

if len(baidu_PC) > 0:
print('百度_PC:', baidu_PC[0])
baidu_pc_weight=baidu_PC[0]
if len(baidu_moblie) > 0:
print('百度_moblie:', baidu_moblie[0])
baidu_mobile_weight = baidu_moblie[0]
else:
print("百度无权重")

result['baidu_pc_weight']=baidu_pc_weight
result['baidu_mobile_weight']=baidu_mobile_weight

#360权重
url=f"https://rank.chinaz.com/sorank/{target_url}/"
text = requests.get(url=url,headers=headers)
html=text.content.decode('utf-8')
sorank360_PC=re.findall('PC端</i><img src="//csstools.chinaz.com/tools/images/rankicons/360(.*?).png"></a><',html,re.S)
sorank360_Mobile=re.findall('移动端</i><img src="//csstools.chinaz.com/tools/images/rankicons/360(.*?).png"',html,re.S)

_360_pc_weight=None
_360_mobile_weight=None

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(sorank360_PC) > 0:
_360_pc_weight=sorank360_PC[0]
print("360_PC:", sorank360_PC[0])
if len(sorank360_Mobile) > 0:
_360_mobile_weight=sorank360_Mobile[0]
print("360_moblie:", sorank360_Mobile[0])
else:
print("360无权重")

result['360_pc_weight']=_360_pc_weight
result['360_mobile_weight']=_360_mobile_weight


#搜狗权重


sogou_pc_weight=None
sogou_mobile_weight=None

sogou_url = f"https://rank.chinaz.com/sogoupc/{target_url}"
sougou_txt = requests.get(url=sogou_url, headers=headers)
sougou_html = sougou_txt.content.decode('utf-8')
sougou_PC = re.findall('PC端</i><img src="//csstools.chinaz.com/tools/images/rankicons/sogou(.*?).png"></a></li>',sougou_html, re.S)
sougou_mobile = re.findall('移动端</i><img src="//csstools.chinaz.com/tools/images/rankicons/sogou(.*?).png"></a></li>',sougou_html, re.S)

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(sougou_PC) > 0:
print('搜狗_PC:', sougou_PC[1])
sogou_pc_weight=sougou_PC[1]

if len(sougou_mobile) > 0 :
print('搜狗_moblie:', sougou_mobile[1])
sogou_mobile_weight=sougou_mobile[1]

else:
print('搜狗无权重')


result['sogou_pc_weight']=sogou_pc_weight
result['sogou_mobile_weight']=sogou_mobile_weight


#神马权重
shenma_pc_weight =None
shenma_url=f'https://rank.chinaz.com/smrank/{target_url}'
shenma_txt=requests.get(url=shenma_url,headers=headers)
shenma_html=shenma_txt.content.decode('utf-8')
shenma_PC=re.findall('class="tc mt5"><img src="//csstools.chinaz.com/tools/images/rankicons/shenma(.*?).png"></a></li>',shenma_html,re.S)

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(shenma_PC) > 0:
print('神马权重为:', shenma_PC[1])
shenma_pc_weight=shenma_PC[1]
else:
print("神马无权重")


result['shenma_pc_weight']=shenma_pc_weight
# result['shenma_mobile_weight']=None


#头条权重

toutiao_pc_weight=None
toutiao_url=f'https://rank.chinaz.com/toutiao/{target_url}'
toutiao_txt=requests.get(url=toutiao_url,headers=headers)
toutiao_html=toutiao_txt.content.decode('utf-8')
toutiao_PC=re.findall('class="tc mt5"><img src="//csstools.chinaz.com/tools/images/rankicons/toutiao(.*?).png"></a></li>',toutiao_html,re.S)

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(toutiao_PC) > 0:
print('头条权重为:', toutiao_PC[1])
toutiao_pc_weight=toutiao_PC[1]
else:
print("头条无权重")

result['toutiao_pc_weight']=toutiao_pc_weight
# result['toutiao_mobile_weight']=None


#备案信息、title、企业性质
beian_url=f"https://seo.chinaz.com/{target_url}"
beian_txt=requests.get(url=beian_url,headers=headers)
beian_html=beian_txt.content.decode('utf-8')

with open('beian_html.html','w') as fp:
fp.write(beian_html)

title,beian_no,name,ip,nature,register,years=parse_info(beian_html)

result['name']=name
result['title']=title
result['beian_no']=beian_no
result['ip']=ip
result['nature']=nature
result['register']=register
result['years']=years


try:
print("备案信息:",beian_no,"名称:",name,"网站首页Title:",title,"企业性质:",nature,"IP地址为:",ip)
print("*"*60)
except:
print("没有查询到有效信息!")

return result

strip_fun = lambda x:x.strip() if x is not None else ""

def parse_info(html):

resp = Selector(text=html)
title = strip_fun(resp.xpath('//div[@class="_chinaz-seo-t2l ellipsis"]/text()').extract_first())
table = resp.xpath('//table[@class="_chinaz-seo-newt"]/tbody')

if table[0].xpath('.//tr[4]/td[2]/span[1]/i'):
beian_num=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[1]/i/a/text()').extract_first())
else:
beian_num=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[1]/a/text()').extract_first())

name=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[2]/i/text()').extract_first())
if not name:
print('---->',name)
name=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[2]/i/a/text()').extract_first())

nature=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[3]/i/text()').extract_first())
ip=strip_fun(table[0].xpath('.//tr[5]/td[2]/div/span[1]/i/a/text()').extract_first())
register=strip_fun(table[0].xpath('.//tr[3]/td[2]/div[1]/span[1]/i/text()').extract_first())
years=strip_fun(table[0].xpath('.//tr[3]/td[2]/div[2]/span[1]/i/text()').extract_first())



return title,beian_num,name,ip,nature,register,years


def crawl_info(site):
return askurl(site)

if __name__ == '__main__':
main()
 
运行效果:





需要完整代码,可关注公众号联系: 查看全部
平时主要比较频繁查询 站长之家这个网站:
 
20220528005.png

还有百度的收录情况:

20220528006.png

 
对于经常操作的朋友,需要使用程序查询,还可以批量查询,并保存到excel或者数据库。

202205280071.png

 
上图为入库到mongodb的数据
 
源码实现:
main.py 入口函数:
from baidu_collection import baidu_site_collect
from seo_info import crawl_info
from configure.settings import DBSelector
import datetime
import argparse

client = DBSelector().mongo('qq')
doc = client['db_parker']['seo']


def main():

parser = argparse.ArgumentParser()
'''
Command line options
'''
parser.add_argument(
'-n',
'--name', type=str,
help='input web domain'
)

parser.add_argument(
'-f',
'--file', type=str,
help='input web site domain file name'
)

FLAGS = parser.parse_args()
site_list=
if FLAGS.name:
print(FLAGS.name)
if '.' in FLAGS.name:
site_list.append(FLAGS.name)

elif FLAGS.file:
print(FLAGS.file)
with open(FLAGS.file,'r') as fp:
webs=fp.readlines()

site_list.extend(list(map(lambda x:x.strip(),webs)))

if site_list:

run(site_list=site_list)
else:
print("please input correct web domain")


def run(site_list):

# TODO: 改为命令行形式


for site in site_list:
count = baidu_site_collect(site)
info = crawl_info(site)
print(info)
print(count)
info['site'] = site
info['baidu_count'] = count
info['update_time'] = datetime.datetime.now()
doc.insert_one(info)


if __name__ == '__main__':
main()


其他具体实现的文件:
 
baidu_collection.py 
from parsel import Selector
import requests

def baidu_site_collect(site):
# 百度收录
headers = {'User-Agent': 'Chrome Google FireFox IE'}
url = 'https://www.baidu.com/s?wd=site:{}&rsv_spt=1&rsv_iqid=0xf8b7b7e50006c034&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=baiduhome_pg&rsv_enter=0&rsv_dl=ib&rsv_sug3=14&rsv_sug1=7&rsv_sug7=100&rsv_n=2&rsv_btype=i&inputT=8238&rsv_sug4=8238'.format(site)
resp = requests.get(
url=url,
headers=headers
)

resp.encoding='utf8'
html = resp.text
selector = Selector(text=html)

count = selector.xpath('//div[@class="op_site_domain c-row"]/div/p/span/b/text()').extract_first()
if count:
count=int(count.replace(',',''))
return count

if __name__=='__main__':
site='30daydo.com'
print(baidu_site_collect(site))




seo_info.py
import argparse
from atexit import register
import sys
import requests
import re
from parsel import Selector

#参数自定义

# parser = argparse.ArgumentParser()
# parser.add_argument('-r', dest='read', help='path file')
# parser.add_argument('-u',dest='read',help='targetdomain')
# parser_args = parser.parse_args()
#爬虫模块查询

VERBOSE = True

def askurl(target_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36'
}


#baidu权重
baidu_url=f"https://rank.chinaz.com/{target_url}"
baidu_txt=requests.get(url=baidu_url,headers=headers)
baidu_html=baidu_txt.content.decode('utf-8')
baidu_PC=re.findall('PC端</i><img src="//csstools.chinaz.com/tools/images/rankicons/baidu(.*?).png"></a></li>',baidu_html,re.S)
baidu_moblie=re.findall('移动端</i><img src="//csstools.chinaz.com/tools/images/rankicons/bd(.*?).png"></a></li>',baidu_html,re.S)
#分割线
print("*"*60)

#如果查询html中有正则出来到权重关键字就输出,否则将不输出
result={}

baidu_pc_weight = None
baidu_mobile_weight = None

if len(baidu_PC) > 0:
print('百度_PC:', baidu_PC[0])
baidu_pc_weight=baidu_PC[0]
if len(baidu_moblie) > 0:
print('百度_moblie:', baidu_moblie[0])
baidu_mobile_weight = baidu_moblie[0]
else:
print("百度无权重")

result['baidu_pc_weight']=baidu_pc_weight
result['baidu_mobile_weight']=baidu_mobile_weight

#360权重
url=f"https://rank.chinaz.com/sorank/{target_url}/"
text = requests.get(url=url,headers=headers)
html=text.content.decode('utf-8')
sorank360_PC=re.findall('PC端</i><img src="//csstools.chinaz.com/tools/images/rankicons/360(.*?).png"></a><',html,re.S)
sorank360_Mobile=re.findall('移动端</i><img src="//csstools.chinaz.com/tools/images/rankicons/360(.*?).png"',html,re.S)

_360_pc_weight=None
_360_mobile_weight=None

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(sorank360_PC) > 0:
_360_pc_weight=sorank360_PC[0]
print("360_PC:", sorank360_PC[0])
if len(sorank360_Mobile) > 0:
_360_mobile_weight=sorank360_Mobile[0]
print("360_moblie:", sorank360_Mobile[0])
else:
print("360无权重")

result['360_pc_weight']=_360_pc_weight
result['360_mobile_weight']=_360_mobile_weight


#搜狗权重


sogou_pc_weight=None
sogou_mobile_weight=None

sogou_url = f"https://rank.chinaz.com/sogoupc/{target_url}"
sougou_txt = requests.get(url=sogou_url, headers=headers)
sougou_html = sougou_txt.content.decode('utf-8')
sougou_PC = re.findall('PC端</i><img src="//csstools.chinaz.com/tools/images/rankicons/sogou(.*?).png"></a></li>',sougou_html, re.S)
sougou_mobile = re.findall('移动端</i><img src="//csstools.chinaz.com/tools/images/rankicons/sogou(.*?).png"></a></li>',sougou_html, re.S)

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(sougou_PC) > 0:
print('搜狗_PC:', sougou_PC[1])
sogou_pc_weight=sougou_PC[1]

if len(sougou_mobile) > 0 :
print('搜狗_moblie:', sougou_mobile[1])
sogou_mobile_weight=sougou_mobile[1]

else:
print('搜狗无权重')


result['sogou_pc_weight']=sogou_pc_weight
result['sogou_mobile_weight']=sogou_mobile_weight


#神马权重
shenma_pc_weight =None
shenma_url=f'https://rank.chinaz.com/smrank/{target_url}'
shenma_txt=requests.get(url=shenma_url,headers=headers)
shenma_html=shenma_txt.content.decode('utf-8')
shenma_PC=re.findall('class="tc mt5"><img src="//csstools.chinaz.com/tools/images/rankicons/shenma(.*?).png"></a></li>',shenma_html,re.S)

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(shenma_PC) > 0:
print('神马权重为:', shenma_PC[1])
shenma_pc_weight=shenma_PC[1]
else:
print("神马无权重")


result['shenma_pc_weight']=shenma_pc_weight
# result['shenma_mobile_weight']=None


#头条权重

toutiao_pc_weight=None
toutiao_url=f'https://rank.chinaz.com/toutiao/{target_url}'
toutiao_txt=requests.get(url=toutiao_url,headers=headers)
toutiao_html=toutiao_txt.content.decode('utf-8')
toutiao_PC=re.findall('class="tc mt5"><img src="//csstools.chinaz.com/tools/images/rankicons/toutiao(.*?).png"></a></li>',toutiao_html,re.S)

# 如果查询html中有正则出来到权重关键字就输出,否则将不输出
if len(toutiao_PC) > 0:
print('头条权重为:', toutiao_PC[1])
toutiao_pc_weight=toutiao_PC[1]
else:
print("头条无权重")

result['toutiao_pc_weight']=toutiao_pc_weight
# result['toutiao_mobile_weight']=None


#备案信息、title、企业性质
beian_url=f"https://seo.chinaz.com/{target_url}"
beian_txt=requests.get(url=beian_url,headers=headers)
beian_html=beian_txt.content.decode('utf-8')

with open('beian_html.html','w') as fp:
fp.write(beian_html)

title,beian_no,name,ip,nature,register,years=parse_info(beian_html)

result['name']=name
result['title']=title
result['beian_no']=beian_no
result['ip']=ip
result['nature']=nature
result['register']=register
result['years']=years


try:
print("备案信息:",beian_no,"名称:",name,"网站首页Title:",title,"企业性质:",nature,"IP地址为:",ip)
print("*"*60)
except:
print("没有查询到有效信息!")

return result

strip_fun = lambda x:x.strip() if x is not None else ""

def parse_info(html):

resp = Selector(text=html)
title = strip_fun(resp.xpath('//div[@class="_chinaz-seo-t2l ellipsis"]/text()').extract_first())
table = resp.xpath('//table[@class="_chinaz-seo-newt"]/tbody')

if table[0].xpath('.//tr[4]/td[2]/span[1]/i'):
beian_num=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[1]/i/a/text()').extract_first())
else:
beian_num=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[1]/a/text()').extract_first())

name=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[2]/i/text()').extract_first())
if not name:
print('---->',name)
name=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[2]/i/a/text()').extract_first())

nature=strip_fun(table[0].xpath('.//tr[4]/td[2]/span[3]/i/text()').extract_first())
ip=strip_fun(table[0].xpath('.//tr[5]/td[2]/div/span[1]/i/a/text()').extract_first())
register=strip_fun(table[0].xpath('.//tr[3]/td[2]/div[1]/span[1]/i/text()').extract_first())
years=strip_fun(table[0].xpath('.//tr[3]/td[2]/div[2]/span[1]/i/text()').extract_first())



return title,beian_num,name,ip,nature,register,years


def crawl_info(site):
return askurl(site)

if __name__ == '__main__':
main()

 
运行效果:

20220528072.png

需要完整代码,可关注公众号联系:

B站批量下载某个UP主的所有视频

李魔佛 发表了文章 • 0 个评论 • 482 次浏览 • 2022-05-21 18:48 • 来自相关话题

B站上不少优秀的学习资源,下载到本地观看,便于快进,多倍速。 也可以放到平板,手机,在没有网络,或者网络条件不佳的环境下观看。
 

 
使用python实现
https://github.com/Rockyzsu/bilibili
 
B站视频下载
自动批量下载B站一个系列的视频

下载某个UP主的所有视频

使用:

下载you-get库,git clone https://github.com/soimort/you-get.git 复制其本地路径,比如/root/you-get/you-get

初次运行,删除history.db 文件, 修改配置文件config.py

START=1 # 下载系列视频的 第一个
END=1 # 下载系列视频的最后一个 , 比如一个系列教程有30个视频, start=5 ,end = 20 下载从第5个到第20个
ID='BV1oK411L7au' # 视频的ID
YOU_GET_PATH='/home/xda/othergit/you-get/you-get' # 你的you-get路径
MINS=1 # 每次循环等待1分钟
user_id = '518973111' # UP主的ID
total_page = 3 # up主的视频的页数
执行 python downloader.py ,进行下载循环

python people.py ,把某个up主的视频链接加入到待下载队列

python add_data.py --id=BV1oK411L7au --start=4 --end=8 下载视频id为BV1oK411L7au的系列教程,从第4开始,到第8个结束,如果只有一个的话,start和end设为1即可。

可以不断地往队列里面添加下载链接。
主要代码:
# @Time : 2019/1/28 14:19
# @File : youtube_downloader.py

import logging
import os
import subprocess
import datetime
import sqlite3
import time
from config import YOU_GET_PATH,MINS
CMD = 'python {} {}'
filename = 'url.txt'


class SQLite():
def __init__(self):
self.conn = sqlite3.connect('history.db')
self.cursor = self.conn.cursor()
self.create_table()

def create_table(self):
create_sql = 'create table if not exists tb_download (url varchar(100),status tinyint,crawltime datetime)'
create_record_tb = 'create table if not exists tb_record (idx varchar(100) PRIMARY KEY,start tinyint,end tinyint,status tinyint)'
self.cursor.execute(create_record_tb)
self.conn.commit()
self.cursor.execute(create_sql)
self.conn.commit()

def exists(self,url):
querySet = 'select * from tb_download where url = ? and status = 1'
self.cursor.execute(querySet,(url,))
ret = self.cursor.fetchone()
return True if ret else False

def insert_history(self,url,status):
query = 'select * from tb_download where url=?'
self.cursor.execute(query,(url,))
ret = self.cursor.fetchone()
current = datetime.datetime.now()

if ret:
insert_sql='update tb_download set status=?,crawltime=? where url = ?'
args=(status,status,current,url)
else:
insert_sql = 'insert into tb_download values(?,?,?)'
args=(url,status,current)

try:
self.cursor.execute(insert_sql,args)
except:
self.conn.rollback()
return False
else:
self.conn.commit()
return True

def get(self):
sql = 'select idx,start,end from tb_record where status=0'
self.cursor.execute(sql)
ret= self.cursor.fetchone()
return ret

def set(self,idx):
print('set status =1')
sql='update tb_record set status=1 where idx=?'
self.cursor.execute(sql,(idx,))
self.conn.commit()

def llogger(filename):
logger = logging.getLogger(filename) # 不加名称设置root logger

logger.setLevel(logging.DEBUG) # 设置输出级别

formatter = logging.Formatter(
'[%(asctime)s][%(filename)s][line: %(lineno)d]\[%(levelname)s] ## %(message)s)',
datefmt='%Y-%m-%d %H:%M:%S')

# 使用FileHandler输出到文件
prefix = os.path.splitext(filename)[0]
fh = logging.FileHandler(prefix + '.log')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)

# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)

# 添加两个Handler
logger.addHandler(ch)
logger.addHandler(fh)
return logger


logger = llogger('download.log')
sql_obj = SQLite()

def run():
while 1:
result = sql_obj.get()
print(result)
if result:
idx=result[0]
start=result[1]
end=result[2]
try:
download_bilibili(idx,start,end)
except:
pass
else:
sql_obj.set(idx)
else:
time.sleep(MINS*60)

def download_bilibili(id,start_page,total_page):
global doc

bilibili_url = 'https://www.bilibili.com/video/{}?p={}'
for i in range(start_page, total_page+1):

next_url = bilibili_url.format(id, i)
if sql_obj.exists(next_url):
print('have download')
continue

try:
command = CMD.format(YOU_GET_PATH, next_url)
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True)

output, error = p.communicate()

except Exception as e:
print('has execption')
sql_obj.insert_history(next_url,status=0)
logger.error(e)
continue
else:
output_str = output.decode()
if len(output_str) == 0:
sql_obj.insert_history(next_url,status=0)
logger.info('下载失败')
continue

logger.info('{} has been downloaded !'.format(next_url))
sql_obj.insert_history(next_url,status=1)

run()

  查看全部
B站上不少优秀的学习资源,下载到本地观看,便于快进,多倍速。 也可以放到平板,手机,在没有网络,或者网络条件不佳的环境下观看。
 

 
使用python实现
https://github.com/Rockyzsu/bilibili
 
B站视频下载
自动批量下载B站一个系列的视频

下载某个UP主的所有视频

使用:

下载you-get库,git clone https://github.com/soimort/you-get.git 复制其本地路径,比如/root/you-get/you-get

初次运行,删除history.db 文件, 修改配置文件config.py

START=1 # 下载系列视频的 第一个
END=1 # 下载系列视频的最后一个 , 比如一个系列教程有30个视频, start=5 ,end = 20 下载从第5个到第20个
ID='BV1oK411L7au' # 视频的ID
YOU_GET_PATH='/home/xda/othergit/you-get/you-get' # 你的you-get路径
MINS=1 # 每次循环等待1分钟
user_id = '518973111' # UP主的ID
total_page = 3 # up主的视频的页数
执行 python downloader.py ,进行下载循环

python people.py ,把某个up主的视频链接加入到待下载队列

python add_data.py --id=BV1oK411L7au --start=4 --end=8 下载视频id为BV1oK411L7au的系列教程,从第4开始,到第8个结束,如果只有一个的话,start和end设为1即可。

可以不断地往队列里面添加下载链接。

主要代码:
# @Time : 2019/1/28 14:19
# @File : youtube_downloader.py

import logging
import os
import subprocess
import datetime
import sqlite3
import time
from config import YOU_GET_PATH,MINS
CMD = 'python {} {}'
filename = 'url.txt'


class SQLite():
def __init__(self):
self.conn = sqlite3.connect('history.db')
self.cursor = self.conn.cursor()
self.create_table()

def create_table(self):
create_sql = 'create table if not exists tb_download (url varchar(100),status tinyint,crawltime datetime)'
create_record_tb = 'create table if not exists tb_record (idx varchar(100) PRIMARY KEY,start tinyint,end tinyint,status tinyint)'
self.cursor.execute(create_record_tb)
self.conn.commit()
self.cursor.execute(create_sql)
self.conn.commit()

def exists(self,url):
querySet = 'select * from tb_download where url = ? and status = 1'
self.cursor.execute(querySet,(url,))
ret = self.cursor.fetchone()
return True if ret else False

def insert_history(self,url,status):
query = 'select * from tb_download where url=?'
self.cursor.execute(query,(url,))
ret = self.cursor.fetchone()
current = datetime.datetime.now()

if ret:
insert_sql='update tb_download set status=?,crawltime=? where url = ?'
args=(status,status,current,url)
else:
insert_sql = 'insert into tb_download values(?,?,?)'
args=(url,status,current)

try:
self.cursor.execute(insert_sql,args)
except:
self.conn.rollback()
return False
else:
self.conn.commit()
return True

def get(self):
sql = 'select idx,start,end from tb_record where status=0'
self.cursor.execute(sql)
ret= self.cursor.fetchone()
return ret

def set(self,idx):
print('set status =1')
sql='update tb_record set status=1 where idx=?'
self.cursor.execute(sql,(idx,))
self.conn.commit()

def llogger(filename):
logger = logging.getLogger(filename) # 不加名称设置root logger

logger.setLevel(logging.DEBUG) # 设置输出级别

formatter = logging.Formatter(
'[%(asctime)s][%(filename)s][line: %(lineno)d]\[%(levelname)s] ## %(message)s)',
datefmt='%Y-%m-%d %H:%M:%S')

# 使用FileHandler输出到文件
prefix = os.path.splitext(filename)[0]
fh = logging.FileHandler(prefix + '.log')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)

# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)

# 添加两个Handler
logger.addHandler(ch)
logger.addHandler(fh)
return logger


logger = llogger('download.log')
sql_obj = SQLite()

def run():
while 1:
result = sql_obj.get()
print(result)
if result:
idx=result[0]
start=result[1]
end=result[2]
try:
download_bilibili(idx,start,end)
except:
pass
else:
sql_obj.set(idx)
else:
time.sleep(MINS*60)

def download_bilibili(id,start_page,total_page):
global doc

bilibili_url = 'https://www.bilibili.com/video/{}?p={}'
for i in range(start_page, total_page+1):

next_url = bilibili_url.format(id, i)
if sql_obj.exists(next_url):
print('have download')
continue

try:
command = CMD.format(YOU_GET_PATH, next_url)
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True)

output, error = p.communicate()

except Exception as e:
print('has execption')
sql_obj.insert_history(next_url,status=0)
logger.error(e)
continue
else:
output_str = output.decode()
if len(output_str) == 0:
sql_obj.insert_history(next_url,status=0)
logger.info('下载失败')
continue

logger.info('{} has been downloaded !'.format(next_url))
sql_obj.insert_history(next_url,status=1)

run()

 

python3的map是迭代器,不用for循环或者next触发是不会执行的

李魔佛 发表了文章 • 0 个评论 • 395 次浏览 • 2022-05-21 17:59 • 来自相关话题

最近刚好有位群友咨询,他写的代码如下:
def update_data(id,start,end):
status=0
conn = sqlite3.connect('history.db')
cursor = conn.cursor()
insert_sql ='insert into tb_record values(?,?,?,?)'

try:
cursor.execute(insert_sql,(id,start,end,status))
except Exception as e:
print(e)
print('Error')
else:
conn.commit()
print("successfully insert") 
bv_list = []
for i in range(1, total_page + 1):
bv_list.extend(visit(i))
print(bv_list)
map(lambda x:update_data(x,1,1),bv_list)
作用很简单,就是拿到列表后用map放入到sqlite里面。
但是上面的代码并不起作用。
因为map只是定义了一个迭代器,并没有被触发。
 
可以加一个list(map(lambda x:update_data(x,1,1),bv_list))
这样就可以执行了。 查看全部
最近刚好有位群友咨询,他写的代码如下:
def update_data(id,start,end):
status=0
conn = sqlite3.connect('history.db')
cursor = conn.cursor()
insert_sql ='insert into tb_record values(?,?,?,?)'

try:
cursor.execute(insert_sql,(id,start,end,status))
except Exception as e:
print(e)
print('Error')
else:
conn.commit()
print("successfully insert")
 
bv_list = []
for i in range(1, total_page + 1):
bv_list.extend(visit(i))
print(bv_list)
map(lambda x:update_data(x,1,1),bv_list)

作用很简单,就是拿到列表后用map放入到sqlite里面。
但是上面的代码并不起作用。
因为map只是定义了一个迭代器,并没有被触发。
 
可以加一个list(map(lambda x:update_data(x,1,1),bv_list))
这样就可以执行了。

dataframe如何 遍历所有的列?

李魔佛 发表了文章 • 0 个评论 • 689 次浏览 • 2022-05-21 02:16 • 来自相关话题

如果遍历行,我们经常会使用df.iterrows(), 而列呢?
可以使用df.items()
 Python pandas.DataFrame.items用法及代码示例
用法:
DataFrame.items()
迭代(列名,系列)对。

遍历 DataFrame 列,返回一个包含列名和内容的元组作为一个系列。

生成(Yield):
label:对象
被迭代的 DataFrame 的列名。

content:Series
属于每个标签的列条目,作为一个系列。

例子:
>>> df = pd.DataFrame({'species':['bear', 'bear', 'marsupial'],
... 'population':[1864, 22000, 80000]},
... index=['panda', 'polar', 'koala'])
>>> df
species population
panda bear 1864
polar bear 22000
koala marsupial 80000
>>> for label, content in df.items():
... print(f'label:{label}')
... print(f'content:{content}', sep='\n')
...
label:species
content:
panda bear
polar bear
koala marsupial
Name:species, dtype:object
label:population
content:
panda 1864
polar 22000
koala 80000
Name:population, dtype:int64 查看全部
如果遍历行,我们经常会使用df.iterrows(), 而列呢?
可以使用df.items()
 
Python pandas.DataFrame.items用法及代码示例
用法:
DataFrame.items()
迭代(列名,系列)对。

遍历 DataFrame 列,返回一个包含列名和内容的元组作为一个系列。

生成(Yield):
label:对象
被迭代的 DataFrame 的列名。

content:Series
属于每个标签的列条目,作为一个系列。

例子:
>>> df = pd.DataFrame({'species':['bear', 'bear', 'marsupial'],
... 'population':[1864, 22000, 80000]},
... index=['panda', 'polar', 'koala'])
>>> df
species population
panda bear 1864
polar bear 22000
koala marsupial 80000
>>> for label, content in df.items():
... print(f'label:{label}')
... print(f'content:{content}', sep='\n')
...
label:species
content:
panda bear
polar bear
koala marsupial
Name:species, dtype:object
label:population
content:
panda 1864
polar 22000
koala 80000
Name:population, dtype:int64

python对视频添加水印 调整帧率

李魔佛 发表了文章 • 0 个评论 • 432 次浏览 • 2022-05-07 11:37 • 来自相关话题

Python调用ffmpeg开源视频处理库,来实现视频批量的处理:水印、背景音乐、剪辑、合并、帧率、速率、分辨率等操作

FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。 它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的。 FFmpeg在Linux平台下开发,但它同样也可以在其它操作系统环境中编译运行,包括Windows、Mac OS X等。 这个项目最早由Fabrice Bellard发起,2004年至2015年间由Michael Niedermayer主要负责维护。 许多FFmpeg的开发人员都来自MPlayer项目,而且当前FFmpeg也是放在MPlayer项目组的服务器上。 项目的名称来自MPEG视频编码标准,前面的"FF"代表"Fast Forward"。
 
 # coding=utf-8
import os
import subprocess
import datetime
import json, pprint
import re, time
import threading
import random
import shutil


class FFmpeg:

def __init__(self, editvdo, addlogo=None, addmusic=None,
addvdohead=None, addvdotail=None):
self.editvdo = editvdo
self.addlogo = addlogo
self.addmusic = addmusic
self.addvdohead = addvdohead
self.addvdotail = addvdotail
self.vdo_time, self.vdo_width, self.vdo_height, self.attr_dict = self.get_attr()
self.editvdo_path = os.path.dirname(editvdo)
self.editvdo_name = os.path.basename(editvdo)

def get_attr(self):
"""
获取视频属性参数
:return:
"""
strcmd = r'ffprobe -print_format json -show_streams -i "{}"'.format(self.editvdo)
status, output = subprocess.getstatusoutput(strcmd)
agrs = eval(re.search('{.*}', output, re.S).group().replace("\n", "").replace(" ", ''))
streams = agrs.get('streams', )
agrs_dict = dict()
[agrs_dict.update(x) for x in streams]
vdo_time = agrs_dict.get('duration')
vdo_width = agrs_dict.get('width')
vdo_height = agrs_dict.get('height')
attr = (vdo_time, vdo_width, vdo_height, agrs_dict)
return attr

def edit_head(self, start_time, end_time, deposit=None):
"""
截取指定长度视频
:param second: 去除开始的多少秒
:param deposit: 另存为文件
:return: True/Flase
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_head'+self.editvdo_name
start = time.strftime('%H:%M:%S', time.gmtime(start_time))
end = time.strftime('%H:%M:%S', time.gmtime(end_time))
strcmd = 'ffmpeg -i "{}" -vcodec copy -acodec copy -ss {} -to {} "{}" -y'.format(
self.editvdo, start, end, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_logo(self, deposit=None):
"""
添加水印
:param deposit:添加水印后另存为路径,为空则覆盖
:return: True/False
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_logo'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -vf "movie=\'{}\' [watermark];[in] ' \
r'[watermark] overlay=main_w-overlay_w-10:10 [out]" "{}"'.format(
self.editvdo, self.addlogo, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_music(self, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -y -i "{}" -i "{}" -filter_complex "[0:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a1], [1:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a2],[a1][a2]amix=duration=first,' \
r'pan=stereo|c0<c0+c1|c1<c2+c3,pan=mono|c0=c0+c1[a]" ' \
r'-map "[a]" -map 0:v -c:v libx264 -c:a aac ' \
r'-strict -2 -ac 2 "{}"'.format(self.editvdo, self.addmusic, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_rate(self, rete=30, deposit=None):
"""
改变帧率
:param rete: 修改大小帧率
:param deposit: 修改后保存路径
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -r {} "{}"' % (self.editvdo, rete, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_power(self, power='1280x720', deposit=None):
"""
修改分辨率
:param power: 分辨率
:param deposit: 修改后保存路径,为空则覆盖
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_power'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -s {} "{}"'.format(self.editvdo, power, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def rdit_marge(self, vdo_head, vdo_tail, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'rdit_marge'+self.editvdo_name
with open(self.editvdo_path+'/'+'rdit_marge.txt', 'w', encoding='utf-8') as f:
f.write("file '{}' \nfile '{}' \nfile '{}'" .format(
vdo_head, self.editvdo, vdo_tail))
strcmd = r'ffmpeg -f concat -safe 0 -i "{}" -c copy "{}"'.format(
self.editvdo_path + '/' + 'rdit_marge.txt', deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False



# ffmpeg - i input.mkv - filter_complex "[0:v]setpts=0.5*PTS[v];[0:a]atempo=2.0[a]" - map"[v]" - map"[a]" output.mkv



test = FFmpeg(r"D:\vdio\4.mp4")


PS:需要电脑把ffmpeg的可执行文件放到环境变量中 查看全部
Python调用ffmpeg开源视频处理库,来实现视频批量的处理:水印、背景音乐、剪辑、合并、帧率、速率、分辨率等操作

FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。 它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的。 FFmpeg在Linux平台下开发,但它同样也可以在其它操作系统环境中编译运行,包括Windows、Mac OS X等。 这个项目最早由Fabrice Bellard发起,2004年至2015年间由Michael Niedermayer主要负责维护。 许多FFmpeg的开发人员都来自MPlayer项目,而且当前FFmpeg也是放在MPlayer项目组的服务器上。 项目的名称来自MPEG视频编码标准,前面的"FF"代表"Fast Forward"。
 
 
# coding=utf-8
import os
import subprocess
import datetime
import json, pprint
import re, time
import threading
import random
import shutil


class FFmpeg:

def __init__(self, editvdo, addlogo=None, addmusic=None,
addvdohead=None, addvdotail=None):
self.editvdo = editvdo
self.addlogo = addlogo
self.addmusic = addmusic
self.addvdohead = addvdohead
self.addvdotail = addvdotail
self.vdo_time, self.vdo_width, self.vdo_height, self.attr_dict = self.get_attr()
self.editvdo_path = os.path.dirname(editvdo)
self.editvdo_name = os.path.basename(editvdo)

def get_attr(self):
"""
获取视频属性参数
:return:
"""
strcmd = r'ffprobe -print_format json -show_streams -i "{}"'.format(self.editvdo)
status, output = subprocess.getstatusoutput(strcmd)
agrs = eval(re.search('{.*}', output, re.S).group().replace("\n", "").replace(" ", ''))
streams = agrs.get('streams', )
agrs_dict = dict()
[agrs_dict.update(x) for x in streams]
vdo_time = agrs_dict.get('duration')
vdo_width = agrs_dict.get('width')
vdo_height = agrs_dict.get('height')
attr = (vdo_time, vdo_width, vdo_height, agrs_dict)
return attr

def edit_head(self, start_time, end_time, deposit=None):
"""
截取指定长度视频
:param second: 去除开始的多少秒
:param deposit: 另存为文件
:return: True/Flase
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_head'+self.editvdo_name
start = time.strftime('%H:%M:%S', time.gmtime(start_time))
end = time.strftime('%H:%M:%S', time.gmtime(end_time))
strcmd = 'ffmpeg -i "{}" -vcodec copy -acodec copy -ss {} -to {} "{}" -y'.format(
self.editvdo, start, end, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_logo(self, deposit=None):
"""
添加水印
:param deposit:添加水印后另存为路径,为空则覆盖
:return: True/False
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_logo'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -vf "movie=\'{}\' [watermark];[in] ' \
r'[watermark] overlay=main_w-overlay_w-10:10 [out]" "{}"'.format(
self.editvdo, self.addlogo, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_music(self, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -y -i "{}" -i "{}" -filter_complex "[0:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a1], [1:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a2],[a1][a2]amix=duration=first,' \
r'pan=stereo|c0<c0+c1|c1<c2+c3,pan=mono|c0=c0+c1[a]" ' \
r'-map "[a]" -map 0:v -c:v libx264 -c:a aac ' \
r'-strict -2 -ac 2 "{}"'.format(self.editvdo, self.addmusic, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_rate(self, rete=30, deposit=None):
"""
改变帧率
:param rete: 修改大小帧率
:param deposit: 修改后保存路径
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -r {} "{}"' % (self.editvdo, rete, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_power(self, power='1280x720', deposit=None):
"""
修改分辨率
:param power: 分辨率
:param deposit: 修改后保存路径,为空则覆盖
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_power'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -s {} "{}"'.format(self.editvdo, power, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def rdit_marge(self, vdo_head, vdo_tail, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'rdit_marge'+self.editvdo_name
with open(self.editvdo_path+'/'+'rdit_marge.txt', 'w', encoding='utf-8') as f:
f.write("file '{}' \nfile '{}' \nfile '{}'" .format(
vdo_head, self.editvdo, vdo_tail))
strcmd = r'ffmpeg -f concat -safe 0 -i "{}" -c copy "{}"'.format(
self.editvdo_path + '/' + 'rdit_marge.txt', deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False



# ffmpeg - i input.mkv - filter_complex "[0:v]setpts=0.5*PTS[v];[0:a]atempo=2.0[a]" - map"[v]" - map"[a]" output.mkv



test = FFmpeg(r"D:\vdio\4.mp4")


PS:需要电脑把ffmpeg的可执行文件放到环境变量中

格式工厂去除视频水印logo效果不好

李魔佛 发表了文章 • 0 个评论 • 577 次浏览 • 2022-05-07 10:53 • 来自相关话题

本来想用opencv处理的,发现格式工厂已经有类似的功能了。
 
试了一下,结果效果不理想,格式工厂只是把logo区域进行模糊处理,也就是logo区域变得不可再阅读。
 

 
  查看全部
本来想用opencv处理的,发现格式工厂已经有类似的功能了。
 
试了一下,结果效果不理想,格式工厂只是把logo区域进行模糊处理,也就是logo区域变得不可再阅读。
 

 
 

python3 安装demjson 报错 use_2to3 is invalid

李魔佛 发表了文章 • 0 个评论 • 1439 次浏览 • 2022-04-18 20:19 • 来自相关话题

ooking in indexes: https://pypi.douban.com/simple
Collecting demjson==2.2.4
Downloading https://pypi.doubanio.com/pack ... ar.gz (131 kB)
|████████████████████████████████| 131 kB 985 kB/s
ERROR: Command errored out with exit status 1:
command: /root/miniconda3/envs/py37/bin/python -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"'; __file__='"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-7ve4tu87
cwd: /tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/
Complete output (3 lines):
/root/miniconda3/envs/py37/lib/python3.7/site-packages/setuptools/dist.py:760: UserWarning: Usage of dash-separated 'index-url' will not be supported in future versions. Please use the underscore name 'index_url' instead
% (opt, underscore_opt)
error in demjson setup command: use_2to3 is invalid.

 记录一下解决办法:
setuptools 降级:
 
pip install --upgrade setuptools==57.5.0
 
然后再pip install demjson 即可
 
如果担心setuptools 修改到系统的其他库,可以创建一个虚拟环境。
然后在虚拟环境里面对setuptools 降级,再安装demjson 
 
来个养眼图: 查看全部
ooking in indexes: https://pypi.douban.com/simple
Collecting demjson==2.2.4
Downloading https://pypi.doubanio.com/pack ... ar.gz (131 kB)
|████████████████████████████████| 131 kB 985 kB/s
ERROR: Command errored out with exit status 1:
command: /root/miniconda3/envs/py37/bin/python -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"'; __file__='"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-7ve4tu87
cwd: /tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/
Complete output (3 lines):
/root/miniconda3/envs/py37/lib/python3.7/site-packages/setuptools/dist.py:760: UserWarning: Usage of dash-separated 'index-url' will not be supported in future versions. Please use the underscore name 'index_url' instead
% (opt, underscore_opt)
error in demjson setup command: use_2to3 is invalid.

 记录一下解决办法:
setuptools 降级:
 
pip install --upgrade setuptools==57.5.0
 
然后再pip install demjson 即可
 
如果担心setuptools 修改到系统的其他库,可以创建一个虚拟环境。
然后在虚拟环境里面对setuptools 降级,再安装demjson 
 
来个养眼图:

mongodb python同步两个数据库数据

李魔佛 发表了文章 • 0 个评论 • 574 次浏览 • 2022-04-07 02:44 • 来自相关话题

有时候需要做一些迁移工作,需要对mongodb进行迁移。默认的工具貌似也十分好用的。缺少像Navicat 之于mysql的这样神级的软件。
 
所以自己动手写代码完成:
 
# -*- coding: utf-8 -*-
# @Time : 2022/4/6 4:41
# @File : database_migrate.py
# @Author : Rocky C@www.30daydo.com
import time
from loguru import logger
import pymongo

ignore_db = ['admin', 'config', 'local',
] # 忽略更新的库

ignore_col = [('db_stock','dfcf_list_full')]

logger.add('mongo.log')

# 数据库同步
def get_client(user, password, host, port):
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = pymongo.MongoClient(connect_uri)
return client


def origin():
return get_client('admin', 'password', '127.0.0.1', '27017')


def target():
return get_client('root', 'password', '127.0.0.1', '27017')


def transfer():
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)
for db in dbs:
for col in get_collection_name(origin_client, db):

if (db,col) in ignore_col:
continue
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
# time.sleep(0.5)

def get_item(client, db_name, col):
return client[db_name][col].find()



def insert_item(client, db_name, col, data):
batch = 1000
count = len(data)//batch + 1
for i in range(count):
item = data[i*batch:(i+1)*batch]

if len(item)==0:
continue

try:
client[db_name][col].insert_many(item)
except Exception as e:
logger.error(e)
logger.error(f'{db_name} {col} 插入出错')


def get_db_name(client):
db_name = client.list_database_names()
dbs = []
for db in db_name:
if db not in ignore_db:
dbs.append(db)
return dbs

def delete_col(client,db,col):
try:
client[db][col].delete_many({})
except Exception as e:
logger.error(e)
logger.error(db)
logger.error(col)
return False
else:
return True

def server_compare():
'''
比较2个数据库是否相同,只是单纯比较条数
'''
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)

for db in dbs:
for col in get_collection_name(origin_client, db):
origin_count = origin_client[db][col].count_documents({})
target_count = target_client[db][col].count_documents({})
if origin_count!=target_count:
logger.info(f'collection {db} {col}有区别')
#
if delete_col(target_client,db,col):
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
time.sleep(1)




def get_collection_name(client, db_name):
collection_names = client[db_name].list_collection_names(session=None)
return collection_names

def main():
server_compare()

if __name__ == '__main__':
main()

 原理就是不断迭代,不同的数据库,里面的不同的collection。
对于同名collection,通过条数是否一致,来决定是否要把原数据复制过来。 
 
保存上面文件为main.py
 
执行 python main.py
 
就可以进行数据同步工作啦。 查看全部
有时候需要做一些迁移工作,需要对mongodb进行迁移。默认的工具貌似也十分好用的。缺少像Navicat 之于mysql的这样神级的软件。
 
所以自己动手写代码完成:
 
# -*- coding: utf-8 -*-
# @Time : 2022/4/6 4:41
# @File : database_migrate.py
# @Author : Rocky C@www.30daydo.com
import time
from loguru import logger
import pymongo

ignore_db = ['admin', 'config', 'local',
] # 忽略更新的库

ignore_col = [('db_stock','dfcf_list_full')]

logger.add('mongo.log')

# 数据库同步
def get_client(user, password, host, port):
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = pymongo.MongoClient(connect_uri)
return client


def origin():
return get_client('admin', 'password', '127.0.0.1', '27017')


def target():
return get_client('root', 'password', '127.0.0.1', '27017')


def transfer():
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)
for db in dbs:
for col in get_collection_name(origin_client, db):

if (db,col) in ignore_col:
continue
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
# time.sleep(0.5)

def get_item(client, db_name, col):
return client[db_name][col].find()



def insert_item(client, db_name, col, data):
batch = 1000
count = len(data)//batch + 1
for i in range(count):
item = data[i*batch:(i+1)*batch]

if len(item)==0:
continue

try:
client[db_name][col].insert_many(item)
except Exception as e:
logger.error(e)
logger.error(f'{db_name} {col} 插入出错')


def get_db_name(client):
db_name = client.list_database_names()
dbs = []
for db in db_name:
if db not in ignore_db:
dbs.append(db)
return dbs

def delete_col(client,db,col):
try:
client[db][col].delete_many({})
except Exception as e:
logger.error(e)
logger.error(db)
logger.error(col)
return False
else:
return True

def server_compare():
'''
比较2个数据库是否相同,只是单纯比较条数
'''
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)

for db in dbs:
for col in get_collection_name(origin_client, db):
origin_count = origin_client[db][col].count_documents({})
target_count = target_client[db][col].count_documents({})
if origin_count!=target_count:
logger.info(f'collection {db} {col}有区别')
#
if delete_col(target_client,db,col):
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
time.sleep(1)




def get_collection_name(client, db_name):
collection_names = client[db_name].list_collection_names(session=None)
return collection_names

def main():
server_compare()

if __name__ == '__main__':
main()

 原理就是不断迭代,不同的数据库,里面的不同的collection。
对于同名collection,通过条数是否一致,来决定是否要把原数据复制过来。 
 
保存上面文件为main.py
 
执行 python main.py
 
就可以进行数据同步工作啦。