当前位置:

小鹅通已购课程下载脚本 PYTHON脚本 202311版

Jack_Sri 2024-03-09 51 0

资源简介

资源目录

方法2
import re, sys
import time
import requests, json, base64
 
from rich.console import Console
from rich.table import Table
 
from hm3u8dl_cli import m3u8download
from hm3u8dl_cli import idm
 
 
class Xet:
    def __init__(self, APPID, cookie: str = ''):
        """ 一个解析模块
 
        :param APPID:  必填    如:appy6xyj9p87665
        :param cookie: 非必填
        """
        # url = 'https://appy6xyj9p87665.h5.xiaoeknow.com/p/course/video/v_62217c28e4b066e9608b63e8?product_id=p_621f4eb7e4b02b8258503f35'
        self.APPID = APPID
 
        self.COOKIE = cookie
        if self.COOKIE == '':
            self.COOKIE = self.get_cookie(self.APPID)
 
        self.HEADERS = {
            'referer': f'https://{self.APPID}.h5.xiaoeknow.com',
            'user-agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Mobile Safari/537.36 Edg/105.0.1343.33',
            'cookie': self.COOKIE
        }
 
    def get_cookie(self, APPID):
        try:
            playurl = f'https://{APPID}.h5.xiaoeknow.com'
 
            response = requests.get(playurl, allow_redirects=False)
 
            set_cookie = response.headers.get('set-cookie')
            cookie = re.findall('(anony_token=.+?);', set_cookie)[0]
            return cookie
        except:
            return None
 
    def get_cookie_from_pic(self,APPID):
        url = f'https://{APPID}.h5.xiaoeknow.com/p/decorate/personal_center'
        session = requests.session()
        response = session.get(url,allow_redirects=False).headers
        Location = response['Location']
        response2 = session.get(Location)
        wxurl = f'https://{APPID}.h5.xiaoeknow.com/xe.account-platform.account.auth.get_qr_code'
        data = {
            'app_id':APPID,
            'bizData[login_key]':0
        }
        response3 = session.post(wxurl,data=data).json()
        print(response3)
        code = response3['data']['code']
        qr_code_url = response3['data']['qr_code_url']
 
        pic = session.get(qr_code_url,allow_redirects=True)
        print(pic.headers)
 
 
        # 检测二维码状态###########################
        authorize_status_url = f'https://{APPID}.h5.xiaoeknow.com/xe.account-platform.account.auth.authorize_status'
        data_authorize_status = {
            'app_id':APPID,
            'bizData[code]':code,
            'bizData[login_key]':''
        }
        while True:
            response_authorize_status = session.post(authorize_status_url,data=data_authorize_status).json()
            print(response_authorize_status)
            time.sleep(1)
 
    def get_all_resourceId_from_productId(self,APPID,product_id):
        product_ids = []
        url = f'https://{APPID}.h5.xiaoeknow.com/get/topic_products'
        data = {
            'bizData[page_index]': 1,
            'bizData[page_size]': 100,
            'bizData[topic_id]': product_id
        }
        response = requests.post(url,headers=self.HEADERS,data=data).json()
        datas = response['data']
        all_resource_ids = []
        for data in datas:
            product_ids.append(data['id'])
            resource_ids = self.get_resourceId_from_productId(APPID,data['id'])
            all_resource_ids += resource_ids
 
        return all_resource_ids
 
 
 
 
    def get_resourceId_from_productId(self, APPID, product_id):
        resource_ids = []
        url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.column.items.get/2.0.0'
 
        data = {
            'bizData[column_id]': product_id,
            'bizData[page_index]': 1,
            'bizData[page_size]': 100,
            'bizData[sort]': 'desc'
        }
        try:
            # print(self.HEADERS)
            response = requests.post(url=url, headers=self.HEADERS, data=data)
            response = response.json()
        except:
            raise ('解析失败')
 
        total = response['data']['total']
 
        Lists = response['data']['list']
        for List in Lists:
            resource_id = List['resource_id']
            resource_title = List['resource_title']
            playurl = f'https://{self.APPID}.h5.xiaoeknow.com/p/course/video/{resource_id}?product_id={product_id}'
            info = {
                'resource_title': resource_title,
                'resource_id': resource_id,
                'playurl': playurl
            }
            resource_ids.append(info)
        return resource_ids
 
    def get_resourceId_from_productId2(self, APPID, product_id):
        resource_ids = []
        url = f'https://{self.APPID}.h5.xiaoeknow.com/get/topic_products'
 
        data = {
            'bizData[topic_id]': product_id,
            'bizData[page_index]': 1,
            'bizData[page_size]': 100,
        }
        try:
            response = requests.post(url=url, headers=self.HEADERS, data=data).json()
            Lists = response['data'][0]['resource_list']
            for List in Lists:
                resource_id = List['id']
                resource_title = List['resource_name']
                playurl = f'https://{self.APPID}.h5.xiaoeknow.com/p/course/video/{resource_id}?product_id={product_id}'
                info = {
                    'resource_title': resource_title,
                    'resource_id': resource_id,
                    'playurl': playurl
                }
                resource_ids.append(info)
 
            return resource_ids
        except:
            print(response)
            sys.exit(0)
 
    def get_USERID_signature(self, APPID, resource_id):
        """ keyurl 后加上 uid=anonymous_USERID 可正常访问
 
        :return: USERID, nonceStr, share_title, signature
        """
        url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.wechat.init'
        wechat_init_url = f'https://{APPID}.h5.xiaoeknow.com'
        data = {
            'bizData[resource_id]': resource_id,
            'bizData[wechat_init_url]': wechat_init_url
        }
        response = requests.post(url, headers=self.HEADERS, data=data).json()
        commonData = response['data']['commonData']
        USERID = commonData['USERID']
 
        wxData = response['data']['wxData']
        nonceStr = wxData['nonceStr']
        share_title = wxData['share_title']
        signature = wxData['signature']
 
        return USERID, nonceStr, share_title, signature
 
    def dec_m3u8(self, t):
        return base64.b64decode(
            t.replace('_ba', '').replace('@', '1').replace('#', '2').replace('$', '3').replace('%', '4')).decode()
 
    def get_detail_info(self, APPID, resource_id:str, product_id,title = None):
        # url_audio = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.audio.info.get/2.0.0'
        if resource_id.startswith('v_'):
            url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.video.detail_info.get/2.0.0'
            data = {
                'bizData[resource_id]': resource_id,
                'bizData[product_id]': product_id,
                'bizData[opr_sys]': 'Win32'
            }
            response = requests.post(url, headers=self.HEADERS, data=data).json()
 
            try:
                if 'video_info' in response['data']:
 
                    video_info = response['data']['video_info']
 
                    video_length = video_info['video_length']
                    file_name = video_info['file_name']
                    video_urls = response['data']['video_urls']
                    video_urls = json.loads(self.dec_m3u8(video_urls))
                    # print(video_urls)
 
                    results = []
                    for video_url in video_urls:
                        host = video_url['ext']['host']
                        path = video_url['ext']['path']
                        baseuri = host + '/' + path + '/'
                        param = '?' + video_url['ext']['param']
                        m3u8url = video_url['url']
                        # print(baseuri, param)
                        info = {
                            'm3u8url': m3u8url,
                            'baseuri': baseuri,
                            'param': param,
                            'file_name': file_name
                        }
                        results.append(info)
                        # baseuri + content + param
 
                    return results
                elif 'audio_info' in response['data']:
 
                    audio_info = response['data']['audio_info']
                    title = audio_info['title']
                    audio_url = audio_info['audio_url']
                    print(title)
                    idm.download(audio_url, save_name=title + '.mp3')
                    return None
                else:
                    sys.exit()
 
            except:
                print(response['msg'])
                raise ''
        elif resource_id.startswith('l_'):
            url = f'https://{APPID}.h5.xiaoeknow.com/_alive/v3/get_lookback_url?alive_id={resource_id}'
            response = requests.get(url, headers=self.HEADERS).json()
            try:
                aliveVideoMp4Url = response['data']['aliveVideoMp4Url']
                aliveVideoUrl = response['data']['aliveVideoUrl']
                if aliveVideoMp4Url != '':
                    m3u8download(aliveVideoMp4Url,title=title)
                elif aliveVideoUrl != '':
                    m3u8download(aliveVideoUrl,title=title)
            except:
                print(response)
                raise ''
 
    def decrypt(self, m3u8url: str) -> str:
        """ xiaoetong 替换链接
 
        :param m3u8url: 传入m3u8/ts链接
        :return: 不加密的链接
        """
        replace_header = ['encrypt-k-vod.xet.tech']
        # true_header = '1252524126.vod2.myqcloud.com'
        true_header = 'live-video-tx.xiaoeknow.com'
        for i in replace_header:
            if i in m3u8url:
                m3u8url = m3u8url.replace(i, true_header)
                m3u8url = re.sub('_\d+', '', m3u8url).replace('.ts', '.m3u8').split('?')[0]
        return m3u8url
 
    def decrypt_key(self, USERID, key0):
        key = []
        uid = list(USERID.encode())
        key0 = list(base64.b64decode(key0))
        for i, j in enumerate(key0):
            key.append(key0[i] ^ uid[i])
        return base64.b64encode(bytes(key)).decode()
 
    def run(self, APPID, resource_id, product_id,title = None):
        results = self.get_detail_info(APPID, resource_id, product_id,title = title)
        if results is not None:
            USERID, nonceStr, share_title, signature = self.get_USERID_signature(APPID, resource_id)
            for result in results:
                m3u8url = result['m3u8url']
 
                baseuri = result['baseuri']
                param = result['param']
                file_name = result['file_name']
                # print(m3u8url, baseuri, param)
 
                response = requests.get(m3u8url).text
                # 解key 不用key也能下载
                keyurl = re.findall('URI="(.+?)"', response)[0] + f'&uid={USERID}'
 
                key0 = base64.b64encode(requests.get(keyurl).content).decode()
                key = self.decrypt_key(USERID, key0)
                # 找到一个ts链接去生成m3u8链接
                temp_url = re.findall('(v\..+?).ts', response)[0].replace("_0", "") + '.m3u8'
 
                # m3u8url = self.decrypt(baseuri + temp_url + param)
                m3u8url = baseuri + temp_url + param
                print(file_name, key, m3u8url)
 
                m3u8download(m3u8url=m3u8url, title=file_name,key=key, headers=self.HEADERS)
 
    def listSort(self, List1):
        table = Table()
        console = Console(color_system='256', style=None)
        List2 = []
        if List1 == []:
            raise ('列表获取错误')
 
        elif len(List1) == 1:
            return List1
        i = 1
        table.add_column(f'[red]sn')
        table.add_column(f'[red]title')
        table.add_column(f'[red]resource_id')
 
        for List in List1:
            table.add_row(
                str(i),
                List['resource_title'],
                List['resource_id'],
 
            )
            i = i + 1
        console.print(table)
 
        numbers = input('输入下载序列(① 5 ② 4-10 ③ 4 10):')
        if ' ' in numbers:
            for number in numbers.split(' '):
                List2.append(List1[int(number) - 1])
        elif '-' in numbers:
            number = re.findall('\d+', numbers)
            return List1[int(number[0]) - 1:int(number[1])]
        else:
            number = re.findall('\d+', numbers)
            List2.append(List1[int(number[0]) - 1])
            return List2
        return List2
 
    def single_parse(self, resource_id, product_id=None):
        """ 单个视频下载
 
        :param resource_id: 如:v_62217c28e4b066e9608b63e8
        :param product_id: 如:p_621f4eb7e4b02b8258503f35
        :return:
        """
        self.run(self.APPID, resource_id, product_id)
 
    def batch_parse(self, product_id):
        """ 批量解析下载
 
        :param product_id: 如:p_621f4eb7e4b02b8258503f35
        :return:
        """
        # infos = self.get_all_resourceId_from_productId(self.APPID, product_id)
        infos = self.get_resourceId_from_productId(self.APPID, product_id)
        if infos == []:
            infos = self.get_resourceId_from_productId2(self.APPID, product_id)
 
            # raise '空列表'
 
        infos = self.listSort(infos)
        for info in infos:
            self.run(self.APPID, info['resource_id'], product_id,title=info['resource_title'])
 
 
if __name__ == '__main__':
    # cookie = 'anony_token=b9235d170d35c7122fd60f04188e3bcc;'
    xet = Xet('appy6xyj9p87665',cookie='')
 
    # 批量解析
    xet.batch_parse('p_621f4eb7e4b02b8258503f35')
    # 单个解析
    # xet.single_parse('v_62fa64a3e4b050af23a99220','')
方法2:直接用这个下载器https://github.com/videoDown123/xiaoeknow_download

资源下载

发表评论

  • 评论列表
还没有人评论,快来抢沙发吧~