DrissionPage

Sherry 发布于 2025-03-27 27 次阅读 文章 最后更新于 2025-03-27 5505 字


浏览器配置

[!note]

实例化浏览器

# 导入
from DrissionPage import Chromium

# 连接浏览器
browser = Chromium() 
# 创建浏览器配置对象,指定浏览器路径  
co = ChromiumOptions().set_browser_path(r'D:\chrome.exe')

[!note]

浏览器配置

# 创建浏览器启动配置对象
options = ChromiumOptions()

# 配置浏览器启动选项
options.set_browser_path(r'C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe')  # 设置浏览器路径
options.ignore_certificate_errors()  # 忽略证书错误
options.no_imgs()  # 禁用图片
options.headless(False)  # 无头模式设置为False
options.set_local_port(9696)  # 设置浏览器debug端口

# 更多配置请参考 http://drissionpage.cn/browser_control/browser_options

# 连接浏览器并获取浏览器对象
browser = Chromium(options)

[!note]

指定端口或地址

# 接管9333端口的浏览器,如该端口空闲,启动一个浏览器  
browser = Chromium(9333)  
browser = Chromium('127.0.0.1:9333')

浏览器操作

标签页

[!note]

标签页操作

from DrissionPage import Chromium  

browser = Chromium()  
tab1 = browser.latest_tab # 获取最后激活的标签页对象  
tab1.get('https://DrissionPage.cn') # 标签页访问一个网址  
tab2 = browser.new_tab('https://www.baidu.com') # 新建一个标签页并访问网址  
tab3 = browser.get_tab(title='DrissionPage') # 按条件获取标签页对象

browser.latest_tab.close() # 关闭最新标签页

[!note]

多线程执行标签页示例

from concurrent.futures import ThreadPoolExecutor

# 定义打开URL的函数
def open_url(browser, url):
    browser.new_tab(url)

# 定义要访问的中国网站列表
chinese_websites = [
    "https://www.taobao.com",    # 淘宝
    "https://www.tmall.com",     # 天猫
    "https://www.jd.com",        # 京东
]

# 使用线程池并发执行
with ThreadPoolExecutor(max_workers=3) as executor:
    for url in chinese_websites:
        executor.submit(open_url, browser, url)

[!note]

多线程操作标签页

from DrissionPage import Chromium,ChromiumOptions
from pprint import pprint
import threading
import concurrent.futures


def fetch_and_save_chapter(chapter_title, chapter_url):
    new_tab = browser.new_tab(chapter_url)  # 打开章节链接
    chapter_content = new_tab('#id=TextContent').text  # 获取章节内容
    print(chapter_content)

    print(f"正在下载笑傲江湖 {chapter_title}")
    tab_instance.wait(1)

    # 将章节内容写入文件
    with open(f'./笑傲江湖_{chapter_title}.txt', 'w', encoding='utf-8') as file:
        file.write(chapter_content)

    threading.Thread(target=new_tab.close).start()  # 关闭新标签页
    # new_tab.close()

def main():
    # 创建一个线程池,最多允许 4 个线程同时运行
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        # 提交多个任务到线程池
        futures = [executor.submit(fetch_and_save_chapter, chapter_title, chapter_url) 
                  for chapter_title, chapter_url in chapter_dict.items()]

[!note]

异步操作标签页

import asyncio
from DrissionPage import Chromium
from DataRecorder import Recorder   # 用来收集数据
from loguru import logger

# 定义异步采集方法
async def collect(tab, recorder, title, page=1):
    # 遍历所有标题元素
    for i in tab.eles('.title project-namespace-path'):
        # 获取某页所有库名称,记录到记录器
        recorder.add_data((title, i.text, page))
        logger.info((title, i.text, page))

    # 获取下一页按钮
    btn = tab('@rel=next', timeout=2)
    # 如果有下一页,点击翻页并递归调用自身
    if btn:
        btn.click(by_js=True)
        await asyncio.sleep(1)  # 异步等待
        # 增加页数并递归调用
        await collect(tab, recorder, title, page + 1)

# 主函数
async def main():
    # 新建浏览器对象
    browser = Chromium()

    # 第一个标签页访问网址
    tab1 = browser.new_tab('https://gitee.com/explore/ai')
    # 新建一个标签页并访问另一个网址,返回其对象
    tab2 = browser.new_tab('https://gitee.com/explore/machine-learning')

    # 新建记录器对象
    recorder = Recorder('data.csv')

    # 创建异步任务
    task1 = asyncio.create_task(collect(tab1, recorder, 'ai'))
    task2 = asyncio.create_task(collect(tab2, recorder, '机器学习'))

    # 等待任务完成
    await task1
    await task2

    print("所有任务完成")
    recorder.record()

if __name__ == '__main__':
    asyncio.run(main())

元素

[!note]

普通定位

# 点击文库按钮
wenku_button = tab.ele('文库')  # 最简单
wenku_button.click()

# 在输入框中输入内容
input_box = tab.ele('@id=kw')  # 最常用
input_box.input('1234')

# 点击搜索按钮
search_button = tab.ele('tag:input@@type=submit@@id=su@@value=百度一下')  # 最精确
search_button.click()

[!note]

iframe 元素定位

# 获取iframe
iframe = tab.get_frame('t:iframe')  # 最规范
# 在iframe中查找元素
ele = iframe.ele('网易首页')
# 打印元素
print(ele)

# 简洁
iframe = tab.ele('t:iframe')  # 最规范
# 在iframe中查找元素
ele = iframe('网易首页')
# 打印元素
print(ele)

# 多个定位
iframe = tab.eles('t:iframe')[0]  # 最规范
# 在iframe中查找元素
ele = iframe('网易首页')
# 打印元素
print(ele)

[!note]

ShadowRoot 元素定位【必须先定位父元素才能定位到】

# 查找ID为'cf-wait'的元素
未知名的p = tab.ele('@id=cf-wait')
# 获取该元素之后的第一个元素
未知名的div = 未知名的p.after(1)
# 打印该元素的HTML内容
print(未知名的div.html)

# 得到shadowroot下的元素
iframe_ele = 未知名的div.sr("t:iframe")
print(iframe_ele)

[!note]

xpath 定位【xpath 前要加上 x:】

# 定义XPath表达式
xpath = 'x://*[@id="root"]/div[1]/section[1]/div/div/div/div/div[2]/div[2]/div[2]/form/div[1]/div/div'

# 使用XPath查找元素
div元素 = tab.ele(xpath)

# 打印找到的元素
print(div元素)

[!note]

获取元素属性

# 获取元素的各种属性
print(AI助手元素.text)              # 获取元素文本内容
print(AI助手元素.link)              # 获取元素链接地址
print(AI助手元素.attr("class"))     # 获取元素的class属性【统一获取对应属性】
print(AI助手元素.html)              # 获取元素的HTML代码

# 获取元素的位置信息
print(AI助手元素.rect.viewport_midpoint)  # 获取视口坐标
print(AI助手元素.rect.screen_location)    # 获取屏幕坐标

# 保存图片元素
baidu_logo = tab.ele('@id=s_lg_img')      # 获取百度logo元素
baidu_logo.save(name='baidu_logo')        # 保存图片到当前目录下

鼠标拖动验证码

# 上下左右
tab.actions.up()
tab.actions.down()
tab.actions.left()
tab.actions.right()
# 鼠标移动至滑块元素上按住滑块移动鼠标,最后释放鼠标
tab.actions.move_to(滑块).hold(滑块).move(offset_x=平移距离,offset_y=4,duration=2.5).release()

数据监听(抓包)

[!note]

抓包

# 开始监听指定URL模式的网络请求
tab.listen.start('spa1.scrape.center/api/movie')  # 开始监听,指定获取包含该文本的数据包。注意:要先开启监听再打开对应页面

# 访问目标网站
tab.get('https://spa1.scrape.center/')  # 访问网址,这行产生的数据包不监听

# 遍历获取到的数据包
for packet in tab.listen.steps():
    pprint(packet.response.body)  # 打印数据包url

操纵浏览器插件

[!note]

操纵浏览器插件【当作网页】

from DrissionPage import Chromium, ChromiumOptions

# 配置Chrome选项并添加插件
options = ChromiumOptions().add_extension(r'C:\Users\Administrator\Desktop\插件\Proxy SwitchyOmega 2.5.21')  # 插件下载 https://www.crxsoso.com/

# 连接浏览器并获取浏览器对象
browser = Chromium(options)

# 获取标签页对象并打开网址
tab = browser.new_tab('https://www.baidu.com/')

tab.wait(3)
插件url = 'chrome-extension://padekgcemlokbadohgkifijomclgjgif/options.html'
tab.get(插件url)


通过检查获取地址

线程池+消息队列使用

from DrissionPage import Chromium, ChromiumOptions
import threading
import concurrent.futures
import time
import queue
from loguru import logger

# 创建一个队列用于存储标签页对象
标签页队列 = queue.Queue()

# 启动浏览器并获取标签页对象
co = ChromiumOptions().use_system_user_path()
browser = Chromium(co)

tab_instance = browser.new_tab('https://www.bigee.cc/book/20233/')

# 获取所有章节链接
链接列表 = [i.link for i in tab_instance('t:div@@class=listmain').eles('t:a') if 'book' in i.link]
print(链接列表)
链接列表 = 链接列表 + [None for i in range(10)]  # 添加结束标记

def 打开网页(url):
    if url is None:
        return
    t = browser.new_tab(url)
    标签页队列.put(t)
    logger.info(f'{t.title} 该网页已经打开')
    time.sleep(2)

def 采集网页数据():
    while True:
        t = 标签页队列.get()
        if t is None:
            break

        logger.error(t.ele('#chaptercontent').text[:15])
        threading.Thread(target=t.close).start()

        logger.warning(f'{t.title} 已经完成抓取...')
    print('数据采集完成')


with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor1, \
 concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor2:

# 提交任务到网页打开线程池
pool1 = [executor1.submit(打开网页, chapter_url) for chapter_url in 链接列表]
time.sleep(6)

# 提交任务到网页数据采集线程池
pool2 = [executor2.submit(采集网页数据) for i in range(3)]

监听 console 数据

from DrissionPage import Chromium

# 获取最新标签页
tab = Chromium().latest_tab

# 启动控制台
tab.console.start()

# 在控制台执行JavaScript代码
tab.run_js('console.log("DrissionPage");')

# 等待并获取控制台输出
data = tab.console.wait()

# 打印输出内容
print(data.text)  # 输出: DrissionPage

# 所有控制台信息
for data in tab.console.messages:
    logger.info(data.text)

# 立即调用的箭头函数形式
response_code = '''
(async ()=>{
    var res = await fetch_code
    let data = await res.text();
    console.log(data);
})()
'''.replace('fetch_code', fetch_code)

# 执行JavaScript代码
tab.run_js(response_code)

# 等待3秒
tab.wait(3)
# 获取并记录控制台输出
logger.info(tab.console.wait().text)

弹窗处理

from DrissionPage import Chromium, ChromiumOptions

# 连接浏览器并获取浏览器对象
co = ChromiumOptions()
browser = Chromium(co)

# 自动点击确认按钮
tab = browser.new_tab('https://zh.javascript.info/alert-prompt-confirm')

# 确认
tab.eles('t:a@@title=运行')[-2].click()
tab.wait(3)
tab.handle_alert(accept=True)

# 取消
tab.eles('t:a@@title=运行')[-2].click()
tab.wait(3)
tab.handle_alert(accept=False)

# 提取信息
tab.eles('t:a@@title=运行')[-2].click()
tab.wait(3)
提示框信息 = tab.handle_alert(accept=False)
print(提示框信息)

input('Press any key to quit')

上传和下载

from DrissionPage import Chromium, ChromiumOptions

# 连接浏览器并获取浏览器对象
co = ChromiumOptions()
browser = Chromium(co)

# ----------------上传部分
tab = browser.new_tab('https://picui.cn/upload')

# 设置要上传的文件路径
upload_file_path = r'C:\Users\Administrator\Desktop\BingWallpaper.jpg'

# 点击选择图片按钮并上传文件
tab.ele('选择图片').click_to_upload(upload_file_path)

# 等待3秒
tab.wait(3)
# 点击上传按钮
tab.ele('@text():上传这张图片').click()

# 等待上传成功提示显示
tab.wait.ele_displayed('上传成功')

print('已上传')
tab.wait(3)

# 获取上传后的图片URL
上传图片url = tab.ele('@data-tab-url').text

print(上传图片url)

# ----------------下载部分
tab.download(上传图片url,rename='下载的图片.jpg',goal_path='tmp')#【文件夹不存在自动创建】


# ----------------按钮触发下载
tab2 = Chromium().latest_tab
# 访问网页
tab2.get('https://im.qq.com/pcqq/index.shtml')
# 获取按钮元素
ele = tab2('全新体验版下载')
# 等待按钮元素生成
ele.wait.has_rect()
# 点击按钮触发下载,并设置下载路径和文件名
mission = ele.click_to_download(save_path='tmp2', rename='QQ.exe')
# 等待下载任务完成
mission.wait()

浏览器隐藏

[!note]

无头模式

co.headless(True)

[!note]
设置位置

co.set_argument('--window-position=32000,32000')    # 在屏幕外

[!note]

官方方法【创建新标签页还会显示出来】

browser.latest_tab.set.window.hide()

模拟人类鼠标轨迹

#!/usr/bin/env python
# -*- coding:utf-8 -*-#

import numpy as np
import math
import random


class bezierTrajectory:

    def _bztsg(self, dataTrajectory):
        lengthOfdata = len(dataTrajectory)

        def staer(x):
            t = ((x - dataTrajectory[0][0]) / (dataTrajectory[-1][0] - dataTrajectory[0][0]))
            y = np.array([0, 0], dtype=np.float64)
            for s in range(len(dataTrajectory)):
                y += dataTrajectory[s] * ((math.factorial(lengthOfdata - 1) / (
                            math.factorial(s) * math.factorial(lengthOfdata - 1 - s))) * math.pow(t, s) * math.pow(
                    (1 - t), lengthOfdata - 1 - s))
            return y[1]

        return staer

    def _type(self, type, x, numberList):
        numberListre = []
        pin = (x[1] - x[0]) / numberList
        if type == 0:
            for i in range(numberList):
                numberListre.append(i * pin)
            if pin >= 0:
                numberListre = numberListre[::-1]
        elif type == 1:
            for i in range(numberList):
                numberListre.append(1 * ((i * pin) ** 2))
            numberListre = numberListre[::-1]
        elif type == 2:
            for i in range(numberList):
                numberListre.append(1 * ((i * pin - x[1]) ** 2))

        elif type == 3:
            dataTrajectory = [np.array([0,0]), np.array([(x[1]-x[0])*0.8, (x[1]-x[0])*0.6]), np.array([x[1]-x[0], 0])]
            fun = self._bztsg(dataTrajectory)
            numberListre = [0]
            for i in range(1,numberList):
                numberListre.append(fun(i * pin) + numberListre[-1])
            if pin >= 0:
                numberListre = numberListre[::-1]
        numberListre = np.abs(np.array(numberListre) - max(numberListre))
        biaoNumberList = ((numberListre - numberListre[numberListre.argmin()]) / (
                    numberListre[numberListre.argmax()] - numberListre[numberListre.argmin()])) * (x[1] - x[0]) + x[0]
        biaoNumberList[0] = x[0]
        biaoNumberList[-1] = x[1]
        return biaoNumberList

    def getFun(self, s):
        '''

        :param s: 传入P点
        :return: 返回公式
        '''
        dataTrajectory = []
        for i in s:
            dataTrajectory.append(np.array(i))
        return self._bztsg(dataTrajectory)

    def simulation(self, start, end, le=1, deviation=0, bias=0.5):
        '''

        :param start:开始点的坐标 如 start = [0, 0]
        :param end:结束点的坐标 如 end = [100, 100]
        :param le:几阶贝塞尔曲线,越大越复杂 如 le = 4
        :param deviation:轨迹上下波动的范围 如 deviation = 10
        :param bias:波动范围的分布位置 如 bias = 0.5
        :return:返回一个字典equation对应该曲线的方程,P对应贝塞尔曲线的影响点
        '''
        start = np.array(start)
        end = np.array(end)
        cbb = []
        if le != 1:
            e = (1 - bias) / (le - 1)
            cbb = [[bias + e * i, bias + e * (i + 1)] for i in range(le - 1)]

        dataTrajectoryList = [start]

        t = random.choice([-1, 1])
        w = 0
        for i in cbb:
            px1 = start[0] + (end[0] - start[0]) * (random.random() * (i[1] - i[0]) + (i[0]))
            p = np.array([px1, self._bztsg([start, end])(px1) + t * deviation])
            dataTrajectoryList.append(p)
            w += 1
            if w >= 2:
                w = 0
                t = -1 * t

        dataTrajectoryList.append(end)
        return {"equation": self._bztsg(dataTrajectoryList), "P": np.array(dataTrajectoryList)}

    def trackArray(self, start, end, numberList, le=1, deviation=10, bias=0.5, type=3, cbb=2, yhh=10):
        '''

        :param start:开始点的坐标 如 start = [0, 0]
        :param end:结束点的坐标 如 end = [100, 100]
        :param numberList:返回的数组的轨迹点的数量 numberList = 150
        :param le:几阶贝塞尔曲线,越大越复杂 如 le = 4
        :param deviation:轨迹上下波动的范围 如 deviation = 10
        :param bias:波动范围的分布位置 如 bias = 0.5
        :param type:0表示均速滑动,1表示先慢后快,2表示先快后慢,3表示先慢中间快后慢 如 type = 1
        :param cbb:在终点来回摆动的次数
        :param yhh:在终点来回摆动的范围
        :return:返回一个字典trackArray对应轨迹数组,P对应贝塞尔曲线的影响点
        '''
        s = []
        fun = self.simulation(start, end, le, deviation, bias)
        w = fun['P']
        fun = fun["equation"]
        if cbb != 0:
            numberListOfcbb = round(numberList*0.2/(cbb+1))
            numberList -= (numberListOfcbb*(cbb+1))

            xTrackArray = self._type(type, [start[0], end[0]], numberList)
            for i in xTrackArray:
                s.append([i, fun(i)])
            dq = yhh/cbb
            kg = 0
            ends = np.copy(end)
            for i in range(cbb):
                if kg == 0:
                    d = np.array([end[0] + (yhh-dq*i), ((end[1]-start[1])/(end[0]-start[0]))*(end[0]+(yhh-dq*i)) +(end[1]-((end[1]-start[1])/(end[0]-start[0]))*end[0])   ])
                    kg = 1
                else:
                    d = np.array([end[0] - (yhh - dq * i), ((end[1]-start[1])/(end[0]-start[0]))*(end[0]-(yhh-dq*i)) +(end[1]-((end[1]-start[1])/(end[0]-start[0]))*end[0])  ])
                    kg = 0
                print(d)
                y = self.trackArray(ends, d, numberListOfcbb, le=2, deviation=0, bias=0.5, type=0, cbb=0, yhh=10)
                s += list(y['trackArray'])
                ends = d
            y = self.trackArray(ends, end, numberListOfcbb, le=2, deviation=0, bias=0.5, type=0, cbb=0, yhh=10)
            s += list(y['trackArray'])

        else:
            xTrackArray = self._type(type, [start[0], end[0]], numberList)
            for i in xTrackArray:
                s.append([i, fun(i)])
        return {"trackArray": np.array(s), "P": w}

class humanMouse:
    def __init__(self):
        self.aa = bezierTrajectory()

    def getRandomTrackArray(self, 水平移动,垂直移动=10,分割=10):
        randomTrackArray = self.aa.trackArray(start=[0,0],end=[水平移动,垂直移动],numberList =分割,deviation = 5,bias = 0.5,type = 3,le=3)
        return randomTrackArray['trackArray']
    def getRandomTrackSpacingArray(self, 水平移动:int=100,垂直移动:int=10,分割:int=10):
        """
        生成一个随机轨迹间距数组。

        参数:
        - 水平移动 (int): 水平方向的移动距离。
        - 垂直移动 (int, 可选): 垂直方向的移动距离,默认为10。
        - 分割 (int, 可选): 分割的段数,即生成的数组长度,默认为10。

        返回:
        - list: 包含随机轨迹间距的数组。
        """
        # 生成一个随机轨迹数组
        randomTrackArray = self.aa.trackArray(start=[0,0],end=[水平移动,垂直移动],numberList =分割,deviation = 5,bias = 0.5,type = 3,le=3)
        # 获取轨迹数组
        bb= randomTrackArray['trackArray']
        # 将第一个点的水平和垂直间距设置为0
        cc=[[0,0]]
        # 获取数组长度
        length = len(bb)
        # 遍历数组,计算每个点与前一个点的水平和垂直间距
        for i in range(1,length):
            x1,y1=bb[i-1]
            x2,y2=bb[i]
            cc.append([float(x2-x1),float(y2-y1)])

        return cc







if __name__ == '__main__':
    # aa=bezierTrajectory()
    # 坐标数组=aa.trackArray(start=[0,10],end=[50,10],numberList =30,deviation = 5,bias = 0.5,type = 3,le=3)
    # print('生产的轨迹坐标',坐标数组['trackArray'])
    from pprint import pprint



    # zuobiao=坐标数组['trackArray']
    轨迹点坐标数组=humanMouse().getRandomTrackArray(147,10,20)
    pprint(轨迹点坐标数组)



    import matplotlib.pyplot as plt

    # 假设这是你的坐标列表
    coordinates = 轨迹点坐标数组

    # 分别提取 x 和 y 坐标
    x, y = zip(*coordinates)

    # 创建一个散点图
    plt.scatter(x, y)

    # 设置图表标题和标签
    plt.title('Scatter Plot of Coordinates')
    plt.xlabel('X axis')
    plt.ylabel('Y axis')

    # 显示网格
    plt.grid()

    # 展示图形
    plt.show()

自动化 hook

from DrissionPage import Chromium
from pathlib import Path

browser = Chromium(8976)
#创建空白页
tab=browser.new_tab()

def get_js_code(name:str):
    # 获取当前脚本所在的目录路径
    script_dir = Path(__file__).resolve().parent
    # 构建js文件的完整路径
    js_file = str(script_dir / name)
    # 读取并获取js文件内容
    with open(js_file, 'r', encoding='utf-8') as f:
        js_content = f.read()
        # print(js_content)
        return js_content

#插入初始化脚本
js=get_js_code('hook_tool.js')
tab.add_init_js(js)

# tab.add_init_js('禁用alert()')
# tab.add_init_js('禁用debugger()')
# tab.add_init_js('重写fetch2()')
# tab.add_init_js('重写xhrSend(to_json=true)')
# tab.add_init_js('重写WebSocket原型()')

# input('已经运行注入hook脚本,输入任意键继续...')

tab.wait(4)
url2='https://www.runoob.com/try/try.php?filename=tryjs_alert'
url7='https://gitcode.com/'
yy='https://www.yy.com/54880976/54880976?tempId=16777217'
# url21='http://tool.pfan.cn/websocket'
# ws://124.222.224.186:8800

tab.get(yy)
# tab.wait(10).run_js('重写WebSocketOnMessage();')

input('输入任意键退出..')

Hook_js

// 定义禁用alert的函数
function 禁用alert() {
    try {
        // 保存原始的alert函数
        const originalAlert = window.alert;

        // 重写alert
        window.alert = function (msg) {
            console.log('[Hook] Alert被拦截:', msg);
            // return originalAlert.call(window, msg);  // 如果想显示弹窗,取消注释这行
            return true;
        };

        console.log('[Hook] Alert已被禁用');
    } catch (error) {
        console.error('[Hook] 禁用Alert失败:', error);
    }
}

// 保存其他原始的函数引用
const originalFetch = window.fetch;
const originalXHRSend = XMLHttpRequest.prototype.send;
const originalWebSocket = window.WebSocket;

// 重写fetch
window.fetch = async function (...args) {
    console.log('[Hook] Fetch请求:', args);
    const response = await originalFetch.apply(this, args);
    const clone = response.clone();
    clone.text().then(text => {
        console.log('[Hook] Fetch响应:', text);
    });
    return response;
}

// 定义重写XHR的函数
function 重写xhrSend(to_json = false) {
    try {
        // 保存原始的send方法
        const originalSend = XMLHttpRequest.prototype.send;

        // 重写send方法
        XMLHttpRequest.prototype.send = function (data) {
            console.log('[Hook] XHR请求:', {
                url: this._url,
                method: this._method,
                data: to_json ? JSON.parse(data) : data
            });

            // 监听响应
            this.addEventListener('load', function () {
                let response = this.responseText;
                if (to_json) {
                    try {
                        response = JSON.parse(response);
                    } catch (e) {
                        console.log('[Hook] 响应数据不是有效的JSON');
                    }
                }

                console.log('[Hook] XHR响应:', {
                    url: this._url,
                    response: response
                });
            });

            return originalSend.apply(this, arguments);
        };

        // 重写open方法以保存url和method
        const originalOpen = XMLHttpRequest.prototype.open;
        XMLHttpRequest.prototype.open = function (method, url) {
            this._url = url;
            this._method = method;
            return originalOpen.apply(this, arguments);
        };

        console.log('[Hook] XHR已被重写');
    } catch (error) {
        console.error('[Hook] 重写XHR失败:', error);
    }
}

// 重写WebSocket
window.WebSocket = function (url, protocols) {
    console.log('[Hook] WebSocket连接:', url);
    const ws = new originalWebSocket(url, protocols);

    // 劫持send方法
    const originalWSSend = ws.send;
    ws.send = function (data) {
        console.log('[Hook] WebSocket发送:', data);
        return originalWSSend.call(this, data);
    }

    // 监听消息
    ws.addEventListener('message', function (event) {
        console.log('[Hook] WebSocket接收:', event.data);
    });

    return ws;
}

// 重写WebSocket onmessage处理
function 重写WebSocketOnMessage() {
    WebSocket.prototype.onmessage = function (event) {
        console.log('[Hook] WebSocket消息:', event.data);
    }
}

console.log('[Hook] 注入完成');

完全模拟手机端

from DrissionPage import Chromium, ChromiumOptions

# 定义不同浏览器的 User Agent
user_agents = {
    "Chrome_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
    "Firefox_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:110.0) Gecko/20100101 Firefox/110.0",
    "Safari_macOS": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Version/13.1 Safari/537.36",
    "Edge_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edge/110.0.1587.57",
    "Chrome_Android": "Mozilla/5.0 (Linux; Android 12; Pixel 5 Build/SP1A.210812.016) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Mobile Safari/537.36",
    "Safari_iOS": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.1 Mobile/15E148 Safari/604.1",
    "Opera_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 OPR/97.0.4692.71"
}

# 创建浏览器选项实例
co = ChromiumOptions()
# 设置 User Agent 为 Android Chrome
co.set_user_agent(user_agents["Chrome_Android"])
tab = Chromium(co).latest_tab

# 设置界面模式
zoom = {
    "command": "Emulation.setDeviceMetricsOverride",
    "parameters": {
        "width": 360,          # 设备的宽度(像素)
        "height": 740,         # 设备的高度(像素)
        "deviceScaleFactor": 1, # 设置设备的缩放比例(相当于调整 DPI)
        "mobile": True,        # 设置是否为手机模拟(通常为 true 或 false)
        "scale": 1            # 页面缩放比例(0.8 表示页面缩小到 80%)
    }
}
tab.run_cdp(zoom["command"], **zoom["parameters"])

# 访问百度
tab.get("https://www.baidu.com/")
input('继续 ?')

监听 websocket

from DrissionPage import Chromium
import time
from datetime import datetime
from typing import Optional, Callable, Dict, Any
from dataclasses import dataclass

@dataclass
class WebSocketMonitorConfig:
    """WebSocket监听器配置"""
    browser: Chromium
    url: str
    callback: Optional[Callable] = None
    quit_on_q: bool = True
    auto_reconnect: bool = False
    reconnect_interval: int = 5

class WebSocketListener:
    """WebSocket监听器类"""
    def __init__(self, config: WebSocketMonitorConfig):
        self.config = config
        self.browser = config.browser
        self.is_running = False

    def _default_message_handler(self, **kwargs: Dict[str, Any]) -> None:
        """默认的WebSocket消息处理函数"""
        try:
            timestamp = datetime.now().strftime('%H:%M:%S')
            response = kwargs.get('response', {})
            if 'payloadData' in response:
                print(f'[{timestamp}] {response["payloadData"]}')
                print('-' * 80)
        except Exception as e:
            print(f'处理消息出错: {str(e)}')

    def start(self) -> None:
        """启动WebSocket监听"""
        self.is_running = True
        while self.is_running:
            try:
                self._run_monitor()
            except Exception as e:
                print(f'监听过程出错: {str(e)}')
                if not self.config.auto_reconnect:
                    break
                print(f'{self.config.reconnect_interval}秒后尝试重新连接...')
                time.sleep(self.config.reconnect_interval)

    def stop(self) -> None:
        """停止WebSocket监听"""
        self.is_running = False

    def _run_monitor(self) -> None:
        """运行监听器的核心逻辑"""
        tab = self.browser.latest_tab
        tab.run_cdp('Network.enable')
        message_handler = self.config.callback if self.config.callback else self._default_message_handler
        tab.driver.set_callback('Network.webSocketFrameReceived', message_handler)
        tab.get(self.config.url)
        print(f'开始监听 {self.config.url} 的WebSocket通信...')
        if self.config.quit_on_q:
            print('按q键退出监听...')
            while self.is_running:
                if input() == 'q':
                    self.stop()
                    break
                time.sleep(0.1)
        else:
            while self.is_running:
                time.sleep(1)

def create_websocket_listener(browser: Chromium,
                            url: str,
                            callback: Optional[Callable] = None,
                            quit_on_q: bool = True,
                            auto_reconnect: bool = False,
                            reconnect_interval: int = 5) -> WebSocketListener:
    """
    创建WebSocket监听器的便捷函数
    Args:
        browser: Chromium浏览器实例
        url: 要监听的网页URL
        callback: 自定义的消息处理函数
        quit_on_q: 是否启用按q退出功能
        auto_reconnect: 是否在断开连接后自动重连
        reconnect_interval: 重连间隔时间(秒)
    Returns:
        WebSocketListener实例
    """
    config = WebSocketMonitorConfig(
        browser=browser,
        url=url,
        callback=callback,
        quit_on_q=quit_on_q,
        auto_reconnect=auto_reconnect,
        reconnect_interval=reconnect_interval
    )
    return WebSocketListener(config)

if __name__ == '__main__':
    # 使用示例
    url = 'https://toolin.cn/ws'
    # 创建浏览器实例
    browser = Chromium()
    try:
        # 方式1:使用便捷函数创建
        listener = create_websocket_listener(
            browser=browser,
            url=url
        )
        listener.start()
        # 方式2:直接使用配置类创建
        # config = WebSocketMonitorConfig(browser=browser, url=url)
        # listener = WebSocketListener(config)
        # listener.start()
    finally:
        # 在主程序结束时关闭浏览器
        browser.quit()

SaoDrisssionPage

[!tip]
引入

from SaoDrissionPage import SaoTab

[!note]

解析网页

# 解析网页
tab = SaoTab(tab)

[!note]
所见即所得

输入框 = tab.ele('<input id="kw" name="wd">')

[!note]

模仿人类行为

tab.human_actions.move_to(输入框).click().type('窗前明月光')

[!note]

切换移动端

tab.mobile_mode(True)

[!note]

一件解析表格

表格元素 = tab.ele("t:table",index=2)
data = tab.table_to_dict(表格元素,debug=True)

[!note]

监听 Json 数据

tab.startJsonListener()