系统概述
一个完整的爬虫管理系统,用于集中管理、调度、监控多个爬虫任务,支持任务配置、日志查看、数据预览等功能。
技术栈选型
| 层级 | 技术选型 | 说明 |
|---|---|---|
| 前端 | Vue 3 + Element Plus | 轻量、生态成熟,适合后台管理系统 |
| 后端 | Python FastAPI | 异步高性能,与爬虫生态无缝衔接 |
| 数据库 | PostgreSQL + Redis | 关系型存储 + 任务队列/缓存 |
| 任务调度 | Celery + Redis | 分布式任务队列,支持定时和异步 |
| 爬虫框架 | Scrapy / requests | 根据任务复杂度灵活选择 |
系统架构图
┌─────────────────────────────────────────────────────────────┐
│ 前端 (Vue 3) │
│ ┌──────────┬──────────┬──────────┬──────────┬──────────┐ │
│ │ 任务管理 │ 爬虫配置 │ 日志监控 │ 数据预览 │ 系统设置 │ │
│ └──────────┴──────────┴──────────┴──────────┴──────────┘ │
└─────────────────────────────────────────────────────────────┘
│ HTTP/WebSocket
▼
┌─────────────────────────────────────────────────────────────┐
│ 后端 (FastAPI) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ API 层: 任务CRUD / 爬虫控制 / 日志查询 / 数据导出 │ │
│ └──────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 服务层: 任务调度器 / 爬虫引擎 / 代理池管理 │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ PostgreSQL │ │ Redis │ │ Celery │
│ (数据存储) │ │ (队列/缓存) │ │ (Worker) │
└────────────┘ └────────────┘ └────────────┘核心功能模块
1. 任务管理
- [ ] 创建/编辑/删除爬虫任务
- [ ] 任务状态管理(待运行、运行中、已完成、失败)
- [ ] 定时任务配置(Cron表达式)
- [ ] 任务优先级设置
2. 爬虫配置
- [ ] 目标URL配置
- [ ] 请求头/Cookie设置
- [ ] 数据提取规则(XPath/CSS选择器)
- [ ] 反爬策略配置(代理、延迟、User-Agent轮换)
3. 监控中心
- [ ] 实时运行日志(WebSocket推送)
- [ ] 任务执行统计(成功率、耗时、数据量)
- [ ] 异常告警通知
- [ ] 资源使用监控(CPU/内存)
4. 数据管理
- [ ] 数据预览与搜索
- [ ] 数据导出(CSV/JSON/Excel)
- [ ] 数据清洗规则配置
- [ ] 数据去重策略
数据库设计
核心表结构
sql
-- 爬虫任务表
CREATE TABLE spider_task (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
target_url TEXT NOT NULL,
status VARCHAR(20) DEFAULT 'pending', -- pending/running/completed/failed
priority INT DEFAULT 5,
cron_expression VARCHAR(50),
config JSONB, -- 存储爬虫配置
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- 执行记录表
CREATE TABLE task_execution (
id SERIAL PRIMARY KEY,
task_id INT REFERENCES spider_task(id),
start_time TIMESTAMP,
end_time TIMESTAMP,
status VARCHAR(20),
items_scraped INT DEFAULT 0,
error_message TEXT
);
-- 爬取数据表
CREATE TABLE scraped_data (
id SERIAL PRIMARY KEY,
task_id INT REFERENCES spider_task(id),
execution_id INT REFERENCES task_execution(id),
data JSONB,
url VARCHAR(500),
created_at TIMESTAMP DEFAULT NOW()
);
-- 代理池表
CREATE TABLE proxy_pool (
id SERIAL PRIMARY KEY,
ip VARCHAR(50),
port INT,
protocol VARCHAR(10),
is_active BOOLEAN DEFAULT TRUE,
fail_count INT DEFAULT 0,
last_check TIMESTAMP
);API 设计
任务管理
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/tasks | 获取任务列表 |
| POST | /api/tasks | 创建新任务 |
| PUT | /api/tasks/{id} | 更新任务 |
| DELETE | /api/tasks/{id} | 删除任务 |
| POST | /api/tasks/{id}/run | 立即执行任务 |
| POST | /api/tasks/{id}/stop | 停止任务 |
监控与日志
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/tasks/{id}/logs | 获取任务日志 |
| WS | /ws/logs/{task_id} | 实时日志推送 |
| GET | /api/stats/overview | 系统统计概览 |
数据管理
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/data/{task_id} | 查询爬取数据 |
| GET | /api/data/{task_id}/export | 导出数据 |
前端页面规划
页面结构
├── 首页仪表盘
│ ├── 任务状态统计卡片
│ ├── 最近执行记录
│ └── 数据量趋势图表
│
├── 任务管理
│ ├── 任务列表(搜索、筛选、分页)
│ ├── 新建/编辑任务弹窗
│ └── 任务详情页
│
├── 监控中心
│ ├── 实时日志面板
│ ├── 任务执行时间线
│ └── 错误告警列表
│
├── 数据中心
│ ├── 数据表格(预览)
│ ├── 数据搜索
│ └── 导出功能
│
└── 系统设置
├── 代理池管理
├── 全局配置
└── 用户管理(可选)后端目录结构
spider_manager/
├── app/
│ ├── api/
│ │ ├── __init__.py
│ │ ├── tasks.py # 任务相关API
│ │ ├── data.py # 数据相关API
│ │ └── monitor.py # 监控相关API
│ ├── core/
│ │ ├── config.py # 配置管理
│ │ ├── database.py # 数据库连接
│ │ └── security.py # 认证授权
│ ├── models/
│ │ ├── task.py
│ │ ├── execution.py
│ │ └── data.py
│ ├── services/
│ │ ├── scheduler.py # 任务调度
│ │ ├── spider_engine.py # 爬虫引擎
│ │ └── proxy_pool.py # 代理池
│ ├── spiders/
│ │ ├── base_spider.py # 基础爬虫类
│ │ └── templates/ # 爬虫模板
│ └── main.py
├── celery_app.py # Celery配置
├── requirements.txt
└── docker-compose.yml # 容器化部署核心代码示例
后端:任务模型 (FastAPI + SQLAlchemy)
python
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON
from sqlalchemy.sql import func
from app.core.database import Base
class SpiderTask(Base):
__tablename__ = "spider_task"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
description = Column(Text)
target_url = Column(Text, nullable=False)
status = Column(String(20), default="pending")
priority = Column(Integer, default=5)
cron_expression = Column(String(50))
config = Column(JSON) # {headers, cookies, selectors, anti_spider...}
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())后端:任务API
python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.task import SpiderTask
from app.services.scheduler import schedule_task
router = APIRouter(prefix="/api/tasks", tags=["tasks"])
@router.post("/")
async def create_task(task_data: TaskCreate, db: Session = Depends(get_db)):
task = SpiderTask(**task_data.dict())
db.add(task)
db.commit()
db.refresh(task)
return task
@router.post("/{task_id}/run")
async def run_task(task_id: int, db: Session = Depends(get_db)):
task = db.query(SpiderTask).filter(SpiderTask.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# 发送到Celery队列
schedule_task.delay(task_id)
return {"message": "Task scheduled", "task_id": task_id}前端:任务列表组件 (Vue 3)
<template>
<div class="task-list">
<el-card>
<template #header>
<div class="card-header">
<span>任务列表</span>
<el-button type="primary" @click="showCreateDialog">
新建任务
</el-button>
</div>
</template>
<el-table :data="tasks" v-loading="loading">
<el-table-column prop="name" label="任务名称" />
<el-table-column prop="target_url" label="目标URL" show-overflow-tooltip />
<el-table-column prop="status" label="状态">
<template #default="{ row }">
<el-tag :type="statusType(row.status)"> row.status </el-tag>
</template>
</el-table-column>
<el-table-column label="操作" width="200">
<template #default="{ row }">
<el-button size="small" @click="runTask(row.id)">运行</el-button>
<el-button size="small" @click="editTask(row)">编辑</el-button>
<el-button size="small" type="danger" @click="deleteTask(row.id)">
删除
</el-button>
</template>
</el-table-column>
</el-table>
</el-card>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import { getTasks, runTask as apiRunTask } from '@/api/tasks'
const tasks = ref([])
const loading = ref(false)
const fetchTasks = async () => {
loading.value = true
tasks.value = await getTasks()
loading.value = false
}
const runTask = async (id) => {
await apiRunTask(id)
ElMessage.success('任务已启动')
fetchTasks()
}
onMounted(fetchTasks)
</script>部署方案
Docker Compose 配置
yaml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
depends_on:
- db
- redis
environment:
- DATABASE_URL=postgresql://user:pass@db/spider_db
- REDIS_URL=redis://redis:6379
celery:
build: .
command: celery -A celery_app worker --loglevel=info
depends_on:
- redis
celery-beat:
build: .
command: celery -A celery_app beat --loglevel=info
depends_on:
- redis
db:
image: postgres:14
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=spider_db
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7-alpine
frontend:
build: ./frontend
ports:
- "80:80"
volumes:
pgdata:反爬模块详细实现
模块架构
┌─────────────────────────────────────────────────────────┐
│ 反爬策略管理器 │
│ ┌─────────────┬─────────────┬─────────────┬──────────┐ │
│ │ 代理池管理 │ UA轮换器 │ 请求限速器 │ Cookie池 │ │
│ └─────────────┴─────────────┴─────────────┴──────────┘ │
│ ┌─────────────┬─────────────┬─────────────┬──────────┐ │
│ │ 指纹模拟器 │ 验证码处理 │ 重试策略 │ 降级策略 │ │
│ └─────────────┴─────────────┴─────────────┴──────────┘ │
└─────────────────────────────────────────────────────────┘1. 代理池管理器
python
# app/services/proxy_pool.py
import random
import asyncio
import aiohttp
from typing import Optional, List
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from app.models.proxy import Proxy
from app.core.database import get_db
import redis.asyncio as redis
class ProxyPool:
"""代理池管理器 - 支持代理获取、验证、评分"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pool_key = "proxy:pool"
self.score_key = "proxy:scores"
self.max_fail_count = 3
self.check_interval = 300 # 5分钟检测一次
async def get_proxy(self, protocol: str = "http") -> Optional[str]:
"""获取一个可用代理(加权随机,优先高分代理)"""
proxies = await self.redis.zrevrangebyscore(
self.score_key, "+inf", 60, # 只取评分>60的代理
start=0, num=10
)
if not proxies:
return None
# 加权随机选择
proxy = random.choice(proxies)
return proxy.decode() if proxy else None
async def add_proxy(self, ip: str, port: int, protocol: str = "http"):
"""添加代理到池中"""
proxy_url = f"{protocol}://{ip}:{port}"
# 初始评分100
await self.redis.zadd(self.score_key, {proxy_url: 100})
await self.redis.sadd(self.pool_key, proxy_url)
async def report_result(self, proxy: str, success: bool):
"""上报代理使用结果,动态调整评分"""
if success:
# 成功+5分,最高100
await self.redis.zincrby(self.score_key, 5, proxy)
score = await self.redis.zscore(self.score_key, proxy)
if score and score > 100:
await self.redis.zadd(self.score_key, {proxy: 100})
else:
# 失败-20分
await self.redis.zincrby(self.score_key, -20, proxy)
score = await self.redis.zscore(self.score_key, proxy)
if score and score <= 0:
# 评分归零,移除代理
await self.remove_proxy(proxy)
async def remove_proxy(self, proxy: str):
"""移除失效代理"""
await self.redis.zrem(self.score_key, proxy)
await self.redis.srem(self.pool_key, proxy)
async def validate_proxy(self, proxy: str, timeout: int = 10) -> bool:
"""验证代理可用性"""
test_url = "https://httpbin.org/ip"
try:
async with aiohttp.ClientSession() as session:
async with session.get(
test_url,
proxy=proxy,
timeout=aiohttp.ClientTimeout(total=timeout)
) as resp:
return resp.status == 200
except Exception:
return False
async def refresh_pool(self, db: Session):
"""从数据库刷新代理池"""
proxies = db.query(Proxy).filter(Proxy.is_active == True).all()
for p in proxies:
await self.add_proxy(p.ip, p.port, p.protocol)
async def health_check(self):
"""定时健康检查任务"""
while True:
proxies = await self.redis.smembers(self.pool_key)
for proxy in proxies:
proxy_str = proxy.decode()
is_valid = await self.validate_proxy(proxy_str)
await self.report_result(proxy_str, is_valid)
await asyncio.sleep(self.check_interval)2. User-Agent 轮换器
python
# app/services/ua_rotator.py
import random
from typing import List, Optional
class UserAgentRotator:
"""User-Agent 轮换器"""
# 常用UA列表(按设备类型分类)
UA_POOL = {
"desktop": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
],
"mobile": [
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 14; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.144 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.144 Mobile Safari/537.36",
]
}
def __init__(self, device_type: str = "desktop", custom_uas: List[str] = None):
self.device_type = device_type
self.custom_uas = custom_uas or []
self._index = 0
def get_random(self) -> str:
"""随机获取一个UA"""
pool = self.custom_uas or self.UA_POOL.get(self.device_type, self.UA_POOL["desktop"])
return random.choice(pool)
def get_sequential(self) -> str:
"""顺序轮换UA"""
pool = self.custom_uas or self.UA_POOL.get(self.device_type, self.UA_POOL["desktop"])
ua = pool[self._index % len(pool)]
self._index += 1
return ua
def get_with_fingerprint(self) -> dict:
"""获取UA及配套的浏览器指纹"""
ua = self.get_random()
# 根据UA生成配套的请求头
headers = {
"User-Agent": ua,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
}
# Chrome特有头
if "Chrome" in ua:
headers["sec-ch-ua"] = '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
headers["sec-ch-ua-mobile"] = "?0" if "Mobile" not in ua else "?1"
headers["sec-ch-ua-platform"] = '"Windows"' if "Windows" in ua else '"macOS"'
return headers3. 请求限速器
python
# app/services/rate_limiter.py
import asyncio
import time
import random
from typing import Dict
from collections import defaultdict
import redis.asyncio as redis
class RateLimiter:
"""请求限速器 - 支持全局限速和域名级限速"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_delay = (1, 3) # 默认延迟范围(秒)
self.domain_delays: Dict[str, tuple] = {} # 域名级延迟配置
self._last_request: Dict[str, float] = defaultdict(float)
def set_domain_delay(self, domain: str, min_delay: float, max_delay: float):
"""设置特定域名的延迟范围"""
self.domain_delays[domain] = (min_delay, max_delay)
async def wait(self, domain: str):
"""等待适当时间后返回(智能延迟)"""
delay_range = self.domain_delays.get(domain, self.default_delay)
delay = random.uniform(*delay_range)
# 检查距离上次请求的时间
elapsed = time.time() - self._last_request[domain]
if elapsed < delay:
await asyncio.sleep(delay - elapsed)
self._last_request[domain] = time.time()
async def acquire(self, domain: str, max_concurrent: int = 5) -> bool:
"""获取并发许可(令牌桶算法)"""
key = f"ratelimit:{domain}:tokens"
# 使用Redis实现分布式令牌桶
current = await self.redis.get(key)
if current is None:
await self.redis.setex(key, 60, max_concurrent - 1)
return True
if int(current) > 0:
await self.redis.decr(key)
return True
return False
async def release(self, domain: str):
"""释放并发许可"""
key = f"ratelimit:{domain}:tokens"
await self.redis.incr(key)
class AdaptiveRateLimiter(RateLimiter):
"""自适应限速器 - 根据响应动态调整"""
async def adjust_on_response(self, domain: str, status_code: int, response_time: float):
"""根据响应调整限速策略"""
current = self.domain_delays.get(domain, self.default_delay)
if status_code == 429: # Too Many Requests
# 遇到限流,大幅增加延迟
new_delay = (current[0] * 2, current[1] * 2)
self.domain_delays[domain] = new_delay
elif status_code == 503: # Service Unavailable
# 服务器压力大,增加延迟
new_delay = (current[0] * 1.5, current[1] * 1.5)
self.domain_delays[domain] = new_delay
elif status_code == 200 and response_time < 1:
# 响应正常且快速,可以适当减少延迟
new_delay = (max(0.5, current[0] * 0.9), max(1, current[1] * 0.9))
self.domain_delays[domain] = new_delay4. Cookie 池管理
python
# app/services/cookie_pool.py
import json
import random
from typing import Dict, List, Optional
import redis.asyncio as redis
from datetime import datetime, timedelta
class CookiePool:
"""Cookie池管理器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def add_cookie(self, domain: str, cookies: Dict, account_id: str = None):
"""添加Cookie到池中"""
key = f"cookies:{domain}"
cookie_data = {
"cookies": cookies,
"account_id": account_id,
"added_at": datetime.now().isoformat(),
"use_count": 0
}
await self.redis.hset(key, account_id or str(random.randint(1000, 9999)),
json.dumps(cookie_data))
async def get_cookie(self, domain: str) -> Optional[Dict]:
"""获取一个可用Cookie(最少使用策略)"""
key = f"cookies:{domain}"
all_cookies = await self.redis.hgetall(key)
if not all_cookies:
return None
# 找使用次数最少的
min_use = float('inf')
selected = None
selected_key = None
for k, v in all_cookies.items():
data = json.loads(v)
if data["use_count"] < min_use:
min_use = data["use_count"]
selected = data
selected_key = k
if selected:
# 更新使用次数
selected["use_count"] += 1
await self.redis.hset(key, selected_key, json.dumps(selected))
return selected["cookies"]
return None
async def invalidate_cookie(self, domain: str, account_id: str):
"""标记Cookie失效"""
key = f"cookies:{domain}"
await self.redis.hdel(key, account_id)
async def refresh_cookie(self, domain: str, account_id: str, new_cookies: Dict):
"""刷新Cookie"""
await self.add_cookie(domain, new_cookies, account_id)5. 反爬策略管理器(整合)
python
# app/services/anti_spider.py
import asyncio
from typing import Dict, Optional
from urllib.parse import urlparse
import aiohttp
from app.services.proxy_pool import ProxyPool
from app.services.ua_rotator import UserAgentRotator
from app.services.rate_limiter import AdaptiveRateLimiter
from app.services.cookie_pool import CookiePool
class AntiSpiderManager:
"""反爬策略管理器 - 统一管理所有反爬组件"""
def __init__(
self,
proxy_pool: ProxyPool,
ua_rotator: UserAgentRotator,
rate_limiter: AdaptiveRateLimiter,
cookie_pool: CookiePool
):
self.proxy_pool = proxy_pool
self.ua_rotator = ua_rotator
self.rate_limiter = rate_limiter
self.cookie_pool = cookie_pool
self.max_retries = 3
async def make_request(
self,
url: str,
method: str = "GET",
use_proxy: bool = True,
use_cookies: bool = False,
**kwargs
) -> Optional[aiohttp.ClientResponse]:
"""发起带反爬策略的请求"""
domain = urlparse(url).netloc
for attempt in range(self.max_retries):
# 1. 等待限速
await self.rate_limiter.wait(domain)
# 2. 获取请求头(含UA和指纹)
headers = self.ua_rotator.get_with_fingerprint()
headers.update(kwargs.pop("headers", {}))
# 3. 获取代理
proxy = None
if use_proxy:
proxy = await self.proxy_pool.get_proxy()
# 4. 获取Cookie
cookies = None
if use_cookies:
cookies = await self.cookie_pool.get_cookie(domain)
try:
async with aiohttp.ClientSession(cookies=cookies) as session:
start_time = asyncio.get_event_loop().time()
async with session.request(
method,
url,
headers=headers,
proxy=proxy,
timeout=aiohttp.ClientTimeout(total=30),
**kwargs
) as response:
response_time = asyncio.get_event_loop().time() - start_time
# 上报结果
if proxy:
await self.proxy_pool.report_result(proxy, response.status == 200)
# 自适应调整限速
await self.rate_limiter.adjust_on_response(
domain, response.status, response_time
)
if response.status == 200:
return response
elif response.status in (403, 429, 503):
# 需要重试
await asyncio.sleep(2 ** attempt) # 指数退避
continue
else:
return response
except Exception as e:
if proxy:
await self.proxy_pool.report_result(proxy, False)
if attempt == self.max_retries - 1:
raise e
await asyncio.sleep(2 ** attempt)
return None设备指纹模拟方案
指纹检测原理
网站通过收集浏览器特征生成唯一标识,主要检测维度:
| 类别 | 检测项 | 说明 |
|---|---|---|
| Canvas | Canvas 渲染指纹 | 通过绘制图形生成唯一哈希 |
| WebGL | GPU 渲染器信息 | 显卡型号、驱动版本 |
| Audio | AudioContext 指纹 | 音频处理特征 |
| 字体 | 安装字体列表 | 通过字体渲染探测 |
| 硬件 | CPU核心数、内存、屏幕 | navigator/screen 属性 |
| 行为 | 鼠标轨迹、键盘节奏 | 人机识别 |
指纹模拟器实现
python
# app/services/fingerprint.py
import random
import hashlib
import json
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class BrowserFingerprint:
"""浏览器指纹配置"""
# 基础信息
user_agent: str = ""
platform: str = "Win32"
language: str = "zh-CN"
languages: List[str] = field(default_factory=lambda: ["zh-CN", "zh", "en"])
# 硬件信息
hardware_concurrency: int = 8 # CPU核心数
device_memory: int = 8 # 内存GB
# 屏幕信息
screen_width: int = 1920
screen_height: int = 1080
screen_color_depth: int = 24
pixel_ratio: float = 1.0
# 时区
timezone: str = "Asia/Shanghai"
timezone_offset: int = -480
# WebGL
webgl_vendor: str = "Google Inc. (NVIDIA)"
webgl_renderer: str = "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0, D3D11)"
# Canvas指纹种子(用于生成一致的canvas结果)
canvas_seed: str = ""
# 音频指纹偏移
audio_offset: float = 0.0
class FingerprintGenerator:
"""指纹生成器 - 生成真实且一致的浏览器指纹"""
# 预定义的真实设备配置
DEVICE_PROFILES = [
{
"name": "Windows Chrome",
"platform": "Win32",
"ua_pattern": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version}.0.0.0 Safari/537.36",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderers": [
"ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (NVIDIA, NVIDIA GeForce GTX 1660 SUPER Direct3D11 vs_5_0 ps_5_0, D3D11)",
],
"screens": [(1920, 1080), (2560, 1440), (1366, 768)],
"memory": [8, 16, 32],
"cores": [4, 6, 8, 12],
},
{
"name": "Mac Chrome",
"platform": "MacIntel",
"ua_pattern": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version}.0.0.0 Safari/537.36",
"webgl_vendor": "Google Inc. (Apple)",
"webgl_renderers": [
"ANGLE (Apple, Apple M1 Pro, OpenGL 4.1)",
"ANGLE (Apple, Apple M2, OpenGL 4.1)",
"ANGLE (Apple, AMD Radeon Pro 5500M, OpenGL 4.1)",
],
"screens": [(2560, 1600), (1440, 900), (1680, 1050)],
"memory": [8, 16],
"cores": [8, 10],
"pixel_ratio": 2.0,
},
]
def __init__(self):
self._fingerprint_cache: Dict[str, BrowserFingerprint] = {}
def generate(self, seed: str = None, profile: str = None) -> BrowserFingerprint:
"""生成指纹(同一seed生成相同指纹)"""
if seed and seed in self._fingerprint_cache:
return self._fingerprint_cache[seed]
# 使用seed初始化随机数生成器,保证一致性
if seed:
random.seed(hashlib.md5(seed.encode()).hexdigest())
# 选择设备配置
device = None
if profile:
device = next((d for d in self.DEVICE_PROFILES if d["name"] == profile), None)
if not device:
device = random.choice(self.DEVICE_PROFILES)
# Chrome版本
chrome_version = random.randint(118, 122)
# 屏幕分辨率
screen = random.choice(device["screens"])
fp = BrowserFingerprint(
user_agent=device["ua_pattern"].format(version=chrome_version),
platform=device["platform"],
hardware_concurrency=random.choice(device["cores"]),
device_memory=random.choice(device["memory"]),
screen_width=screen[0],
screen_height=screen[1],
pixel_ratio=device.get("pixel_ratio", 1.0),
webgl_vendor=device["webgl_vendor"],
webgl_renderer=random.choice(device["webgl_renderers"]),
canvas_seed=seed or hashlib.md5(str(random.random()).encode()).hexdigest(),
audio_offset=random.uniform(-0.0001, 0.0001),
)
# 恢复随机状态
if seed:
random.seed()
self._fingerprint_cache[seed] = fp
return fp
def to_playwright_config(self, fp: BrowserFingerprint) -> dict:
"""转换为Playwright配置"""
return {
"user_agent": fp.user_agent,
"viewport": {"width": fp.screen_width, "height": fp.screen_height},
"device_scale_factor": fp.pixel_ratio,
"locale": fp.language,
"timezone_id": fp.timezone,
}
def to_inject_script(self, fp: BrowserFingerprint) -> str:
"""生成注入脚本(用于修改浏览器指纹)"""
return f'''
// 修改 navigator 属性
Object.defineProperty(navigator, 'hardwareConcurrency', {{
get: () => {fp.hardware_concurrency}
}});
Object.defineProperty(navigator, 'deviceMemory', {{
get: () => {fp.device_memory}
}});
Object.defineProperty(navigator, 'platform', {{
get: () => "{fp.platform}"
}});
Object.defineProperty(navigator, 'languages', {{
get: () => {json.dumps(fp.languages)}
}});
// 修改屏幕属性
Object.defineProperty(screen, 'width', {{ get: () => {fp.screen_width} }});
Object.defineProperty(screen, 'height', {{ get: () => {fp.screen_height} }});
Object.defineProperty(screen, 'colorDepth', {{ get: () => {fp.screen_color_depth} }});
// 修改 WebGL 指纹
const getParameterOrig = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(param) {{
if (param === 37445) return "{fp.webgl_vendor}";
if (param === 37446) return "{fp.webgl_renderer}";
return getParameterOrig.call(this, param);
}};
// Canvas 指纹噪声
const toDataURLOrig = HTMLCanvasElement.prototype.toDataURL;
HTMLCanvasElement.prototype.toDataURL = function(type) {{
const ctx = this.getContext('2d');
if (ctx) {{
const imageData = ctx.getImageData(0, 0, this.width, this.height);
for (let i = 0; i < imageData.data.length; i += 4)
imageData.data[i] ^= 1; // 微小噪声
ctx.putImageData(imageData, 0, 0);
}}
return toDataURLOrig.call(this, type);
}};
// AudioContext 指纹
const audioOffsetValue = {fp.audio_offset};
const getChannelDataOrig = AudioBuffer.prototype.getChannelData;
AudioBuffer.prototype.getChannelData = function(channel) {{
const data = getChannelDataOrig.call(this, channel);
for (let i = 0; i < data.length; i++)
data[i] += audioOffsetValue;
return data;
}};
'''浏览器自动化方案
技术选型对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Playwright | 跨浏览器、API友好、原生异步 | 指纹特征明显 | 通用自动化 |
| Puppeteer | Chrome原生支持 | 仅支持Chromium | Chrome专用 |
| undetected-chromedriver | 自动绕过检测 | 更新滞后 | 快速原型 |
| DrissionPage | 混合模式、简单易用 | 社区较小 | 国内项目 |
| 指纹浏览器 | 专业反检测 | 付费、资源消耗大 | 高强度反爬 |
Playwright 集成实现
python
# app/services/browser_automation.py
import asyncio
from typing import Optional, Dict, Any
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
from app.services.fingerprint import FingerprintGenerator, BrowserFingerprint
class BrowserPool:
"""浏览器实例池 - 管理多个浏览器实例"""
def __init__(self, max_browsers: int = 5):
self.max_browsers = max_browsers
self._playwright = None
self._browsers: list = []
self._available: asyncio.Queue = None
self._fingerprint_gen = FingerprintGenerator()
async def start(self):
"""初始化浏览器池"""
self._playwright = await async_playwright().start()
self._available = asyncio.Queue()
for i in range(self.max_browsers):
browser = await self._create_browser()
self._browsers.append(browser)
await self._available.put(browser)
async def _create_browser(self) -> Browser:
"""创建反检测浏览器实例"""
return await self._playwright.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--disable-features=IsolateOrigins,site-per-process',
'--disable-site-isolation-trials',
'--disable-web-security',
'--no-first-run',
'--no-default-browser-check',
'--disable-infobars',
'--window-size=1920,1080',
'--start-maximized',
]
)
async def acquire(self, fingerprint_seed: str = None) -> 'StealthContext':
"""获取一个浏览器上下文"""
browser = await self._available.get()
fp = self._fingerprint_gen.generate(seed=fingerprint_seed)
context = await self._create_stealth_context(browser, fp)
return StealthContext(context, browser, self, fp)
async def release(self, browser: Browser):
"""释放浏览器回池"""
await self._available.put(browser)
async def _create_stealth_context(self, browser: Browser, fp: BrowserFingerprint) -> BrowserContext:
"""创建隐身上下文"""
context = await browser.new_context(
user_agent=fp.user_agent,
viewport={'width': fp.screen_width, 'height': fp.screen_height},
device_scale_factor=fp.pixel_ratio,
locale=fp.language,
timezone_id=fp.timezone,
)
# 注入指纹修改脚本
await context.add_init_script(self._fingerprint_gen.to_inject_script(fp))
# 注入反自动化检测脚本
await context.add_init_script(self._get_stealth_script())
return context
def _get_stealth_script(self) -> str:
"""获取隐身脚本(绕过自动化检测)"""
return '''
// 移除 webdriver 标志
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 伪装 plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [
{ name: 'Chrome PDF Plugin', filename: 'internal-pdf-viewer' },
{ name: 'Chrome PDF Viewer', filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai' },
{ name: 'Native Client', filename: 'internal-nacl-plugin' }
]
});
// 伪装 Chrome runtime
window.chrome = {
runtime: {
connect: () => {},
sendMessage: () => {},
onMessage: { addListener: () => {} }
}
};
// 修复 permissions 检测
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => {
if (parameters.name === 'notifications') {
return Promise.resolve({ state: Notification.permission });
}
return originalQuery(parameters);
};
// 隐藏 Playwright/Puppeteer 特征
delete window.__playwright;
delete window.__puppeteer;
delete window.__selenium;
delete window.__webdriver;
// 修复 iframe contentWindow
const elementDescriptor = Object.getOwnPropertyDescriptor(HTMLIFrameElement.prototype, 'contentWindow');
Object.defineProperty(HTMLIFrameElement.prototype, 'contentWindow', {
get: function() {
const frame = elementDescriptor.get.call(this);
if (!frame) return frame;
// 返回代理对象以隐藏特征
return frame;
}
});
''';
async def close(self):
"""关闭所有浏览器"""
for browser in self._browsers:
await browser.close()
if self._playwright:
await self._playwright.stop()
class StealthContext:
"""隐身浏览器上下文"""
def __init__(self, context: BrowserContext, browser: Browser,
pool: BrowserPool, fingerprint: BrowserFingerprint):
self.context = context
self.browser = browser
self.pool = pool
self.fingerprint = fingerprint
self._pages: list = []
async def new_page(self) -> Page:
"""创建新页面"""
page = await self.context.new_page()
self._pages.append(page)
return page
async def goto(self, url: str, wait_until: str = "domcontentloaded") -> Page:
"""导航到指定URL"""
page = await self.new_page()
await page.goto(url, wait_until=wait_until)
return page
async def get_content(self, url: str) -> str:
"""获取页面内容"""
page = await self.goto(url)
content = await page.content()
await page.close()
return content
async def execute_js(self, page: Page, script: str) -> Any:
"""执行JavaScript"""
return await page.evaluate(script)
async def wait_for_selector(self, page: Page, selector: str, timeout: int = 30000):
"""等待元素出现"""
await page.wait_for_selector(selector, timeout=timeout)
async def simulate_human(self, page: Page):
"""模拟人类行为"""
import random
# 随机滚动
for _ in range(random.randint(2, 5)):
await page.mouse.wheel(0, random.randint(100, 300))
await asyncio.sleep(random.uniform(0.5, 1.5))
# 随机鼠标移动
for _ in range(random.randint(3, 7)):
x = random.randint(100, 800)
y = random.randint(100, 600)
await page.mouse.move(x, y)
await asyncio.sleep(random.uniform(0.1, 0.3))
async def close(self):
"""关闭上下文并释放浏览器"""
for page in self._pages:
if not page.is_closed():
await page.close()
await self.context.close()
await self.pool.release(self.browser)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
class AutomationSpider:
"""自动化爬虫 - 用于需要浏览器渲染的场景"""
def __init__(self, browser_pool: BrowserPool):
self.browser_pool = browser_pool
async def scrape(self, url: str, selectors: Dict[str, str],
fingerprint_seed: str = None) -> Dict[str, Any]:
"""爬取页面数据"""
async with await self.browser_pool.acquire(fingerprint_seed) as ctx:
page = await ctx.goto(url)
# 模拟人类行为
await ctx.simulate_human(page)
# 等待页面加载
await asyncio.sleep(2)
# 提取数据
result = {}
for key, selector in selectors.items():
try:
element = await page.query_selector(selector)
if element:
result[key] = await element.inner_text()
except Exception:
result[key] = None
return result
async def scrape_with_scroll(self, url: str, item_selector: str,
max_items: int = 100) -> list:
"""滚动加载爬取"""
async with await self.browser_pool.acquire() as ctx:
page = await ctx.goto(url)
items = []
while len(items) < max_items:
# 获取当前页面项目
elements = await page.query_selector_all(item_selector)
for el in elements[len(items):]:
text = await el.inner_text()
items.append(text)
if len(items) >= max_items:
break
# 滚动加载更多
prev_count = len(elements)
await page.mouse.wheel(0, 500)
await asyncio.sleep(2)
# 检查是否到底
new_elements = await page.query_selector_all(item_selector)
if len(new_elements) == prev_count:
break
return itemsRPC 远程调用方案
RPC 架构设计
┌───────────────────────────────────────────────────────────┐
│ RPC 架构 │
│ │
│ ┌─────────────┐ WebSocket ┌────────────────┐ │
│ │ Python │ ──────────────▶ │ 真实浏览器 │ │
│ │ 爬虫引擎 │ 双向通信 │ (浏览器插件) │ │
│ └─────────────┘ ◀────────────── └────────────────┘ │
│ │ │ │
│ │ 调用JS函数 │ 返回结果 │
│ ▼ ▼ │
│ ┌───────────────────────────────────────────┐ │
│ │ 目标网站页面 │ │
│ │ window.encrypt() / window.sign() │ │
│ │ (网站加密函数) │ │
│ └───────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────┘
应用场景:
1. 调用网站的加密函数生成签名
2. 获取动态生成的Token
3. 执行网站的反爬检测JS并返回结果
4. 调用需要浏览器环境的复杂计算RPC 服务端(Python)
python
# app/services/rpc_server.py
import asyncio
import json
import uuid
from typing import Dict, Any, Callable, Optional
from fastapi import WebSocket
import redis.asyncio as redis
class RPCServer:
"""
RPC服务器 - 接收浏览器插件连接,提供远程调用能力
工作流程:
1. 浏览器插件连接WebSocket,保持长连接
2. 爬虫发起RPC调用请求
3. 服务器转发请求到浏览器插件
4. 插件执行JS并返回结果
5. 服务器将结果返回给爬虫
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.clients: Dict[str, WebSocket] = {} # client_id -> websocket
self.pending_calls: Dict[str, asyncio.Future] = {} # call_id -> future
self.call_timeout = 30 # 超时时间(秒)
async def register_client(self, client_id: str, websocket: WebSocket):
"""注册浏览器客户端"""
await websocket.accept()
self.clients[client_id] = websocket
# 存储到Redis,支持分布式
await self.redis.sadd("rpc:clients", client_id)
await self.redis.setex(f"rpc:client:{client_id}:online", 60, "1")
async def unregister_client(self, client_id: str):
"""注销客户端"""
self.clients.pop(client_id, None)
await self.redis.srem("rpc:clients", client_id)
await self.redis.delete(f"rpc:client:{client_id}:online")
async def call(self, client_id: str, method: str, params: Dict = None) -> Any:
"""
发起RPC调用
Args:
client_id: 目标客户端 ID
method: 要调用的JS方法名
params: 方法参数
Returns:
JS执行结果
"""
if client_id not in self.clients:
raise Exception(f"Client {client_id} not connected")
websocket = self.clients[client_id]
call_id = str(uuid.uuid4())
# 创建Future等待结果
future = asyncio.get_event_loop().create_future()
self.pending_calls[call_id] = future
# 发送调用请求
request = {
"type": "call",
"call_id": call_id,
"method": method,
"params": params or {}
}
await websocket.send_json(request)
# 等待结果
try:
result = await asyncio.wait_for(future, timeout=self.call_timeout)
return result
except asyncio.TimeoutError:
self.pending_calls.pop(call_id, None)
raise Exception(f"RPC call timeout: {method}")
async def handle_response(self, call_id: str, result: Any, error: str = None):
"""处理调用结果"""
future = self.pending_calls.pop(call_id, None)
if future:
if error:
future.set_exception(Exception(error))
else:
future.set_result(result)
async def get_available_client(self) -> Optional[str]:
"""获取一个可用客户端"""
if self.clients:
return next(iter(self.clients.keys()))
return None
async def heartbeat(self, client_id: str):
"""更新客户端心跳"""
await self.redis.setex(f"rpc:client:{client_id}:online", 60, "1")
class RPCClient:
"""
RPC客户端接口 - 供爬虫调用
"""
def __init__(self, rpc_server: RPCServer):
self.server = rpc_server
async def execute_js(self, script: str, client_id: str = None) -> Any:
"""
在浏览器中执行JS代码
Args:
script: 要执行的JS代码
client_id: 指定客户端,不指定则自动选择
Returns:
JS执行结果
"""
if not client_id:
client_id = await self.server.get_available_client()
if not client_id:
raise Exception("No available RPC client")
return await self.server.call(client_id, "eval", {"script": script})
async def call_function(self, func_name: str, *args, client_id: str = None) -> Any:
"""
调用浏览器中的函数
Args:
func_name: 函数名(如 window.encrypt)
args: 函数参数
client_id: 指定客户端
Returns:
函数执行结果
"""
if not client_id:
client_id = await self.server.get_available_client()
if not client_id:
raise Exception("No available RPC client")
return await self.server.call(client_id, "callFunction", {
"function": func_name,
"args": list(args)
})
async def get_cookies(self, domain: str, client_id: str = None) -> Dict:
"""获取指定域名的Cookie"""
return await self.server.call(
client_id or await self.server.get_available_client(),
"getCookies",
{"domain": domain}
)
async def get_local_storage(self, key: str, client_id: str = None) -> Any:
"""获取localStorage"""
return await self.server.call(
client_id or await self.server.get_available_client(),
"getLocalStorage",
{"key": key}
)RPC WebSocket 端点
python
# app/api/rpc_websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.services.rpc_server import RPCServer
from app.core.dependencies import get_rpc_server
router = APIRouter()
@router.websocket("/ws/rpc/{client_id}")
async def rpc_websocket(websocket: WebSocket, client_id: str):
"""
RPC WebSocket端点 - 浏览器插件连接此端点
协议:
客户端 -> 服务器:
- {"type": "heartbeat"}
- {"type": "response", "call_id": "xxx", "result": ..., "error": null}
服务器 -> 客户端:
- {"type": "call", "call_id": "xxx", "method": "eval", "params": {...}}
"""
rpc_server: RPCServer = get_rpc_server()
await rpc_server.register_client(client_id, websocket)
try:
while True:
data = await websocket.receive_json()
msg_type = data.get("type")
if msg_type == "heartbeat":
await rpc_server.heartbeat(client_id)
await websocket.send_json({"type": "heartbeat_ack"})
elif msg_type == "response":
call_id = data.get("call_id")
result = data.get("result")
error = data.get("error")
await rpc_server.handle_response(call_id, result, error)
except WebSocketDisconnect:
pass
finally:
await rpc_server.unregister_client(client_id)浏览器插件客户端(JS)
jsx
// browser_extension/content.js
// 浏览器插件的内容脚本,注入到目标网页
class RPCClient {
constructor(serverUrl, clientId) {
this.serverUrl = serverUrl;
this.clientId = clientId;
this.ws = null;
this.reconnectInterval = 5000;
}
connect() {
this.ws = new WebSocket(`${this.serverUrl}/ws/rpc/${this.clientId}`);
this.ws.onopen = () => {
console.log('[RPC] Connected to server');
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.ws.onclose = () => {
console.log('[RPC] Disconnected, reconnecting...');
this.stopHeartbeat();
setTimeout(() => this.connect(), this.reconnectInterval);
};
this.ws.onerror = (error) => {
console.error('[RPC] WebSocket error:', error);
};
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'heartbeat' }));
}
}, 30000);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
}
handleMessage(data) {
if (data.type === 'call') {
this.handleCall(data);
} else if (data.type === 'heartbeat_ack') {
// 心跳确认
}
}
async handleCall(data) {
const { call_id, method, params } = data;
let result = null;
let error = null;
try {
switch (method) {
case 'eval':
// 执行任意JS代码
result = eval(params.script);
break;
case 'callFunction':
// 调用指定函数
const func = this.getNestedProperty(window, params.function);
if (typeof func === 'function') {
result = await func.apply(null, params.args);
} else {
throw new Error(`Function ${params.function} not found`);
}
break;
case 'getCookies':
// 获取Cookie
result = this.parseCookies();
break;
case 'getLocalStorage':
// 获取localStorage
result = localStorage.getItem(params.key);
break;
case 'setLocalStorage':
localStorage.setItem(params.key, params.value);
result = true;
break;
default:
throw new Error(`Unknown method: ${method}`);
}
} catch (e) {
error = e.message;
}
// 发送响应
this.ws.send(JSON.stringify({
type: 'response',
call_id: call_id,
result: result,
error: error
}));
}
getNestedProperty(obj, path) {
return path.split('.').reduce((current, key) => current?.[key], obj);
}
parseCookies() {
const cookies = {};
document.cookie.split(';').forEach(cookie => {
const [name, value] = cookie.trim().split('=');
cookies[name] = value;
});
return cookies;
}
}
// 启动RPC客户端
const CLIENT_ID = 'browser_' + Math.random().toString(36).substr(2, 9);
const SERVER_URL = 'ws://localhost:8000'; // 从配置读取
const rpcClient = new RPCClient(SERVER_URL, CLIENT_ID);
rpcClient.connect();爬虫中使用RPC
python
# 使用示例
async def crawl_with_rpc():
"""RPC爬虫示例 - 调用网站的加密函数"""
from app.services.rpc_server import RPCClient
rpc = RPCClient(rpc_server) # 注入rpc_server实例
# 场景1: 调用网站的加密函数
# 假设网站有一个 window.encryptData() 函数
encrypted = await rpc.call_function(
"window.encryptData",
{"username": "test", "password": "123456"}
)
print(f"加密结果: {encrypted}")
# 场景2: 获取动态Token
token = await rpc.execute_js('''
// 执行网站的token生成逻辑
window.generateToken && window.generateToken()
''')
print(f"Token: {token}")
# 场景3: 获取Cookie
cookies = await rpc.get_cookies("example.com")
print(f"Cookies: {cookies}")
# 场景4: 调用复杂的签名函数
signature = await rpc.execute_js(f'''
const params =
timestamp: Date.now(),
nonce: Math.random().toString(36)
;
// 调用网站的签名函数
window.sign(params)
''')
print(f"签名: {signature}")
class RPCEnabledSpider:
"""支持RPC的爬虫"""
def __init__(self, rpc_client: RPCClient, session: aiohttp.ClientSession):
self.rpc = rpc_client
self.session = session
async def request_with_signature(self, url: str, data: dict) -> dict:
"""发起带签名的请求"""
# 1. 通过RPC获取签名
timestamp = int(time.time() * 1000)
signature = await self.rpc.execute_js(f'''
window.sign({{
data: {json.dumps(data)},
timestamp: {timestamp}
}})
''')
# 2. 构建请求
headers = {
"X-Signature": signature,
"X-Timestamp": str(timestamp),
}
# 3. 发送请求
async with self.session.post(url, json=data, headers=headers) as resp:
return await resp.json()实时日志 WebSocket 方案
架构设计
┌──────────────┐ 发布日志 ┌──────────────┐ 订阅 ┌──────────────┐
│ Celery Worker │ ───────────▶ │ Redis │ ◀─────── │ FastAPI │
│ (爬虫执行) │ Pub/Sub │ (消息中间件) │ │ WebSocket │
└──────────────┘ └──────────────┘ └──────────────┘
│
│ 推送
▼
┌──────────────┐
│ 前端 Vue │
│ (日志面板) │
└──────────────┘1. 后端:日志收集与发布
python
# app/services/log_publisher.py
import json
import logging
from datetime import datetime
from typing import Any, Dict
import redis.asyncio as redis
class LogPublisher:
"""日志发布器 - 将爬虫日志发布到Redis"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.channel_prefix = "spider:logs:"
async def publish(self, task_id: int, level: str, message: str, extra: Dict = None):
"""发布日志消息"""
log_data = {
"task_id": task_id,
"level": level,
"message": message,
"timestamp": datetime.now().isoformat(),
"extra": extra or {}
}
channel = f"{self.channel_prefix}{task_id}"
await self.redis.publish(channel, json.dumps(log_data, ensure_ascii=False))
# 同时存储到列表(保留最近1000条)
list_key = f"spider:logs:history:{task_id}"
await self.redis.lpush(list_key, json.dumps(log_data, ensure_ascii=False))
await self.redis.ltrim(list_key, 0, 999)
class SpiderLogHandler(logging.Handler):
"""爬虫日志Handler - 集成到Python logging"""
def __init__(self, publisher: LogPublisher, task_id: int):
super().__init__()
self.publisher = publisher
self.task_id = task_id
self._loop = None
def emit(self, record: logging.LogRecord):
import asyncio
log_entry = self.format(record)
# 在异步上下文中发布
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(
self.publisher.publish(
self.task_id,
record.levelname,
log_entry,
{"module": record.module, "line": record.lineno}
)
)
except RuntimeError:
pass # 没有事件循环时忽略2. 后端:WebSocket 端点
python
# app/api/websocket.py
import json
import asyncio
from typing import Dict, Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
import redis.asyncio as redis
from app.core.config import settings
router = APIRouter()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
# task_id -> Set[WebSocket]
self.active_connections: Dict[int, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, task_id: int):
await websocket.accept()
if task_id not in self.active_connections:
self.active_connections[task_id] = set()
self.active_connections[task_id].add(websocket)
def disconnect(self, websocket: WebSocket, task_id: int):
if task_id in self.active_connections:
self.active_connections[task_id].discard(websocket)
if not self.active_connections[task_id]:
del self.active_connections[task_id]
async def broadcast(self, task_id: int, message: str):
if task_id in self.active_connections:
dead_connections = set()
for connection in self.active_connections[task_id]:
try:
await connection.send_text(message)
except Exception:
dead_connections.add(connection)
# 清理断开的连接
for conn in dead_connections:
self.active_connections[task_id].discard(conn)
manager = ConnectionManager()
@router.websocket("/ws/logs/{task_id}")
async def websocket_logs(websocket: WebSocket, task_id: int):
"""实时日志WebSocket端点"""
await manager.connect(websocket, task_id)
# 创建Redis连接用于订阅
redis_client = redis.from_url(settings.REDIS_URL)
pubsub = redis_client.pubsub()
channel = f"spider:logs:{task_id}"
try:
# 先发送历史日志(最近50条)
history_key = f"spider:logs:history:{task_id}"
history = await redis_client.lrange(history_key, 0, 49)
for log in reversed(history):
await websocket.send_text(log.decode())
# 订阅实时日志
await pubsub.subscribe(channel)
# 创建两个协程:接收Redis消息 和 接收WebSocket消息
async def listen_redis():
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_text(message["data"].decode())
async def listen_websocket():
while True:
# 接收客户端消息(如心跳)
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text(json.dumps({"type": "pong"}))
# 并发运行
await asyncio.gather(listen_redis(), listen_websocket())
except WebSocketDisconnect:
pass
finally:
manager.disconnect(websocket, task_id)
await pubsub.unsubscribe(channel)
await redis_client.close()
@router.websocket("/ws/stats")
async def websocket_stats(websocket: WebSocket):
"""系统统计实时推送"""
await websocket.accept()
try:
while True:
# 每5秒推送一次系统统计
stats = await get_system_stats()
await websocket.send_text(json.dumps(stats))
await asyncio.sleep(5)
except WebSocketDisconnect:
pass
async def get_system_stats() -> dict:
"""获取系统统计信息"""
# TODO: 实现实际的统计逻辑
return {
"running_tasks": 3,
"completed_today": 15,
"total_items_scraped": 12580,
"cpu_usage": 45.2,
"memory_usage": 62.8
}3. 前端:日志面板组件
<!-- components/LogPanel.vue -->
<template>
<div class="log-panel">
<el-card>
<template #header>
<div class="panel-header">
<span>实时日志 - 任务 # taskId </span>
<div class="controls">
<el-switch v-model="autoScroll" active-text="自动滚动" />
<el-button size="small" @click="clearLogs">清空</el-button>
<el-tag :type="connectionStatus === 'connected' ? 'success' : 'danger'">
connectionStatus
</el-tag>
</div>
</div>
</template>
<div ref="logContainer" class="log-container">
<div
v-for="(log, index) in logs"
:key="index"
:class="['log-entry', `level-${log.level.toLowerCase()}`]"
>
<span class="timestamp"> formatTime(log.timestamp) </span>
<el-tag :type="levelType(log.level)" size="small"> log.level </el-tag>
<span class="message"> log.message </span>
</div>
<div v-if="logs.length === 0" class="empty-state">
等待日志...
</div>
</div>
</el-card>
</div>
</template>
<script setup>
import { ref, watch, onMounted, onUnmounted, nextTick } from 'vue'
const props = defineProps({
taskId: {
type: Number,
required: true
}
})
const logs = ref([])
const autoScroll = ref(true)
const connectionStatus = ref('disconnected')
const logContainer = ref(null)
let ws = null
let reconnectTimer = null
let heartbeatTimer = null
const connect = () => {
const wsUrl = `${import.meta.env.VITE_WS_URL}/ws/logs/${props.taskId}`
ws = new WebSocket(wsUrl)
ws.onopen = () => {
connectionStatus.value = 'connected'
startHeartbeat()
}
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
if (data.type === 'pong') return // 心跳响应
logs.value.push(data)
// 限制日志数量(保留最近500条)
if (logs.value.length > 500) {
logs.value = logs.value.slice(-500)
}
// 自动滚动
if (autoScroll.value) {
nextTick(() => scrollToBottom())
}
} catch (e) {
console.error('Parse log error:', e)
}
}
ws.onclose = () => {
connectionStatus.value = 'disconnected'
stopHeartbeat()
scheduleReconnect()
}
ws.onerror = (error) => {
console.error('WebSocket error:', error)
connectionStatus.value = 'error'
}
}
const startHeartbeat = () => {
heartbeatTimer = setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send('ping')
}
}, 30000) // 30秒心跳
}
const stopHeartbeat = () => {
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
}
const scheduleReconnect = () => {
reconnectTimer = setTimeout(() => {
console.log('Attempting to reconnect...')
connect()
}, 3000) // 3秒后重连
}
const scrollToBottom = () => {
if (logContainer.value) {
logContainer.value.scrollTop = logContainer.value.scrollHeight
}
}
const clearLogs = () => {
logs.value = []
}
const formatTime = (timestamp) => {
return new Date(timestamp).toLocaleTimeString('zh-CN', {
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
fractionalSecondDigits: 3
})
}
const levelType = (level) => {
const types = {
DEBUG: 'info',
INFO: 'success',
WARNING: 'warning',
ERROR: 'danger',
CRITICAL: 'danger'
}
return types[level] || 'info'
}
// 监听taskId变化,重新连接
watch(() => props.taskId, (newId) => {
if (ws) ws.close()
logs.value = []
connect()
})
onMounted(() => {
connect()
})
onUnmounted(() => {
if (ws) ws.close()
if (reconnectTimer) clearTimeout(reconnectTimer)
stopHeartbeat()
})
</script>
<style scoped>
.log-panel {
height: 100%;
}
.panel-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.controls {
display: flex;
gap: 12px;
align-items: center;
}
.log-container {
height: 400px;
overflow-y: auto;
background: #1e1e1e;
border-radius: 4px;
padding: 12px;
font-family: 'JetBrains Mono', 'Fira Code', monospace;
font-size: 13px;
}
.log-entry {
padding: 4px 0;
border-bottom: 1px solid #333;
display: flex;
align-items: center;
gap: 8px;
}
.timestamp {
color: #888;
font-size: 12px;
min-width: 100px;
}
.message {
color: #d4d4d4;
flex: 1;
}
.level-debug .message { color: #9cdcfe; }
.level-info .message { color: #4ec9b0; }
.level-warning .message { color: #dcdcaa; }
.level-error .message { color: #f14c4c; }
.level-critical .message { color: #ff0000; font-weight: bold; }
.empty-state {
color: #666;
text-align: center;
padding: 40px;
}
</style>4. 爬虫任务中使用日志
python
# app/services/spider_engine.py
import logging
import asyncio
from app.services.log_publisher import LogPublisher, SpiderLogHandler
from app.services.anti_spider import AntiSpiderManager
class SpiderEngine:
"""爬虫执行引擎"""
def __init__(self, log_publisher: LogPublisher, anti_spider: AntiSpiderManager):
self.log_publisher = log_publisher
self.anti_spider = anti_spider
async def run_task(self, task_id: int, config: dict):
"""执行爬虫任务"""
# 配置日志
logger = logging.getLogger(f"spider.task.{task_id}")
logger.setLevel(logging.DEBUG)
handler = SpiderLogHandler(self.log_publisher, task_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
try:
logger.info(f"🚀 开始执行任务 #{task_id}")
logger.info(f"目标URL: {config['target_url']}")
urls = config.get('urls', [config['target_url']])
total = len(urls)
for i, url in enumerate(urls, 1):
logger.info(f"📥 正在爬取 [{i}/{total}]: {url}")
try:
response = await self.anti_spider.make_request(
url,
use_proxy=config.get('use_proxy', True),
use_cookies=config.get('use_cookies', False)
)
if response and response.status == 200:
logger.info(f"✅ 成功: {url}")
# 解析数据...
data = await self.parse_response(response, config)
logger.debug(f"提取到 {len(data)} 条数据")
else:
logger.warning(f"⚠️ 响应异常: {url}, 状态码: {response.status if response else 'None'}")
except Exception as e:
logger.error(f"❌ 爬取失败: {url}, 错误: {str(e)}")
logger.info(f"🎉 任务 #{task_id} 执行完成")
except Exception as e:
logger.critical(f"💥 任务崩溃: {str(e)}")
raise
finally:
logger.removeHandler(handler)
async def parse_response(self, response, config: dict) -> list:
"""解析响应数据"""
# TODO: 根据config中的选择器提取数据
return []扩展方向
- 分布式爬取:支持多节点部署,Scrapy + Scrapy-Redis
- 智能反爬:集成验证码识别服务、指纹浏览器
- 可视化配置:无代码配置爬虫规则
- 数据清洗管道:内置ETL流程
- API对接:提供OpenAPI供外部系统调用