时间:2021-05-22
import requestsimport osimport datetimeimport threadingclass xiazai(): def __init__(self,url): self.url = url work_dir = os.getcwd() # print(work_dir) # 用来保存ts文件 file_dir = os.path.join(work_dir, 'file_tmp') if not os.path.exists(file_dir): os.mkdir(file_dir) self.headers ={ 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36' } self.savefile(self.url) def savefile(self, file_url): r = requests.get(file_url, headers=self.headers) # 合成带有hls的m3u8地址 if r.text.split('\n')[-1] == '': hls_mark = r.text.split('\n')[-2] # 以防\n结尾 else: hls_mark = r.text.split('\n')[-1] self.url_m3u8_hls = file_url.replace('index.m3u8', hls_mark) #file_m3u8 = url_m3u8_hls.split('/')[-1] self.duqu() #print(url_m3u8_hls) def duqu(self): r = requests.get(self.url_m3u8_hls, headers=self.headers).text text_bytes = r.split('\n') # 筛选以.ts结尾的行 # 有些情况下可能是以其他格式的文件,比如png,下载后修改后缀即可 # ts_name = [i for i in text_string if i.endswith('.ts')] self.ts_time = [i for i in text_bytes if i.startswith('#EXTINF')] #self.shijian(dm_time) #print(dm_time) self.ts_neirong = [i for i in text_bytes if not i.startswith('#')] self.ts_neirong.pop() self.threads = [] self.threads.append(threading.Thread(target=self.xiazai)) self.threads.append(threading.Thread(target=self.shijian)) for t in self.threads: # print(t) t.start() #self.xiazai(url_m3u8_hls) # print(ts_neirong) def shijian(self): self.dm_time = 0 for i in range(len(self.ts_time)): ts_time1 = self.ts_time[i].replace('#EXTINF:', '') ts_time2 = ts_time1.replace(',', '') self.dm_time = float(ts_time2) + self.dm_time shichang_time = str(datetime.timedelta(seconds=self.dm_time)) print('视频时长:%s' % shichang_time) def xiazai(self): liebiao=[] for i in range(len(self.ts_neirong)): hls_mark = self.url_m3u8_hls.split('/')[-1] url_xiazai = self.url_m3u8_hls.replace(hls_mark, self.ts_neirong[i]) liebiao.append(url_xiazai) #print(url_xiazai) # r = requests.get(url_xiazai, headers=self.headers) # with open('file_tmp/'+ts_neirong[i], 'wb') as f: # f.write(r.content) # f.close() x = self.bisector_list(liebiao, 10) self.xiancheng0=x[0] self.xiancheng1=x[1] self.xiancheng2=x[2] self.xiancheng3=x[3] self.xiancheng4=x[4] self.xiancheng5=x[5] self.xiancheng6=x[6] self.xiancheng7=x[7] self.xiancheng8=x[8] self.xiancheng9=x[9] self.threads2 = [] self.threads2.append(threading.Thread(target=self.xiancheng_xiazai1)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai2)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai3)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai4)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai5)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai6)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai7)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai8)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai9)) self.threads2.append(threading.Thread(target=self.xiancheng_xiazai10)) for t in self.threads2: # print(t) t.start() def xiancheng_xiazai1(self): #print(self.xiancheng0) for i in self.xiancheng0: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai2(self): #print(self.xiancheng1) for i in self.xiancheng1: #print(i) r = requests.get(i, headers=self.headers) mingzi= i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai3(self): #print(self.xiancheng2) for i in self.xiancheng2: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai4(self): #print(self.xiancheng3) for i in self.xiancheng3: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai5(self): #print(self.xiancheng4) for i in self.xiancheng4: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai6(self): #print(self.xiancheng5) for i in self.xiancheng5: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai7(self): #print(self.xiancheng6) for i in self.xiancheng6: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai8(self): #print(self.xiancheng7) for i in self.xiancheng7: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai9(self): #print(self.xiancheng8) for i in self.xiancheng8: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def xiancheng_xiazai10(self): #print(self.xiancheng9) for i in self.xiancheng9: #print(i) r = requests.get(i, headers=self.headers) mingzi = i.split('/')[-1] with open('file_tmp/'+mingzi, 'wb') as f: f.write(r.content) f.close() def bisector_list(self,tabulation: list, num: int): """ 将列表平均分成几份 :param tabulation: 列表 :param num: 份数 :return: 返回一个新的列表 """ new_list = [] '''列表长度大于等于份数''' if len(tabulation) >= num: '''remainder:列表长度除以份数,取余''' remainder = len(tabulation) % num if remainder == 0: '''merchant:列表长度除以分数''' merchant = int(len(tabulation) / num) '''将列表平均拆分''' for i in range(1, num + 1): if i == 1: new_list.append(tabulation[:merchant]) else: new_list.append(tabulation[(i - 1) * merchant:i * merchant]) return new_list else: '''merchant:列表长度除以分数 取商''' merchant = int(len(tabulation) // num) '''remainder:列表长度除以份数,取余''' remainder = int(len(tabulation) % num) '''将列表平均拆分''' for i in range(1, num + 1): if i == 1: new_list.append(tabulation[:merchant]) else: new_list.append(tabulation[(i - 1) * merchant:i * merchant]) '''将剩余数据的添加前面列表中''' if int(len(tabulation) - i * merchant) <= merchant: for j in tabulation[-remainder:]: new_list[tabulation[-remainder:].index(j)].append(j) return new_list else: '''如果列表长度小于份数''' for i in range(1, len(tabulation) + 1): tabulation_subset = [] tabulation_subset.append(tabulation[i - 1]) new_list.append(tabulation_subset) return new_listif __name__=='__main__': xiazai('http://iqiyi.cdn9-okzy.com/20200907/15137_ed25d8c5/index.m3u8')
速度很慢. 40m 5分钟 不加多线程 1小时。 可能我的m3u8不行,或者我电脑不行, 多线程是机械式的。 电脑好 可以多加几条。
以上就是python 下载m3u8视频的示例代码的详细内容,更多关于python 下载m3u8视频的资料请关注其它相关文章!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
电影之类的长视频好像都用m3u8格式了,这就导致了多线程下载视频的意义不是很大,都是短视频,线不线程就没什么意义了嘛。我们知道,m3u8的链接会下载一个文档,相
本文为大家分享了python爬取m3u8连接的视频方法,供大家参考,具体内容如下要求:输入m3u8所在url,且ts视频与其在同一路径下#!/usr/bin/e
前言今天被这个关于m3u8视频播放不了搞了一下午,这个项目所有的视频流都是m3u8格式的,后台给我们返回的都是m3u8格式的视频流,解决了好长时间,看了好多博客
HLS协议由三部分组成:HTTP、M3U8、TS。这三部分中,HTTP是传输协议,M3U8是索引文件,TS是音视频的媒体信息。 HTTPLiveStreami
一、关于m3u8:m3u8是苹果公司推出一种视频播放标准,是m3u的一种,不过编码方式是utf-8,是一种文件检索格式,将视频切割成一小段一小段的ts格式的视频