您当前的位置:首页 > 计算机 > 编程开发 > Python

Python爬取曾今的K歌

时间:04-23来源:作者:点击数:

前言

还记得我们童年唱过的歌吗,还记得曾今喜欢的人的声音吗,全民K歌作为曾今主流的唱歌软件,深受我的想念。每次联网去访问,万一哪天对方把歌删了,或变成私密了,那就可惜了。今天我在此制作一款全民K歌下载器,让你留住你与别人的曾今!【本程序融合了爬取企查查~这篇文章里的get_cookies.py文件代码,可以自动获取cookie,不过实测自动获取的cookie会比正常访问的cookie少一个参数,在获取单曲数量上可能会缺失(自己实测某全民K歌下载143首只下载了140首),直接复制cookie则没有这个问题。】

思路

目前全民K歌只显示前8首歌曲,而且底部的查看更多是没有用的,我一开始看到这现象是对爬这个内心感到绝望的,当然如果愿意一个个分享出来提取链接下载那也是可以的,但是人肯定不想那么麻烦,只想获取到一个人就把这个人所有的歌曲都下载了。经过我对网页的研究,使用BeautifulSoup等网页解析工具后发现个人界面上有个script标签里存放当前账号所能获取到的概览信息,有用的主要就是总歌曲数,如果是爬别人的,只会显示非私密歌曲数,这能让我们知道,我们要爬多少,有这个好头,我想之后处理起来应该好一点。

在我多次刷新观察XHR后,我灵机一动点了一次查看更多,发现他虽然没有用,但多出来了一个XHR请求kg_ugc_get_homepage,虽然我看到他并没有返回有用的数据,但我当我看到他所需的熟悉的参数,我就肯定这是获取歌曲的唯一方式!

在这里插入图片描述

这可把我激动坏了,拼接上startnumshare_uid后,果然,成功的响应了一个有歌曲信息的callback对象,经过简单分析后,得到了ugclist歌曲列表。经过多次试探,最多只能15首歌曲,罢了,多请求几次就多请求几次吧,反正总歌曲数据 get 到了。

我本以为接下来会很轻松,没想到音频链接很好找,不过更没想到…

在这里插入图片描述

到这里我要吐了,都是不知道从哪里冒出来的参数,要找到未免有点太复杂了吧…我感觉,至少有十几个甚至几十个JavaScript函数参与了这些参数的生成,更可能是在服务器端生成的,我果断放弃了这条路。

我看了下网页源代码,嘿,歌曲url在网页源代码的某个script标签中,果然我被幸运女神宠幸了两次。

接下来使用BeautifulSoup等网页解析工具就能解析出歌曲地址,这样就直接跳过了ftnrkeyvkeyfnameugcid的获取,嘿嘿。然后就可以正常的下载了。

下载完歌曲,我看了看我的全民K歌,好似还有一个专辑没有下载,我心想歌曲都下了,专辑也不能落下啊,我直接打开专辑标签,出现了一个fcg_user_album_list的XHR请求,看英文就知道是获取专辑列表,我就一个专辑所以显示一个。专辑详情界面实际参数挺少的,只要一个专辑id参数s,就能访问专辑界面,我现在对界面中的XHR绝望了,我毅然决然的再次分析网页源代码,发现我想要的信息依旧静静的躺在script标签里,不过我明明有11首单曲他只能获得10首,不知道是因为我其中一首删除了还是因为他只能获取前10首,就不管它了,专辑就差不多就完事了,如果你们有专辑内歌曲多的,可以试试获取几条~

接下来下载专辑歌曲和下载普通歌曲一样,获取到shareid就能进入歌曲详情页,分析出歌曲网址就能把他们下载下来了~

代码

# _*_ coding:utf-8 _*_
# Project: 
# FileName: qmkg_new.py
# UserName: 高俊佶
# ComputerUser:19305
# Day: 2021/10/24
# Time: 12:00
# IDE: PyCharm
# 女人,不要也罢!——来自2021-10-9日的灵魂伤感

import os
import sys
import json
import time
import base64
import getpass
import sqlite3

import urllib3
import requests
import webbrowser
import ctypes.wintypes
from bs4 import BeautifulSoup
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class DataBlob(ctypes.Structure):
    _fields_ = [('cbData', ctypes.wintypes.DWORD), ('pbData', ctypes.POINTER(ctypes.c_char))]


def dp_api_decrypt(encrypted):
    p = ctypes.create_string_buffer(encrypted, len(encrypted))
    blob_out = DataBlob()
    ret_val = ctypes.windll.crypt32.CryptUnprotectData(ctypes.byref(DataBlob(ctypes.sizeof(p), p)), None, None, None, None, 0, ctypes.byref(blob_out))
    if not ret_val:
        raise ctypes.WinError()
    result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
    ctypes.windll.kernel32.LocalFree(blob_out.pbData)
    return result


def aes_decrypt(encrypted_txt):
    with open(f'C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data\\Local State', encoding='utf-8', mode="r") as f:
        jsn = json.loads(str(f.readline()))
    encrypted_key = base64.b64decode(jsn["os_crypt"]["encrypted_key"].encode())
    encrypted_key = encrypted_key[5:]
    cipher = Cipher(algorithms.AES(dp_api_decrypt(encrypted_key)), None, backend=default_backend())
    cipher.mode = modes.GCM(encrypted_txt[3:15], tag=None, min_tag_length=16)
    return cipher.decryptor().update(encrypted_txt[15:])


def chrome_decrypt(encrypted_txt):
    if sys.platform == 'win32':
        try:
            if encrypted_txt[:4] == b'x01x00x00x00':
                return dp_api_decrypt(encrypted_txt).decode()
            elif encrypted_txt[:3] == b'v10':
                return aes_decrypt(encrypted_txt)[:-16].decode()
        except WindowsError:
            return None
    else:
        raise WindowsError


def get_cookies_from_chrome(d):
    con = sqlite3.connect(f'C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Cookies')
    con.row_factory = sqlite3.Row
    cur = con.cursor()
    cur.execute(f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{d}%"')
    cookies = ''
    for row in cur:
        if row['value'] is not None:
            value = chrome_decrypt(row['value'])
            if value is not None:
                cookies += row['name'] + '=' + value + ';'
    return cookies


def parse_cookies(cookies: str):
    cookies_dict = {}
    for c in cookies.replace(' ', '').split(';'):
        try:
            cookies_dict[c.split('=')[0]] = c.split('=')[1]
        except IndexError:
            cookies_dict[c.split('=')[0]] = ''
    if "" in cookies_dict:
        del cookies_dict[""]
    return cookies_dict


webbrowser.open('https://kg.qq.com/index-pc.html')
cookie = input('输入有效的cookie(XHR中选一),否则读取谷歌浏览器cookie(可能会缺失歌曲):')
if not cookie:
    cookie = get_cookies_from_chrome('qq.com') + get_cookies_from_chrome('kg.qq.com')

if not parse_cookies(cookie).get('muid', None):
    print('等待登录全民K歌网页...')
    n = 1
    while not parse_cookies(cookie).get('muid', None):
        cookie = get_cookies_from_chrome('qq.com') + get_cookies_from_chrome('kg.qq.com')
        print(f'检测登陆状态第【{n}】次...', end='\r')
        time.sleep(1)
        n += 1
uid = parse_cookies(cookie)['muid']
print(f'\n获取到用户uid:{uid}')
inp = input('需要查询的uid,否则获取用户自身:')
if len(inp) > 10:
    uid = inp

# 获取所有能得到的歌曲信息
total = 0  # 可以获取到的歌曲总数
ugc = []  # 全部歌曲数据
user_information = {}  # 用户基本信息
res = requests.get(f'https://kg.qq.com/node/personal?uid={uid}', cookies={"cookie": cookie})
if res.ok:
    for script in BeautifulSoup(res.text, 'lxml').find_all('script'):
        if "window.__DATA__" in script.text:
            user_information = json.loads(script.text[script.text.find('{'): script.text.rfind('};') + 1])["data"]
            total = user_information["ugc_total_count"]  # 没有cookies ==公开的歌曲 | 有cookies ==账户所有的歌曲 || 能够被获取到的歌曲数目
            print(f'总共歌曲数目:{total}')
            if not os.path.exists(f'{user_information["kgnick"]}_{uid}/media'):
                os.makedirs(f'{user_information["kgnick"]}_{uid}/media')
            num = 15  # 单次获取最大15首
            n = 1  # 页数
            while n:
                url = f'http://node.kg.qq.com/cgi/fcgi-bin/kg_ugc_get_homepage?type=get_uinfo&start={n}&num={num}&share_uid={uid}'
                res = requests.get(url, cookies={"cookie": cookie})
                if res.ok:
                    song_information = json.loads(res.text[res.text.find('{'): res.text.rfind('}') + 1])["data"]
                    if not song_information["ugclist"]:
                        break
                    ugc += song_information["ugclist"]
                    n += 1
            break
    else:
        print('未发现歌曲!')

if user_information:
    open(f'{user_information["kgnick"]}_{uid}/{user_information["kgnick"]}_{uid}.json', 'w', encoding='utf-8').write(json.dumps(ugc, indent=4, ensure_ascii=False))
    for i, song in enumerate(ugc):
        # 直接从字典获取歌曲链接(跳过 vkey 的麻烦获取)
        res = requests.get(f'https://node.kg.qq.com/play?s={song["shareid"]}', cookies={"cookie": cookie})
        if res.ok:
            for script in BeautifulSoup(res.text, 'lxml').find_all('script'):
                if "window.__DATA__" in script.text:
                    media_information = json.loads(script.text[script.text.find('{'): script.text.rfind('};') + 1])["detail"]
                    res = requests.get(media_information["playurl"], stream=True)
                    if res.ok:
                        print(f'\r正在下载:{user_information["kgnick"]}_{uid}/media/{song["title"]}_{song["shareid"]}.m4a\n【当前:{str(i + 1).zfill(len(str(total)))}/总共:{total}】', end='')
                        open(f'{user_information["kgnick"]}_{uid}/media/{song["title"]}_{song["shareid"]}.m4a', 'wb').write(res.content)
                    break
            else:
                print('未发现媒体链接!')
    print()

# 获取专辑
album_list = {}
res = requests.get(f'https://node.kg.qq.com/cgi/fcgi-bin/fcg_user_album_list?dest_uid={uid}', cookies={"cookie": cookie})
if res.ok:
    album_information = json.loads(res.text[res.text.find('{'): res.text.rfind('}') + 1])["data"]
    if "album_list" in album_information and album_information["album_list"]:
        for album in album_information["album_list"]:
            album_list[album["album_id"]] = {"album_name": album["album_name"], "album_list": []}
            res = requests.get(f'https://node.kg.qq.com/album?s={album["album_id"]}')
            if res.ok:
                for script in BeautifulSoup(res.text, 'lxml').find_all('script'):
                    if "window.__DATA__" in script.text:
                        album_list_information = json.loads(script.text[script.text.find('{'): script.text.rfind('};') + 1])["detail"]
                        if album_list_information["ugc_list"] and album["ugc_num"] and len(album_list_information["ugc_list"]) == album["ugc_num"]:
                            if not os.path.exists(f'{user_information["kgnick"]}_{uid}/{album["album_name"]}_{album["album_id"]}'):
                                os.makedirs(f'{user_information["kgnick"]}_{uid}/{album["album_name"]}_{album["album_id"]}')
                            if album["ugc_num"] != album_list_information["ugc_num"]:
                                print('获取专辑歌曲暂时只能10首,超出部分无法获得')
                            album_list[album["album_id"]]["album_list"] = album_list_information["ugc_list"]
                        else:
                            print('无专辑歌曲或未获取全部专辑歌曲')
                        break
                else:
                    print('专辑未发现歌曲!')

# 下载专辑
if album_list:
    open(f'{user_information["kgnick"]}_{uid}/album_list.json', 'w', encoding='utf-8').write(json.dumps(album_list, indent=4, ensure_ascii=False))
    for album in album_list:
        if album_list[album]:
            total = len(album_list[album]["album_list"])
            open(f'{user_information["kgnick"]}_{uid}/{album_list[album]["album_name"]}_{album}/{album_list[album]["album_name"]}_{album}.json', 'w', encoding='utf-8').write(json.dumps(album_list[album], indent=4, ensure_ascii=False))
            for i, song in enumerate(album_list[album]["album_list"]):
                # 直接从字典获取歌曲链接(跳过 vkey 的麻烦获取)
                res = requests.get(f'https://node.kg.qq.com/play?s={song["ugc_id"]}', cookies={"cookie": cookie})
                if res.ok:
                    for script in BeautifulSoup(res.text, 'lxml').find_all('script'):
                        if "window.__DATA__" in script.text:
                            media_information = json.loads(script.text[script.text.find('{'): script.text.rfind('};') + 1])["detail"]
                            res = requests.get(media_information["playurl"], stream=True)
                            if res.ok:
                                print(f'\r正在下载:{user_information["kgnick"]}_{uid}/{album_list[album]["album_name"]}_{album}/{song["song_name"]}_{song["ugc_id"]}.m4a\n【当前:{str(i + 1).zfill(len(str(total)))}/总共:{total}】', end='')
                                open(f'{user_information["kgnick"]}_{uid}/{album_list[album]["album_name"]}_{album}/{song["song_name"]}_{song["ugc_id"]}.m4a', 'wb').write(res.content)
                            break
                    else:
                        print('未发现媒体链接!')
            print()
input('下载完成!回车结束程序~')

结束语

本程序可以打包成程序:pyinstaller -F qmkg_new.py实现走到哪用到哪。

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门