浏览器配置
[!note]
实例化浏览器
# 导入
from DrissionPage import Chromium
# 连接浏览器
browser = Chromium()
# 创建浏览器配置对象,指定浏览器路径
co = ChromiumOptions().set_browser_path(r'D:\chrome.exe')
[!note]
浏览器配置
# 创建浏览器启动配置对象
options = ChromiumOptions()
# 配置浏览器启动选项
options.set_browser_path(r'C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe') # 设置浏览器路径
options.ignore_certificate_errors() # 忽略证书错误
options.no_imgs() # 禁用图片
options.headless(False) # 无头模式设置为False
options.set_local_port(9696) # 设置浏览器debug端口
# 更多配置请参考 http://drissionpage.cn/browser_control/browser_options
# 连接浏览器并获取浏览器对象
browser = Chromium(options)
[!note]
指定端口或地址
# 接管9333端口的浏览器,如该端口空闲,启动一个浏览器
browser = Chromium(9333)
browser = Chromium('127.0.0.1:9333')
浏览器操作
标签页
[!note]
标签页操作
from DrissionPage import Chromium
browser = Chromium()
tab1 = browser.latest_tab # 获取最后激活的标签页对象
tab1.get('https://DrissionPage.cn') # 标签页访问一个网址
tab2 = browser.new_tab('https://www.baidu.com') # 新建一个标签页并访问网址
tab3 = browser.get_tab(title='DrissionPage') # 按条件获取标签页对象
browser.latest_tab.close() # 关闭最新标签页
[!note]
多线程执行标签页示例
from concurrent.futures import ThreadPoolExecutor
# 定义打开URL的函数
def open_url(browser, url):
browser.new_tab(url)
# 定义要访问的中国网站列表
chinese_websites = [
"https://www.taobao.com", # 淘宝
"https://www.tmall.com", # 天猫
"https://www.jd.com", # 京东
]
# 使用线程池并发执行
with ThreadPoolExecutor(max_workers=3) as executor:
for url in chinese_websites:
executor.submit(open_url, browser, url)
[!note]
多线程操作标签页
from DrissionPage import Chromium,ChromiumOptions
from pprint import pprint
import threading
import concurrent.futures
def fetch_and_save_chapter(chapter_title, chapter_url):
new_tab = browser.new_tab(chapter_url) # 打开章节链接
chapter_content = new_tab('#id=TextContent').text # 获取章节内容
print(chapter_content)
print(f"正在下载笑傲江湖 {chapter_title}")
tab_instance.wait(1)
# 将章节内容写入文件
with open(f'./笑傲江湖_{chapter_title}.txt', 'w', encoding='utf-8') as file:
file.write(chapter_content)
threading.Thread(target=new_tab.close).start() # 关闭新标签页
# new_tab.close()
def main():
# 创建一个线程池,最多允许 4 个线程同时运行
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交多个任务到线程池
futures = [executor.submit(fetch_and_save_chapter, chapter_title, chapter_url)
for chapter_title, chapter_url in chapter_dict.items()]
[!note]
异步操作标签页
import asyncio
from DrissionPage import Chromium
from DataRecorder import Recorder # 用来收集数据
from loguru import logger
# 定义异步采集方法
async def collect(tab, recorder, title, page=1):
# 遍历所有标题元素
for i in tab.eles('.title project-namespace-path'):
# 获取某页所有库名称,记录到记录器
recorder.add_data((title, i.text, page))
logger.info((title, i.text, page))
# 获取下一页按钮
btn = tab('@rel=next', timeout=2)
# 如果有下一页,点击翻页并递归调用自身
if btn:
btn.click(by_js=True)
await asyncio.sleep(1) # 异步等待
# 增加页数并递归调用
await collect(tab, recorder, title, page + 1)
# 主函数
async def main():
# 新建浏览器对象
browser = Chromium()
# 第一个标签页访问网址
tab1 = browser.new_tab('https://gitee.com/explore/ai')
# 新建一个标签页并访问另一个网址,返回其对象
tab2 = browser.new_tab('https://gitee.com/explore/machine-learning')
# 新建记录器对象
recorder = Recorder('data.csv')
# 创建异步任务
task1 = asyncio.create_task(collect(tab1, recorder, 'ai'))
task2 = asyncio.create_task(collect(tab2, recorder, '机器学习'))
# 等待任务完成
await task1
await task2
print("所有任务完成")
recorder.record()
if __name__ == '__main__':
asyncio.run(main())
元素
[!note]
普通定位
# 点击文库按钮
wenku_button = tab.ele('文库') # 最简单
wenku_button.click()
# 在输入框中输入内容
input_box = tab.ele('@id=kw') # 最常用
input_box.input('1234')
# 点击搜索按钮
search_button = tab.ele('tag:input@@type=submit@@id=su@@value=百度一下') # 最精确
search_button.click()
[!note]
iframe 元素定位
# 获取iframe
iframe = tab.get_frame('t:iframe') # 最规范
# 在iframe中查找元素
ele = iframe.ele('网易首页')
# 打印元素
print(ele)
# 简洁
iframe = tab.ele('t:iframe') # 最规范
# 在iframe中查找元素
ele = iframe('网易首页')
# 打印元素
print(ele)
# 多个定位
iframe = tab.eles('t:iframe')[0] # 最规范
# 在iframe中查找元素
ele = iframe('网易首页')
# 打印元素
print(ele)
[!note]
ShadowRoot 元素定位【必须先定位父元素才能定位到】
# 查找ID为'cf-wait'的元素
未知名的p = tab.ele('@id=cf-wait')
# 获取该元素之后的第一个元素
未知名的div = 未知名的p.after(1)
# 打印该元素的HTML内容
print(未知名的div.html)
# 得到shadowroot下的元素
iframe_ele = 未知名的div.sr("t:iframe")
print(iframe_ele)
[!note]
xpath 定位【xpath 前要加上 x:】
# 定义XPath表达式
xpath = 'x://*[@id="root"]/div[1]/section[1]/div/div/div/div/div[2]/div[2]/div[2]/form/div[1]/div/div'
# 使用XPath查找元素
div元素 = tab.ele(xpath)
# 打印找到的元素
print(div元素)
[!note]
获取元素属性
# 获取元素的各种属性
print(AI助手元素.text) # 获取元素文本内容
print(AI助手元素.link) # 获取元素链接地址
print(AI助手元素.attr("class")) # 获取元素的class属性【统一获取对应属性】
print(AI助手元素.html) # 获取元素的HTML代码
# 获取元素的位置信息
print(AI助手元素.rect.viewport_midpoint) # 获取视口坐标
print(AI助手元素.rect.screen_location) # 获取屏幕坐标
# 保存图片元素
baidu_logo = tab.ele('@id=s_lg_img') # 获取百度logo元素
baidu_logo.save(name='baidu_logo') # 保存图片到当前目录下
鼠标拖动验证码
# 上下左右
tab.actions.up()
tab.actions.down()
tab.actions.left()
tab.actions.right()
# 鼠标移动至滑块元素上按住滑块移动鼠标,最后释放鼠标
tab.actions.move_to(滑块).hold(滑块).move(offset_x=平移距离,offset_y=4,duration=2.5).release()
数据监听(抓包)
[!note]
抓包
# 开始监听指定URL模式的网络请求
tab.listen.start('spa1.scrape.center/api/movie') # 开始监听,指定获取包含该文本的数据包。注意:要先开启监听再打开对应页面
# 访问目标网站
tab.get('https://spa1.scrape.center/') # 访问网址,这行产生的数据包不监听
# 遍历获取到的数据包
for packet in tab.listen.steps():
pprint(packet.response.body) # 打印数据包url
操纵浏览器插件
[!note]
操纵浏览器插件【当作网页】
from DrissionPage import Chromium, ChromiumOptions
# 配置Chrome选项并添加插件
options = ChromiumOptions().add_extension(r'C:\Users\Administrator\Desktop\插件\Proxy SwitchyOmega 2.5.21') # 插件下载 https://www.crxsoso.com/
# 连接浏览器并获取浏览器对象
browser = Chromium(options)
# 获取标签页对象并打开网址
tab = browser.new_tab('https://www.baidu.com/')
tab.wait(3)
插件url = 'chrome-extension://padekgcemlokbadohgkifijomclgjgif/options.html'
tab.get(插件url)
通过检查获取地址
线程池+消息队列使用
from DrissionPage import Chromium, ChromiumOptions
import threading
import concurrent.futures
import time
import queue
from loguru import logger
# 创建一个队列用于存储标签页对象
标签页队列 = queue.Queue()
# 启动浏览器并获取标签页对象
co = ChromiumOptions().use_system_user_path()
browser = Chromium(co)
tab_instance = browser.new_tab('https://www.bigee.cc/book/20233/')
# 获取所有章节链接
链接列表 = [i.link for i in tab_instance('t:div@@class=listmain').eles('t:a') if 'book' in i.link]
print(链接列表)
链接列表 = 链接列表 + [None for i in range(10)] # 添加结束标记
def 打开网页(url):
if url is None:
return
t = browser.new_tab(url)
标签页队列.put(t)
logger.info(f'{t.title} 该网页已经打开')
time.sleep(2)
def 采集网页数据():
while True:
t = 标签页队列.get()
if t is None:
break
logger.error(t.ele('#chaptercontent').text[:15])
threading.Thread(target=t.close).start()
logger.warning(f'{t.title} 已经完成抓取...')
print('数据采集完成')
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor1, \
concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor2:
# 提交任务到网页打开线程池
pool1 = [executor1.submit(打开网页, chapter_url) for chapter_url in 链接列表]
time.sleep(6)
# 提交任务到网页数据采集线程池
pool2 = [executor2.submit(采集网页数据) for i in range(3)]
监听 console 数据
from DrissionPage import Chromium
# 获取最新标签页
tab = Chromium().latest_tab
# 启动控制台
tab.console.start()
# 在控制台执行JavaScript代码
tab.run_js('console.log("DrissionPage");')
# 等待并获取控制台输出
data = tab.console.wait()
# 打印输出内容
print(data.text) # 输出: DrissionPage
# 所有控制台信息
for data in tab.console.messages:
logger.info(data.text)
# 立即调用的箭头函数形式
response_code = '''
(async ()=>{
var res = await fetch_code
let data = await res.text();
console.log(data);
})()
'''.replace('fetch_code', fetch_code)
# 执行JavaScript代码
tab.run_js(response_code)
# 等待3秒
tab.wait(3)
# 获取并记录控制台输出
logger.info(tab.console.wait().text)
弹窗处理
from DrissionPage import Chromium, ChromiumOptions
# 连接浏览器并获取浏览器对象
co = ChromiumOptions()
browser = Chromium(co)
# 自动点击确认按钮
tab = browser.new_tab('https://zh.javascript.info/alert-prompt-confirm')
# 确认
tab.eles('t:a@@title=运行')[-2].click()
tab.wait(3)
tab.handle_alert(accept=True)
# 取消
tab.eles('t:a@@title=运行')[-2].click()
tab.wait(3)
tab.handle_alert(accept=False)
# 提取信息
tab.eles('t:a@@title=运行')[-2].click()
tab.wait(3)
提示框信息 = tab.handle_alert(accept=False)
print(提示框信息)
input('Press any key to quit')
上传和下载
from DrissionPage import Chromium, ChromiumOptions
# 连接浏览器并获取浏览器对象
co = ChromiumOptions()
browser = Chromium(co)
# ----------------上传部分
tab = browser.new_tab('https://picui.cn/upload')
# 设置要上传的文件路径
upload_file_path = r'C:\Users\Administrator\Desktop\BingWallpaper.jpg'
# 点击选择图片按钮并上传文件
tab.ele('选择图片').click_to_upload(upload_file_path)
# 等待3秒
tab.wait(3)
# 点击上传按钮
tab.ele('@text():上传这张图片').click()
# 等待上传成功提示显示
tab.wait.ele_displayed('上传成功')
print('已上传')
tab.wait(3)
# 获取上传后的图片URL
上传图片url = tab.ele('@data-tab-url').text
print(上传图片url)
# ----------------下载部分
tab.download(上传图片url,rename='下载的图片.jpg',goal_path='tmp')#【文件夹不存在自动创建】
# ----------------按钮触发下载
tab2 = Chromium().latest_tab
# 访问网页
tab2.get('https://im.qq.com/pcqq/index.shtml')
# 获取按钮元素
ele = tab2('全新体验版下载')
# 等待按钮元素生成
ele.wait.has_rect()
# 点击按钮触发下载,并设置下载路径和文件名
mission = ele.click_to_download(save_path='tmp2', rename='QQ.exe')
# 等待下载任务完成
mission.wait()
浏览器隐藏
[!note]
无头模式
co.headless(True)
[!note]
设置位置
co.set_argument('--window-position=32000,32000') # 在屏幕外
[!note]
官方方法【创建新标签页还会显示出来】
browser.latest_tab.set.window.hide()
模拟人类鼠标轨迹
#!/usr/bin/env python
# -*- coding:utf-8 -*-#
import numpy as np
import math
import random
class bezierTrajectory:
def _bztsg(self, dataTrajectory):
lengthOfdata = len(dataTrajectory)
def staer(x):
t = ((x - dataTrajectory[0][0]) / (dataTrajectory[-1][0] - dataTrajectory[0][0]))
y = np.array([0, 0], dtype=np.float64)
for s in range(len(dataTrajectory)):
y += dataTrajectory[s] * ((math.factorial(lengthOfdata - 1) / (
math.factorial(s) * math.factorial(lengthOfdata - 1 - s))) * math.pow(t, s) * math.pow(
(1 - t), lengthOfdata - 1 - s))
return y[1]
return staer
def _type(self, type, x, numberList):
numberListre = []
pin = (x[1] - x[0]) / numberList
if type == 0:
for i in range(numberList):
numberListre.append(i * pin)
if pin >= 0:
numberListre = numberListre[::-1]
elif type == 1:
for i in range(numberList):
numberListre.append(1 * ((i * pin) ** 2))
numberListre = numberListre[::-1]
elif type == 2:
for i in range(numberList):
numberListre.append(1 * ((i * pin - x[1]) ** 2))
elif type == 3:
dataTrajectory = [np.array([0,0]), np.array([(x[1]-x[0])*0.8, (x[1]-x[0])*0.6]), np.array([x[1]-x[0], 0])]
fun = self._bztsg(dataTrajectory)
numberListre = [0]
for i in range(1,numberList):
numberListre.append(fun(i * pin) + numberListre[-1])
if pin >= 0:
numberListre = numberListre[::-1]
numberListre = np.abs(np.array(numberListre) - max(numberListre))
biaoNumberList = ((numberListre - numberListre[numberListre.argmin()]) / (
numberListre[numberListre.argmax()] - numberListre[numberListre.argmin()])) * (x[1] - x[0]) + x[0]
biaoNumberList[0] = x[0]
biaoNumberList[-1] = x[1]
return biaoNumberList
def getFun(self, s):
'''
:param s: 传入P点
:return: 返回公式
'''
dataTrajectory = []
for i in s:
dataTrajectory.append(np.array(i))
return self._bztsg(dataTrajectory)
def simulation(self, start, end, le=1, deviation=0, bias=0.5):
'''
:param start:开始点的坐标 如 start = [0, 0]
:param end:结束点的坐标 如 end = [100, 100]
:param le:几阶贝塞尔曲线,越大越复杂 如 le = 4
:param deviation:轨迹上下波动的范围 如 deviation = 10
:param bias:波动范围的分布位置 如 bias = 0.5
:return:返回一个字典equation对应该曲线的方程,P对应贝塞尔曲线的影响点
'''
start = np.array(start)
end = np.array(end)
cbb = []
if le != 1:
e = (1 - bias) / (le - 1)
cbb = [[bias + e * i, bias + e * (i + 1)] for i in range(le - 1)]
dataTrajectoryList = [start]
t = random.choice([-1, 1])
w = 0
for i in cbb:
px1 = start[0] + (end[0] - start[0]) * (random.random() * (i[1] - i[0]) + (i[0]))
p = np.array([px1, self._bztsg([start, end])(px1) + t * deviation])
dataTrajectoryList.append(p)
w += 1
if w >= 2:
w = 0
t = -1 * t
dataTrajectoryList.append(end)
return {"equation": self._bztsg(dataTrajectoryList), "P": np.array(dataTrajectoryList)}
def trackArray(self, start, end, numberList, le=1, deviation=10, bias=0.5, type=3, cbb=2, yhh=10):
'''
:param start:开始点的坐标 如 start = [0, 0]
:param end:结束点的坐标 如 end = [100, 100]
:param numberList:返回的数组的轨迹点的数量 numberList = 150
:param le:几阶贝塞尔曲线,越大越复杂 如 le = 4
:param deviation:轨迹上下波动的范围 如 deviation = 10
:param bias:波动范围的分布位置 如 bias = 0.5
:param type:0表示均速滑动,1表示先慢后快,2表示先快后慢,3表示先慢中间快后慢 如 type = 1
:param cbb:在终点来回摆动的次数
:param yhh:在终点来回摆动的范围
:return:返回一个字典trackArray对应轨迹数组,P对应贝塞尔曲线的影响点
'''
s = []
fun = self.simulation(start, end, le, deviation, bias)
w = fun['P']
fun = fun["equation"]
if cbb != 0:
numberListOfcbb = round(numberList*0.2/(cbb+1))
numberList -= (numberListOfcbb*(cbb+1))
xTrackArray = self._type(type, [start[0], end[0]], numberList)
for i in xTrackArray:
s.append([i, fun(i)])
dq = yhh/cbb
kg = 0
ends = np.copy(end)
for i in range(cbb):
if kg == 0:
d = np.array([end[0] + (yhh-dq*i), ((end[1]-start[1])/(end[0]-start[0]))*(end[0]+(yhh-dq*i)) +(end[1]-((end[1]-start[1])/(end[0]-start[0]))*end[0]) ])
kg = 1
else:
d = np.array([end[0] - (yhh - dq * i), ((end[1]-start[1])/(end[0]-start[0]))*(end[0]-(yhh-dq*i)) +(end[1]-((end[1]-start[1])/(end[0]-start[0]))*end[0]) ])
kg = 0
print(d)
y = self.trackArray(ends, d, numberListOfcbb, le=2, deviation=0, bias=0.5, type=0, cbb=0, yhh=10)
s += list(y['trackArray'])
ends = d
y = self.trackArray(ends, end, numberListOfcbb, le=2, deviation=0, bias=0.5, type=0, cbb=0, yhh=10)
s += list(y['trackArray'])
else:
xTrackArray = self._type(type, [start[0], end[0]], numberList)
for i in xTrackArray:
s.append([i, fun(i)])
return {"trackArray": np.array(s), "P": w}
class humanMouse:
def __init__(self):
self.aa = bezierTrajectory()
def getRandomTrackArray(self, 水平移动,垂直移动=10,分割=10):
randomTrackArray = self.aa.trackArray(start=[0,0],end=[水平移动,垂直移动],numberList =分割,deviation = 5,bias = 0.5,type = 3,le=3)
return randomTrackArray['trackArray']
def getRandomTrackSpacingArray(self, 水平移动:int=100,垂直移动:int=10,分割:int=10):
"""
生成一个随机轨迹间距数组。
参数:
- 水平移动 (int): 水平方向的移动距离。
- 垂直移动 (int, 可选): 垂直方向的移动距离,默认为10。
- 分割 (int, 可选): 分割的段数,即生成的数组长度,默认为10。
返回:
- list: 包含随机轨迹间距的数组。
"""
# 生成一个随机轨迹数组
randomTrackArray = self.aa.trackArray(start=[0,0],end=[水平移动,垂直移动],numberList =分割,deviation = 5,bias = 0.5,type = 3,le=3)
# 获取轨迹数组
bb= randomTrackArray['trackArray']
# 将第一个点的水平和垂直间距设置为0
cc=[[0,0]]
# 获取数组长度
length = len(bb)
# 遍历数组,计算每个点与前一个点的水平和垂直间距
for i in range(1,length):
x1,y1=bb[i-1]
x2,y2=bb[i]
cc.append([float(x2-x1),float(y2-y1)])
return cc
if __name__ == '__main__':
# aa=bezierTrajectory()
# 坐标数组=aa.trackArray(start=[0,10],end=[50,10],numberList =30,deviation = 5,bias = 0.5,type = 3,le=3)
# print('生产的轨迹坐标',坐标数组['trackArray'])
from pprint import pprint
# zuobiao=坐标数组['trackArray']
轨迹点坐标数组=humanMouse().getRandomTrackArray(147,10,20)
pprint(轨迹点坐标数组)
import matplotlib.pyplot as plt
# 假设这是你的坐标列表
coordinates = 轨迹点坐标数组
# 分别提取 x 和 y 坐标
x, y = zip(*coordinates)
# 创建一个散点图
plt.scatter(x, y)
# 设置图表标题和标签
plt.title('Scatter Plot of Coordinates')
plt.xlabel('X axis')
plt.ylabel('Y axis')
# 显示网格
plt.grid()
# 展示图形
plt.show()
自动化 hook
from DrissionPage import Chromium
from pathlib import Path
browser = Chromium(8976)
#创建空白页
tab=browser.new_tab()
def get_js_code(name:str):
# 获取当前脚本所在的目录路径
script_dir = Path(__file__).resolve().parent
# 构建js文件的完整路径
js_file = str(script_dir / name)
# 读取并获取js文件内容
with open(js_file, 'r', encoding='utf-8') as f:
js_content = f.read()
# print(js_content)
return js_content
#插入初始化脚本
js=get_js_code('hook_tool.js')
tab.add_init_js(js)
# tab.add_init_js('禁用alert()')
# tab.add_init_js('禁用debugger()')
# tab.add_init_js('重写fetch2()')
# tab.add_init_js('重写xhrSend(to_json=true)')
# tab.add_init_js('重写WebSocket原型()')
# input('已经运行注入hook脚本,输入任意键继续...')
tab.wait(4)
url2='https://www.runoob.com/try/try.php?filename=tryjs_alert'
url7='https://gitcode.com/'
yy='https://www.yy.com/54880976/54880976?tempId=16777217'
# url21='http://tool.pfan.cn/websocket'
# ws://124.222.224.186:8800
tab.get(yy)
# tab.wait(10).run_js('重写WebSocketOnMessage();')
input('输入任意键退出..')
Hook_js
// 定义禁用alert的函数
function 禁用alert() {
try {
// 保存原始的alert函数
const originalAlert = window.alert;
// 重写alert
window.alert = function (msg) {
console.log('[Hook] Alert被拦截:', msg);
// return originalAlert.call(window, msg); // 如果想显示弹窗,取消注释这行
return true;
};
console.log('[Hook] Alert已被禁用');
} catch (error) {
console.error('[Hook] 禁用Alert失败:', error);
}
}
// 保存其他原始的函数引用
const originalFetch = window.fetch;
const originalXHRSend = XMLHttpRequest.prototype.send;
const originalWebSocket = window.WebSocket;
// 重写fetch
window.fetch = async function (...args) {
console.log('[Hook] Fetch请求:', args);
const response = await originalFetch.apply(this, args);
const clone = response.clone();
clone.text().then(text => {
console.log('[Hook] Fetch响应:', text);
});
return response;
}
// 定义重写XHR的函数
function 重写xhrSend(to_json = false) {
try {
// 保存原始的send方法
const originalSend = XMLHttpRequest.prototype.send;
// 重写send方法
XMLHttpRequest.prototype.send = function (data) {
console.log('[Hook] XHR请求:', {
url: this._url,
method: this._method,
data: to_json ? JSON.parse(data) : data
});
// 监听响应
this.addEventListener('load', function () {
let response = this.responseText;
if (to_json) {
try {
response = JSON.parse(response);
} catch (e) {
console.log('[Hook] 响应数据不是有效的JSON');
}
}
console.log('[Hook] XHR响应:', {
url: this._url,
response: response
});
});
return originalSend.apply(this, arguments);
};
// 重写open方法以保存url和method
const originalOpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function (method, url) {
this._url = url;
this._method = method;
return originalOpen.apply(this, arguments);
};
console.log('[Hook] XHR已被重写');
} catch (error) {
console.error('[Hook] 重写XHR失败:', error);
}
}
// 重写WebSocket
window.WebSocket = function (url, protocols) {
console.log('[Hook] WebSocket连接:', url);
const ws = new originalWebSocket(url, protocols);
// 劫持send方法
const originalWSSend = ws.send;
ws.send = function (data) {
console.log('[Hook] WebSocket发送:', data);
return originalWSSend.call(this, data);
}
// 监听消息
ws.addEventListener('message', function (event) {
console.log('[Hook] WebSocket接收:', event.data);
});
return ws;
}
// 重写WebSocket onmessage处理
function 重写WebSocketOnMessage() {
WebSocket.prototype.onmessage = function (event) {
console.log('[Hook] WebSocket消息:', event.data);
}
}
console.log('[Hook] 注入完成');
完全模拟手机端
from DrissionPage import Chromium, ChromiumOptions
# 定义不同浏览器的 User Agent
user_agents = {
"Chrome_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
"Firefox_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:110.0) Gecko/20100101 Firefox/110.0",
"Safari_macOS": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Version/13.1 Safari/537.36",
"Edge_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edge/110.0.1587.57",
"Chrome_Android": "Mozilla/5.0 (Linux; Android 12; Pixel 5 Build/SP1A.210812.016) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Mobile Safari/537.36",
"Safari_iOS": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.1 Mobile/15E148 Safari/604.1",
"Opera_Windows": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 OPR/97.0.4692.71"
}
# 创建浏览器选项实例
co = ChromiumOptions()
# 设置 User Agent 为 Android Chrome
co.set_user_agent(user_agents["Chrome_Android"])
tab = Chromium(co).latest_tab
# 设置界面模式
zoom = {
"command": "Emulation.setDeviceMetricsOverride",
"parameters": {
"width": 360, # 设备的宽度(像素)
"height": 740, # 设备的高度(像素)
"deviceScaleFactor": 1, # 设置设备的缩放比例(相当于调整 DPI)
"mobile": True, # 设置是否为手机模拟(通常为 true 或 false)
"scale": 1 # 页面缩放比例(0.8 表示页面缩小到 80%)
}
}
tab.run_cdp(zoom["command"], **zoom["parameters"])
# 访问百度
tab.get("https://www.baidu.com/")
input('继续 ?')
监听 websocket
from DrissionPage import Chromium
import time
from datetime import datetime
from typing import Optional, Callable, Dict, Any
from dataclasses import dataclass
@dataclass
class WebSocketMonitorConfig:
"""WebSocket监听器配置"""
browser: Chromium
url: str
callback: Optional[Callable] = None
quit_on_q: bool = True
auto_reconnect: bool = False
reconnect_interval: int = 5
class WebSocketListener:
"""WebSocket监听器类"""
def __init__(self, config: WebSocketMonitorConfig):
self.config = config
self.browser = config.browser
self.is_running = False
def _default_message_handler(self, **kwargs: Dict[str, Any]) -> None:
"""默认的WebSocket消息处理函数"""
try:
timestamp = datetime.now().strftime('%H:%M:%S')
response = kwargs.get('response', {})
if 'payloadData' in response:
print(f'[{timestamp}] {response["payloadData"]}')
print('-' * 80)
except Exception as e:
print(f'处理消息出错: {str(e)}')
def start(self) -> None:
"""启动WebSocket监听"""
self.is_running = True
while self.is_running:
try:
self._run_monitor()
except Exception as e:
print(f'监听过程出错: {str(e)}')
if not self.config.auto_reconnect:
break
print(f'{self.config.reconnect_interval}秒后尝试重新连接...')
time.sleep(self.config.reconnect_interval)
def stop(self) -> None:
"""停止WebSocket监听"""
self.is_running = False
def _run_monitor(self) -> None:
"""运行监听器的核心逻辑"""
tab = self.browser.latest_tab
tab.run_cdp('Network.enable')
message_handler = self.config.callback if self.config.callback else self._default_message_handler
tab.driver.set_callback('Network.webSocketFrameReceived', message_handler)
tab.get(self.config.url)
print(f'开始监听 {self.config.url} 的WebSocket通信...')
if self.config.quit_on_q:
print('按q键退出监听...')
while self.is_running:
if input() == 'q':
self.stop()
break
time.sleep(0.1)
else:
while self.is_running:
time.sleep(1)
def create_websocket_listener(browser: Chromium,
url: str,
callback: Optional[Callable] = None,
quit_on_q: bool = True,
auto_reconnect: bool = False,
reconnect_interval: int = 5) -> WebSocketListener:
"""
创建WebSocket监听器的便捷函数
Args:
browser: Chromium浏览器实例
url: 要监听的网页URL
callback: 自定义的消息处理函数
quit_on_q: 是否启用按q退出功能
auto_reconnect: 是否在断开连接后自动重连
reconnect_interval: 重连间隔时间(秒)
Returns:
WebSocketListener实例
"""
config = WebSocketMonitorConfig(
browser=browser,
url=url,
callback=callback,
quit_on_q=quit_on_q,
auto_reconnect=auto_reconnect,
reconnect_interval=reconnect_interval
)
return WebSocketListener(config)
if __name__ == '__main__':
# 使用示例
url = 'https://toolin.cn/ws'
# 创建浏览器实例
browser = Chromium()
try:
# 方式1:使用便捷函数创建
listener = create_websocket_listener(
browser=browser,
url=url
)
listener.start()
# 方式2:直接使用配置类创建
# config = WebSocketMonitorConfig(browser=browser, url=url)
# listener = WebSocketListener(config)
# listener.start()
finally:
# 在主程序结束时关闭浏览器
browser.quit()
SaoDrisssionPage
[!tip]
引入
from SaoDrissionPage import SaoTab
[!note]
解析网页
# 解析网页
tab = SaoTab(tab)
[!note]
所见即所得
输入框 = tab.ele('<input id="kw" name="wd">')
[!note]
模仿人类行为
tab.human_actions.move_to(输入框).click().type('窗前明月光')
[!note]
切换移动端
tab.mobile_mode(True)
[!note]
一件解析表格
表格元素 = tab.ele("t:table",index=2)
data = tab.table_to_dict(表格元素,debug=True)
[!note]
监听 Json 数据
tab.startJsonListener()
Comments NOTHING