资源简介
资源目录
方法2
import re, sys
import time
import requests, json, base64
from rich.console import Console
from rich.table import Table
from hm3u8dl_cli import m3u8download
from hm3u8dl_cli import idm
class Xet:
def __init__(self, APPID, cookie: str = ''):
""" 一个解析模块
:param APPID: 必填 如:appy6xyj9p87665
:param cookie: 非必填
"""
# url = 'https://appy6xyj9p87665.h5.xiaoeknow.com/p/course/video/v_62217c28e4b066e9608b63e8?product_id=p_621f4eb7e4b02b8258503f35'
self.APPID = APPID
self.COOKIE = cookie
if self.COOKIE == '':
self.COOKIE = self.get_cookie(self.APPID)
self.HEADERS = {
'referer': f'https://{self.APPID}.h5.xiaoeknow.com',
'user-agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Mobile Safari/537.36 Edg/105.0.1343.33',
'cookie': self.COOKIE
}
def get_cookie(self, APPID):
try:
playurl = f'https://{APPID}.h5.xiaoeknow.com'
response = requests.get(playurl, allow_redirects=False)
set_cookie = response.headers.get('set-cookie')
cookie = re.findall('(anony_token=.+?);', set_cookie)[0]
return cookie
except:
return None
def get_cookie_from_pic(self,APPID):
url = f'https://{APPID}.h5.xiaoeknow.com/p/decorate/personal_center'
session = requests.session()
response = session.get(url,allow_redirects=False).headers
Location = response['Location']
response2 = session.get(Location)
wxurl = f'https://{APPID}.h5.xiaoeknow.com/xe.account-platform.account.auth.get_qr_code'
data = {
'app_id':APPID,
'bizData[login_key]':0
}
response3 = session.post(wxurl,data=data).json()
print(response3)
code = response3['data']['code']
qr_code_url = response3['data']['qr_code_url']
pic = session.get(qr_code_url,allow_redirects=True)
print(pic.headers)
# 检测二维码状态###########################
authorize_status_url = f'https://{APPID}.h5.xiaoeknow.com/xe.account-platform.account.auth.authorize_status'
data_authorize_status = {
'app_id':APPID,
'bizData[code]':code,
'bizData[login_key]':''
}
while True:
response_authorize_status = session.post(authorize_status_url,data=data_authorize_status).json()
print(response_authorize_status)
time.sleep(1)
def get_all_resourceId_from_productId(self,APPID,product_id):
product_ids = []
url = f'https://{APPID}.h5.xiaoeknow.com/get/topic_products'
data = {
'bizData[page_index]': 1,
'bizData[page_size]': 100,
'bizData[topic_id]': product_id
}
response = requests.post(url,headers=self.HEADERS,data=data).json()
datas = response['data']
all_resource_ids = []
for data in datas:
product_ids.append(data['id'])
resource_ids = self.get_resourceId_from_productId(APPID,data['id'])
all_resource_ids += resource_ids
return all_resource_ids
def get_resourceId_from_productId(self, APPID, product_id):
resource_ids = []
url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.column.items.get/2.0.0'
data = {
'bizData[column_id]': product_id,
'bizData[page_index]': 1,
'bizData[page_size]': 100,
'bizData[sort]': 'desc'
}
try:
# print(self.HEADERS)
response = requests.post(url=url, headers=self.HEADERS, data=data)
response = response.json()
except:
raise ('解析失败')
total = response['data']['total']
Lists = response['data']['list']
for List in Lists:
resource_id = List['resource_id']
resource_title = List['resource_title']
playurl = f'https://{self.APPID}.h5.xiaoeknow.com/p/course/video/{resource_id}?product_id={product_id}'
info = {
'resource_title': resource_title,
'resource_id': resource_id,
'playurl': playurl
}
resource_ids.append(info)
return resource_ids
def get_resourceId_from_productId2(self, APPID, product_id):
resource_ids = []
url = f'https://{self.APPID}.h5.xiaoeknow.com/get/topic_products'
data = {
'bizData[topic_id]': product_id,
'bizData[page_index]': 1,
'bizData[page_size]': 100,
}
try:
response = requests.post(url=url, headers=self.HEADERS, data=data).json()
Lists = response['data'][0]['resource_list']
for List in Lists:
resource_id = List['id']
resource_title = List['resource_name']
playurl = f'https://{self.APPID}.h5.xiaoeknow.com/p/course/video/{resource_id}?product_id={product_id}'
info = {
'resource_title': resource_title,
'resource_id': resource_id,
'playurl': playurl
}
resource_ids.append(info)
return resource_ids
except:
print(response)
sys.exit(0)
def get_USERID_signature(self, APPID, resource_id):
""" keyurl 后加上 uid=anonymous_USERID 可正常访问
:return: USERID, nonceStr, share_title, signature
"""
url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.wechat.init'
wechat_init_url = f'https://{APPID}.h5.xiaoeknow.com'
data = {
'bizData[resource_id]': resource_id,
'bizData[wechat_init_url]': wechat_init_url
}
response = requests.post(url, headers=self.HEADERS, data=data).json()
commonData = response['data']['commonData']
USERID = commonData['USERID']
wxData = response['data']['wxData']
nonceStr = wxData['nonceStr']
share_title = wxData['share_title']
signature = wxData['signature']
return USERID, nonceStr, share_title, signature
def dec_m3u8(self, t):
return base64.b64decode(
t.replace('_ba', '').replace('@', '1').replace('#', '2').replace('$', '3').replace('%', '4')).decode()
def get_detail_info(self, APPID, resource_id:str, product_id,title = None):
# url_audio = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.audio.info.get/2.0.0'
if resource_id.startswith('v_'):
url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.video.detail_info.get/2.0.0'
data = {
'bizData[resource_id]': resource_id,
'bizData[product_id]': product_id,
'bizData[opr_sys]': 'Win32'
}
response = requests.post(url, headers=self.HEADERS, data=data).json()
try:
if 'video_info' in response['data']:
video_info = response['data']['video_info']
video_length = video_info['video_length']
file_name = video_info['file_name']
video_urls = response['data']['video_urls']
video_urls = json.loads(self.dec_m3u8(video_urls))
# print(video_urls)
results = []
for video_url in video_urls:
host = video_url['ext']['host']
path = video_url['ext']['path']
baseuri = host + '/' + path + '/'
param = '?' + video_url['ext']['param']
m3u8url = video_url['url']
# print(baseuri, param)
info = {
'm3u8url': m3u8url,
'baseuri': baseuri,
'param': param,
'file_name': file_name
}
results.append(info)
# baseuri + content + param
return results
elif 'audio_info' in response['data']:
audio_info = response['data']['audio_info']
title = audio_info['title']
audio_url = audio_info['audio_url']
print(title)
idm.download(audio_url, save_name=title + '.mp3')
return None
else:
sys.exit()
except:
print(response['msg'])
raise ''
elif resource_id.startswith('l_'):
url = f'https://{APPID}.h5.xiaoeknow.com/_alive/v3/get_lookback_url?alive_id={resource_id}'
response = requests.get(url, headers=self.HEADERS).json()
try:
aliveVideoMp4Url = response['data']['aliveVideoMp4Url']
aliveVideoUrl = response['data']['aliveVideoUrl']
if aliveVideoMp4Url != '':
m3u8download(aliveVideoMp4Url,title=title)
elif aliveVideoUrl != '':
m3u8download(aliveVideoUrl,title=title)
except:
print(response)
raise ''
def decrypt(self, m3u8url: str) -> str:
""" xiaoetong 替换链接
:param m3u8url: 传入m3u8/ts链接
:return: 不加密的链接
"""
replace_header = ['encrypt-k-vod.xet.tech']
# true_header = '1252524126.vod2.myqcloud.com'
true_header = 'live-video-tx.xiaoeknow.com'
for i in replace_header:
if i in m3u8url:
m3u8url = m3u8url.replace(i, true_header)
m3u8url = re.sub('_\d+', '', m3u8url).replace('.ts', '.m3u8').split('?')[0]
return m3u8url
def decrypt_key(self, USERID, key0):
key = []
uid = list(USERID.encode())
key0 = list(base64.b64decode(key0))
for i, j in enumerate(key0):
key.append(key0[i] ^ uid[i])
return base64.b64encode(bytes(key)).decode()
def run(self, APPID, resource_id, product_id,title = None):
results = self.get_detail_info(APPID, resource_id, product_id,title = title)
if results is not None:
USERID, nonceStr, share_title, signature = self.get_USERID_signature(APPID, resource_id)
for result in results:
m3u8url = result['m3u8url']
baseuri = result['baseuri']
param = result['param']
file_name = result['file_name']
# print(m3u8url, baseuri, param)
response = requests.get(m3u8url).text
# 解key 不用key也能下载
keyurl = re.findall('URI="(.+?)"', response)[0] + f'&uid={USERID}'
key0 = base64.b64encode(requests.get(keyurl).content).decode()
key = self.decrypt_key(USERID, key0)
# 找到一个ts链接去生成m3u8链接
temp_url = re.findall('(v\..+?).ts', response)[0].replace("_0", "") + '.m3u8'
# m3u8url = self.decrypt(baseuri + temp_url + param)
m3u8url = baseuri + temp_url + param
print(file_name, key, m3u8url)
m3u8download(m3u8url=m3u8url, title=file_name,key=key, headers=self.HEADERS)
def listSort(self, List1):
table = Table()
console = Console(color_system='256', style=None)
List2 = []
if List1 == []:
raise ('列表获取错误')
elif len(List1) == 1:
return List1
i = 1
table.add_column(f'[red]sn')
table.add_column(f'[red]title')
table.add_column(f'[red]resource_id')
for List in List1:
table.add_row(
str(i),
List['resource_title'],
List['resource_id'],
)
i = i + 1
console.print(table)
numbers = input('输入下载序列(① 5 ② 4-10 ③ 4 10):')
if ' ' in numbers:
for number in numbers.split(' '):
List2.append(List1[int(number) - 1])
elif '-' in numbers:
number = re.findall('\d+', numbers)
return List1[int(number[0]) - 1:int(number[1])]
else:
number = re.findall('\d+', numbers)
List2.append(List1[int(number[0]) - 1])
return List2
return List2
def single_parse(self, resource_id, product_id=None):
""" 单个视频下载
:param resource_id: 如:v_62217c28e4b066e9608b63e8
:param product_id: 如:p_621f4eb7e4b02b8258503f35
:return:
"""
self.run(self.APPID, resource_id, product_id)
def batch_parse(self, product_id):
""" 批量解析下载
:param product_id: 如:p_621f4eb7e4b02b8258503f35
:return:
"""
# infos = self.get_all_resourceId_from_productId(self.APPID, product_id)
infos = self.get_resourceId_from_productId(self.APPID, product_id)
if infos == []:
infos = self.get_resourceId_from_productId2(self.APPID, product_id)
# raise '空列表'
infos = self.listSort(infos)
for info in infos:
self.run(self.APPID, info['resource_id'], product_id,title=info['resource_title'])
if __name__ == '__main__':
# cookie = 'anony_token=b9235d170d35c7122fd60f04188e3bcc;'
xet = Xet('appy6xyj9p87665',cookie='')
# 批量解析
xet.batch_parse('p_621f4eb7e4b02b8258503f35')
# 单个解析
# xet.single_parse('v_62fa64a3e4b050af23a99220','')
方法2:直接用这个下载器https://github.com/videoDown123/xiaoeknow_download
资源下载