Skip to content

系统概述

一个完整的爬虫管理系统,用于集中管理、调度、监控多个爬虫任务,支持任务配置、日志查看、数据预览等功能。


技术栈选型

层级技术选型说明
前端Vue 3 + Element Plus轻量、生态成熟,适合后台管理系统
后端Python FastAPI异步高性能,与爬虫生态无缝衔接
数据库PostgreSQL + Redis关系型存储 + 任务队列/缓存
任务调度Celery + Redis分布式任务队列,支持定时和异步
爬虫框架Scrapy / requests根据任务复杂度灵活选择

系统架构图

┌─────────────────────────────────────────────────────────────┐
│                        前端 (Vue 3)                          │
│   ┌──────────┬──────────┬──────────┬──────────┬──────────┐  │
│   │ 任务管理  │ 爬虫配置  │ 日志监控  │ 数据预览  │ 系统设置  │  │
│   └──────────┴──────────┴──────────┴──────────┴──────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │ HTTP/WebSocket

┌─────────────────────────────────────────────────────────────┐
│                     后端 (FastAPI)                           │
│   ┌──────────────────────────────────────────────────────┐  │
│   │  API 层: 任务CRUD / 爬虫控制 / 日志查询 / 数据导出     │  │
│   └──────────────────────────────────────────────────────┘  │
│   ┌──────────────────────────────────────────────────────┐  │
│   │  服务层: 任务调度器 / 爬虫引擎 / 代理池管理            │  │
│   └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘
          │                    │                    │
          ▼                    ▼                    ▼
   ┌────────────┐      ┌────────────┐      ┌────────────┐
   │ PostgreSQL │      │   Redis    │      │  Celery    │
   │  (数据存储) │      │ (队列/缓存) │      │ (Worker)   │
   └────────────┘      └────────────┘      └────────────┘

核心功能模块

1. 任务管理

  • [ ] 创建/编辑/删除爬虫任务
  • [ ] 任务状态管理(待运行、运行中、已完成、失败)
  • [ ] 定时任务配置(Cron表达式)
  • [ ] 任务优先级设置

2. 爬虫配置

  • [ ] 目标URL配置
  • [ ] 请求头/Cookie设置
  • [ ] 数据提取规则(XPath/CSS选择器)
  • [ ] 反爬策略配置(代理、延迟、User-Agent轮换)

3. 监控中心

  • [ ] 实时运行日志(WebSocket推送)
  • [ ] 任务执行统计(成功率、耗时、数据量)
  • [ ] 异常告警通知
  • [ ] 资源使用监控(CPU/内存)

4. 数据管理

  • [ ] 数据预览与搜索
  • [ ] 数据导出(CSV/JSON/Excel)
  • [ ] 数据清洗规则配置
  • [ ] 数据去重策略

数据库设计

核心表结构

sql
-- 爬虫任务表
CREATE TABLE spider_task (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    target_url TEXT NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',  -- pending/running/completed/failed
    priority INT DEFAULT 5,
    cron_expression VARCHAR(50),
    config JSONB,  -- 存储爬虫配置
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- 执行记录表
CREATE TABLE task_execution (
    id SERIAL PRIMARY KEY,
    task_id INT REFERENCES spider_task(id),
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    status VARCHAR(20),
    items_scraped INT DEFAULT 0,
    error_message TEXT
);

-- 爬取数据表
CREATE TABLE scraped_data (
    id SERIAL PRIMARY KEY,
    task_id INT REFERENCES spider_task(id),
    execution_id INT REFERENCES task_execution(id),
    data JSONB,
    url VARCHAR(500),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 代理池表
CREATE TABLE proxy_pool (
    id SERIAL PRIMARY KEY,
    ip VARCHAR(50),
    port INT,
    protocol VARCHAR(10),
    is_active BOOLEAN DEFAULT TRUE,
    fail_count INT DEFAULT 0,
    last_check TIMESTAMP
);

API 设计

任务管理

方法路径说明
GET/api/tasks获取任务列表
POST/api/tasks创建新任务
PUT/api/tasks/{id}更新任务
DELETE/api/tasks/{id}删除任务
POST/api/tasks/{id}/run立即执行任务
POST/api/tasks/{id}/stop停止任务

监控与日志

方法路径说明
GET/api/tasks/{id}/logs获取任务日志
WS/ws/logs/{task_id}实时日志推送
GET/api/stats/overview系统统计概览

数据管理

方法路径说明
GET/api/data/{task_id}查询爬取数据
GET/api/data/{task_id}/export导出数据

前端页面规划

页面结构

├── 首页仪表盘
│   ├── 任务状态统计卡片
│   ├── 最近执行记录
│   └── 数据量趋势图表

├── 任务管理
│   ├── 任务列表(搜索、筛选、分页)
│   ├── 新建/编辑任务弹窗
│   └── 任务详情页

├── 监控中心
│   ├── 实时日志面板
│   ├── 任务执行时间线
│   └── 错误告警列表

├── 数据中心
│   ├── 数据表格(预览)
│   ├── 数据搜索
│   └── 导出功能

└── 系统设置
    ├── 代理池管理
    ├── 全局配置
    └── 用户管理(可选)

后端目录结构

spider_manager/
├── app/
│   ├── api/
│   │   ├── __init__.py
│   │   ├── tasks.py          # 任务相关API
│   │   ├── data.py           # 数据相关API
│   │   └── monitor.py        # 监控相关API
│   ├── core/
│   │   ├── config.py         # 配置管理
│   │   ├── database.py       # 数据库连接
│   │   └── security.py       # 认证授权
│   ├── models/
│   │   ├── task.py
│   │   ├── execution.py
│   │   └── data.py
│   ├── services/
│   │   ├── scheduler.py      # 任务调度
│   │   ├── spider_engine.py  # 爬虫引擎
│   │   └── proxy_pool.py     # 代理池
│   ├── spiders/
│   │   ├── base_spider.py    # 基础爬虫类
│   │   └── templates/        # 爬虫模板
│   └── main.py
├── celery_app.py             # Celery配置
├── requirements.txt
└── docker-compose.yml        # 容器化部署

核心代码示例

后端:任务模型 (FastAPI + SQLAlchemy)

python
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON
from sqlalchemy.sql import func
from app.core.database import Base

class SpiderTask(Base):
    __tablename__ = "spider_task"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text)
    target_url = Column(Text, nullable=False)
    status = Column(String(20), default="pending")
    priority = Column(Integer, default=5)
    cron_expression = Column(String(50))
    config = Column(JSON)  # {headers, cookies, selectors, anti_spider...}
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, onupdate=func.now())

后端:任务API

python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.task import SpiderTask
from app.services.scheduler import schedule_task

router = APIRouter(prefix="/api/tasks", tags=["tasks"])

@router.post("/")
async def create_task(task_data: TaskCreate, db: Session = Depends(get_db)):
    task = SpiderTask(**task_data.dict())
    db.add(task)
    db.commit()
    db.refresh(task)
    return task

@router.post("/{task_id}/run")
async def run_task(task_id: int, db: Session = Depends(get_db)):
    task = db.query(SpiderTask).filter(SpiderTask.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    
    # 发送到Celery队列
    schedule_task.delay(task_id)
    return {"message": "Task scheduled", "task_id": task_id}

前端:任务列表组件 (Vue 3)

<template>
  <div class="task-list">
    <el-card>
      <template #header>
        <div class="card-header">
          <span>任务列表</span>
          <el-button type="primary" @click="showCreateDialog">
            新建任务
          </el-button>
        </div>
      </template>
      
      <el-table :data="tasks" v-loading="loading">
        <el-table-column prop="name" label="任务名称" />
        <el-table-column prop="target_url" label="目标URL" show-overflow-tooltip />
        <el-table-column prop="status" label="状态">
          <template #default="{ row }">
            <el-tag :type="statusType(row.status)"> row.status </el-tag>
          </template>
        </el-table-column>
        <el-table-column label="操作" width="200">
          <template #default="{ row }">
            <el-button size="small" @click="runTask(row.id)">运行</el-button>
            <el-button size="small" @click="editTask(row)">编辑</el-button>
            <el-button size="small" type="danger" @click="deleteTask(row.id)">
              删除
            </el-button>
          </template>
        </el-table-column>
      </el-table>
    </el-card>
  </div>
</template>

<script setup>
import { ref, onMounted } from 'vue'
import { getTasks, runTask as apiRunTask } from '@/api/tasks'

const tasks = ref([])
const loading = ref(false)

const fetchTasks = async () => {
  loading.value = true
  tasks.value = await getTasks()
  loading.value = false
}

const runTask = async (id) => {
  await apiRunTask(id)
  ElMessage.success('任务已启动')
  fetchTasks()
}

onMounted(fetchTasks)
</script>

部署方案

Docker Compose 配置

yaml
version: '3.8'
services:
  web:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - db
      - redis
    environment:
      - DATABASE_URL=postgresql://user:pass@db/spider_db
      - REDIS_URL=redis://redis:6379

  celery:
    build: .
    command: celery -A celery_app worker --loglevel=info
    depends_on:
      - redis

  celery-beat:
    build: .
    command: celery -A celery_app beat --loglevel=info
    depends_on:
      - redis

  db:
    image: postgres:14
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=spider_db
    volumes:
      - pgdata:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

  frontend:
    build: ./frontend
    ports:
      - "80:80"

volumes:
  pgdata:


反爬模块详细实现

模块架构

┌─────────────────────────────────────────────────────────┐
│                   反爬策略管理器                          │
│  ┌─────────────┬─────────────┬─────────────┬──────────┐ │
│  │  代理池管理  │  UA轮换器   │  请求限速器  │ Cookie池 │ │
│  └─────────────┴─────────────┴─────────────┴──────────┘ │
│  ┌─────────────┬─────────────┬─────────────┬──────────┐ │
│  │  指纹模拟器  │ 验证码处理  │  重试策略   │  降级策略 │ │
│  └─────────────┴─────────────┴─────────────┴──────────┘ │
└─────────────────────────────────────────────────────────┘

1. 代理池管理器

python
# app/services/proxy_pool.py
import random
import asyncio
import aiohttp
from typing import Optional, List
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from app.models.proxy import Proxy
from app.core.database import get_db
import redis.asyncio as redis

class ProxyPool:
    """代理池管理器 - 支持代理获取、验证、评分"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.pool_key = "proxy:pool"
        self.score_key = "proxy:scores"
        self.max_fail_count = 3
        self.check_interval = 300  # 5分钟检测一次
    
    async def get_proxy(self, protocol: str = "http") -> Optional[str]:
        """获取一个可用代理(加权随机,优先高分代理)"""
        proxies = await self.redis.zrevrangebyscore(
            self.score_key, "+inf", 60,  # 只取评分>60的代理
            start=0, num=10
        )
        if not proxies:
            return None
        
        # 加权随机选择
        proxy = random.choice(proxies)
        return proxy.decode() if proxy else None
    
    async def add_proxy(self, ip: str, port: int, protocol: str = "http"):
        """添加代理到池中"""
        proxy_url = f"{protocol}://{ip}:{port}"
        # 初始评分100
        await self.redis.zadd(self.score_key, {proxy_url: 100})
        await self.redis.sadd(self.pool_key, proxy_url)
    
    async def report_result(self, proxy: str, success: bool):
        """上报代理使用结果,动态调整评分"""
        if success:
            # 成功+5分,最高100
            await self.redis.zincrby(self.score_key, 5, proxy)
            score = await self.redis.zscore(self.score_key, proxy)
            if score and score > 100:
                await self.redis.zadd(self.score_key, {proxy: 100})
        else:
            # 失败-20分
            await self.redis.zincrby(self.score_key, -20, proxy)
            score = await self.redis.zscore(self.score_key, proxy)
            if score and score <= 0:
                # 评分归零,移除代理
                await self.remove_proxy(proxy)
    
    async def remove_proxy(self, proxy: str):
        """移除失效代理"""
        await self.redis.zrem(self.score_key, proxy)
        await self.redis.srem(self.pool_key, proxy)
    
    async def validate_proxy(self, proxy: str, timeout: int = 10) -> bool:
        """验证代理可用性"""
        test_url = "https://httpbin.org/ip"
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    test_url,
                    proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=timeout)
                ) as resp:
                    return resp.status == 200
        except Exception:
            return False
    
    async def refresh_pool(self, db: Session):
        """从数据库刷新代理池"""
        proxies = db.query(Proxy).filter(Proxy.is_active == True).all()
        for p in proxies:
            await self.add_proxy(p.ip, p.port, p.protocol)
    
    async def health_check(self):
        """定时健康检查任务"""
        while True:
            proxies = await self.redis.smembers(self.pool_key)
            for proxy in proxies:
                proxy_str = proxy.decode()
                is_valid = await self.validate_proxy(proxy_str)
                await self.report_result(proxy_str, is_valid)
            await asyncio.sleep(self.check_interval)

2. User-Agent 轮换器

python
# app/services/ua_rotator.py
import random
from typing import List, Optional

class UserAgentRotator:
    """User-Agent 轮换器"""
    
    # 常用UA列表(按设备类型分类)
    UA_POOL = {
        "desktop": [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
        ],
        "mobile": [
            "Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
            "Mozilla/5.0 (Linux; Android 14; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.144 Mobile Safari/537.36",
            "Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.144 Mobile Safari/537.36",
        ]
    }
    
    def __init__(self, device_type: str = "desktop", custom_uas: List[str] = None):
        self.device_type = device_type
        self.custom_uas = custom_uas or []
        self._index = 0
    
    def get_random(self) -> str:
        """随机获取一个UA"""
        pool = self.custom_uas or self.UA_POOL.get(self.device_type, self.UA_POOL["desktop"])
        return random.choice(pool)
    
    def get_sequential(self) -> str:
        """顺序轮换UA"""
        pool = self.custom_uas or self.UA_POOL.get(self.device_type, self.UA_POOL["desktop"])
        ua = pool[self._index % len(pool)]
        self._index += 1
        return ua
    
    def get_with_fingerprint(self) -> dict:
        """获取UA及配套的浏览器指纹"""
        ua = self.get_random()
        
        # 根据UA生成配套的请求头
        headers = {
            "User-Agent": ua,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
            "Sec-Fetch-User": "?1",
            "Cache-Control": "max-age=0",
        }
        
        # Chrome特有头
        if "Chrome" in ua:
            headers["sec-ch-ua"] = '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
            headers["sec-ch-ua-mobile"] = "?0" if "Mobile" not in ua else "?1"
            headers["sec-ch-ua-platform"] = '"Windows"' if "Windows" in ua else '"macOS"'
        
        return headers

3. 请求限速器

python
# app/services/rate_limiter.py
import asyncio
import time
import random
from typing import Dict
from collections import defaultdict
import redis.asyncio as redis

class RateLimiter:
    """请求限速器 - 支持全局限速和域名级限速"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.default_delay = (1, 3)  # 默认延迟范围(秒)
        self.domain_delays: Dict[str, tuple] = {}  # 域名级延迟配置
        self._last_request: Dict[str, float] = defaultdict(float)
    
    def set_domain_delay(self, domain: str, min_delay: float, max_delay: float):
        """设置特定域名的延迟范围"""
        self.domain_delays[domain] = (min_delay, max_delay)
    
    async def wait(self, domain: str):
        """等待适当时间后返回(智能延迟)"""
        delay_range = self.domain_delays.get(domain, self.default_delay)
        delay = random.uniform(*delay_range)
        
        # 检查距离上次请求的时间
        elapsed = time.time() - self._last_request[domain]
        if elapsed < delay:
            await asyncio.sleep(delay - elapsed)
        
        self._last_request[domain] = time.time()
    
    async def acquire(self, domain: str, max_concurrent: int = 5) -> bool:
        """获取并发许可(令牌桶算法)"""
        key = f"ratelimit:{domain}:tokens"
        
        # 使用Redis实现分布式令牌桶
        current = await self.redis.get(key)
        if current is None:
            await self.redis.setex(key, 60, max_concurrent - 1)
            return True
        
        if int(current) > 0:
            await self.redis.decr(key)
            return True
        
        return False
    
    async def release(self, domain: str):
        """释放并发许可"""
        key = f"ratelimit:{domain}:tokens"
        await self.redis.incr(key)

class AdaptiveRateLimiter(RateLimiter):
    """自适应限速器 - 根据响应动态调整"""
    
    async def adjust_on_response(self, domain: str, status_code: int, response_time: float):
        """根据响应调整限速策略"""
        current = self.domain_delays.get(domain, self.default_delay)
        
        if status_code == 429:  # Too Many Requests
            # 遇到限流,大幅增加延迟
            new_delay = (current[0] * 2, current[1] * 2)
            self.domain_delays[domain] = new_delay
        elif status_code == 503:  # Service Unavailable
            # 服务器压力大,增加延迟
            new_delay = (current[0] * 1.5, current[1] * 1.5)
            self.domain_delays[domain] = new_delay
        elif status_code == 200 and response_time < 1:
            # 响应正常且快速,可以适当减少延迟
            new_delay = (max(0.5, current[0] * 0.9), max(1, current[1] * 0.9))
            self.domain_delays[domain] = new_delay
python
# app/services/cookie_pool.py
import json
import random
from typing import Dict, List, Optional
import redis.asyncio as redis
from datetime import datetime, timedelta

class CookiePool:
    """Cookie池管理器"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def add_cookie(self, domain: str, cookies: Dict, account_id: str = None):
        """添加Cookie到池中"""
        key = f"cookies:{domain}"
        cookie_data = {
            "cookies": cookies,
            "account_id": account_id,
            "added_at": datetime.now().isoformat(),
            "use_count": 0
        }
        await self.redis.hset(key, account_id or str(random.randint(1000, 9999)), 
                              json.dumps(cookie_data))
    
    async def get_cookie(self, domain: str) -> Optional[Dict]:
        """获取一个可用Cookie(最少使用策略)"""
        key = f"cookies:{domain}"
        all_cookies = await self.redis.hgetall(key)
        
        if not all_cookies:
            return None
        
        # 找使用次数最少的
        min_use = float('inf')
        selected = None
        selected_key = None
        
        for k, v in all_cookies.items():
            data = json.loads(v)
            if data["use_count"] < min_use:
                min_use = data["use_count"]
                selected = data
                selected_key = k
        
        if selected:
            # 更新使用次数
            selected["use_count"] += 1
            await self.redis.hset(key, selected_key, json.dumps(selected))
            return selected["cookies"]
        
        return None
    
    async def invalidate_cookie(self, domain: str, account_id: str):
        """标记Cookie失效"""
        key = f"cookies:{domain}"
        await self.redis.hdel(key, account_id)
    
    async def refresh_cookie(self, domain: str, account_id: str, new_cookies: Dict):
        """刷新Cookie"""
        await self.add_cookie(domain, new_cookies, account_id)

5. 反爬策略管理器(整合)

python
# app/services/anti_spider.py
import asyncio
from typing import Dict, Optional
from urllib.parse import urlparse
import aiohttp

from app.services.proxy_pool import ProxyPool
from app.services.ua_rotator import UserAgentRotator
from app.services.rate_limiter import AdaptiveRateLimiter
from app.services.cookie_pool import CookiePool

class AntiSpiderManager:
    """反爬策略管理器 - 统一管理所有反爬组件"""
    
    def __init__(
        self,
        proxy_pool: ProxyPool,
        ua_rotator: UserAgentRotator,
        rate_limiter: AdaptiveRateLimiter,
        cookie_pool: CookiePool
    ):
        self.proxy_pool = proxy_pool
        self.ua_rotator = ua_rotator
        self.rate_limiter = rate_limiter
        self.cookie_pool = cookie_pool
        self.max_retries = 3
    
    async def make_request(
        self,
        url: str,
        method: str = "GET",
        use_proxy: bool = True,
        use_cookies: bool = False,
        **kwargs
    ) -> Optional[aiohttp.ClientResponse]:
        """发起带反爬策略的请求"""
        
        domain = urlparse(url).netloc
        
        for attempt in range(self.max_retries):
            # 1. 等待限速
            await self.rate_limiter.wait(domain)
            
            # 2. 获取请求头(含UA和指纹)
            headers = self.ua_rotator.get_with_fingerprint()
            headers.update(kwargs.pop("headers", {}))
            
            # 3. 获取代理
            proxy = None
            if use_proxy:
                proxy = await self.proxy_pool.get_proxy()
            
            # 4. 获取Cookie
            cookies = None
            if use_cookies:
                cookies = await self.cookie_pool.get_cookie(domain)
            
            try:
                async with aiohttp.ClientSession(cookies=cookies) as session:
                    start_time = asyncio.get_event_loop().time()
                    
                    async with session.request(
                        method,
                        url,
                        headers=headers,
                        proxy=proxy,
                        timeout=aiohttp.ClientTimeout(total=30),
                        **kwargs
                    ) as response:
                        response_time = asyncio.get_event_loop().time() - start_time
                        
                        # 上报结果
                        if proxy:
                            await self.proxy_pool.report_result(proxy, response.status == 200)
                        
                        # 自适应调整限速
                        await self.rate_limiter.adjust_on_response(
                            domain, response.status, response_time
                        )
                        
                        if response.status == 200:
                            return response
                        elif response.status in (403, 429, 503):
                            # 需要重试
                            await asyncio.sleep(2 ** attempt)  # 指数退避
                            continue
                        else:
                            return response
                            
            except Exception as e:
                if proxy:
                    await self.proxy_pool.report_result(proxy, False)
                if attempt == self.max_retries - 1:
                    raise e
                await asyncio.sleep(2 ** attempt)
        
        return None

设备指纹模拟方案

指纹检测原理

网站通过收集浏览器特征生成唯一标识,主要检测维度:

类别检测项说明
CanvasCanvas 渲染指纹通过绘制图形生成唯一哈希
WebGLGPU 渲染器信息显卡型号、驱动版本
AudioAudioContext 指纹音频处理特征
字体安装字体列表通过字体渲染探测
硬件CPU核心数、内存、屏幕navigator/screen 属性
行为鼠标轨迹、键盘节奏人机识别

指纹模拟器实现

python
# app/services/fingerprint.py
import random
import hashlib
import json
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class BrowserFingerprint:
    """浏览器指纹配置"""
    # 基础信息
    user_agent: str = ""
    platform: str = "Win32"
    language: str = "zh-CN"
    languages: List[str] = field(default_factory=lambda: ["zh-CN", "zh", "en"])
    
    # 硬件信息
    hardware_concurrency: int = 8  # CPU核心数
    device_memory: int = 8  # 内存GB
    
    # 屏幕信息
    screen_width: int = 1920
    screen_height: int = 1080
    screen_color_depth: int = 24
    pixel_ratio: float = 1.0
    
    # 时区
    timezone: str = "Asia/Shanghai"
    timezone_offset: int = -480
    
    # WebGL
    webgl_vendor: str = "Google Inc. (NVIDIA)"
    webgl_renderer: str = "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0, D3D11)"
    
    # Canvas指纹种子(用于生成一致的canvas结果)
    canvas_seed: str = ""
    
    # 音频指纹偏移
    audio_offset: float = 0.0

class FingerprintGenerator:
    """指纹生成器 - 生成真实且一致的浏览器指纹"""
    
    # 预定义的真实设备配置
    DEVICE_PROFILES = [
        {
            "name": "Windows Chrome",
            "platform": "Win32",
            "ua_pattern": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version}.0.0.0 Safari/537.36",
            "webgl_vendor": "Google Inc. (NVIDIA)",
            "webgl_renderers": [
                "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0, D3D11)",
                "ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0, D3D11)",
                "ANGLE (NVIDIA, NVIDIA GeForce GTX 1660 SUPER Direct3D11 vs_5_0 ps_5_0, D3D11)",
            ],
            "screens": [(1920, 1080), (2560, 1440), (1366, 768)],
            "memory": [8, 16, 32],
            "cores": [4, 6, 8, 12],
        },
        {
            "name": "Mac Chrome",
            "platform": "MacIntel",
            "ua_pattern": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version}.0.0.0 Safari/537.36",
            "webgl_vendor": "Google Inc. (Apple)",
            "webgl_renderers": [
                "ANGLE (Apple, Apple M1 Pro, OpenGL 4.1)",
                "ANGLE (Apple, Apple M2, OpenGL 4.1)",
                "ANGLE (Apple, AMD Radeon Pro 5500M, OpenGL 4.1)",
            ],
            "screens": [(2560, 1600), (1440, 900), (1680, 1050)],
            "memory": [8, 16],
            "cores": [8, 10],
            "pixel_ratio": 2.0,
        },
    ]
    
    def __init__(self):
        self._fingerprint_cache: Dict[str, BrowserFingerprint] = {}
    
    def generate(self, seed: str = None, profile: str = None) -> BrowserFingerprint:
        """生成指纹(同一seed生成相同指纹)"""
        if seed and seed in self._fingerprint_cache:
            return self._fingerprint_cache[seed]
        
        # 使用seed初始化随机数生成器,保证一致性
        if seed:
            random.seed(hashlib.md5(seed.encode()).hexdigest())
        
        # 选择设备配置
        device = None
        if profile:
            device = next((d for d in self.DEVICE_PROFILES if d["name"] == profile), None)
        if not device:
            device = random.choice(self.DEVICE_PROFILES)
        
        # Chrome版本
        chrome_version = random.randint(118, 122)
        
        # 屏幕分辨率
        screen = random.choice(device["screens"])
        
        fp = BrowserFingerprint(
            user_agent=device["ua_pattern"].format(version=chrome_version),
            platform=device["platform"],
            hardware_concurrency=random.choice(device["cores"]),
            device_memory=random.choice(device["memory"]),
            screen_width=screen[0],
            screen_height=screen[1],
            pixel_ratio=device.get("pixel_ratio", 1.0),
            webgl_vendor=device["webgl_vendor"],
            webgl_renderer=random.choice(device["webgl_renderers"]),
            canvas_seed=seed or hashlib.md5(str(random.random()).encode()).hexdigest(),
            audio_offset=random.uniform(-0.0001, 0.0001),
        )
        
        # 恢复随机状态
        if seed:
            random.seed()
            self._fingerprint_cache[seed] = fp
        
        return fp
    
    def to_playwright_config(self, fp: BrowserFingerprint) -> dict:
        """转换为Playwright配置"""
        return {
            "user_agent": fp.user_agent,
            "viewport": {"width": fp.screen_width, "height": fp.screen_height},
            "device_scale_factor": fp.pixel_ratio,
            "locale": fp.language,
            "timezone_id": fp.timezone,
        }
    
    def to_inject_script(self, fp: BrowserFingerprint) -> str:
        """生成注入脚本(用于修改浏览器指纹)"""
        return f'''
        // 修改 navigator 属性
        Object.defineProperty(navigator, 'hardwareConcurrency', {{
            get: () => {fp.hardware_concurrency}
        }});
        Object.defineProperty(navigator, 'deviceMemory', {{
            get: () => {fp.device_memory}
        }});
        Object.defineProperty(navigator, 'platform', {{
            get: () => "{fp.platform}"
        }});
        Object.defineProperty(navigator, 'languages', {{
            get: () => {json.dumps(fp.languages)}
        }});
        
        // 修改屏幕属性
        Object.defineProperty(screen, 'width', {{ get: () => {fp.screen_width} }});
        Object.defineProperty(screen, 'height', {{ get: () => {fp.screen_height} }});
        Object.defineProperty(screen, 'colorDepth', {{ get: () => {fp.screen_color_depth} }});
        
        // 修改 WebGL 指纹
        const getParameterOrig = WebGLRenderingContext.prototype.getParameter;
        WebGLRenderingContext.prototype.getParameter = function(param) {{
            if (param === 37445) return "{fp.webgl_vendor}";
            if (param === 37446) return "{fp.webgl_renderer}";
            return getParameterOrig.call(this, param);
        }};
        
        // Canvas 指纹噪声
        const toDataURLOrig = HTMLCanvasElement.prototype.toDataURL;
        HTMLCanvasElement.prototype.toDataURL = function(type) {{
            const ctx = this.getContext('2d');
            if (ctx) {{
                const imageData = ctx.getImageData(0, 0, this.width, this.height);
                for (let i = 0; i < imageData.data.length; i += 4) 
                    imageData.data[i] ^= 1;  // 微小噪声
                
                ctx.putImageData(imageData, 0, 0);
            }}
            return toDataURLOrig.call(this, type);
        }};
        
        // AudioContext 指纹
        const audioOffsetValue = {fp.audio_offset};
        const getChannelDataOrig = AudioBuffer.prototype.getChannelData;
        AudioBuffer.prototype.getChannelData = function(channel) {{
            const data = getChannelDataOrig.call(this, channel);
            for (let i = 0; i < data.length; i++) 
                data[i] += audioOffsetValue;
            
            return data;
        }};
        '''

浏览器自动化方案

技术选型对比

方案优点缺点适用场景
Playwright跨浏览器、API友好、原生异步指纹特征明显通用自动化
PuppeteerChrome原生支持仅支持ChromiumChrome专用
undetected-chromedriver自动绕过检测更新滞后快速原型
DrissionPage混合模式、简单易用社区较小国内项目
指纹浏览器专业反检测付费、资源消耗大高强度反爬

Playwright 集成实现

python
# app/services/browser_automation.py
import asyncio
from typing import Optional, Dict, Any
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
from app.services.fingerprint import FingerprintGenerator, BrowserFingerprint

class BrowserPool:
    """浏览器实例池 - 管理多个浏览器实例"""
    
    def __init__(self, max_browsers: int = 5):
        self.max_browsers = max_browsers
        self._playwright = None
        self._browsers: list = []
        self._available: asyncio.Queue = None
        self._fingerprint_gen = FingerprintGenerator()
    
    async def start(self):
        """初始化浏览器池"""
        self._playwright = await async_playwright().start()
        self._available = asyncio.Queue()
        
        for i in range(self.max_browsers):
            browser = await self._create_browser()
            self._browsers.append(browser)
            await self._available.put(browser)
    
    async def _create_browser(self) -> Browser:
        """创建反检测浏览器实例"""
        return await self._playwright.chromium.launch(
            headless=True,
            args=[
                '--disable-blink-features=AutomationControlled',
                '--disable-features=IsolateOrigins,site-per-process',
                '--disable-site-isolation-trials',
                '--disable-web-security',
                '--no-first-run',
                '--no-default-browser-check',
                '--disable-infobars',
                '--window-size=1920,1080',
                '--start-maximized',
            ]
        )
    
    async def acquire(self, fingerprint_seed: str = None) -> 'StealthContext':
        """获取一个浏览器上下文"""
        browser = await self._available.get()
        fp = self._fingerprint_gen.generate(seed=fingerprint_seed)
        context = await self._create_stealth_context(browser, fp)
        return StealthContext(context, browser, self, fp)
    
    async def release(self, browser: Browser):
        """释放浏览器回池"""
        await self._available.put(browser)
    
    async def _create_stealth_context(self, browser: Browser, fp: BrowserFingerprint) -> BrowserContext:
        """创建隐身上下文"""
        context = await browser.new_context(
            user_agent=fp.user_agent,
            viewport={'width': fp.screen_width, 'height': fp.screen_height},
            device_scale_factor=fp.pixel_ratio,
            locale=fp.language,
            timezone_id=fp.timezone,
        )
        
        # 注入指纹修改脚本
        await context.add_init_script(self._fingerprint_gen.to_inject_script(fp))
        
        # 注入反自动化检测脚本
        await context.add_init_script(self._get_stealth_script())
        
        return context
    
    def _get_stealth_script(self) -> str:
        """获取隐身脚本(绕过自动化检测)"""
        return '''
        // 移除 webdriver 标志
        Object.defineProperty(navigator, 'webdriver', {
            get: () => undefined
        });
        
        // 伪装 plugins
        Object.defineProperty(navigator, 'plugins', {
            get: () => [
                { name: 'Chrome PDF Plugin', filename: 'internal-pdf-viewer' },
                { name: 'Chrome PDF Viewer', filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai' },
                { name: 'Native Client', filename: 'internal-nacl-plugin' }
            ]
        });
        
        // 伪装 Chrome runtime
        window.chrome = {
            runtime: {
                connect: () => {},
                sendMessage: () => {},
                onMessage: { addListener: () => {} }
            }
        };
        
        // 修复 permissions 检测
        const originalQuery = window.navigator.permissions.query;
        window.navigator.permissions.query = (parameters) => {
            if (parameters.name === 'notifications') {
                return Promise.resolve({ state: Notification.permission });
            }
            return originalQuery(parameters);
        };
        
        // 隐藏 Playwright/Puppeteer 特征
        delete window.__playwright;
        delete window.__puppeteer;
        delete window.__selenium;
        delete window.__webdriver;
        
        // 修复 iframe contentWindow
        const elementDescriptor = Object.getOwnPropertyDescriptor(HTMLIFrameElement.prototype, 'contentWindow');
        Object.defineProperty(HTMLIFrameElement.prototype, 'contentWindow', {
            get: function() {
                const frame = elementDescriptor.get.call(this);
                if (!frame) return frame;
                // 返回代理对象以隐藏特征
                return frame;
            }
        });
        ''';
    
    async def close(self):
        """关闭所有浏览器"""
        for browser in self._browsers:
            await browser.close()
        if self._playwright:
            await self._playwright.stop()

class StealthContext:
    """隐身浏览器上下文"""
    
    def __init__(self, context: BrowserContext, browser: Browser, 
                 pool: BrowserPool, fingerprint: BrowserFingerprint):
        self.context = context
        self.browser = browser
        self.pool = pool
        self.fingerprint = fingerprint
        self._pages: list = []
    
    async def new_page(self) -> Page:
        """创建新页面"""
        page = await self.context.new_page()
        self._pages.append(page)
        return page
    
    async def goto(self, url: str, wait_until: str = "domcontentloaded") -> Page:
        """导航到指定URL"""
        page = await self.new_page()
        await page.goto(url, wait_until=wait_until)
        return page
    
    async def get_content(self, url: str) -> str:
        """获取页面内容"""
        page = await self.goto(url)
        content = await page.content()
        await page.close()
        return content
    
    async def execute_js(self, page: Page, script: str) -> Any:
        """执行JavaScript"""
        return await page.evaluate(script)
    
    async def wait_for_selector(self, page: Page, selector: str, timeout: int = 30000):
        """等待元素出现"""
        await page.wait_for_selector(selector, timeout=timeout)
    
    async def simulate_human(self, page: Page):
        """模拟人类行为"""
        import random
        
        # 随机滚动
        for _ in range(random.randint(2, 5)):
            await page.mouse.wheel(0, random.randint(100, 300))
            await asyncio.sleep(random.uniform(0.5, 1.5))
        
        # 随机鼠标移动
        for _ in range(random.randint(3, 7)):
            x = random.randint(100, 800)
            y = random.randint(100, 600)
            await page.mouse.move(x, y)
            await asyncio.sleep(random.uniform(0.1, 0.3))
    
    async def close(self):
        """关闭上下文并释放浏览器"""
        for page in self._pages:
            if not page.is_closed():
                await page.close()
        await self.context.close()
        await self.pool.release(self.browser)
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

class AutomationSpider:
    """自动化爬虫 - 用于需要浏览器渲染的场景"""
    
    def __init__(self, browser_pool: BrowserPool):
        self.browser_pool = browser_pool
    
    async def scrape(self, url: str, selectors: Dict[str, str], 
                     fingerprint_seed: str = None) -> Dict[str, Any]:
        """爬取页面数据"""
        async with await self.browser_pool.acquire(fingerprint_seed) as ctx:
            page = await ctx.goto(url)
            
            # 模拟人类行为
            await ctx.simulate_human(page)
            
            # 等待页面加载
            await asyncio.sleep(2)
            
            # 提取数据
            result = {}
            for key, selector in selectors.items():
                try:
                    element = await page.query_selector(selector)
                    if element:
                        result[key] = await element.inner_text()
                except Exception:
                    result[key] = None
            
            return result
    
    async def scrape_with_scroll(self, url: str, item_selector: str,
                                  max_items: int = 100) -> list:
        """滚动加载爬取"""
        async with await self.browser_pool.acquire() as ctx:
            page = await ctx.goto(url)
            items = []
            
            while len(items) < max_items:
                # 获取当前页面项目
                elements = await page.query_selector_all(item_selector)
                for el in elements[len(items):]:
                    text = await el.inner_text()
                    items.append(text)
                    if len(items) >= max_items:
                        break
                
                # 滚动加载更多
                prev_count = len(elements)
                await page.mouse.wheel(0, 500)
                await asyncio.sleep(2)
                
                # 检查是否到底
                new_elements = await page.query_selector_all(item_selector)
                if len(new_elements) == prev_count:
                    break
            
            return items

RPC 远程调用方案

RPC 架构设计

┌───────────────────────────────────────────────────────────┐
│                         RPC 架构                              │
│                                                               │
│  ┌─────────────┐      WebSocket      ┌────────────────┐    │
│  │  Python     │ ──────────────▶ │  真实浏览器      │    │
│  │  爬虫引擎    │    双向通信      │  (浏览器插件)     │    │
│  └─────────────┘ ◀────────────── └────────────────┘    │
│        │                                    │                │
│        │ 调用JS函数                        │ 返回结果        │
│        ▼                                    ▼                │
│  ┌───────────────────────────────────────────┐    │
│  │           目标网站页面                       │    │
│  │    window.encrypt() / window.sign()       │    │
│  │    (网站加密函数)                          │    │
│  └───────────────────────────────────────────┘    │
└───────────────────────────────────────────────────────────┘

应用场景:
1. 调用网站的加密函数生成签名
2. 获取动态生成的Token
3. 执行网站的反爬检测JS并返回结果
4. 调用需要浏览器环境的复杂计算

RPC 服务端(Python)

python
# app/services/rpc_server.py
import asyncio
import json
import uuid
from typing import Dict, Any, Callable, Optional
from fastapi import WebSocket
import redis.asyncio as redis

class RPCServer:
    """
    RPC服务器 - 接收浏览器插件连接,提供远程调用能力
    
    工作流程:
    1. 浏览器插件连接WebSocket,保持长连接
    2. 爬虫发起RPC调用请求
    3. 服务器转发请求到浏览器插件
    4. 插件执行JS并返回结果
    5. 服务器将结果返回给爬虫
    """
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.clients: Dict[str, WebSocket] = {}  # client_id -> websocket
        self.pending_calls: Dict[str, asyncio.Future] = {}  # call_id -> future
        self.call_timeout = 30  # 超时时间(秒)
    
    async def register_client(self, client_id: str, websocket: WebSocket):
        """注册浏览器客户端"""
        await websocket.accept()
        self.clients[client_id] = websocket
        
        # 存储到Redis,支持分布式
        await self.redis.sadd("rpc:clients", client_id)
        await self.redis.setex(f"rpc:client:{client_id}:online", 60, "1")
    
    async def unregister_client(self, client_id: str):
        """注销客户端"""
        self.clients.pop(client_id, None)
        await self.redis.srem("rpc:clients", client_id)
        await self.redis.delete(f"rpc:client:{client_id}:online")
    
    async def call(self, client_id: str, method: str, params: Dict = None) -> Any:
        """
        发起RPC调用
        
        Args:
            client_id: 目标客户端 ID
            method: 要调用的JS方法名
            params: 方法参数
        
        Returns:
            JS执行结果
        """
        if client_id not in self.clients:
            raise Exception(f"Client {client_id} not connected")
        
        websocket = self.clients[client_id]
        call_id = str(uuid.uuid4())
        
        # 创建Future等待结果
        future = asyncio.get_event_loop().create_future()
        self.pending_calls[call_id] = future
        
        # 发送调用请求
        request = {
            "type": "call",
            "call_id": call_id,
            "method": method,
            "params": params or {}
        }
        await websocket.send_json(request)
        
        # 等待结果
        try:
            result = await asyncio.wait_for(future, timeout=self.call_timeout)
            return result
        except asyncio.TimeoutError:
            self.pending_calls.pop(call_id, None)
            raise Exception(f"RPC call timeout: {method}")
    
    async def handle_response(self, call_id: str, result: Any, error: str = None):
        """处理调用结果"""
        future = self.pending_calls.pop(call_id, None)
        if future:
            if error:
                future.set_exception(Exception(error))
            else:
                future.set_result(result)
    
    async def get_available_client(self) -> Optional[str]:
        """获取一个可用客户端"""
        if self.clients:
            return next(iter(self.clients.keys()))
        return None
    
    async def heartbeat(self, client_id: str):
        """更新客户端心跳"""
        await self.redis.setex(f"rpc:client:{client_id}:online", 60, "1")

class RPCClient:
    """
    RPC客户端接口 - 供爬虫调用
    """
    
    def __init__(self, rpc_server: RPCServer):
        self.server = rpc_server
    
    async def execute_js(self, script: str, client_id: str = None) -> Any:
        """
        在浏览器中执行JS代码
        
        Args:
            script: 要执行的JS代码
            client_id: 指定客户端,不指定则自动选择
        
        Returns:
            JS执行结果
        """
        if not client_id:
            client_id = await self.server.get_available_client()
            if not client_id:
                raise Exception("No available RPC client")
        
        return await self.server.call(client_id, "eval", {"script": script})
    
    async def call_function(self, func_name: str, *args, client_id: str = None) -> Any:
        """
        调用浏览器中的函数
        
        Args:
            func_name: 函数名(如 window.encrypt)
            args: 函数参数
            client_id: 指定客户端
        
        Returns:
            函数执行结果
        """
        if not client_id:
            client_id = await self.server.get_available_client()
            if not client_id:
                raise Exception("No available RPC client")
        
        return await self.server.call(client_id, "callFunction", {
            "function": func_name,
            "args": list(args)
        })
    
    async def get_cookies(self, domain: str, client_id: str = None) -> Dict:
        """获取指定域名的Cookie"""
        return await self.server.call(
            client_id or await self.server.get_available_client(),
            "getCookies",
            {"domain": domain}
        )
    
    async def get_local_storage(self, key: str, client_id: str = None) -> Any:
        """获取localStorage"""
        return await self.server.call(
            client_id or await self.server.get_available_client(),
            "getLocalStorage",
            {"key": key}
        )

RPC WebSocket 端点

python
# app/api/rpc_websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.services.rpc_server import RPCServer
from app.core.dependencies import get_rpc_server

router = APIRouter()

@router.websocket("/ws/rpc/{client_id}")
async def rpc_websocket(websocket: WebSocket, client_id: str):
    """
    RPC WebSocket端点 - 浏览器插件连接此端点
    
    协议:
    客户端 -> 服务器:
    - {"type": "heartbeat"}
    - {"type": "response", "call_id": "xxx", "result": ..., "error": null}
    
    服务器 -> 客户端:
    - {"type": "call", "call_id": "xxx", "method": "eval", "params": {...}}
    """
    rpc_server: RPCServer = get_rpc_server()
    
    await rpc_server.register_client(client_id, websocket)
    
    try:
        while True:
            data = await websocket.receive_json()
            msg_type = data.get("type")
            
            if msg_type == "heartbeat":
                await rpc_server.heartbeat(client_id)
                await websocket.send_json({"type": "heartbeat_ack"})
            
            elif msg_type == "response":
                call_id = data.get("call_id")
                result = data.get("result")
                error = data.get("error")
                await rpc_server.handle_response(call_id, result, error)
    
    except WebSocketDisconnect:
        pass
    finally:
        await rpc_server.unregister_client(client_id)

浏览器插件客户端(JS)

jsx
// browser_extension/content.js
// 浏览器插件的内容脚本,注入到目标网页

class RPCClient {
    constructor(serverUrl, clientId) {
        this.serverUrl = serverUrl;
        this.clientId = clientId;
        this.ws = null;
        this.reconnectInterval = 5000;
    }
    
    connect() {
        this.ws = new WebSocket(`${this.serverUrl}/ws/rpc/${this.clientId}`);
        
        this.ws.onopen = () => {
            console.log('[RPC] Connected to server');
            this.startHeartbeat();
        };
        
        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            this.handleMessage(data);
        };
        
        this.ws.onclose = () => {
            console.log('[RPC] Disconnected, reconnecting...');
            this.stopHeartbeat();
            setTimeout(() => this.connect(), this.reconnectInterval);
        };
        
        this.ws.onerror = (error) => {
            console.error('[RPC] WebSocket error:', error);
        };
    }
    
    startHeartbeat() {
        this.heartbeatTimer = setInterval(() => {
            if (this.ws.readyState === WebSocket.OPEN) {
                this.ws.send(JSON.stringify({ type: 'heartbeat' }));
            }
        }, 30000);
    }
    
    stopHeartbeat() {
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
        }
    }
    
    handleMessage(data) {
        if (data.type === 'call') {
            this.handleCall(data);
        } else if (data.type === 'heartbeat_ack') {
            // 心跳确认
        }
    }
    
    async handleCall(data) {
        const { call_id, method, params } = data;
        let result = null;
        let error = null;
        
        try {
            switch (method) {
                case 'eval':
                    // 执行任意JS代码
                    result = eval(params.script);
                    break;
                    
                case 'callFunction':
                    // 调用指定函数
                    const func = this.getNestedProperty(window, params.function);
                    if (typeof func === 'function') {
                        result = await func.apply(null, params.args);
                    } else {
                        throw new Error(`Function ${params.function} not found`);
                    }
                    break;
                    
                case 'getCookies':
                    // 获取Cookie
                    result = this.parseCookies();
                    break;
                    
                case 'getLocalStorage':
                    // 获取localStorage
                    result = localStorage.getItem(params.key);
                    break;
                    
                case 'setLocalStorage':
                    localStorage.setItem(params.key, params.value);
                    result = true;
                    break;
                    
                default:
                    throw new Error(`Unknown method: ${method}`);
            }
        } catch (e) {
            error = e.message;
        }
        
        // 发送响应
        this.ws.send(JSON.stringify({
            type: 'response',
            call_id: call_id,
            result: result,
            error: error
        }));
    }
    
    getNestedProperty(obj, path) {
        return path.split('.').reduce((current, key) => current?.[key], obj);
    }
    
    parseCookies() {
        const cookies = {};
        document.cookie.split(';').forEach(cookie => {
            const [name, value] = cookie.trim().split('=');
            cookies[name] = value;
        });
        return cookies;
    }
}

// 启动RPC客户端
const CLIENT_ID = 'browser_' + Math.random().toString(36).substr(2, 9);
const SERVER_URL = 'ws://localhost:8000';  // 从配置读取

const rpcClient = new RPCClient(SERVER_URL, CLIENT_ID);
rpcClient.connect();

爬虫中使用RPC

python
# 使用示例

async def crawl_with_rpc():
    """RPC爬虫示例 - 调用网站的加密函数"""
    
    from app.services.rpc_server import RPCClient
    
    rpc = RPCClient(rpc_server)  # 注入rpc_server实例
    
    # 场景1: 调用网站的加密函数
    # 假设网站有一个 window.encryptData() 函数
    encrypted = await rpc.call_function(
        "window.encryptData",
        {"username": "test", "password": "123456"}
    )
    print(f"加密结果: {encrypted}")
    
    # 场景2: 获取动态Token
    token = await rpc.execute_js('''
        // 执行网站的token生成逻辑
        window.generateToken && window.generateToken()
    ''')
    print(f"Token: {token}")
    
    # 场景3: 获取Cookie
    cookies = await rpc.get_cookies("example.com")
    print(f"Cookies: {cookies}")
    
    # 场景4: 调用复杂的签名函数
    signature = await rpc.execute_js(f'''
        const params = 
            timestamp: Date.now(),
            nonce: Math.random().toString(36)
        ;
        // 调用网站的签名函数
        window.sign(params)
    ''')
    print(f"签名: {signature}")

class RPCEnabledSpider:
    """支持RPC的爬虫"""
    
    def __init__(self, rpc_client: RPCClient, session: aiohttp.ClientSession):
        self.rpc = rpc_client
        self.session = session
    
    async def request_with_signature(self, url: str, data: dict) -> dict:
        """发起带签名的请求"""
        
        # 1. 通过RPC获取签名
        timestamp = int(time.time() * 1000)
        signature = await self.rpc.execute_js(f'''
            window.sign({{
                data: {json.dumps(data)},
                timestamp: {timestamp}
            }})
        ''')
        
        # 2. 构建请求
        headers = {
            "X-Signature": signature,
            "X-Timestamp": str(timestamp),
        }
        
        # 3. 发送请求
        async with self.session.post(url, json=data, headers=headers) as resp:
            return await resp.json()

实时日志 WebSocket 方案

架构设计

┌──────────────┐    发布日志    ┌──────────────┐   订阅    ┌──────────────┐
│ Celery Worker │ ───────────▶ │    Redis     │ ◀─────── │   FastAPI    │
│   (爬虫执行)   │    Pub/Sub   │  (消息中间件)  │          │  WebSocket   │
└──────────────┘              └──────────────┘          └──────────────┘

                                                               │ 推送

                                                        ┌──────────────┐
                                                        │   前端 Vue   │
                                                        │  (日志面板)   │
                                                        └──────────────┘

1. 后端:日志收集与发布

python
# app/services/log_publisher.py
import json
import logging
from datetime import datetime
from typing import Any, Dict
import redis.asyncio as redis

class LogPublisher:
    """日志发布器 - 将爬虫日志发布到Redis"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.channel_prefix = "spider:logs:"
    
    async def publish(self, task_id: int, level: str, message: str, extra: Dict = None):
        """发布日志消息"""
        log_data = {
            "task_id": task_id,
            "level": level,
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "extra": extra or {}
        }
        
        channel = f"{self.channel_prefix}{task_id}"
        await self.redis.publish(channel, json.dumps(log_data, ensure_ascii=False))
        
        # 同时存储到列表(保留最近1000条)
        list_key = f"spider:logs:history:{task_id}"
        await self.redis.lpush(list_key, json.dumps(log_data, ensure_ascii=False))
        await self.redis.ltrim(list_key, 0, 999)

class SpiderLogHandler(logging.Handler):
    """爬虫日志Handler - 集成到Python logging"""
    
    def __init__(self, publisher: LogPublisher, task_id: int):
        super().__init__()
        self.publisher = publisher
        self.task_id = task_id
        self._loop = None
    
    def emit(self, record: logging.LogRecord):
        import asyncio
        log_entry = self.format(record)
        
        # 在异步上下文中发布
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                asyncio.create_task(
                    self.publisher.publish(
                        self.task_id,
                        record.levelname,
                        log_entry,
                        {"module": record.module, "line": record.lineno}
                    )
                )
        except RuntimeError:
            pass  # 没有事件循环时忽略

2. 后端:WebSocket 端点

python
# app/api/websocket.py
import json
import asyncio
from typing import Dict, Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
import redis.asyncio as redis
from app.core.config import settings

router = APIRouter()

class ConnectionManager:
    """WebSocket连接管理器"""
    
    def __init__(self):
        # task_id -> Set[WebSocket]
        self.active_connections: Dict[int, Set[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, task_id: int):
        await websocket.accept()
        if task_id not in self.active_connections:
            self.active_connections[task_id] = set()
        self.active_connections[task_id].add(websocket)
    
    def disconnect(self, websocket: WebSocket, task_id: int):
        if task_id in self.active_connections:
            self.active_connections[task_id].discard(websocket)
            if not self.active_connections[task_id]:
                del self.active_connections[task_id]
    
    async def broadcast(self, task_id: int, message: str):
        if task_id in self.active_connections:
            dead_connections = set()
            for connection in self.active_connections[task_id]:
                try:
                    await connection.send_text(message)
                except Exception:
                    dead_connections.add(connection)
            
            # 清理断开的连接
            for conn in dead_connections:
                self.active_connections[task_id].discard(conn)

manager = ConnectionManager()

@router.websocket("/ws/logs/{task_id}")
async def websocket_logs(websocket: WebSocket, task_id: int):
    """实时日志WebSocket端点"""
    await manager.connect(websocket, task_id)
    
    # 创建Redis连接用于订阅
    redis_client = redis.from_url(settings.REDIS_URL)
    pubsub = redis_client.pubsub()
    channel = f"spider:logs:{task_id}"
    
    try:
        # 先发送历史日志(最近50条)
        history_key = f"spider:logs:history:{task_id}"
        history = await redis_client.lrange(history_key, 0, 49)
        for log in reversed(history):
            await websocket.send_text(log.decode())
        
        # 订阅实时日志
        await pubsub.subscribe(channel)
        
        # 创建两个协程:接收Redis消息 和 接收WebSocket消息
        async def listen_redis():
            async for message in pubsub.listen():
                if message["type"] == "message":
                    await websocket.send_text(message["data"].decode())
        
        async def listen_websocket():
            while True:
                # 接收客户端消息(如心跳)
                data = await websocket.receive_text()
                if data == "ping":
                    await websocket.send_text(json.dumps({"type": "pong"}))
        
        # 并发运行
        await asyncio.gather(listen_redis(), listen_websocket())
        
    except WebSocketDisconnect:
        pass
    finally:
        manager.disconnect(websocket, task_id)
        await pubsub.unsubscribe(channel)
        await redis_client.close()

@router.websocket("/ws/stats")
async def websocket_stats(websocket: WebSocket):
    """系统统计实时推送"""
    await websocket.accept()
    
    try:
        while True:
            # 每5秒推送一次系统统计
            stats = await get_system_stats()
            await websocket.send_text(json.dumps(stats))
            await asyncio.sleep(5)
    except WebSocketDisconnect:
        pass

async def get_system_stats() -> dict:
    """获取系统统计信息"""
    # TODO: 实现实际的统计逻辑
    return {
        "running_tasks": 3,
        "completed_today": 15,
        "total_items_scraped": 12580,
        "cpu_usage": 45.2,
        "memory_usage": 62.8
    }

3. 前端:日志面板组件

<!-- components/LogPanel.vue -->
<template>
  <div class="log-panel">
    <el-card>
      <template #header>
        <div class="panel-header">
          <span>实时日志 - 任务 # taskId </span>
          <div class="controls">
            <el-switch v-model="autoScroll" active-text="自动滚动" />
            <el-button size="small" @click="clearLogs">清空</el-button>
            <el-tag :type="connectionStatus === 'connected' ? 'success' : 'danger'">
               connectionStatus 
            </el-tag>
          </div>
        </div>
      </template>
      
      <div ref="logContainer" class="log-container">
        <div
          v-for="(log, index) in logs"
          :key="index"
          :class="['log-entry', `level-${log.level.toLowerCase()}`]"
        >
          <span class="timestamp"> formatTime(log.timestamp) </span>
          <el-tag :type="levelType(log.level)" size="small"> log.level </el-tag>
          <span class="message"> log.message </span>
        </div>
        
        <div v-if="logs.length === 0" class="empty-state">
          等待日志...
        </div>
      </div>
    </el-card>
  </div>
</template>

<script setup>
import { ref, watch, onMounted, onUnmounted, nextTick } from 'vue'

const props = defineProps({
  taskId: {
    type: Number,
    required: true
  }
})

const logs = ref([])
const autoScroll = ref(true)
const connectionStatus = ref('disconnected')
const logContainer = ref(null)
let ws = null
let reconnectTimer = null
let heartbeatTimer = null

const connect = () => {
  const wsUrl = `${import.meta.env.VITE_WS_URL}/ws/logs/${props.taskId}`
  ws = new WebSocket(wsUrl)
  
  ws.onopen = () => {
    connectionStatus.value = 'connected'
    startHeartbeat()
  }
  
  ws.onmessage = (event) => {
    try {
      const data = JSON.parse(event.data)
      if (data.type === 'pong') return  // 心跳响应
      
      logs.value.push(data)
      
      // 限制日志数量(保留最近500条)
      if (logs.value.length > 500) {
        logs.value = logs.value.slice(-500)
      }
      
      // 自动滚动
      if (autoScroll.value) {
        nextTick(() => scrollToBottom())
      }
    } catch (e) {
      console.error('Parse log error:', e)
    }
  }
  
  ws.onclose = () => {
    connectionStatus.value = 'disconnected'
    stopHeartbeat()
    scheduleReconnect()
  }
  
  ws.onerror = (error) => {
    console.error('WebSocket error:', error)
    connectionStatus.value = 'error'
  }
}

const startHeartbeat = () => {
  heartbeatTimer = setInterval(() => {
    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send('ping')
    }
  }, 30000)  // 30秒心跳
}

const stopHeartbeat = () => {
  if (heartbeatTimer) {
    clearInterval(heartbeatTimer)
    heartbeatTimer = null
  }
}

const scheduleReconnect = () => {
  reconnectTimer = setTimeout(() => {
    console.log('Attempting to reconnect...')
    connect()
  }, 3000)  // 3秒后重连
}

const scrollToBottom = () => {
  if (logContainer.value) {
    logContainer.value.scrollTop = logContainer.value.scrollHeight
  }
}

const clearLogs = () => {
  logs.value = []
}

const formatTime = (timestamp) => {
  return new Date(timestamp).toLocaleTimeString('zh-CN', {
    hour: '2-digit',
    minute: '2-digit',
    second: '2-digit',
    fractionalSecondDigits: 3
  })
}

const levelType = (level) => {
  const types = {
    DEBUG: 'info',
    INFO: 'success',
    WARNING: 'warning',
    ERROR: 'danger',
    CRITICAL: 'danger'
  }
  return types[level] || 'info'
}

// 监听taskId变化,重新连接
watch(() => props.taskId, (newId) => {
  if (ws) ws.close()
  logs.value = []
  connect()
})

onMounted(() => {
  connect()
})

onUnmounted(() => {
  if (ws) ws.close()
  if (reconnectTimer) clearTimeout(reconnectTimer)
  stopHeartbeat()
})
</script>

<style scoped>
.log-panel {
  height: 100%;
}

.panel-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
}

.controls {
  display: flex;
  gap: 12px;
  align-items: center;
}

.log-container {
  height: 400px;
  overflow-y: auto;
  background: #1e1e1e;
  border-radius: 4px;
  padding: 12px;
  font-family: 'JetBrains Mono', 'Fira Code', monospace;
  font-size: 13px;
}

.log-entry {
  padding: 4px 0;
  border-bottom: 1px solid #333;
  display: flex;
  align-items: center;
  gap: 8px;
}

.timestamp {
  color: #888;
  font-size: 12px;
  min-width: 100px;
}

.message {
  color: #d4d4d4;
  flex: 1;
}

.level-debug .message { color: #9cdcfe; }
.level-info .message { color: #4ec9b0; }
.level-warning .message { color: #dcdcaa; }
.level-error .message { color: #f14c4c; }
.level-critical .message { color: #ff0000; font-weight: bold; }

.empty-state {
  color: #666;
  text-align: center;
  padding: 40px;
}
</style>

4. 爬虫任务中使用日志

python
# app/services/spider_engine.py
import logging
import asyncio
from app.services.log_publisher import LogPublisher, SpiderLogHandler
from app.services.anti_spider import AntiSpiderManager

class SpiderEngine:
    """爬虫执行引擎"""
    
    def __init__(self, log_publisher: LogPublisher, anti_spider: AntiSpiderManager):
        self.log_publisher = log_publisher
        self.anti_spider = anti_spider
    
    async def run_task(self, task_id: int, config: dict):
        """执行爬虫任务"""
        
        # 配置日志
        logger = logging.getLogger(f"spider.task.{task_id}")
        logger.setLevel(logging.DEBUG)
        handler = SpiderLogHandler(self.log_publisher, task_id)
        handler.setFormatter(logging.Formatter('%(message)s'))
        logger.addHandler(handler)
        
        try:
            logger.info(f"🚀 开始执行任务 #{task_id}")
            logger.info(f"目标URL: {config['target_url']}")
            
            urls = config.get('urls', [config['target_url']])
            total = len(urls)
            
            for i, url in enumerate(urls, 1):
                logger.info(f"📥 正在爬取 [{i}/{total}]: {url}")
                
                try:
                    response = await self.anti_spider.make_request(
                        url,
                        use_proxy=config.get('use_proxy', True),
                        use_cookies=config.get('use_cookies', False)
                    )
                    
                    if response and response.status == 200:
                        logger.info(f"✅ 成功: {url}")
                        # 解析数据...
                        data = await self.parse_response(response, config)
                        logger.debug(f"提取到 {len(data)} 条数据")
                    else:
                        logger.warning(f"⚠️ 响应异常: {url}, 状态码: {response.status if response else 'None'}")
                        
                except Exception as e:
                    logger.error(f"❌ 爬取失败: {url}, 错误: {str(e)}")
            
            logger.info(f"🎉 任务 #{task_id} 执行完成")
            
        except Exception as e:
            logger.critical(f"💥 任务崩溃: {str(e)}")
            raise
        finally:
            logger.removeHandler(handler)
    
    async def parse_response(self, response, config: dict) -> list:
        """解析响应数据"""
        # TODO: 根据config中的选择器提取数据
        return []

扩展方向

  1. 分布式爬取:支持多节点部署,Scrapy + Scrapy-Redis
  2. 智能反爬:集成验证码识别服务、指纹浏览器
  3. 可视化配置:无代码配置爬虫规则
  4. 数据清洗管道:内置ETL流程
  5. API对接:提供OpenAPI供外部系统调用
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.7.1