B站批量下载某个UP主的所有视频

李魔佛 发表了文章 • 0 个评论 • 32 次浏览 • 2022-05-21 18:48 • 来自相关话题

B站上不少优秀的学习资源,下载到本地观看,便于快进,多倍速。 也可以放到平板,手机,在没有网络,或者网络条件不佳的环境下观看。
 

 
使用python实现
https://github.com/Rockyzsu/bilibili
 
B站视频下载
自动批量下载B站一个系列的视频

下载某个UP主的所有视频

使用:

下载you-get库,git clone https://github.com/soimort/you-get.git 复制其本地路径,比如/root/you-get/you-get

初次运行,删除history.db 文件, 修改配置文件config.py

START=1 # 下载系列视频的 第一个
END=1 # 下载系列视频的最后一个 , 比如一个系列教程有30个视频, start=5 ,end = 20 下载从第5个到第20个
ID='BV1oK411L7au' # 视频的ID
YOU_GET_PATH='/home/xda/othergit/you-get/you-get' # 你的you-get路径
MINS=1 # 每次循环等待1分钟
user_id = '518973111' # UP主的ID
total_page = 3 # up主的视频的页数
执行 python downloader.py ,进行下载循环

python people.py ,把某个up主的视频链接加入到待下载队列

python add_data.py --id=BV1oK411L7au --start=4 --end=8 下载视频id为BV1oK411L7au的系列教程,从第4开始,到第8个结束,如果只有一个的话,start和end设为1即可。

可以不断地往队列里面添加下载链接。
主要代码:
# @Time : 2019/1/28 14:19
# @File : youtube_downloader.py

import logging
import os
import subprocess
import datetime
import sqlite3
import time
from config import YOU_GET_PATH,MINS
CMD = 'python {} {}'
filename = 'url.txt'


class SQLite():
def __init__(self):
self.conn = sqlite3.connect('history.db')
self.cursor = self.conn.cursor()
self.create_table()

def create_table(self):
create_sql = 'create table if not exists tb_download (url varchar(100),status tinyint,crawltime datetime)'
create_record_tb = 'create table if not exists tb_record (idx varchar(100) PRIMARY KEY,start tinyint,end tinyint,status tinyint)'
self.cursor.execute(create_record_tb)
self.conn.commit()
self.cursor.execute(create_sql)
self.conn.commit()

def exists(self,url):
querySet = 'select * from tb_download where url = ? and status = 1'
self.cursor.execute(querySet,(url,))
ret = self.cursor.fetchone()
return True if ret else False

def insert_history(self,url,status):
query = 'select * from tb_download where url=?'
self.cursor.execute(query,(url,))
ret = self.cursor.fetchone()
current = datetime.datetime.now()

if ret:
insert_sql='update tb_download set status=?,crawltime=? where url = ?'
args=(status,status,current,url)
else:
insert_sql = 'insert into tb_download values(?,?,?)'
args=(url,status,current)

try:
self.cursor.execute(insert_sql,args)
except:
self.conn.rollback()
return False
else:
self.conn.commit()
return True

def get(self):
sql = 'select idx,start,end from tb_record where status=0'
self.cursor.execute(sql)
ret= self.cursor.fetchone()
return ret

def set(self,idx):
print('set status =1')
sql='update tb_record set status=1 where idx=?'
self.cursor.execute(sql,(idx,))
self.conn.commit()

def llogger(filename):
logger = logging.getLogger(filename) # 不加名称设置root logger

logger.setLevel(logging.DEBUG) # 设置输出级别

formatter = logging.Formatter(
'[%(asctime)s][%(filename)s][line: %(lineno)d]\[%(levelname)s] ## %(message)s)',
datefmt='%Y-%m-%d %H:%M:%S')

# 使用FileHandler输出到文件
prefix = os.path.splitext(filename)[0]
fh = logging.FileHandler(prefix + '.log')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)

# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)

# 添加两个Handler
logger.addHandler(ch)
logger.addHandler(fh)
return logger


logger = llogger('download.log')
sql_obj = SQLite()

def run():
while 1:
result = sql_obj.get()
print(result)
if result:
idx=result[0]
start=result[1]
end=result[2]
try:
download_bilibili(idx,start,end)
except:
pass
else:
sql_obj.set(idx)
else:
time.sleep(MINS*60)

def download_bilibili(id,start_page,total_page):
global doc

bilibili_url = 'https://www.bilibili.com/video/{}?p={}'
for i in range(start_page, total_page+1):

next_url = bilibili_url.format(id, i)
if sql_obj.exists(next_url):
print('have download')
continue

try:
command = CMD.format(YOU_GET_PATH, next_url)
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True)

output, error = p.communicate()

except Exception as e:
print('has execption')
sql_obj.insert_history(next_url,status=0)
logger.error(e)
continue
else:
output_str = output.decode()
if len(output_str) == 0:
sql_obj.insert_history(next_url,status=0)
logger.info('下载失败')
continue

logger.info('{} has been downloaded !'.format(next_url))
sql_obj.insert_history(next_url,status=1)

run()

  查看全部
B站上不少优秀的学习资源,下载到本地观看,便于快进,多倍速。 也可以放到平板,手机,在没有网络,或者网络条件不佳的环境下观看。
 

 
使用python实现
https://github.com/Rockyzsu/bilibili
 
B站视频下载
自动批量下载B站一个系列的视频

下载某个UP主的所有视频

使用:

下载you-get库,git clone https://github.com/soimort/you-get.git 复制其本地路径,比如/root/you-get/you-get

初次运行,删除history.db 文件, 修改配置文件config.py

START=1 # 下载系列视频的 第一个
END=1 # 下载系列视频的最后一个 , 比如一个系列教程有30个视频, start=5 ,end = 20 下载从第5个到第20个
ID='BV1oK411L7au' # 视频的ID
YOU_GET_PATH='/home/xda/othergit/you-get/you-get' # 你的you-get路径
MINS=1 # 每次循环等待1分钟
user_id = '518973111' # UP主的ID
total_page = 3 # up主的视频的页数
执行 python downloader.py ,进行下载循环

python people.py ,把某个up主的视频链接加入到待下载队列

python add_data.py --id=BV1oK411L7au --start=4 --end=8 下载视频id为BV1oK411L7au的系列教程,从第4开始,到第8个结束,如果只有一个的话,start和end设为1即可。

可以不断地往队列里面添加下载链接。

主要代码:
# @Time : 2019/1/28 14:19
# @File : youtube_downloader.py

import logging
import os
import subprocess
import datetime
import sqlite3
import time
from config import YOU_GET_PATH,MINS
CMD = 'python {} {}'
filename = 'url.txt'


class SQLite():
def __init__(self):
self.conn = sqlite3.connect('history.db')
self.cursor = self.conn.cursor()
self.create_table()

def create_table(self):
create_sql = 'create table if not exists tb_download (url varchar(100),status tinyint,crawltime datetime)'
create_record_tb = 'create table if not exists tb_record (idx varchar(100) PRIMARY KEY,start tinyint,end tinyint,status tinyint)'
self.cursor.execute(create_record_tb)
self.conn.commit()
self.cursor.execute(create_sql)
self.conn.commit()

def exists(self,url):
querySet = 'select * from tb_download where url = ? and status = 1'
self.cursor.execute(querySet,(url,))
ret = self.cursor.fetchone()
return True if ret else False

def insert_history(self,url,status):
query = 'select * from tb_download where url=?'
self.cursor.execute(query,(url,))
ret = self.cursor.fetchone()
current = datetime.datetime.now()

if ret:
insert_sql='update tb_download set status=?,crawltime=? where url = ?'
args=(status,status,current,url)
else:
insert_sql = 'insert into tb_download values(?,?,?)'
args=(url,status,current)

try:
self.cursor.execute(insert_sql,args)
except:
self.conn.rollback()
return False
else:
self.conn.commit()
return True

def get(self):
sql = 'select idx,start,end from tb_record where status=0'
self.cursor.execute(sql)
ret= self.cursor.fetchone()
return ret

def set(self,idx):
print('set status =1')
sql='update tb_record set status=1 where idx=?'
self.cursor.execute(sql,(idx,))
self.conn.commit()

def llogger(filename):
logger = logging.getLogger(filename) # 不加名称设置root logger

logger.setLevel(logging.DEBUG) # 设置输出级别

formatter = logging.Formatter(
'[%(asctime)s][%(filename)s][line: %(lineno)d]\[%(levelname)s] ## %(message)s)',
datefmt='%Y-%m-%d %H:%M:%S')

# 使用FileHandler输出到文件
prefix = os.path.splitext(filename)[0]
fh = logging.FileHandler(prefix + '.log')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)

# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)

# 添加两个Handler
logger.addHandler(ch)
logger.addHandler(fh)
return logger


logger = llogger('download.log')
sql_obj = SQLite()

def run():
while 1:
result = sql_obj.get()
print(result)
if result:
idx=result[0]
start=result[1]
end=result[2]
try:
download_bilibili(idx,start,end)
except:
pass
else:
sql_obj.set(idx)
else:
time.sleep(MINS*60)

def download_bilibili(id,start_page,total_page):
global doc

bilibili_url = 'https://www.bilibili.com/video/{}?p={}'
for i in range(start_page, total_page+1):

next_url = bilibili_url.format(id, i)
if sql_obj.exists(next_url):
print('have download')
continue

try:
command = CMD.format(YOU_GET_PATH, next_url)
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True)

output, error = p.communicate()

except Exception as e:
print('has execption')
sql_obj.insert_history(next_url,status=0)
logger.error(e)
continue
else:
output_str = output.decode()
if len(output_str) == 0:
sql_obj.insert_history(next_url,status=0)
logger.info('下载失败')
continue

logger.info('{} has been downloaded !'.format(next_url))
sql_obj.insert_history(next_url,status=1)

run()

 

python3的map是迭代器,不用for循环或者next触发是不会执行的

李魔佛 发表了文章 • 0 个评论 • 20 次浏览 • 2022-05-21 17:59 • 来自相关话题

最近刚好有位群友咨询,他写的代码如下:
def update_data(id,start,end):
status=0
conn = sqlite3.connect('history.db')
cursor = conn.cursor()
insert_sql ='insert into tb_record values(?,?,?,?)'

try:
cursor.execute(insert_sql,(id,start,end,status))
except Exception as e:
print(e)
print('Error')
else:
conn.commit()
print("successfully insert") 
bv_list = []
for i in range(1, total_page + 1):
bv_list.extend(visit(i))
print(bv_list)
map(lambda x:update_data(x,1,1),bv_list)
作用很简单,就是拿到列表后用map放入到sqlite里面。
但是上面的代码并不起作用。
因为map只是定义了一个迭代器,并没有被触发。
 
可以加一个list(map(lambda x:update_data(x,1,1),bv_list))
这样就可以执行了。 查看全部
最近刚好有位群友咨询,他写的代码如下:
def update_data(id,start,end):
status=0
conn = sqlite3.connect('history.db')
cursor = conn.cursor()
insert_sql ='insert into tb_record values(?,?,?,?)'

try:
cursor.execute(insert_sql,(id,start,end,status))
except Exception as e:
print(e)
print('Error')
else:
conn.commit()
print("successfully insert")
 
bv_list = []
for i in range(1, total_page + 1):
bv_list.extend(visit(i))
print(bv_list)
map(lambda x:update_data(x,1,1),bv_list)

作用很简单,就是拿到列表后用map放入到sqlite里面。
但是上面的代码并不起作用。
因为map只是定义了一个迭代器,并没有被触发。
 
可以加一个list(map(lambda x:update_data(x,1,1),bv_list))
这样就可以执行了。

dataframe如何 遍历所有的列?

李魔佛 发表了文章 • 0 个评论 • 25 次浏览 • 2022-05-21 02:16 • 来自相关话题

如果遍历行,我们经常会使用df.iterrows(), 而列呢?
可以使用df.items()
 Python pandas.DataFrame.items用法及代码示例
用法:
DataFrame.items()
迭代(列名,系列)对。

遍历 DataFrame 列,返回一个包含列名和内容的元组作为一个系列。

生成(Yield):
label:对象
被迭代的 DataFrame 的列名。

content:Series
属于每个标签的列条目,作为一个系列。

例子:
>>> df = pd.DataFrame({'species':['bear', 'bear', 'marsupial'],
... 'population':[1864, 22000, 80000]},
... index=['panda', 'polar', 'koala'])
>>> df
species population
panda bear 1864
polar bear 22000
koala marsupial 80000
>>> for label, content in df.items():
... print(f'label:{label}')
... print(f'content:{content}', sep='\n')
...
label:species
content:
panda bear
polar bear
koala marsupial
Name:species, dtype:object
label:population
content:
panda 1864
polar 22000
koala 80000
Name:population, dtype:int64 查看全部
如果遍历行,我们经常会使用df.iterrows(), 而列呢?
可以使用df.items()
 
Python pandas.DataFrame.items用法及代码示例
用法:
DataFrame.items()
迭代(列名,系列)对。

遍历 DataFrame 列,返回一个包含列名和内容的元组作为一个系列。

生成(Yield):
label:对象
被迭代的 DataFrame 的列名。

content:Series
属于每个标签的列条目,作为一个系列。

例子:
>>> df = pd.DataFrame({'species':['bear', 'bear', 'marsupial'],
... 'population':[1864, 22000, 80000]},
... index=['panda', 'polar', 'koala'])
>>> df
species population
panda bear 1864
polar bear 22000
koala marsupial 80000
>>> for label, content in df.items():
... print(f'label:{label}')
... print(f'content:{content}', sep='\n')
...
label:species
content:
panda bear
polar bear
koala marsupial
Name:species, dtype:object
label:population
content:
panda 1864
polar 22000
koala 80000
Name:population, dtype:int64

python对视频添加水印 调整帧率

李魔佛 发表了文章 • 0 个评论 • 77 次浏览 • 2022-05-07 11:37 • 来自相关话题

Python调用ffmpeg开源视频处理库,来实现视频批量的处理:水印、背景音乐、剪辑、合并、帧率、速率、分辨率等操作

FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。 它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的。 FFmpeg在Linux平台下开发,但它同样也可以在其它操作系统环境中编译运行,包括Windows、Mac OS X等。 这个项目最早由Fabrice Bellard发起,2004年至2015年间由Michael Niedermayer主要负责维护。 许多FFmpeg的开发人员都来自MPlayer项目,而且当前FFmpeg也是放在MPlayer项目组的服务器上。 项目的名称来自MPEG视频编码标准,前面的"FF"代表"Fast Forward"。
 
 # coding=utf-8
import os
import subprocess
import datetime
import json, pprint
import re, time
import threading
import random
import shutil


class FFmpeg:

def __init__(self, editvdo, addlogo=None, addmusic=None,
addvdohead=None, addvdotail=None):
self.editvdo = editvdo
self.addlogo = addlogo
self.addmusic = addmusic
self.addvdohead = addvdohead
self.addvdotail = addvdotail
self.vdo_time, self.vdo_width, self.vdo_height, self.attr_dict = self.get_attr()
self.editvdo_path = os.path.dirname(editvdo)
self.editvdo_name = os.path.basename(editvdo)

def get_attr(self):
"""
获取视频属性参数
:return:
"""
strcmd = r'ffprobe -print_format json -show_streams -i "{}"'.format(self.editvdo)
status, output = subprocess.getstatusoutput(strcmd)
agrs = eval(re.search('{.*}', output, re.S).group().replace("\n", "").replace(" ", ''))
streams = agrs.get('streams', )
agrs_dict = dict()
[agrs_dict.update(x) for x in streams]
vdo_time = agrs_dict.get('duration')
vdo_width = agrs_dict.get('width')
vdo_height = agrs_dict.get('height')
attr = (vdo_time, vdo_width, vdo_height, agrs_dict)
return attr

def edit_head(self, start_time, end_time, deposit=None):
"""
截取指定长度视频
:param second: 去除开始的多少秒
:param deposit: 另存为文件
:return: True/Flase
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_head'+self.editvdo_name
start = time.strftime('%H:%M:%S', time.gmtime(start_time))
end = time.strftime('%H:%M:%S', time.gmtime(end_time))
strcmd = 'ffmpeg -i "{}" -vcodec copy -acodec copy -ss {} -to {} "{}" -y'.format(
self.editvdo, start, end, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_logo(self, deposit=None):
"""
添加水印
:param deposit:添加水印后另存为路径,为空则覆盖
:return: True/False
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_logo'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -vf "movie=\'{}\' [watermark];[in] ' \
r'[watermark] overlay=main_w-overlay_w-10:10 [out]" "{}"'.format(
self.editvdo, self.addlogo, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_music(self, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -y -i "{}" -i "{}" -filter_complex "[0:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a1], [1:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a2],[a1][a2]amix=duration=first,' \
r'pan=stereo|c0<c0+c1|c1<c2+c3,pan=mono|c0=c0+c1[a]" ' \
r'-map "[a]" -map 0:v -c:v libx264 -c:a aac ' \
r'-strict -2 -ac 2 "{}"'.format(self.editvdo, self.addmusic, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_rate(self, rete=30, deposit=None):
"""
改变帧率
:param rete: 修改大小帧率
:param deposit: 修改后保存路径
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -r {} "{}"' % (self.editvdo, rete, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_power(self, power='1280x720', deposit=None):
"""
修改分辨率
:param power: 分辨率
:param deposit: 修改后保存路径,为空则覆盖
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_power'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -s {} "{}"'.format(self.editvdo, power, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def rdit_marge(self, vdo_head, vdo_tail, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'rdit_marge'+self.editvdo_name
with open(self.editvdo_path+'/'+'rdit_marge.txt', 'w', encoding='utf-8') as f:
f.write("file '{}' \nfile '{}' \nfile '{}'" .format(
vdo_head, self.editvdo, vdo_tail))
strcmd = r'ffmpeg -f concat -safe 0 -i "{}" -c copy "{}"'.format(
self.editvdo_path + '/' + 'rdit_marge.txt', deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False



# ffmpeg - i input.mkv - filter_complex "[0:v]setpts=0.5*PTS[v];[0:a]atempo=2.0[a]" - map"[v]" - map"[a]" output.mkv



test = FFmpeg(r"D:\vdio\4.mp4")


PS:需要电脑把ffmpeg的可执行文件放到环境变量中 查看全部
Python调用ffmpeg开源视频处理库,来实现视频批量的处理:水印、背景音乐、剪辑、合并、帧率、速率、分辨率等操作

FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。 它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的。 FFmpeg在Linux平台下开发,但它同样也可以在其它操作系统环境中编译运行,包括Windows、Mac OS X等。 这个项目最早由Fabrice Bellard发起,2004年至2015年间由Michael Niedermayer主要负责维护。 许多FFmpeg的开发人员都来自MPlayer项目,而且当前FFmpeg也是放在MPlayer项目组的服务器上。 项目的名称来自MPEG视频编码标准,前面的"FF"代表"Fast Forward"。
 
 
# coding=utf-8
import os
import subprocess
import datetime
import json, pprint
import re, time
import threading
import random
import shutil


class FFmpeg:

def __init__(self, editvdo, addlogo=None, addmusic=None,
addvdohead=None, addvdotail=None):
self.editvdo = editvdo
self.addlogo = addlogo
self.addmusic = addmusic
self.addvdohead = addvdohead
self.addvdotail = addvdotail
self.vdo_time, self.vdo_width, self.vdo_height, self.attr_dict = self.get_attr()
self.editvdo_path = os.path.dirname(editvdo)
self.editvdo_name = os.path.basename(editvdo)

def get_attr(self):
"""
获取视频属性参数
:return:
"""
strcmd = r'ffprobe -print_format json -show_streams -i "{}"'.format(self.editvdo)
status, output = subprocess.getstatusoutput(strcmd)
agrs = eval(re.search('{.*}', output, re.S).group().replace("\n", "").replace(" ", ''))
streams = agrs.get('streams', )
agrs_dict = dict()
[agrs_dict.update(x) for x in streams]
vdo_time = agrs_dict.get('duration')
vdo_width = agrs_dict.get('width')
vdo_height = agrs_dict.get('height')
attr = (vdo_time, vdo_width, vdo_height, agrs_dict)
return attr

def edit_head(self, start_time, end_time, deposit=None):
"""
截取指定长度视频
:param second: 去除开始的多少秒
:param deposit: 另存为文件
:return: True/Flase
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_head'+self.editvdo_name
start = time.strftime('%H:%M:%S', time.gmtime(start_time))
end = time.strftime('%H:%M:%S', time.gmtime(end_time))
strcmd = 'ffmpeg -i "{}" -vcodec copy -acodec copy -ss {} -to {} "{}" -y'.format(
self.editvdo, start, end, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_logo(self, deposit=None):
"""
添加水印
:param deposit:添加水印后另存为路径,为空则覆盖
:return: True/False
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_logo'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -vf "movie=\'{}\' [watermark];[in] ' \
r'[watermark] overlay=main_w-overlay_w-10:10 [out]" "{}"'.format(
self.editvdo, self.addlogo, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_music(self, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -y -i "{}" -i "{}" -filter_complex "[0:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a1], [1:a] ' \
r'pan=stereo|c0=1*c0|c1=1*c1 [a2],[a1][a2]amix=duration=first,' \
r'pan=stereo|c0<c0+c1|c1<c2+c3,pan=mono|c0=c0+c1[a]" ' \
r'-map "[a]" -map 0:v -c:v libx264 -c:a aac ' \
r'-strict -2 -ac 2 "{}"'.format(self.editvdo, self.addmusic, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_rate(self, rete=30, deposit=None):
"""
改变帧率
:param rete: 修改大小帧率
:param deposit: 修改后保存路径
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_music'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -r {} "{}"' % (self.editvdo, rete, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def edit_power(self, power='1280x720', deposit=None):
"""
修改分辨率
:param power: 分辨率
:param deposit: 修改后保存路径,为空则覆盖
:return:
"""
if None == deposit:
deposit = self.editvdo_path+'/'+'edit_power'+self.editvdo_name
strcmd = r'ffmpeg -i "{}" -s {} "{}"'.format(self.editvdo, power, deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False

def rdit_marge(self, vdo_head, vdo_tail, deposit=None):
if None == deposit:
deposit = self.editvdo_path+'/'+'rdit_marge'+self.editvdo_name
with open(self.editvdo_path+'/'+'rdit_marge.txt', 'w', encoding='utf-8') as f:
f.write("file '{}' \nfile '{}' \nfile '{}'" .format(
vdo_head, self.editvdo, vdo_tail))
strcmd = r'ffmpeg -f concat -safe 0 -i "{}" -c copy "{}"'.format(
self.editvdo_path + '/' + 'rdit_marge.txt', deposit)
result = subprocess.run(args=strcmd, stdout=subprocess.PIPE, shell=True)
if os.path.exists(deposit):
os.remove(self.editvdo)
os.rename(deposit, self.editvdo)
return True
else:
return False



# ffmpeg - i input.mkv - filter_complex "[0:v]setpts=0.5*PTS[v];[0:a]atempo=2.0[a]" - map"[v]" - map"[a]" output.mkv



test = FFmpeg(r"D:\vdio\4.mp4")


PS:需要电脑把ffmpeg的可执行文件放到环境变量中

格式工厂去除视频水印logo效果不好

李魔佛 发表了文章 • 0 个评论 • 83 次浏览 • 2022-05-07 10:53 • 来自相关话题

本来想用opencv处理的,发现格式工厂已经有类似的功能了。
 
试了一下,结果效果不理想,格式工厂只是把logo区域进行模糊处理,也就是logo区域变得不可再阅读。
 

 
  查看全部
本来想用opencv处理的,发现格式工厂已经有类似的功能了。
 
试了一下,结果效果不理想,格式工厂只是把logo区域进行模糊处理,也就是logo区域变得不可再阅读。
 

 
 

python3 安装demjson 报错 use_2to3 is invalid

李魔佛 发表了文章 • 0 个评论 • 191 次浏览 • 2022-04-18 20:19 • 来自相关话题

ooking in indexes: https://pypi.douban.com/simple
Collecting demjson==2.2.4
Downloading https://pypi.doubanio.com/pack ... ar.gz (131 kB)
|████████████████████████████████| 131 kB 985 kB/s
ERROR: Command errored out with exit status 1:
command: /root/miniconda3/envs/py37/bin/python -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"'; __file__='"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-7ve4tu87
cwd: /tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/
Complete output (3 lines):
/root/miniconda3/envs/py37/lib/python3.7/site-packages/setuptools/dist.py:760: UserWarning: Usage of dash-separated 'index-url' will not be supported in future versions. Please use the underscore name 'index_url' instead
% (opt, underscore_opt)
error in demjson setup command: use_2to3 is invalid.

 记录一下解决办法:
setuptools 降级:
 
pip install --upgrade setuptools==57.5.0
 
然后再pip install demjson 即可
 
如果担心setuptools 修改到系统的其他库,可以创建一个虚拟环境。
然后在虚拟环境里面对setuptools 降级,再安装demjson 
 
来个养眼图: 查看全部
ooking in indexes: https://pypi.douban.com/simple
Collecting demjson==2.2.4
Downloading https://pypi.doubanio.com/pack ... ar.gz (131 kB)
|████████████████████████████████| 131 kB 985 kB/s
ERROR: Command errored out with exit status 1:
command: /root/miniconda3/envs/py37/bin/python -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"'; __file__='"'"'/tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-7ve4tu87
cwd: /tmp/pip-install-pj0ajcpl/demjson_86a1bab6643c4ed7a7b0c6bb6d3a43a2/
Complete output (3 lines):
/root/miniconda3/envs/py37/lib/python3.7/site-packages/setuptools/dist.py:760: UserWarning: Usage of dash-separated 'index-url' will not be supported in future versions. Please use the underscore name 'index_url' instead
% (opt, underscore_opt)
error in demjson setup command: use_2to3 is invalid.

 记录一下解决办法:
setuptools 降级:
 
pip install --upgrade setuptools==57.5.0
 
然后再pip install demjson 即可
 
如果担心setuptools 修改到系统的其他库,可以创建一个虚拟环境。
然后在虚拟环境里面对setuptools 降级,再安装demjson 
 
来个养眼图:

mongodb python同步两个数据库数据

李魔佛 发表了文章 • 0 个评论 • 179 次浏览 • 2022-04-07 02:44 • 来自相关话题

有时候需要做一些迁移工作,需要对mongodb进行迁移。默认的工具貌似也十分好用的。缺少像Navicat 之于mysql的这样神级的软件。
 
所以自己动手写代码完成:
 
# -*- coding: utf-8 -*-
# @Time : 2022/4/6 4:41
# @File : database_migrate.py
# @Author : Rocky C@www.30daydo.com
import time
from loguru import logger
import pymongo

ignore_db = ['admin', 'config', 'local',
] # 忽略更新的库

ignore_col = [('db_stock','dfcf_list_full')]

logger.add('mongo.log')

# 数据库同步
def get_client(user, password, host, port):
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = pymongo.MongoClient(connect_uri)
return client


def origin():
return get_client('admin', 'password', '127.0.0.1', '27017')


def target():
return get_client('root', 'password', '127.0.0.1', '27017')


def transfer():
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)
for db in dbs:
for col in get_collection_name(origin_client, db):

if (db,col) in ignore_col:
continue
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
# time.sleep(0.5)

def get_item(client, db_name, col):
return client[db_name][col].find()



def insert_item(client, db_name, col, data):
batch = 1000
count = len(data)//batch + 1
for i in range(count):
item = data[i*batch:(i+1)*batch]

if len(item)==0:
continue

try:
client[db_name][col].insert_many(item)
except Exception as e:
logger.error(e)
logger.error(f'{db_name} {col} 插入出错')


def get_db_name(client):
db_name = client.list_database_names()
dbs = []
for db in db_name:
if db not in ignore_db:
dbs.append(db)
return dbs

def delete_col(client,db,col):
try:
client[db][col].delete_many({})
except Exception as e:
logger.error(e)
logger.error(db)
logger.error(col)
return False
else:
return True

def server_compare():
'''
比较2个数据库是否相同,只是单纯比较条数
'''
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)

for db in dbs:
for col in get_collection_name(origin_client, db):
origin_count = origin_client[db][col].count_documents({})
target_count = target_client[db][col].count_documents({})
if origin_count!=target_count:
logger.info(f'collection {db} {col}有区别')
#
if delete_col(target_client,db,col):
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
time.sleep(1)




def get_collection_name(client, db_name):
collection_names = client[db_name].list_collection_names(session=None)
return collection_names

def main():
server_compare()

if __name__ == '__main__':
main()

 原理就是不断迭代,不同的数据库,里面的不同的collection。
对于同名collection,通过条数是否一致,来决定是否要把原数据复制过来。 
 
保存上面文件为main.py
 
执行 python main.py
 
就可以进行数据同步工作啦。 查看全部
有时候需要做一些迁移工作,需要对mongodb进行迁移。默认的工具貌似也十分好用的。缺少像Navicat 之于mysql的这样神级的软件。
 
所以自己动手写代码完成:
 
# -*- coding: utf-8 -*-
# @Time : 2022/4/6 4:41
# @File : database_migrate.py
# @Author : Rocky C@www.30daydo.com
import time
from loguru import logger
import pymongo

ignore_db = ['admin', 'config', 'local',
] # 忽略更新的库

ignore_col = [('db_stock','dfcf_list_full')]

logger.add('mongo.log')

# 数据库同步
def get_client(user, password, host, port):
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = pymongo.MongoClient(connect_uri)
return client


def origin():
return get_client('admin', 'password', '127.0.0.1', '27017')


def target():
return get_client('root', 'password', '127.0.0.1', '27017')


def transfer():
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)
for db in dbs:
for col in get_collection_name(origin_client, db):

if (db,col) in ignore_col:
continue
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
# time.sleep(0.5)

def get_item(client, db_name, col):
return client[db_name][col].find()



def insert_item(client, db_name, col, data):
batch = 1000
count = len(data)//batch + 1
for i in range(count):
item = data[i*batch:(i+1)*batch]

if len(item)==0:
continue

try:
client[db_name][col].insert_many(item)
except Exception as e:
logger.error(e)
logger.error(f'{db_name} {col} 插入出错')


def get_db_name(client):
db_name = client.list_database_names()
dbs = []
for db in db_name:
if db not in ignore_db:
dbs.append(db)
return dbs

def delete_col(client,db,col):
try:
client[db][col].delete_many({})
except Exception as e:
logger.error(e)
logger.error(db)
logger.error(col)
return False
else:
return True

def server_compare():
'''
比较2个数据库是否相同,只是单纯比较条数
'''
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)

for db in dbs:
for col in get_collection_name(origin_client, db):
origin_count = origin_client[db][col].count_documents({})
target_count = target_client[db][col].count_documents({})
if origin_count!=target_count:
logger.info(f'collection {db} {col}有区别')
#
if delete_col(target_client,db,col):
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
time.sleep(1)




def get_collection_name(client, db_name):
collection_names = client[db_name].list_collection_names(session=None)
return collection_names

def main():
server_compare()

if __name__ == '__main__':
main()

 原理就是不断迭代,不同的数据库,里面的不同的collection。
对于同名collection,通过条数是否一致,来决定是否要把原数据复制过来。 
 
保存上面文件为main.py
 
执行 python main.py
 
就可以进行数据同步工作啦。

知识星球获取文章链接与数据

python爬虫李魔佛 发表了文章 • 0 个评论 • 207 次浏览 • 2022-03-21 20:15 • 来自相关话题

 
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。

 
既然官方不提供这个功能,只能自己使用爬虫手段获取了,额。

python AES 加密 windows和linux平台的不同

李魔佛 发表了文章 • 0 个评论 • 193 次浏览 • 2022-03-19 11:18 • 来自相关话题

同样一段AES加密的代码,放到了ubuntu可以正常使用,而在windows却报错。
实际两个平台使用pip install安装的aes库不一样。
 
windows报错
File "C:\anaconda\lib\site-packages\Crypto\Cipher\__init__.py", line 77, in _create_cipher
raise TypeError("IV is not meaningful for the ECB mode")
TypeError: IV is not meaningful for the ECB mode

只需要把AES.new() 的参数里面的iv给去掉就可以了。
 

  查看全部
同样一段AES加密的代码,放到了ubuntu可以正常使用,而在windows却报错。
实际两个平台使用pip install安装的aes库不一样。
 
windows报错
  File "C:\anaconda\lib\site-packages\Crypto\Cipher\__init__.py", line 77, in _create_cipher
raise TypeError("IV is not meaningful for the ECB mode")
TypeError: IV is not meaningful for the ECB mode

只需要把AES.new() 的参数里面的iv给去掉就可以了。
 

 

ASGI 'lifespan' protocol appears unsupported

李魔佛 发表了文章 • 0 个评论 • 525 次浏览 • 2022-01-18 23:12 • 来自相关话题

ASGI 'lifespan' protocol appears unsupported
实际原因并不是真的不支持。
 
可能只是在lifespan部分的代码里面出现了错误而无法打印出来。
 
启动的时候加入:--lifespan on
 
uvicorn --host 0.0.0.0 asgi_lc:app --lifespan on
 
这样就知道你的代码那里出错了,只要把错误的地方修复了,那么这个提示就会消失了。 查看全部
ASGI 'lifespan' protocol appears unsupported
实际原因并不是真的不支持。
 
可能只是在lifespan部分的代码里面出现了错误而无法打印出来。
 
启动的时候加入:--lifespan on
 
uvicorn --host 0.0.0.0 asgi_lc:app --lifespan on
 
这样就知道你的代码那里出错了,只要把错误的地方修复了,那么这个提示就会消失了。

不是所有的bytes都可以转换为string

李魔佛 发表了文章 • 0 个评论 • 384 次浏览 • 2022-01-14 14:56 • 来自相关话题

byte转为string
b.decode('utf8')
 
如果报错:UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe4 in position 1: invalid continuation byte
说明字节无法字节转为string, 





 
上面的字节是可以正常decode为utf8
 
而改下字节数据





 
所以你试下decode下面的字节:c=b'\x1e\xe4\xd5\x97\x9a#\x99kC\xadD\x7f\x9a\xc2G\x92'
是无法解析的。
 
这个是没有办法的,如果要硬刚 。
可以加入参数errors = ‘replace’
 b.decode('utf8',errors='replace')
这样就不会报错。但是这也只是输出的乱码。
 
正确的姿势是要看看你的字节的最原始编码格式。如果是gbk,那么就应该使用b.decode('gbk')
 
  查看全部
byte转为string
b.decode('utf8')
 
如果报错:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe4 in position 1: invalid continuation byte

说明字节无法字节转为string, 

Shutter_2022-01-14-14:49:54.png

 
上面的字节是可以正常decode为utf8
 
而改下字节数据

Shutter_2022-01-14-14:53:40.png

 
所以你试下decode下面的字节:
c=b'\x1e\xe4\xd5\x97\x9a#\x99kC\xadD\x7f\x9a\xc2G\x92'

是无法解析的。
 
这个是没有办法的,如果要硬刚 。
可以加入参数errors = ‘replace’
 
b.decode('utf8',errors='replace')

这样就不会报错。但是这也只是输出的乱码。
 
正确的姿势是要看看你的字节的最原始编码格式。如果是gbk,那么就应该使用b.decode('gbk')
 
 

vs code 无法启动jupyter notebook 修复 亲测

李魔佛 发表了文章 • 0 个评论 • 1167 次浏览 • 2021-12-16 12:12 • 来自相关话题

错误信息:failed to start INotebook in kernel, UI Disabled = false s [Error]: Unable to start Kernel 'base (Python 3.8.3)' due to connection timeout. View Jupyter [log](command:jupyter.viewOutput) for further detail





之前一直运行得好好的。不知道安装了什么依赖库后就这样了。。
 
在国外网站找了一通后,找到了解决办法:
 pip install traitlets==4.3.3
不得不感慨,国外大神多,国内csdn乱ctrl cv.
 
  查看全部
错误信息:
failed to start INotebook in kernel, UI Disabled = false s [Error]: Unable to start Kernel 'base (Python 3.8.3)' due to connection timeout.  View Jupyter [log](command:jupyter.viewOutput) for further detail

NM3I8.png


之前一直运行得好好的。不知道安装了什么依赖库后就这样了。。
 
在国外网站找了一通后,找到了解决办法:
 
pip install traitlets==4.3.3

不得不感慨,国外大神多,国内csdn乱ctrl cv.
 
 

想用python爬虫批量下载数据,下载下来的数据是excel表格形式,但是源码下载的链接如下,请问这样可以爬吗?

python爬虫低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 463 次浏览 • 2021-11-26 13:20 • 来自相关话题

怎么使用控制台将动态加载数据刷新出来啊????

回复

liwenyu 发起了问题 • 1 人关注 • 0 个回复 • 595 次浏览 • 2021-10-12 14:54 • 来自相关话题

如何使用控制台将动态加载数据刷新出来啊????

低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 715 次浏览 • 2021-08-11 02:06 • 来自相关话题