Python – 加强篇

Sherry 发布于 2025-03-27 44 次阅读 文章 最后更新于 2025-03-27 8545 字



tags:
- python
- 爬虫
- 编程
created: 2024-06-10


循环增强

zip多层赋值

for title,png in zip(titles,pngs):
    print(title,png)

列表推导式

# 列表推导式语法
[表达式 for 变量 in 可迭代对象 [if 条件]]

Python 列表推导式是一种简洁高效的创建列表的方法,结合了 for 循环和条件表达式。

优势

  • 简洁: 用一行代码完成循环和条件判断。
  • 高效: 生成列表比传统方法更快。
  • 易读: 代码更直观易懂。

示例

1. 生成数字列表

numbers = [i for i in range(1, 11)]  # [1, 2, 3, ..., 10]
even_numbers = [i for i in range(1, 11) if i % 2 == 0]  # [2, 4, 6, ..., 10]

2. 矩阵转置

matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
transposed_matrix = [[row[i] for row in matrix] for i in range(len(matrix[0]))]  # [[1, 4, 7], [2, 5, 8], [3, 6, 9]]

3. 字典转列表

d = {'a': 1, 'b': 2, 'c': 3}
keys = [k for k in d]  # ['a', 'b', 'c']
values = [v for v in d.values()]  # [1, 2, 3]
items = [(k, v) for k, v in d.items()]  # [('a', 1), ('b', 2), ('c', 3)]

注意事项

  • 列表推导式只适用于生成列表。
  • 避免在列表推导式中使用复杂表达式。
  • 对于大型数据集,可以考虑使用生成器表达式来节省内存。

使用类需要进行实例化操作(调用init方法)

class 类名:
    def__init__(self):  #初始化方法,类似构造器
        self.name=name

[!tip]

python中的类同样存在继承,封装,多态

继承

class 类名(要继承的类):   #继承
    方法

[!note]

以双下划线开头的不会被继承
例如:self._ _Sherry=name 类似于私有化(方法同理)

调用父类

super(Sherry,self)._ _init_ _(name)#特例
self.speak() #调用父类方法(当自身不存在这个方法时)
super(Sherry,self).speak() #当自身存在这个方法时

call方法

class Sherry:
    def __call__(self,*args,**kwargs):
        print('caLL被调用')

S=Sherry()#实例化
S()#将类当作方法调用

#call被调用

魔法方法

clss Foo(object)
    def __str__(self):
        return "魔法方法"

obj = Foo()
print(obj) # 输出对象时,如果想要定制显示的内容。

文件读写

模式 解释 使用情况
w 写入 如果文件不存在我就创建
r 读取 读取文件内容
a 追加 把内容写入文件末尾,不存在就创建,存在就直接写
b 二进制 写入视频,音频,图片之类的就需要加这个
写入
#f=open('文件名','模式',encoding='编码')
with open('文件名','模式',encoding='编码')as f:
    f.write() #写入

读取

with open('文件名','模式',encoding='编码')as f:
    content = f.read() #读取

导入工具包

random库

[!note]

import random

初始化随机数生成器(种子一样,随机数一样)

random.seed(种子)

生成随机数

random.randrange(起始,截止,步长)

libnum库

import libnum


#字符串转十进制
s="flag{123}"
print(libnum.s2n(s))

#十进制转字符串
j=1889377532526015427453
print(libnum.n2s(j))

#十六进制转字符串
n=0x666c61677b3132337d
print(libnum.n2s(n))

#字符串转十六进制
print(hex(libnum.s2n("flag{123}")))

#二进制转字符串
b='01100001011000110110001001100100'
print(libnum.b2s(b))

#字符串转二进制
s = 'acbd'
print(libnum.s2b(s))

#数字转二进制串
# n=0xfff
# print(s2b(n2s(n)))

#因数分解:返回n的所有素因子及每个素因子的个数。
print(libnum.factorize(1024))

#任意进制转为十进制
print(int('01111',2))
print(int('0x64',16))

gmpy2库

import gmpy2  

# 求模幂  
print gmpy2.powmod(2, 3, 5)  
print gmpy2.powmod(2, 4, 5)  
print gmpy2.powmod(2, 5, 5)  
print  

# 求逆元(常用于RSA)  
p = 3  
q = 7  
e = 7  
n = p * q  
d = gmpy2.invert(e, (p - 1) * (q - 1))  
print d  


# 判断素数  
print gmpy2.is_prime(3)  
print gmpy2.is_prime(6)  
print  

# 欧几里德算法,判断互素  
print gmpy2.gcd(2, 3)  
print gmpy2.gcd(4, 6)  
print  

# 扩展欧几里德算法  
print gmpy2.gcdext(2, 3)  
print gmpy2.gcdext(4, 6)  
print  

# 开n次方根  
print gmpy2.iroot(4, 2)  
print gmpy2.iroot(28, 3)

hashlib库

#md5加密
import hashlib

x = hashlib.md5()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())

# 如果数据量很大,可以分块多次调用 update(),最后计算的结果是一样的
x = hashlib.md5()
x.update('I_love_'.encode('utf-8'))
x.update('python'.encode('utf-8'))
print("x2 = ", x.hexdigest())

#sha1
x = hashlib.sha1()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())

#sha256
x = hashlib.sha256()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())

#sha512
x = hashlib.sha512()
x.update('I_love_python'.encode('utf-8'))
print("x1 = ", x.hexdigest())

CSV库

写入CSV文件

import csv

# 数据
data = [
    ['Name', 'Age', 'City'],
    ['Alice', 30, 'New York'],
    ['Bob', 25, 'Los Angeles']
]

# 写入 CSV 文件
with open('example.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

高级写入

import csv

with open('example.csv', 'w', newline='') as file:
    fields = ['Name', 'Age', 'City']
    writer = csv.DictWriter(file, fieldnames=fields)

    writer.writeheader()
    writer.writerow({'Name': 'Alice', 'Age': 30, 'City': 'New York'})
    writer.writerow({'Name': 'Bob', 'Age': 25, 'City': 'Los Angeles'})

读取CSV

# 读取 CSV 文件 
with open('example.csv', 'r') as file: 
    reader = csv.reader(file) 
    for row in reader: 
        print(row)

高级读取

with open('example.csv', 'r') as file: 
    reader = csv.DictReader(file) 
    for row in reader: 
        print(row)

写入文件(字典写入)

import csv
datas = []
dic = {'WWWW': 'FFFF', 'MSS': '2', 'TTL': '40', 'WS': '3', 'S': 1, 'N': 1, 'D': 0, 'T': 8, 'F': 'S', 'LEN': '3C'}
header = list(dic.keys())
datas.append(dic)

with open('test.csv', 'a', newline='',encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
    writer.writeheader()  # 写入列名
    writer.writerows(datas)  # 写入数据


正常写入

import csv
rows = [
    ['Name', 'Age', 'City'],
    ['Alice', 30, 'New York'],
    ['Bob', 25, 'Los Angeles']
]

with open('stu_info.csv','w',newline='',encoding='gbk') as file
    f_csv = csv.writer(file)
    # 写入字段
    f_csv.writerows(headers)
    # 写入具体数据
    f_csv.writerows(rows)

写入字典

# 创建cSV写对象并声明表头信息
with open('stu_info_dict.csv','w',newline='') as file:
    f_csv = csv.DictWriter(file,['class_name','name','gender','phone','qq'])
    # 写入表头
    f_csv.writeheader()
    # 写入学生信息
    f_csv.writerows(rows)

urllib库

from urllib.parse import quote,unquote

#解码
unquote()

#编码
quote()

re库

import re

.   匹配换行符以外的字符
[]  里面是字符集合,匹配[]里任意一个字符
[0123456789]  匹配任意一个数字字符
[0-9]  匹配任意一个数字字符
[a-z]  匹配任意一个小写字符
[A-Za-z]  匹配任意一个英文字符
[A-Za-z0-9]   匹配任意一个数字或英文字符
[^abcd]  []里的^称为脱字符,只要不属于[]中任意一个字符
^[abcd]  以[]中内的某一个字符作为开头
\ d  匹配任意一个数字字符
\ D  匹配任意一个非数字字符
\ w  匹配字母、下划线、数字中的任意一个字符
\ W  匹配非字母、下划线、数字中的任意一个字符
\ s   匹配空白符(空格、换页、换行、回车、制表),相当于[\f\n\r\t]
\ S   匹配非空白符(空格、换页、换行、回车、制表),相当于[^ \f\n\r\t]
{n}   匹配前面一种规则n次 {m,n}m-n之间 {m,}至少m个
$    限制结尾,一般和^组合使用 匹配字符中必须完全匹配才算符合
?   可有可无 放在规则后面 例如 -?[1-9] 匹配正负数

    代表前面的0-多次 . ? 一般组合使用
.*    贪婪模式,匹配换行符以外的字符任意次     算一个整体
.* ?   非贪婪模式,匹配换行符以外的字符任意次         有一次算一次>

  •    匹配一次到多次
    .+?   匹配换行符以外的字符至少1次
    |     代表或
    ()    1、作为一个单元 2、作为子存储

扫描整个字符串,并返回第一个正则模式成功的匹配,flags标识位用于控制正则表达式的匹配方式

Re.Search(正则,字符串,flags=0)

获取正则匹配之后的内容

Re.Search(正则,字符串,flags=0).group()

如果有多个括号时 可以使用groups将值单独储存下来

Re.Search(正则,字符串,flags=0).groups()

只匹配一次 必须从第一位开始

Re.Match(正则,字符串,flags=0)

匹配多次

Re.Findall(正则,字符串,flags=0)

flag标志位

re.I         是匹配对大小写不敏感

re.M      多行匹配,影响到^和$

re.S        使.匹配包括换行符在内的所有字符

得到一个迭代器 返回一个列表

Re.Finditer(正则,字符串,flags=0)

创建正则语法(返回对象)

mytext = re.compile()

查找

results = mytext.findall(html.text)

os库

创建文件夹

if not os.path.exists('./content'):
    os.mkdir('./content')

String库

字符表

import string
print(string.printable)

#0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ \t\n\r\x0b\x0c

Z3库

[!tip]

z3中有3中类型的变量,分别是整型(Int),实型(Real)向量(BitVec)

Int-整数型

from z3 import *
a = Int('a')#声明单个整型变量
a,b = Ints('a b')#声明多个整型变量

Real-实数型

from z3 import *
a = Real('a')#声明单个实型变量
a,b = Reals('a b')#声明多个实型变量

BitVec-向量

from z3 import *
a = BitVec('a',8) #声明单个8位的变量
a, b = BitVec('a b',8)#声明多个8位的变量

常规使用

from z3 import *
a,b = Ints('a b')
solve(a>3,b<8,2*a+b==16)
#结果 [a = 5, b = 6]

[!tip]

约束求解器:
在解决问题是很多情况下是有多个约束条件的。这个时候可以先生成一个求解器对象(Solver()),然后就可以方便的向其中添加更多的约束条件。

#!/usr/bin/python3
#-*- coding=utf-8 -*-
from z3 import *
a,b = Ints('a b')
solver = Solver()#创建一个求解器对象
solver.add(a+b==10)#用add方法添加约束条件
solver.add(a-b==6)
if solver.check() == sat: #check()方法用来判断是否有解,sat(satisify)表示满足有解
    ans = solver.model() #model()方法得到解
    print(ans)
    #也可以用变量名作为下标得到解
    print(ans[a])
else:
    print("no ans!")

pillow 库

[!note]

生成图片

from PIL import Image
img = Image.new(name='RGB', size=(120,30), color=(255,255,255))

# 保存本地
with open('code.png','wb') as f:
    img.save(f,format='png')

[!note]

创建画笔绘画

from PIL import Image, ImageDraw, ImageFont

# 创建画布
img = Image.new('RGB', (120,30), color=(255,255,255))

# 创建画笔
draw = ImageDraw.Draw(img)

# 画线
draw.line((0,0, 120,30), fill=(255,0,0), width=2)

# 画点
draw.point((50,50), fill=(0,0,255))

# 画矩形
draw.rectangle((10,10, 110,20), outline=(0,255,0))

# 写文字
font = ImageFont.truetype('arial.ttf', size=16)
draw.text((10,10), "验证码", font=font, fill=(0,0,0))

# 保存图片
img.save('code.png')

[!note]

 图片处理操作

 from PIL import Image

 # 打开图片
 img = Image.open('code.png')

 # 调整大小
 img = img.resize((200, 100))

 # 旋转图片
 img = img.rotate(45)

 # 裁剪图片
 img = img.crop((0, 0, 100, 100))  # (left, top, right, bottom)

 # 图片格式转换
 img = img.convert('L')  # 转为灰度图

 # 保存修改后的图片
 img.save('new_code.png')

ctypes 库

from ctypes import *

'''
1. 基本数据类型对照表:
c_bool    -> bool
c_char    -> char
c_wchar   -> wchar
c_byte    -> int8
c_ubyte   -> uint8
c_short   -> int16
c_ushort  -> uint16
c_int     -> int32
c_uint    -> uint32
c_long    -> int32/64
c_ulong   -> uint32/64
c_float   -> float
c_double  -> double
c_char_p  -> char*
c_void_p  -> void*
'''

'''
CDLL加载库的几种方式:

1. 加载系统libc
libc = CDLL("libc.so.6")                  # Linux直接加载
libc = CDLL("msvcrt")                     # Windows加载msvcrt
libc = CDLL(None)                         # 加载主程序符号表

2. 使用find_library查找库路径
from ctypes.util import find_library
libc_path = find_library('c')             # 自动查找libc路径
libc = CDLL(libc_path)

3. 加载自定义动态库
mylib = CDLL("./mylib.so")                # 加载当前目录的库
mylib = CDLL("/absolute/path/mylib.so")   # 使用绝对路径加载

4. 加载带版本号的库
libpthread = CDLL("libpthread.so.0")      # 加载特定版本pthread

5. 设置加载模式
lib = CDLL("mylib.so", mode=RTLD_GLOBAL)  # 全局符号表可见
'''

# 示例1:加载动态链接库
libc = CDLL("libc.so.6")  # Linux
# libc = CDLL("msvcrt")   # Windows

# 示例2:调用C函数
libc.printf(b"Hello %s\n", b"World")

# 示例3:设置函数参数类型和返回值类型
libc.time.argtypes = [c_void_p]
libc.time.restype = c_long
current_time = libc.time(None)

# 示例4:创建数组
array = (c_int * 5)(1, 2, 3, 4, 5)

# 示例5:结构体定义
class Point(Structure):
    _fields_ = [
        ("x", c_int),
        ("y", c_int)
    ]

# 示例6:指针操作
ptr = pointer(c_int(42))
value = ptr.contents.value

# 示例7:内存分配
buffer = create_string_buffer(10)  # 分配10字节
mem = malloc(100)                  # 分配100字节
free(mem)                         # 释放内存

# 示例8:函数回调
CFUNCTYPE(c_int, c_int, c_int)    # 定义回调函数类型

常见使用场景:

  1. 系统调用
# 调用mmap
from ctypes import *
from ctypes.util import find_library

# 加载libc
libc = CDLL(find_library('c'))

# mmap 调用示例
mmap = libc.mmap
mmap.argtypes = [c_void_p, c_size_t, c_int, c_int, c_int, c_size_t]
mmap.restype = c_void_p

addr = mmap(0x1000, 0x1000, 7, 0x22, -1, 0)
  1. 读写内存
# 读写内存示例
def write_memory(addr, data):
    memmove(addr, data, len(data))

def read_memory(addr, size):
    buffer = create_string_buffer(size)
    memmove(buffer, addr, size)
    return buffer.raw
  1. shellcode 执行
# 执行shellcode示例
shellcode = b"\x31\xc0\x48\xbb\xd1\x9d..."
buffer = create_string_buffer(shellcode)
function = cast(buffer, CFUNCTYPE(c_void_p))
function()  
  1. 结构体操作
# 复杂结构体示例
class ComplexStruct(Structure):
    _pack_ = 1  # 设置对齐方式
    _fields_ = [
        ("id", c_int),
        ("name", c_char * 32),
        ("data", c_ubyte * 64)
    ]

# 使用结构体
obj = ComplexStruct()
obj.id = 1
obj.name = b"test"

常用方法

求余数

divmod(num)
# 返回:(商,余数)

使用c 中整形变量

import ctypes
v1 = 11119229123812344
v2 = 11119229123812344 << 5 
data = ctypes.c_int32(v2).value
print(data) #-1019511040

常用装饰器

[!note]

将类方法转化为类属性调用

class Sherry
    @property
    def Sherry_qq(self):
        return'3193871709'

S=Sherry()
S.Sherry_qq

[!note]

设置和删除

@Sherry_qq.setter  #设置装饰器
def Sherry_qq(self,value):
    self.qq=value
@Sherry_qq.deleter
def Sherry_qq(self):
    del self.qq

[!note]

类方法,表示属于类,不属于示例(可以直接使用)

@classmethod

[!note]

静态方法 我当前的方法不属于类是一个单独的函数

@staticmethod
def get_num():
    print(random.random())

自制装饰器

[!tip]

高阶函数----把函数作为参数
参数是函数返回内部函数内部函数里边调用函数(传递参数)

[!note]

计时装饰器---计算函数执行时间

def Timer(func):
    start=time.time()
    def inner(*args,**kwargs):
        print('传递的参数:',*args,**kwargs)
        func (*args,**kwargs)
        end=time.time()
        print('耗费时间:'+end-start)
    return inner

多线程

[!tip]

import threading

创建线程(非守护线程)

#函数不加括号
t1=threading.Thread(target=要执行的函数,args=(函数参数,))

启动线程

t1.start()

守护线程

[!note]

线程都是你主线程的附庸你结束我就结束了

t1.daemon  #判断是否时守护线程
t1.setDaemon(True) #在start之前,设置守护线程

打印线程名

threading.currentThread()
threading.main_thread() #返回主线程名

等待线程执行

t1.join()

多线程上锁

创建线程锁

Lock=threading.Lock()
#函数内部
Lock.acquire() #获取锁
Lock.release() #释放锁

多进程

[!tip]

import multiprocessing

计算cpu内核数

print(multiprocessing.cpu_count())

创建进程

p1=multiprocessing.Process(target=num,args=(i,))

启动进程

p1.start()

堵塞进程

p1.join()

进程池

我在一开始就创建好一个袋子 N个进程哪个空闲哪个就拿出来直接用

申明进程池

Pools=multiprocessing.Pool(进程数量)

同步安排进程(必须执行完一个任务,才进行下一个)

Pools.apply(函数,args=(i,))

异步安排进程(不必等待任务结束)

Pools.apply_async(函数,args=(i,))

进程池不在接收新的任务

Pools.close()

阻塞进程

Pools.join()

多进程代理数据共享

li=multiprocessing.Manager().list()
#上锁
Lock=multiprocessing.Manager().Lock()
#获取锁
lock.acquire()
#释放锁
lock.release()

协程异步(一般爬虫使用)

[!tip]

import asyncio

利用I0等待时间去执行其他任务
协程本身不会多个并发需要我们包装成任务

标记异步任务

async def num(name):
    print('{} start'.format(name))
    time.sleep(2)
    print('{} end'.format(name))

等待

await asyncio.sleep(等待事件秒)

运行协程对象

asyncio.run(num(1))

#一般先写一个开始的协程任务(await运行协程任务 等待 会一直阻塞到结果返回)

封装成任务(直接运行)

t1=asyncio.create_task(num(1))

#一般放入列表内(使用gather直接放入协程)
t1=[num(1),num(2),num(3)]
await asyncio.gather(*t1)

#wait要加
t1=[asyncio.create_task(num(1)),
    asyncio.create_task(num(2)),
    asyncio.create_task(num(3))]
await asyncio.wait(t1)

PYC文件

[!tip]

pyc文件:是由Python文件经过编译后所生成的文件,它是一种字节码 byte code,因此我们直接查看就是乱码的,也对源码起到一定的保护作用,但是这种字节码byte code是可以反编译的

[!note]
反编译

uncompyle6 C:\Users\Sherry\Desktop\ezpy.pyc>ezpy.py

python应用反编译

python pyinstxtractor.py python.exe

websocket 链接

from websocket import WebSocketApp

def on_open(ws):
    print("on_open", ws)

def on_message(ws, message):
    # 回调函数,直接接收到的消息信息
    print("on_message", message)

def on_error(ws, message):
    print("on_error", ws, message)

def on_close(ws, *args, **kwargs):
    pass

def run():
    # 向持有发送Web请求链接
    ws = WebSocketApp(
        url="...",
        header={},
        cookie="",
        on_open=on_open,
        on_message=on_message,    # 抖音只给我们发送数据
        on_error=on_error,
        on_close=on_close,
    )
    ws.run_forever()

if __name__ == '__main__':
    run()

爬虫

同步 异步 session同步 请求量异步(多)
requests>httpx httpx>requests httpx>requests aiohttp>httpx

请求头

关键参数

英文 中文释义
Cookie 身份信息
User-Agent 浏览器信息
Referer 防盗链

Request库

[!tip]

import requests

发起get请求

#返回对象 params是问号之后的参数(字典)
html = requests.get(url,headers,params) 

发起post请求

#返回对象 data为参数
html = requests.post(url,headers,data) 

获取状态码

html.status_code  #返回状态码

获取响应头

html.headers

获取文本形式

html.text

获取二进制形式

html.content

获取JSON形式

html.json()

设置编码形式

html.encoding='utf-8'

进阶

[!note]

session,公有容器,以它为基础发起请求共用里面的东西,参数隔离
每一个基于session得请求是独立的
如果覆盖了共有的信息,那就会更新,如果共有的没有,那就添加

声明session(等于有了公有的requests)

session = requests.Session()

设置公有属性

#例子
session.headers = {'common':'Sherry'}

html1=session.get('https://httpbin.org/headers',headers={'html1':"my headers"})   #参数隔离,头部多一个html1
print(html1.text)


html2=session.get('https://httpbin.org/headers')
print(html2.text)

过掉ssl证书验证

requests.get('https://kennethreitz.org', verify=False)

提交多个文件

url 'https://httpbin.org/post'
multiple_files = [
('images',('foo.png',open('foo.png','rb'),'image/png')),
('images',('bar.png',open('bar.png','rb'),'image/png'))]
r requests.post(url,files=multiple_files)
r.text

代理设置

proxies =
'http':'http://127.0.01:指定端口',  #本机代理
'https':'http://10.10.1.10:1080',  #当使用本机代理时两个写成一样的
}
requests.get('http://example.org',proxies=proxies)

httpx库

[!tip]
import httpx

完美支持HTTP/2. 可以写异步爬虫 操作和requests一样

发起get请求

r = httpx.get(url)

发起post请求

#返回对象 data为参数
r = requests.post(url,headers,data) 

获取状态码

r.status_code

获取响应头部

r.headers

设置代理

proxies={
"http://":"http://ocalhost:7890",
"https://":"http://Localhost:7890",}

html=httpx.get('https://www.pythonhttpx.org/',proxies=proxies)

发送http/2请求

client=httpx.Client(http2=True,proxies)
html=client.get('https://www.pythonhttpx.org/')

异步爬虫

import httpx  
import asyncio  


async def spider(num):  
    print('启动', num)  
    headers = {  
        'Referer':'https://cn.bing.com/search?',  
        'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'  
    }  
    async with httpx.AsyncClient(http2=True) as client:  # 建立异步  
        # await阻塞等待  
        html = await client.get('https://cn.bing.com/search?q=1231321',headers=headers)  
        print(html.text)  

async def main():  
    await asyncio.gather(*[spider(1), spider(2), spider(3)])  

if __name__ == '__main__':  
    asyncio.run(main())

aiohttp异步请求库

[!tip]

import aiohttp
import asyncio

import aiohttp  
import asyncio  

headers = {  
        'Referer': 'https://cn.bing.com/search?',  
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'  
}  
proxy='http://localhost:7890' #只支持http代理  

async def main():  
    async with aiohttp.ClientSession(headers=headers) as client:  # Session共有属性  
        html = await client.get('https://cn.bing.com/search?q=1231321',proxy)  
        print(await html.text())  
        html.close()  


asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())  
asyncio.run(main())  # 新版驱动需要设置成老板驱动 不然会报错  
# loop = asyncio.get_event_loop()  #老版本驱动  
# loop.run_until_complete(main())

bs4解析库

[!tip]

from bs4 import BeautifulSoup

解析html

soup = BeautifulSoup(html.text,'lxml')  #使用lxml解析器进行解析

使用css选择器(返回列表)

soup.select('选择器')

获取所有文本

result.text

获取标签属性(以字典的形式取出)

result['href']   #取出href属性的值

查询在文档中符合条件的所有标签

res_1 = soup.find_all('b')

查询参数

res = soup.find_all(class_='sister')

按照标签包含的文本信息进行匹配提取

res_8 = soup.find_all(string='Lacie')

xpath库

[!tip]

from lxml import etree

[!note]

// 从当前节点或者当前节点得子节点开始查找
/ 从当前节点开始查找

#例子  @属性  contains包含   start-with以什么开头
//div[@class="result c-container xpath-log new-pmd"] #严格匹配
//div[contains(@class,"c-container xpath-log new-pmd")]  #包含
//div[start-with(@class,"result")]  #以result开头

解析html(生成解析对象)

soup = etree.HTML(html.text)

xpath查找(返回对象)

soup.xpath('//ol[@id="b_results"]/li/h2/a')

获取当前节点文本

/text()

获取当前节点以及节点之下的节点中的文字

/string()

获取属性值(返回列表)

# 获取src
i.xpath("//img/@src") 

获取除外标签

li_list tree.xpath('//li[@class="item-1"][position()>1]')

获取最后一个页面

li_last tree.xpath('//li[@class="item"][last()]/a/text()')

将 element 对象转为字符串对象

html_str = etree.tostring(tree).decode()

jsonpath 库

[!tip]

from jsonpath import jsonpath

#例子  $根节点  []下标/过滤器  .子节点  ..任意层级子节点
$.store.book[0]  #获取第一本书
$..book[?(@.price<10)]  #获取价格小于10的书
$.store.book[(@.length-1)]  #获取最后一本书

解析 JSON(生成解析对象)

# 方式1:从字符串解析
data = json.loads(json_str)

# 方式2:从文件解析
with open('data.json') as f:
    data = json.load(f)

基本语法示例

# 获取所有书籍
result = jsonpath(data, '$.store.book[*]')

# 获取所有作者
authors = jsonpath(data, '$..author')

# 获取所有价格
prices = jsonpath(data, '$..price')

# 获取特定索引的书
book = jsonpath(data, '$.store.book[0]')

条件过滤

# 价格大于10的书
expensive_books = jsonpath(data, '$.store.book[?(@.price > 10)]')

# 包含特定属性的节点
has_isbn = jsonpath(data, '$.store.book[?(@.isbn)]')

通配符和切片

# 获取所有属性
all_properties = jsonpath(data, '$.store.book[*].*')

# 获取前两本书
first_two = jsonpath(data, '$.store.book[0:2]')

# 获取最后一本书
last_book = jsonpath(data, '$.store.book[-1:]')

多条件组合

# 价格大于10且为小说类型的书
result = jsonpath(data, '$.store.book[?(@.price>10 && @.category=="fiction")]')

获取节点长度

# 获取书籍数量
length = len(jsonpath(data, '$.store.book[*]'))

Selenium

[!note]

添加 cookie

# 导入 webdriver 用于浏览器自动化
from selenium import webdriver

# 初始化 Chrome 浏览器实例
browser = webdriver.Chrome()

# 访问百度首页
browser.get('https://www.baidu.com')

# 添加两个 cookie
# 添加用户名 cookie,值为"安娜"
browser.add_cookie({'name': 'username', 'value': '安娜'})
# 添加性别 cookie,值为"女"
browser.add_cookie({'name': 'gender', 'value': '女'})

# 获取所有 cookie 并存储在列表中
cookie_list = browser.get_cookies()

# 将 cookie 转换为字典格式,key 为 name,value 为对应的值
cookie_dict = {x["name"]: x["value"] for x in cookie_list}

# 打印所有 cookie
print(cookie_dict)

# 获取并打印指定的 cookie
print(browser.get_cookie('username'))  # 获取 username 对应的 cookie
print(browser.get_cookie('gender'))    # 获取 gender 对应的 cookie

# 关闭浏览器
browser.quit()

1. 浏览器操作

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options

# 初始化浏览器
driver = webdriver.Chrome()                      # 基础初始化
driver = webdriver.Chrome('./chromedriver.exe')  # 指定驱动路径

# 浏览器选项配置
chrome_options = Options()
chrome_options.add_argument('--headless')         # 无头模式
chrome_options.add_argument('--incognito')        # 无痕模式
chrome_options.add_argument('--window-size=1920,1080')  # 设置窗口大小

# 设置UA
user_agent ='abc'
options.add_argument(f'user-agent={user_agent}')

# 是否进制图片加载
prefs {"profile.managed_default_content_settings.images":2}
options.add_experimental_option(argument:'prefs',prefs)

#隐藏webdriver 
options.add_argument('--disable-blink-features=AutomationControlled') 

driver = webdriver.Chrome(options=chrome_options) 

# 基础操作
driver.get(url)                    # 访问网页
driver.refresh()                   # 刷新页面
driver.back()                      # 后退
driver.forward()                   # 前进
driver.maximize_window()           # 最大化窗口
driver.minimize_window()           # 最小化窗口
driver.close()                     # 关闭当前标签页
driver.quit()                      # 关闭浏览器

2. 元素定位

from selenium.webdriver.common.by import By

# 八种定位方式
driver.find_element(By.ID, "id")                     # ID定位
driver.find_element(By.NAME, "name")                 # name定位
driver.find_element(By.CLASS_NAME, "class")          # class定位
driver.find_element(By.TAG_NAME, "div")              # 标签定位
driver.find_element(By.LINK_TEXT, "链接文字")        # 链接文字定位
driver.find_element(By.PARTIAL_LINK_TEXT, "部分文字") # 部分链接文字
driver.find_element(By.CSS_SELECTOR, "#id .class")   # CSS选择器
driver.find_element(By.XPATH, "//div[@id='id']")     # XPATH定位

# 查找多个元素
elements = driver.find_elements(By.CLASS_NAME, "class")  # 返回列表

3. 元素操作

element = driver.find_element(By.ID, "id")

# 基础操作
element.click()                    # 点击
element.send_keys("文本")          # 输入文本
element.clear()                    # 清空输入
element.submit()                   # 提交表单

# 获取元素信息
element.text                       # 获取文本内容
element.get_attribute("href")      # 获取属性值
element.is_displayed()             # 元素是否可见
element.is_enabled()               # 元素是否可用
element.is_selected()              # 元素是否被选中
element.location                   # 元素位置
element.size                       # 元素大小

4. 等待机制

from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException

# 显式等待
wait = WebDriverWait(driver, timeout=10)
element = wait.until(
    EC.presence_of_element_located((By.ID, "id"))
)

# 常用的expected_conditions条件
EC.presence_of_element_located()           # 元素存在
EC.visibility_of_element_located()         # 元素可见
EC.element_to_be_clickable()              # 元素可点击
EC.text_to_be_present_in_element()        # 文本出现在元素中
EC.title_contains()                        # 标题包含特定文本

# 隐式等待
driver.implicitly_wait(10)                 # 设置全局等待时间

5. 框架切换

# iframe切换
driver.switch_to.frame("iframe_name")      # 通过name切换
driver.switch_to.frame(iframe_element)     # 通过元素切换
driver.switch_to.default_content()         # 返回主文档

# 窗口切换
driver.switch_to.window(handle)            # 切换窗口
handles = driver.window_handles            # 获取所有窗口句柄

6. JavaScript 执行

# 执行JavaScript
driver.execute_script("return document.title;")  # 执行JS并返回结果
#相对位置滚动
js_code 'window.scrollBy(0,700)'
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")  # 滚动到底部

7. 键盘和鼠标操作

from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains

# 键盘操作
element.send_keys(Keys.RETURN)             # 回车
element.send_keys(Keys.CONTROL + 'a')      # 全选
element.send_keys(Keys.ESCAPE)             # ESC键

# 鼠标操作
actions = ActionChains(driver)
actions.move_to_element(element)           # 鼠标悬停
actions.click_and_hold(element)            # 鼠标按住
actions.drag_and_drop(source, target)      # 拖拽
actions.double_click(element)              # 双击
actions.context_click(element)             # 右键
actions.perform()                          # 执行操作

8. 异常处理

from selenium.common.exceptions import *

try:
    element = driver.find_element(By.ID, "id")
except NoSuchElementException:
    print("元素未找到")
except TimeoutException:
    print("等待超时")
except ElementClickInterceptedException:
    print("元素无法点击")

9. 截图功能

# 截图
driver.save_screenshot("screenshot.png")    # 截取整个页面
element.screenshot("element.png")           # 截取特定元素

10. Cookie 操作

# Cookie操作
driver.add_cookie({"name": "foo", "value": "bar"})  # 添加Cookie
driver.get_cookies()                                # 获取所有Cookie
driver.delete_all_cookies()                         # 删除所有Cookie

11. 动作链接

# Selenium相关导入
from selenium import webdriver                        # 浏览器驱动
from selenium.webdriver.common.action_chains import ActionChains    # 动作链
from selenium.webdriver.common.keys import Keys       # 键盘按键
from selenium.webdriver.common.by import By          # 元素定位方式
from selenium.webdriver.support.ui import WebDriverWait    # 显式等待
from selenium.webdriver.support import expected_conditions as EC    # 预期条件
from selenium.common.exceptions import *             # 异常处理

# 系统相关导入
import time         # 时间处理
import random      # 随机数生成

"""
ActionChains 动作链完整示例
作用:模拟真实的用户操作,比如鼠标移动、点击、拖拽等复杂交互
"""

# 创建动作链对象
actions = ActionChains(driver)

# 1. 基础鼠标操作
actions.click(element)                     # 单击元素
actions.double_click(element)              # 双击元素
actions.context_click(element)             # 右键点击
actions.click_and_hold(element)            # 按住元素不放
actions.release()                          # 释放按住的元素

# 2. 鼠标移动
actions.move_to_element(element)           # 移动到元素上
actions.move_by_offset(x, y)               # 移动指定偏移量
actions.move_to_element_with_offset(element, x, y)  # 移动到元素的相对位置

# 3. 拖拽操作
actions.drag_and_drop(source, target)      # 从源元素拖到目标元素
actions.drag_and_drop_by_offset(element, x, y)  # 拖拽元素到指定偏移位置

# 4. 键盘操作
actions.key_down(Keys.CONTROL)             # 按下键盘按键
actions.key_up(Keys.CONTROL)               # 释放键盘按键
actions.send_keys('text')                  # 输入文本
actions.send_keys(Keys.ENTER)              # 发送特殊键

# 5. 常用组合键
# Ctrl + A (全选)
actions.key_down(Keys.CONTROL).send_keys('a').key_up(Keys.CONTROL)

# Ctrl + C (复制)
actions.key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL)

# Ctrl + V (粘贴)
actions.key_down(Keys.CONTROL).send_keys('v').key_up(Keys.CONTROL)

# Ctrl + X (剪切)
actions.key_down(Keys.CONTROL).send_keys('x').key_up(Keys.CONTROL)

# Ctrl + Z (撤销)
actions.key_down(Keys.CONTROL).send_keys('z').key_up(Keys.CONTROL)

# 6. 实用场景示例

# 示例1:下拉菜单操作
def handle_dropdown_menu(driver, menu_element, submenu_element):
    actions = ActionChains(driver)
    actions.move_to_element(menu_element)\
           .pause(1)\
           .click(submenu_element)\
           .perform()

# 示例2:拖拽排序
def drag_and_sort(driver, source_element, target_element):
    actions = ActionChains(driver)
    actions.click_and_hold(source_element)\
           .pause(0.5)\
           .move_to_element(target_element)\
           .pause(0.5)\
           .release()\
           .perform()

# 示例3:签名模拟
def simulate_signature(driver, canvas):
    actions = ActionChains(driver)
    actions.click_and_hold(canvas)\
           .move_by_offset(100, 0)\
           .move_by_offset(0, 100)\
           .move_by_offset(-100, 0)\
           .move_by_offset(0, -100)\
           .release()\
           .perform()

# 示例4:文件上传操作
def upload_file(driver, upload_button, file_input):
    actions = ActionChains(driver)
    actions.click(upload_button)\
           .pause(1)\
           .send_keys(file_input)\
           .send_keys(Keys.ENTER)\
           .perform()

# 7. 异常处理示例
def safe_action_chain(driver, action_func):
    try:
        # 创建显式等待对象
        wait = WebDriverWait(driver, 10)

        # 执行动作链
        action_func()

    except ElementNotInteractableException:
        print("元素不可交互")
    except MoveTargetOutOfBoundsException:
        print("目标元素超出边界")
    except Exception as e:
        print(f"发生异常: {str(e)}")
    finally:
        # 重置动作链
        actions.reset_actions()

# 使用建议:
# 1. 每个动作链操作后必须调用perform()
# 2. 复杂操作建议加入pause()等待
# 3. 操作前确保元素可见和可交互
# 4. 使用try-except处理可能的异常
# 5. 必要时使用显式等待确保元素就绪
# 6. 操作完成后记得重置动作链
# 7. 模拟人工操作时添加随机延迟

补充

[!note]

禁用跳转重定向

response = requests.get(url='http://www.baidu.com',headers=headers,allow_redirects=False)

[!note]

查询历史请求

print(response.history)

[!note]

忽略证书错误

response requests.get(url,verify=False)

[!note]

设置超时

response requests.get(url,timeout=3)

[!note]

设置重试

from retrying import retry
import random
import requests

# 示例1:基本重试装饰器
@retry
def basic_retry_function():
    """最基础的重试示例,会一直重试直到成功"""
    print("尝试执行...")
    random_num = random.random()
    if random_num < 0.7:  # 70%的概率失败
        print(f"执行失败 (随机数: {random_num})")
        raise ValueError("模拟随机失败")
    print(f"执行成功 (随机数: {random_num})")
    return "成功"

# 示例2:设置最大重试次数和重试间隔
@retry(stop_max_attempt_number=3, wait_fixed=2000)  # 最多重试3次,每次间隔2秒
def retry_with_params():
    """带参数的重试示例"""
    print("尝试执行...")
    if random.random() < 0.8:  # 80%的概率失败
        raise ConnectionError("模拟连接错误")
    return "成功"

# 示例3:自定义重试条件
def should_retry(exception):
    """自定义重试条件:只在特定异常时重试"""
    return isinstance(exception, requests.exceptions.RequestException)

@retry(retry_on_exception=should_retry, stop_max_attempt_number=3)
def fetch_url(url):
    """模拟网络请求的重试"""
    response = requests.get(url)
    response.raise_for_status()
    return response.text

# 示例4:结合等待时间和重试条件
@retry(
    wait_exponential_multiplier=1000,  # 指数级增长的等待时间(毫秒)
    wait_exponential_max=10000,        # 最大等待时间10秒
    stop_max_attempt_number=5          # 最多重试5次
)
def complex_operation():
    """复杂重试策略示例"""
    if random.random() < 0.6:
        raise TimeoutError("模拟超时")
    return "操作成功"

if __name__ == "__main__":
    # 测试基本重试
    print("\n测试基本重试:")
    try:
        result = basic_retry_function()
        print(f"最终结果: {result}")
    except Exception as e:
        print(f"最终失败: {e}")

    # 测试带参数的重试
    print("\n测试带参数的重试:")
    try:
        result = retry_with_params()
        print(f"最终结果: {result}")
    except Exception as e:
        print(f"最终失败: {e}")

    # 测试网络请求重试
    print("\n测试网络请求重试:")
    try:
        result = fetch_url("https://api.github.com")
        print("请求成功")
    except Exception as e:
        print(f"请求失败: {e}")

    # 测试复杂重试策略
    print("\n测试复杂重试策略:")
    try:
        result = complex_operation()
        print(f"最终结果: {result}")
    except Exception as e:
        print(f"最终失败: {e}")

[!note]

获取 cookies 对象

#返回的是RequestsCookieJar对象
print(response.cookies) # 对象
print(dict(response.cookies))
print(requests.utils.dict_from_cookiejar(response.cookies))

[!note]

json 格式展示

json_data = json.dumps(dict_data,indent=4,ensure_ascii=False)

redis

[!note]

数据存储

import redis
from typing import Any, Dict, List
import json

class RedisStorage:
    def __init__(self):
        # 创建Redis连接
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True
        )

    def set_string(self, key: str, value: str, expire_seconds: int = None) -> bool:
        """存储字符串类型数据"""
        try:
            self.redis_client.set(key, value, ex=expire_seconds)
            return True
        except Exception as e:
            print(f"存储失败: {str(e)}")
            return False

    def set_hash(self, key: str, mapping: Dict[str, Any]) -> bool:
        """存储哈希类型数据"""
        try:
            self.redis_client.hset(key, mapping=mapping)
            return True
        except Exception as e:
            print(f"存储失败: {str(e)}")
            return False

    def set_list(self, key: str, values: List[Any]) -> bool:
        """存储列表类型数据"""
        try:
            self.redis_client.rpush(key, *values)
            return True
        except Exception as e:
            print(f"存储失败: {str(e)}")
            return False

    def get_string(self, key: str) -> str:
        """获取字符串类型数据"""
        return self.redis_client.get(key)

    def get_hash(self, key: str) -> dict:
        """获取哈希类型数据"""
        return self.redis_client.hgetall(key)

    def get_list(self, key: str) -> list:
        """获取列表类型数据"""
        return self.redis_client.lrange(key, 0, -1)

并发爬虫

asyncio 结合 requests 协程爬虫

import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup

url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}

loop = asyncio.get_event_loop()

async def get_movie_info(page):
    # 计算起始位置
    start = page * 25
    # 发送请求
    response = await loop.run_in_executor(
        None, 
        partial(requests.get, url.format(start), headers=headers)
    )

    # 解析内容
    soup = BeautifulSoup(response.text, 'html.parser')
    movie_list = soup.find_all('div', class_='item')

    movies = []
    for movie in movie_list:
        title = movie.find('span', class_='title').text
        rating = movie.find('span', class_='rating_num').text
        movies.append({
            'title': title,
            'rating': rating
        })

    return movies

async def main():
    tasks = [get_movie_info(page) for page in range(10)]  # 爬取10页
    results = await asyncio.gather(*tasks)

    # 打印结果
    for page_num, movies in enumerate(results):
        print(f"\n第 {page_num + 1} 页电影:")
        for movie in movies:
            print(f"电影: {movie['title']}, 评分: {movie['rating']}")

if __name__ == "__main__":
    asyncio.run(main())

aiohttp 异步请求

import asyncio

from aiohttp import ClientSession



url = 'https://www.baidu.com'

headers = {

    "User-Agent":"Mozilla/5.0 (Macintosh;Intel Mac OS X 10_15_7)I"

    "AppleWebKit/537.36 (KHTML,Like Gecko)Chrome/117.0.0.0 Safari/537.36"

}



# 异步请求

async def main():

    async with ClientSession() as session:

        async with session.get(url,headers=headers) as response:

            print(response.status)

            # 获取响应内容

            print(await response.text())



if __name__ == '__main__':
    Loop asyncio.get_event_Loop()
    Loop.run_until_complete(get_baidu())

代理

protobuf

syntax = "proto3";
package yuan;
message Person{
    string name = 1;
    int32 age = 2;
}
message Message{
    string method = 1;
    string payload = 2;
}
protoc --python_out=. douyin.proto
from p1_pb2 import Person,Message
p1 = Person()
p1.name = "yuan"
pl.age = 18

# 序列化
info = p1.SerializeToString()
print(info)

p2 Message
p2.method ="POST"
p2.payload="棒极了"
info = p2.SerializeToString()
print(info)

# 反序列化
obj = Message()
obj.ParseFromString (info)
print(obj.method)
print(obi payload)