tags:
- python
- 爬虫
- 编程
created: 2024-06-10
循环增强
zip多层赋值
for title,png in zip(titles,pngs):
print(title,png)
列表推导式
# 列表推导式语法
[表达式 for 变量 in 可迭代对象 [if 条件]]
Python 列表推导式是一种简洁高效的创建列表的方法,结合了 for 循环和条件表达式。
优势
- 简洁: 用一行代码完成循环和条件判断。
- 高效: 生成列表比传统方法更快。
- 易读: 代码更直观易懂。
示例
1. 生成数字列表
numbers = [i for i in range(1, 11)] # [1, 2, 3, ..., 10]
even_numbers = [i for i in range(1, 11) if i % 2 == 0] # [2, 4, 6, ..., 10]
2. 矩阵转置
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
transposed_matrix = [[row[i] for row in matrix] for i in range(len(matrix[0]))] # [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
3. 字典转列表
d = {'a': 1, 'b': 2, 'c': 3}
keys = [k for k in d] # ['a', 'b', 'c']
values = [v for v in d.values()] # [1, 2, 3]
items = [(k, v) for k, v in d.items()] # [('a', 1), ('b', 2), ('c', 3)]
注意事项
- 列表推导式只适用于生成列表。
- 避免在列表推导式中使用复杂表达式。
- 对于大型数据集,可以考虑使用生成器表达式来节省内存。
类
使用类需要进行实例化操作(调用init方法)
class 类名:
def__init__(self): #初始化方法,类似构造器
self.name=name
[!tip]
python中的类同样存在继承,封装,多态
继承
class 类名(要继承的类): #继承
方法
[!note]
以双下划线开头的不会被继承
例如:self._ _Sherry=name 类似于私有化(方法同理)
调用父类
super(Sherry,self)._ _init_ _(name)#特例
self.speak() #调用父类方法(当自身不存在这个方法时)
super(Sherry,self).speak() #当自身存在这个方法时
call方法
class Sherry:
def __call__(self,*args,**kwargs):
print('caLL被调用')
S=Sherry()#实例化
S()#将类当作方法调用
#call被调用
魔法方法
clss Foo(object)
def __str__(self):
return "魔法方法"
obj = Foo()
print(obj) # 输出对象时,如果想要定制显示的内容。
文件读写
模式 | 解释 | 使用情况 |
---|---|---|
w | 写入 | 如果文件不存在我就创建 |
r | 读取 | 读取文件内容 |
a | 追加 | 把内容写入文件末尾,不存在就创建,存在就直接写 |
b | 二进制 | 写入视频,音频,图片之类的就需要加这个 |
写入 |
#f=open('文件名','模式',encoding='编码')
with open('文件名','模式',encoding='编码')as f:
f.write() #写入
读取
with open('文件名','模式',encoding='编码')as f:
content = f.read() #读取
导入工具包
random库
[!note]
import random
初始化随机数生成器(种子一样,随机数一样)
random.seed(种子)
生成随机数
random.randrange(起始,截止,步长)
libnum库
import libnum
#字符串转十进制
s="flag{123}"
print(libnum.s2n(s))
#十进制转字符串
j=1889377532526015427453
print(libnum.n2s(j))
#十六进制转字符串
n=0x666c61677b3132337d
print(libnum.n2s(n))
#字符串转十六进制
print(hex(libnum.s2n("flag{123}")))
#二进制转字符串
b='01100001011000110110001001100100'
print(libnum.b2s(b))
#字符串转二进制
s = 'acbd'
print(libnum.s2b(s))
#数字转二进制串
# n=0xfff
# print(s2b(n2s(n)))
#因数分解:返回n的所有素因子及每个素因子的个数。
print(libnum.factorize(1024))
#任意进制转为十进制
print(int('01111',2))
print(int('0x64',16))
gmpy2库
import gmpy2
# 求模幂
print gmpy2.powmod(2, 3, 5)
print gmpy2.powmod(2, 4, 5)
print gmpy2.powmod(2, 5, 5)
print
# 求逆元(常用于RSA)
p = 3
q = 7
e = 7
n = p * q
d = gmpy2.invert(e, (p - 1) * (q - 1))
print d
# 判断素数
print gmpy2.is_prime(3)
print gmpy2.is_prime(6)
print
# 欧几里德算法,判断互素
print gmpy2.gcd(2, 3)
print gmpy2.gcd(4, 6)
print
# 扩展欧几里德算法
print gmpy2.gcdext(2, 3)
print gmpy2.gcdext(4, 6)
print
# 开n次方根
print gmpy2.iroot(4, 2)
print gmpy2.iroot(28, 3)
hashlib库
#md5加密
import hashlib
x = hashlib.md5()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())
# 如果数据量很大,可以分块多次调用 update(),最后计算的结果是一样的
x = hashlib.md5()
x.update('I_love_'.encode('utf-8'))
x.update('python'.encode('utf-8'))
print("x2 = ", x.hexdigest())
#sha1
x = hashlib.sha1()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())
#sha256
x = hashlib.sha256()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())
#sha512
x = hashlib.sha512()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())
CSV库
写入CSV文件
import csv
# 数据
data = [
['Name', 'Age', 'City'],
['Alice', 30, 'New York'],
['Bob', 25, 'Los Angeles']
]
# 写入 CSV 文件
with open('example.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
高级写入
import csv
with open('example.csv', 'w', newline='') as file:
fields = ['Name', 'Age', 'City']
writer = csv.DictWriter(file, fieldnames=fields)
writer.writeheader()
writer.writerow({'Name': 'Alice', 'Age': 30, 'City': 'New York'})
writer.writerow({'Name': 'Bob', 'Age': 25, 'City': 'Los Angeles'})
读取CSV
# 读取 CSV 文件
with open('example.csv', 'r') as file:
reader = csv.reader(file)
for row in reader:
print(row)
高级读取
with open('example.csv', 'r') as file:
reader = csv.DictReader(file)
for row in reader:
print(row)
写入文件(字典写入)
import csv
datas = []
dic = {'WWWW': 'FFFF', 'MSS': '2', 'TTL': '40', 'WS': '3', 'S': 1, 'N': 1, 'D': 0, 'T': 8, 'F': 'S', 'LEN': '3C'}
header = list(dic.keys())
datas.append(dic)
with open('test.csv', 'a', newline='',encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writeheader() # 写入列名
writer.writerows(datas) # 写入数据
正常写入
import csv
rows = [
['Name', 'Age', 'City'],
['Alice', 30, 'New York'],
['Bob', 25, 'Los Angeles']
]
with open('stu_info.csv','w',newline='',encoding='gbk') as file
f_csv = csv.writer(file)
# 写入字段
f_csv.writerows(headers)
# 写入具体数据
f_csv.writerows(rows)
写入字典
# 创建cSV写对象并声明表头信息
with open('stu_info_dict.csv','w',newline='') as file:
f_csv = csv.DictWriter(file,['class_name','name','gender','phone','qq'])
# 写入表头
f_csv.writeheader()
# 写入学生信息
f_csv.writerows(rows)
urllib库
from urllib.parse import quote,unquote
#解码
unquote()
#编码
quote()
re库
import re
. 匹配换行符以外的字符
[] 里面是字符集合,匹配[]里任意一个字符
[0123456789] 匹配任意一个数字字符
[0-9] 匹配任意一个数字字符
[a-z] 匹配任意一个小写字符
[A-Za-z] 匹配任意一个英文字符
[A-Za-z0-9] 匹配任意一个数字或英文字符
[^abcd] []里的^称为脱字符,只要不属于[]中任意一个字符
^[abcd] 以[]中内的某一个字符作为开头
\ d 匹配任意一个数字字符
\ D 匹配任意一个非数字字符
\ w 匹配字母、下划线、数字中的任意一个字符
\ W 匹配非字母、下划线、数字中的任意一个字符
\ s 匹配空白符(空格、换页、换行、回车、制表),相当于[\f\n\r\t]
\ S 匹配非空白符(空格、换页、换行、回车、制表),相当于[^ \f\n\r\t]
{n} 匹配前面一种规则n次 {m,n}m-n之间 {m,}至少m个
$ 限制结尾,一般和^组合使用 匹配字符中必须完全匹配才算符合
? 可有可无 放在规则后面 例如 -?[1-9] 匹配正负数代表前面的0-多次 . ? 一般组合使用
.* 贪婪模式,匹配换行符以外的字符任意次 算一个整体
.* ? 非贪婪模式,匹配换行符以外的字符任意次 有一次算一次>
- 匹配一次到多次
.+? 匹配换行符以外的字符至少1次
| 代表或
() 1、作为一个单元 2、作为子存储
扫描整个字符串,并返回第一个正则模式成功的匹配,flags标识位用于控制正则表达式的匹配方式
Re.Search(正则,字符串,flags=0)
获取正则匹配之后的内容
Re.Search(正则,字符串,flags=0).group()
如果有多个括号时 可以使用groups将值单独储存下来
Re.Search(正则,字符串,flags=0).groups()
只匹配一次 必须从第一位开始
Re.Match(正则,字符串,flags=0)
匹配多次
Re.Findall(正则,字符串,flags=0)
flag标志位
re.I 是匹配对大小写不敏感
re.M 多行匹配,影响到^和$
re.S 使.匹配包括换行符在内的所有字符
得到一个迭代器 返回一个列表
Re.Finditer(正则,字符串,flags=0)
创建正则语法(返回对象)
mytext = re.compile()
查找
results = mytext.findall(html.text)
os库
创建文件夹
if not os.path.exists('./content'):
os.mkdir('./content')
String库
字符表
import string
print(string.printable)
#0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ \t\n\r\x0b\x0c
Z3库
[!tip]
z3中有3中类型的变量,分别是整型(Int),实型(Real)和向量(BitVec)
Int-整数型
from z3 import *
a = Int('a')#声明单个整型变量
a,b = Ints('a b')#声明多个整型变量
Real-实数型
from z3 import *
a = Real('a')#声明单个实型变量
a,b = Reals('a b')#声明多个实型变量
BitVec-向量
from z3 import *
a = BitVec('a',8) #声明单个8位的变量
a, b = BitVec('a b',8)#声明多个8位的变量
常规使用
from z3 import *
a,b = Ints('a b')
solve(a>3,b<8,2*a+b==16)
#结果 [a = 5, b = 6]
[!tip]
约束求解器:
在解决问题是很多情况下是有多个约束条件的。这个时候可以先生成一个求解器对象(Solver()),然后就可以方便的向其中添加更多的约束条件。
#!/usr/bin/python3
#-*- coding=utf-8 -*-
from z3 import *
a,b = Ints('a b')
solver = Solver()#创建一个求解器对象
solver.add(a+b==10)#用add方法添加约束条件
solver.add(a-b==6)
if solver.check() == sat: #check()方法用来判断是否有解,sat(satisify)表示满足有解
ans = solver.model() #model()方法得到解
print(ans)
#也可以用变量名作为下标得到解
print(ans[a])
else:
print("no ans!")
pillow 库
[!note]
生成图片
from PIL import Image
img = Image.new(name='RGB', size=(120,30), color=(255,255,255))
# 保存本地
with open('code.png','wb') as f:
img.save(f,format='png')
[!note]
创建画笔绘画
from PIL import Image, ImageDraw, ImageFont
# 创建画布
img = Image.new('RGB', (120,30), color=(255,255,255))
# 创建画笔
draw = ImageDraw.Draw(img)
# 画线
draw.line((0,0, 120,30), fill=(255,0,0), width=2)
# 画点
draw.point((50,50), fill=(0,0,255))
# 画矩形
draw.rectangle((10,10, 110,20), outline=(0,255,0))
# 写文字
font = ImageFont.truetype('arial.ttf', size=16)
draw.text((10,10), "验证码", font=font, fill=(0,0,0))
# 保存图片
img.save('code.png')
[!note]
图片处理操作
from PIL import Image
# 打开图片
img = Image.open('code.png')
# 调整大小
img = img.resize((200, 100))
# 旋转图片
img = img.rotate(45)
# 裁剪图片
img = img.crop((0, 0, 100, 100)) # (left, top, right, bottom)
# 图片格式转换
img = img.convert('L') # 转为灰度图
# 保存修改后的图片
img.save('new_code.png')
ctypes 库
from ctypes import *
'''
1. 基本数据类型对照表:
c_bool -> bool
c_char -> char
c_wchar -> wchar
c_byte -> int8
c_ubyte -> uint8
c_short -> int16
c_ushort -> uint16
c_int -> int32
c_uint -> uint32
c_long -> int32/64
c_ulong -> uint32/64
c_float -> float
c_double -> double
c_char_p -> char*
c_void_p -> void*
'''
'''
CDLL加载库的几种方式:
1. 加载系统libc
libc = CDLL("libc.so.6") # Linux直接加载
libc = CDLL("msvcrt") # Windows加载msvcrt
libc = CDLL(None) # 加载主程序符号表
2. 使用find_library查找库路径
from ctypes.util import find_library
libc_path = find_library('c') # 自动查找libc路径
libc = CDLL(libc_path)
3. 加载自定义动态库
mylib = CDLL("./mylib.so") # 加载当前目录的库
mylib = CDLL("/absolute/path/mylib.so") # 使用绝对路径加载
4. 加载带版本号的库
libpthread = CDLL("libpthread.so.0") # 加载特定版本pthread
5. 设置加载模式
lib = CDLL("mylib.so", mode=RTLD_GLOBAL) # 全局符号表可见
'''
# 示例1:加载动态链接库
libc = CDLL("libc.so.6") # Linux
# libc = CDLL("msvcrt") # Windows
# 示例2:调用C函数
libc.printf(b"Hello %s\n", b"World")
# 示例3:设置函数参数类型和返回值类型
libc.time.argtypes = [c_void_p]
libc.time.restype = c_long
current_time = libc.time(None)
# 示例4:创建数组
array = (c_int * 5)(1, 2, 3, 4, 5)
# 示例5:结构体定义
class Point(Structure):
_fields_ = [
("x", c_int),
("y", c_int)
]
# 示例6:指针操作
ptr = pointer(c_int(42))
value = ptr.contents.value
# 示例7:内存分配
buffer = create_string_buffer(10) # 分配10字节
mem = malloc(100) # 分配100字节
free(mem) # 释放内存
# 示例8:函数回调
CFUNCTYPE(c_int, c_int, c_int) # 定义回调函数类型
常见使用场景:
- 系统调用
# 调用mmap
from ctypes import *
from ctypes.util import find_library
# 加载libc
libc = CDLL(find_library('c'))
# mmap 调用示例
mmap = libc.mmap
mmap.argtypes = [c_void_p, c_size_t, c_int, c_int, c_int, c_size_t]
mmap.restype = c_void_p
addr = mmap(0x1000, 0x1000, 7, 0x22, -1, 0)
- 读写内存
# 读写内存示例
def write_memory(addr, data):
memmove(addr, data, len(data))
def read_memory(addr, size):
buffer = create_string_buffer(size)
memmove(buffer, addr, size)
return buffer.raw
- shellcode 执行
# 执行shellcode示例
shellcode = b"\x31\xc0\x48\xbb\xd1\x9d..."
buffer = create_string_buffer(shellcode)
function = cast(buffer, CFUNCTYPE(c_void_p))
function()
- 结构体操作
# 复杂结构体示例
class ComplexStruct(Structure):
_pack_ = 1 # 设置对齐方式
_fields_ = [
("id", c_int),
("name", c_char * 32),
("data", c_ubyte * 64)
]
# 使用结构体
obj = ComplexStruct()
obj.id = 1
obj.name = b"test"
常用方法
求余数
divmod(num)
# 返回:(商,余数)
使用c 中整形变量
import ctypes
v1 = 11119229123812344
v2 = 11119229123812344 << 5
data = ctypes.c_int32(v2).value
print(data) #-1019511040
常用装饰器
[!note]
将类方法转化为类属性调用
class Sherry
@property
def Sherry_qq(self):
return'3193871709'
S=Sherry()
S.Sherry_qq
[!note]
设置和删除
@Sherry_qq.setter #设置装饰器
def Sherry_qq(self,value):
self.qq=value
@Sherry_qq.deleter
def Sherry_qq(self):
del self.qq
[!note]
类方法,表示属于类,不属于示例(可以直接使用)
@classmethod
[!note]
静态方法 我当前的方法不属于类是一个单独的函数
@staticmethod
def get_num():
print(random.random())
自制装饰器
[!tip]
高阶函数----把函数作为参数
参数是函数返回内部函数内部函数里边调用函数(传递参数)[!note]
计时装饰器---计算函数执行时间
def Timer(func):
start=time.time()
def inner(*args,**kwargs):
print('传递的参数:',*args,**kwargs)
func (*args,**kwargs)
end=time.time()
print('耗费时间:'+end-start)
return inner
多线程
[!tip]
import threading
创建线程(非守护线程)
#函数不加括号
t1=threading.Thread(target=要执行的函数,args=(函数参数,))
启动线程
t1.start()
守护线程
[!note]
线程都是你主线程的附庸你结束我就结束了
t1.daemon #判断是否时守护线程
t1.setDaemon(True) #在start之前,设置守护线程
打印线程名
threading.currentThread()
threading.main_thread() #返回主线程名
等待线程执行
t1.join()
多线程上锁
创建线程锁
Lock=threading.Lock()
#函数内部
Lock.acquire() #获取锁
Lock.release() #释放锁
多进程
[!tip]
import multiprocessing
计算cpu内核数
print(multiprocessing.cpu_count())
创建进程
p1=multiprocessing.Process(target=num,args=(i,))
启动进程
p1.start()
堵塞进程
p1.join()
进程池
我在一开始就创建好一个袋子 N个进程哪个空闲哪个就拿出来直接用
申明进程池
Pools=multiprocessing.Pool(进程数量)
同步安排进程(必须执行完一个任务,才进行下一个)
Pools.apply(函数,args=(i,))
异步安排进程(不必等待任务结束)
Pools.apply_async(函数,args=(i,))
进程池不在接收新的任务
Pools.close()
阻塞进程
Pools.join()
多进程代理数据共享
li=multiprocessing.Manager().list()
#上锁
Lock=multiprocessing.Manager().Lock()
#获取锁
lock.acquire()
#释放锁
lock.release()
协程异步(一般爬虫使用)
[!tip]
import asyncio
利用I0等待时间去执行其他任务
协程本身不会多个并发需要我们包装成任务
标记异步任务
async def num(name):
print('{} start'.format(name))
time.sleep(2)
print('{} end'.format(name))
等待
await asyncio.sleep(等待事件秒)
运行协程对象
asyncio.run(num(1))
#一般先写一个开始的协程任务(await运行协程任务 等待 会一直阻塞到结果返回)
封装成任务(直接运行)
t1=asyncio.create_task(num(1))
#一般放入列表内(使用gather直接放入协程)
t1=[num(1),num(2),num(3)]
await asyncio.gather(*t1)
#wait要加
t1=[asyncio.create_task(num(1)),
asyncio.create_task(num(2)),
asyncio.create_task(num(3))]
await asyncio.wait(t1)
PYC文件
[!tip]
pyc文件
:是由Python文件
经过编译后
所生成的文件,它是一种字节码 byte code
,因此我们直接查看就是乱码
的,也对源码起到一定的保护作用,但是这种字节码byte code
是可以反编译的
[!note]
反编译
uncompyle6 C:\Users\Sherry\Desktop\ezpy.pyc>ezpy.py
python应用反编译
python pyinstxtractor.py python.exe
websocket 链接
from websocket import WebSocketApp
def on_open(ws):
print("on_open", ws)
def on_message(ws, message):
# 回调函数,直接接收到的消息信息
print("on_message", message)
def on_error(ws, message):
print("on_error", ws, message)
def on_close(ws, *args, **kwargs):
pass
def run():
# 向持有发送Web请求链接
ws = WebSocketApp(
url="...",
header={},
cookie="",
on_open=on_open,
on_message=on_message, # 抖音只给我们发送数据
on_error=on_error,
on_close=on_close,
)
ws.run_forever()
if __name__ == '__main__':
run()
爬虫
同步 | 异步 | session同步 | 请求量异步(多) | |
---|---|---|---|---|
requests>httpx | httpx>requests | httpx>requests | aiohttp>httpx |
请求头
关键参数
英文 | 中文释义 |
---|---|
Cookie | 身份信息 |
User-Agent | 浏览器信息 |
Referer | 防盗链 |
Request库
[!tip]
import requests
发起get请求
#返回对象 params是问号之后的参数(字典)
html = requests.get(url,headers,params)
发起post请求
#返回对象 data为参数
html = requests.post(url,headers,data)
获取状态码
html.status_code #返回状态码
获取响应头
html.headers
获取文本形式
html.text
获取二进制形式
html.content
获取JSON形式
html.json()
设置编码形式
html.encoding='utf-8'
进阶
[!note]
session,公有容器,以它为基础发起请求共用里面的东西,参数隔离
每一个基于session得请求是独立的
如果覆盖了共有的信息,那就会更新,如果共有的没有,那就添加
声明session(等于有了公有的requests)
session = requests.Session()
设置公有属性
#例子
session.headers = {'common':'Sherry'}
html1=session.get('https://httpbin.org/headers',headers={'html1':"my headers"}) #参数隔离,头部多一个html1
print(html1.text)
html2=session.get('https://httpbin.org/headers')
print(html2.text)
过掉ssl证书验证
requests.get('https://kennethreitz.org', verify=False)
提交多个文件
url 'https://httpbin.org/post'
multiple_files = [
('images',('foo.png',open('foo.png','rb'),'image/png')),
('images',('bar.png',open('bar.png','rb'),'image/png'))]
r requests.post(url,files=multiple_files)
r.text
代理设置
proxies =
'http':'http://127.0.01:指定端口', #本机代理
'https':'http://10.10.1.10:1080', #当使用本机代理时两个写成一样的
}
requests.get('http://example.org',proxies=proxies)
httpx库
[!tip]
import httpx完美支持HTTP/2. 可以写异步爬虫 操作和requests一样
发起get请求
r = httpx.get(url)
发起post请求
#返回对象 data为参数
r = requests.post(url,headers,data)
获取状态码
r.status_code
获取响应头部
r.headers
设置代理
proxies={
"http://":"http://ocalhost:7890",
"https://":"http://Localhost:7890",}
html=httpx.get('https://www.pythonhttpx.org/',proxies=proxies)
发送http/2请求
client=httpx.Client(http2=True,proxies)
html=client.get('https://www.pythonhttpx.org/')
异步爬虫
import httpx
import asyncio
async def spider(num):
print('启动', num)
headers = {
'Referer':'https://cn.bing.com/search?',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
}
async with httpx.AsyncClient(http2=True) as client: # 建立异步
# await阻塞等待
html = await client.get('https://cn.bing.com/search?q=1231321',headers=headers)
print(html.text)
async def main():
await asyncio.gather(*[spider(1), spider(2), spider(3)])
if __name__ == '__main__':
asyncio.run(main())
aiohttp异步请求库
[!tip]
import aiohttp
import asyncio
import aiohttp
import asyncio
headers = {
'Referer': 'https://cn.bing.com/search?',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
}
proxy='http://localhost:7890' #只支持http代理
async def main():
async with aiohttp.ClientSession(headers=headers) as client: # Session共有属性
html = await client.get('https://cn.bing.com/search?q=1231321',proxy)
print(await html.text())
html.close()
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main()) # 新版驱动需要设置成老板驱动 不然会报错
# loop = asyncio.get_event_loop() #老版本驱动
# loop.run_until_complete(main())
bs4解析库
[!tip]
from bs4 import BeautifulSoup
解析html
soup = BeautifulSoup(html.text,'lxml') #使用lxml解析器进行解析
使用css选择器(返回列表)
soup.select('选择器')
获取所有文本
result.text
获取标签属性(以字典的形式取出)
result['href'] #取出href属性的值
查询在文档中符合条件的所有标签
res_1 = soup.find_all('b')
查询参数
res = soup.find_all(class_='sister')
按照标签包含的文本信息进行匹配提取
res_8 = soup.find_all(string='Lacie')
xpath库
[!tip]
from lxml import etree
[!note]
// 从当前节点或者当前节点得子节点开始查找
/ 从当前节点开始查找
#例子 @属性 contains包含 start-with以什么开头
//div[@class="result c-container xpath-log new-pmd"] #严格匹配
//div[contains(@class,"c-container xpath-log new-pmd")] #包含
//div[start-with(@class,"result")] #以result开头
解析html(生成解析对象)
soup = etree.HTML(html.text)
xpath查找(返回对象)
soup.xpath('//ol[@id="b_results"]/li/h2/a')
获取当前节点文本
/text()
获取当前节点以及节点之下的节点中的文字
/string()
获取属性值(返回列表)
# 获取src
i.xpath("//img/@src")
获取除外标签
li_list tree.xpath('//li[@class="item-1"][position()>1]')
获取最后一个页面
li_last tree.xpath('//li[@class="item"][last()]/a/text()')
将 element 对象转为字符串对象
html_str = etree.tostring(tree).decode()
jsonpath 库
[!tip]
from jsonpath import jsonpath
#例子 $根节点 []下标/过滤器 .子节点 ..任意层级子节点
$.store.book[0] #获取第一本书
$..book[?(@.price<10)] #获取价格小于10的书
$.store.book[(@.length-1)] #获取最后一本书
解析 JSON(生成解析对象)
# 方式1:从字符串解析
data = json.loads(json_str)
# 方式2:从文件解析
with open('data.json') as f:
data = json.load(f)
基本语法示例
# 获取所有书籍
result = jsonpath(data, '$.store.book[*]')
# 获取所有作者
authors = jsonpath(data, '$..author')
# 获取所有价格
prices = jsonpath(data, '$..price')
# 获取特定索引的书
book = jsonpath(data, '$.store.book[0]')
条件过滤
# 价格大于10的书
expensive_books = jsonpath(data, '$.store.book[?(@.price > 10)]')
# 包含特定属性的节点
has_isbn = jsonpath(data, '$.store.book[?(@.isbn)]')
通配符和切片
# 获取所有属性
all_properties = jsonpath(data, '$.store.book[*].*')
# 获取前两本书
first_two = jsonpath(data, '$.store.book[0:2]')
# 获取最后一本书
last_book = jsonpath(data, '$.store.book[-1:]')
多条件组合
# 价格大于10且为小说类型的书
result = jsonpath(data, '$.store.book[?(@.price>10 && @.category=="fiction")]')
获取节点长度
# 获取书籍数量
length = len(jsonpath(data, '$.store.book[*]'))
Selenium
[!note]
添加 cookie
# 导入 webdriver 用于浏览器自动化
from selenium import webdriver
# 初始化 Chrome 浏览器实例
browser = webdriver.Chrome()
# 访问百度首页
browser.get('https://www.baidu.com')
# 添加两个 cookie
# 添加用户名 cookie,值为"安娜"
browser.add_cookie({'name': 'username', 'value': '安娜'})
# 添加性别 cookie,值为"女"
browser.add_cookie({'name': 'gender', 'value': '女'})
# 获取所有 cookie 并存储在列表中
cookie_list = browser.get_cookies()
# 将 cookie 转换为字典格式,key 为 name,value 为对应的值
cookie_dict = {x["name"]: x["value"] for x in cookie_list}
# 打印所有 cookie
print(cookie_dict)
# 获取并打印指定的 cookie
print(browser.get_cookie('username')) # 获取 username 对应的 cookie
print(browser.get_cookie('gender')) # 获取 gender 对应的 cookie
# 关闭浏览器
browser.quit()
1. 浏览器操作
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
# 初始化浏览器
driver = webdriver.Chrome() # 基础初始化
driver = webdriver.Chrome('./chromedriver.exe') # 指定驱动路径
# 浏览器选项配置
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--incognito') # 无痕模式
chrome_options.add_argument('--window-size=1920,1080') # 设置窗口大小
# 设置UA
user_agent ='abc'
options.add_argument(f'user-agent={user_agent}')
# 是否进制图片加载
prefs {"profile.managed_default_content_settings.images":2}
options.add_experimental_option(argument:'prefs',prefs)
#隐藏webdriver
options.add_argument('--disable-blink-features=AutomationControlled')
driver = webdriver.Chrome(options=chrome_options)
# 基础操作
driver.get(url) # 访问网页
driver.refresh() # 刷新页面
driver.back() # 后退
driver.forward() # 前进
driver.maximize_window() # 最大化窗口
driver.minimize_window() # 最小化窗口
driver.close() # 关闭当前标签页
driver.quit() # 关闭浏览器
2. 元素定位
from selenium.webdriver.common.by import By
# 八种定位方式
driver.find_element(By.ID, "id") # ID定位
driver.find_element(By.NAME, "name") # name定位
driver.find_element(By.CLASS_NAME, "class") # class定位
driver.find_element(By.TAG_NAME, "div") # 标签定位
driver.find_element(By.LINK_TEXT, "链接文字") # 链接文字定位
driver.find_element(By.PARTIAL_LINK_TEXT, "部分文字") # 部分链接文字
driver.find_element(By.CSS_SELECTOR, "#id .class") # CSS选择器
driver.find_element(By.XPATH, "//div[@id='id']") # XPATH定位
# 查找多个元素
elements = driver.find_elements(By.CLASS_NAME, "class") # 返回列表
3. 元素操作
element = driver.find_element(By.ID, "id")
# 基础操作
element.click() # 点击
element.send_keys("文本") # 输入文本
element.clear() # 清空输入
element.submit() # 提交表单
# 获取元素信息
element.text # 获取文本内容
element.get_attribute("href") # 获取属性值
element.is_displayed() # 元素是否可见
element.is_enabled() # 元素是否可用
element.is_selected() # 元素是否被选中
element.location # 元素位置
element.size # 元素大小
4. 等待机制
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
# 显式等待
wait = WebDriverWait(driver, timeout=10)
element = wait.until(
EC.presence_of_element_located((By.ID, "id"))
)
# 常用的expected_conditions条件
EC.presence_of_element_located() # 元素存在
EC.visibility_of_element_located() # 元素可见
EC.element_to_be_clickable() # 元素可点击
EC.text_to_be_present_in_element() # 文本出现在元素中
EC.title_contains() # 标题包含特定文本
# 隐式等待
driver.implicitly_wait(10) # 设置全局等待时间
5. 框架切换
# iframe切换
driver.switch_to.frame("iframe_name") # 通过name切换
driver.switch_to.frame(iframe_element) # 通过元素切换
driver.switch_to.default_content() # 返回主文档
# 窗口切换
driver.switch_to.window(handle) # 切换窗口
handles = driver.window_handles # 获取所有窗口句柄
6. JavaScript 执行
# 执行JavaScript
driver.execute_script("return document.title;") # 执行JS并返回结果
#相对位置滚动
js_code 'window.scrollBy(0,700)'
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") # 滚动到底部
7. 键盘和鼠标操作
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
# 键盘操作
element.send_keys(Keys.RETURN) # 回车
element.send_keys(Keys.CONTROL + 'a') # 全选
element.send_keys(Keys.ESCAPE) # ESC键
# 鼠标操作
actions = ActionChains(driver)
actions.move_to_element(element) # 鼠标悬停
actions.click_and_hold(element) # 鼠标按住
actions.drag_and_drop(source, target) # 拖拽
actions.double_click(element) # 双击
actions.context_click(element) # 右键
actions.perform() # 执行操作
8. 异常处理
from selenium.common.exceptions import *
try:
element = driver.find_element(By.ID, "id")
except NoSuchElementException:
print("元素未找到")
except TimeoutException:
print("等待超时")
except ElementClickInterceptedException:
print("元素无法点击")
9. 截图功能
# 截图
driver.save_screenshot("screenshot.png") # 截取整个页面
element.screenshot("element.png") # 截取特定元素
10. Cookie 操作
# Cookie操作
driver.add_cookie({"name": "foo", "value": "bar"}) # 添加Cookie
driver.get_cookies() # 获取所有Cookie
driver.delete_all_cookies() # 删除所有Cookie
11. 动作链接
# Selenium相关导入
from selenium import webdriver # 浏览器驱动
from selenium.webdriver.common.action_chains import ActionChains # 动作链
from selenium.webdriver.common.keys import Keys # 键盘按键
from selenium.webdriver.common.by import By # 元素定位方式
from selenium.webdriver.support.ui import WebDriverWait # 显式等待
from selenium.webdriver.support import expected_conditions as EC # 预期条件
from selenium.common.exceptions import * # 异常处理
# 系统相关导入
import time # 时间处理
import random # 随机数生成
"""
ActionChains 动作链完整示例
作用:模拟真实的用户操作,比如鼠标移动、点击、拖拽等复杂交互
"""
# 创建动作链对象
actions = ActionChains(driver)
# 1. 基础鼠标操作
actions.click(element) # 单击元素
actions.double_click(element) # 双击元素
actions.context_click(element) # 右键点击
actions.click_and_hold(element) # 按住元素不放
actions.release() # 释放按住的元素
# 2. 鼠标移动
actions.move_to_element(element) # 移动到元素上
actions.move_by_offset(x, y) # 移动指定偏移量
actions.move_to_element_with_offset(element, x, y) # 移动到元素的相对位置
# 3. 拖拽操作
actions.drag_and_drop(source, target) # 从源元素拖到目标元素
actions.drag_and_drop_by_offset(element, x, y) # 拖拽元素到指定偏移位置
# 4. 键盘操作
actions.key_down(Keys.CONTROL) # 按下键盘按键
actions.key_up(Keys.CONTROL) # 释放键盘按键
actions.send_keys('text') # 输入文本
actions.send_keys(Keys.ENTER) # 发送特殊键
# 5. 常用组合键
# Ctrl + A (全选)
actions.key_down(Keys.CONTROL).send_keys('a').key_up(Keys.CONTROL)
# Ctrl + C (复制)
actions.key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL)
# Ctrl + V (粘贴)
actions.key_down(Keys.CONTROL).send_keys('v').key_up(Keys.CONTROL)
# Ctrl + X (剪切)
actions.key_down(Keys.CONTROL).send_keys('x').key_up(Keys.CONTROL)
# Ctrl + Z (撤销)
actions.key_down(Keys.CONTROL).send_keys('z').key_up(Keys.CONTROL)
# 6. 实用场景示例
# 示例1:下拉菜单操作
def handle_dropdown_menu(driver, menu_element, submenu_element):
actions = ActionChains(driver)
actions.move_to_element(menu_element)\
.pause(1)\
.click(submenu_element)\
.perform()
# 示例2:拖拽排序
def drag_and_sort(driver, source_element, target_element):
actions = ActionChains(driver)
actions.click_and_hold(source_element)\
.pause(0.5)\
.move_to_element(target_element)\
.pause(0.5)\
.release()\
.perform()
# 示例3:签名模拟
def simulate_signature(driver, canvas):
actions = ActionChains(driver)
actions.click_and_hold(canvas)\
.move_by_offset(100, 0)\
.move_by_offset(0, 100)\
.move_by_offset(-100, 0)\
.move_by_offset(0, -100)\
.release()\
.perform()
# 示例4:文件上传操作
def upload_file(driver, upload_button, file_input):
actions = ActionChains(driver)
actions.click(upload_button)\
.pause(1)\
.send_keys(file_input)\
.send_keys(Keys.ENTER)\
.perform()
# 7. 异常处理示例
def safe_action_chain(driver, action_func):
try:
# 创建显式等待对象
wait = WebDriverWait(driver, 10)
# 执行动作链
action_func()
except ElementNotInteractableException:
print("元素不可交互")
except MoveTargetOutOfBoundsException:
print("目标元素超出边界")
except Exception as e:
print(f"发生异常: {str(e)}")
finally:
# 重置动作链
actions.reset_actions()
# 使用建议:
# 1. 每个动作链操作后必须调用perform()
# 2. 复杂操作建议加入pause()等待
# 3. 操作前确保元素可见和可交互
# 4. 使用try-except处理可能的异常
# 5. 必要时使用显式等待确保元素就绪
# 6. 操作完成后记得重置动作链
# 7. 模拟人工操作时添加随机延迟
补充
[!note]
禁用跳转重定向
response = requests.get(url='http://www.baidu.com',headers=headers,allow_redirects=False)
[!note]
查询历史请求
print(response.history)
[!note]
忽略证书错误
response requests.get(url,verify=False)
[!note]
设置超时
response requests.get(url,timeout=3)
[!note]
设置重试
from retrying import retry
import random
import requests
# 示例1:基本重试装饰器
@retry
def basic_retry_function():
"""最基础的重试示例,会一直重试直到成功"""
print("尝试执行...")
random_num = random.random()
if random_num < 0.7: # 70%的概率失败
print(f"执行失败 (随机数: {random_num})")
raise ValueError("模拟随机失败")
print(f"执行成功 (随机数: {random_num})")
return "成功"
# 示例2:设置最大重试次数和重试间隔
@retry(stop_max_attempt_number=3, wait_fixed=2000) # 最多重试3次,每次间隔2秒
def retry_with_params():
"""带参数的重试示例"""
print("尝试执行...")
if random.random() < 0.8: # 80%的概率失败
raise ConnectionError("模拟连接错误")
return "成功"
# 示例3:自定义重试条件
def should_retry(exception):
"""自定义重试条件:只在特定异常时重试"""
return isinstance(exception, requests.exceptions.RequestException)
@retry(retry_on_exception=should_retry, stop_max_attempt_number=3)
def fetch_url(url):
"""模拟网络请求的重试"""
response = requests.get(url)
response.raise_for_status()
return response.text
# 示例4:结合等待时间和重试条件
@retry(
wait_exponential_multiplier=1000, # 指数级增长的等待时间(毫秒)
wait_exponential_max=10000, # 最大等待时间10秒
stop_max_attempt_number=5 # 最多重试5次
)
def complex_operation():
"""复杂重试策略示例"""
if random.random() < 0.6:
raise TimeoutError("模拟超时")
return "操作成功"
if __name__ == "__main__":
# 测试基本重试
print("\n测试基本重试:")
try:
result = basic_retry_function()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
# 测试带参数的重试
print("\n测试带参数的重试:")
try:
result = retry_with_params()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
# 测试网络请求重试
print("\n测试网络请求重试:")
try:
result = fetch_url("https://api.github.com")
print("请求成功")
except Exception as e:
print(f"请求失败: {e}")
# 测试复杂重试策略
print("\n测试复杂重试策略:")
try:
result = complex_operation()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
[!note]
获取 cookies 对象
#返回的是RequestsCookieJar对象
print(response.cookies) # 对象
print(dict(response.cookies))
print(requests.utils.dict_from_cookiejar(response.cookies))
[!note]
json 格式展示
json_data = json.dumps(dict_data,indent=4,ensure_ascii=False)
redis
[!note]
数据存储
import redis
from typing import Any, Dict, List
import json
class RedisStorage:
def __init__(self):
# 创建Redis连接
self.redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
def set_string(self, key: str, value: str, expire_seconds: int = None) -> bool:
"""存储字符串类型数据"""
try:
self.redis_client.set(key, value, ex=expire_seconds)
return True
except Exception as e:
print(f"存储失败: {str(e)}")
return False
def set_hash(self, key: str, mapping: Dict[str, Any]) -> bool:
"""存储哈希类型数据"""
try:
self.redis_client.hset(key, mapping=mapping)
return True
except Exception as e:
print(f"存储失败: {str(e)}")
return False
def set_list(self, key: str, values: List[Any]) -> bool:
"""存储列表类型数据"""
try:
self.redis_client.rpush(key, *values)
return True
except Exception as e:
print(f"存储失败: {str(e)}")
return False
def get_string(self, key: str) -> str:
"""获取字符串类型数据"""
return self.redis_client.get(key)
def get_hash(self, key: str) -> dict:
"""获取哈希类型数据"""
return self.redis_client.hgetall(key)
def get_list(self, key: str) -> list:
"""获取列表类型数据"""
return self.redis_client.lrange(key, 0, -1)
并发爬虫
asyncio 结合 requests 协程爬虫
import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
loop = asyncio.get_event_loop()
async def get_movie_info(page):
# 计算起始位置
start = page * 25
# 发送请求
response = await loop.run_in_executor(
None,
partial(requests.get, url.format(start), headers=headers)
)
# 解析内容
soup = BeautifulSoup(response.text, 'html.parser')
movie_list = soup.find_all('div', class_='item')
movies = []
for movie in movie_list:
title = movie.find('span', class_='title').text
rating = movie.find('span', class_='rating_num').text
movies.append({
'title': title,
'rating': rating
})
return movies
async def main():
tasks = [get_movie_info(page) for page in range(10)] # 爬取10页
results = await asyncio.gather(*tasks)
# 打印结果
for page_num, movies in enumerate(results):
print(f"\n第 {page_num + 1} 页电影:")
for movie in movies:
print(f"电影: {movie['title']}, 评分: {movie['rating']}")
if __name__ == "__main__":
asyncio.run(main())
aiohttp 异步请求
import asyncio
from aiohttp import ClientSession
url = 'https://www.baidu.com'
headers = {
"User-Agent":"Mozilla/5.0 (Macintosh;Intel Mac OS X 10_15_7)I"
"AppleWebKit/537.36 (KHTML,Like Gecko)Chrome/117.0.0.0 Safari/537.36"
}
# 异步请求
async def main():
async with ClientSession() as session:
async with session.get(url,headers=headers) as response:
print(response.status)
# 获取响应内容
print(await response.text())
if __name__ == '__main__':
Loop asyncio.get_event_Loop()
Loop.run_until_complete(get_baidu())
代理
protobuf
syntax = "proto3";
package yuan;
message Person{
string name = 1;
int32 age = 2;
}
message Message{
string method = 1;
string payload = 2;
}
protoc --python_out=. douyin.proto
from p1_pb2 import Person,Message
p1 = Person()
p1.name = "yuan"
pl.age = 18
# 序列化
info = p1.SerializeToString()
print(info)
p2 Message
p2.method ="POST"
p2.payload="棒极了"
info = p2.SerializeToString()
print(info)
# 反序列化
obj = Message()
obj.ParseFromString (info)
print(obj.method)
print(obi payload)
Comments NOTHING