AI应用开发指南

AI应用架构

1. 基础架构

python
from dataclasses import dataclass from typing import List, Optional import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class AIConfig: """AI应用配置""" model_name: str api_key: str max_tokens: int temperature: float timeout: int = 30 class AIApplication: """AI应用基础类""" def __init__(self, config: AIConfig): self.config = config self.initialize() def initialize(self): """初始化应用""" try: # 初始化AI模型 self.setup_model() # 初始化数据库连接 self.setup_database() # 初始化缓存 self.setup_cache() except Exception as e: logger.error(f"初始化失败: {str(e)}") raise def setup_model(self): """设置AI模型""" pass def setup_database(self): """设置数据库""" pass def setup_cache(self): """设置缓存""" pass

2. 中间件

python
from functools import wraps import time def rate_limit(max_requests: int, time_window: int): """速率限制装饰器""" requests = [] def decorator(func): @wraps(func) def wrapper(*args, **kwargs): now = time.time() # 清理过期请求 while requests and now - requests[0] > time_window: requests.pop(0) if len(requests) >= max_requests: raise Exception("Rate limit exceeded") requests.append(now) return func(*args, **kwargs) return wrapper return decorator def cache_result(ttl: int = 3600): """结果缓存装饰器""" cache = {} def decorator(func): @wraps(func) def wrapper(*args, **kwargs): key = str(args) + str(kwargs) if key in cache: result, timestamp = cache[key] if time.time() - timestamp < ttl: return result result = func(*args, **kwargs) cache[key] = (result, time.time()) return result return wrapper return decorator

应用示例

1. 智能客服

python
from typing import Dict, List import openai class CustomerService(AIApplication): def __init__(self, config: AIConfig): super().__init__(config) self.conversation_history: Dict[str, List[Dict]] = {} @rate_limit(max_requests=60, time_window=60) async def handle_message(self, user_id: str, message: str) -> str: """处理用户消息""" # 获取对话历史 history = self.conversation_history.get(user_id, []) try: # 生成回复 response = await openai.ChatCompletion.acreate( model=self.config.model_name, messages=[ {"role": "system", "content": "你是一个专业的客服代表"}, *history, {"role": "user", "content": message} ], max_tokens=self.config.max_tokens, temperature=self.config.temperature ) # 更新对话历史 history.extend([ {"role": "user", "content": message}, {"role": "assistant", "content": response.choices[0].message.content} ]) self.conversation_history[user_id] = history[-10:] # 保留最近10条记录 return response.choices[0].message.content except Exception as e: logger.error(f"处理消息失败: {str(e)}") return "抱歉,我现在无法回答您的问题,请稍后再试。"

2. 文档分析器

python
import PyPDF2 from typing import List, Dict import spacy class DocumentAnalyzer(AIApplication): def __init__(self, config: AIConfig): super().__init__(config) self.nlp = spacy.load("zh_core_web_sm") def process_pdf(self, file_path: str) -> Dict: """处理PDF文档""" try: # 读取PDF with open(file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) text = "" for page in reader.pages: text += page.extract_text() # 分析文本 return self.analyze_text(text) except Exception as e: logger.error(f"处理PDF失败: {str(e)}") raise @cache_result(ttl=3600) def analyze_text(self, text: str) -> Dict: """分析文本内容""" doc = self.nlp(text) # 提取关键信息 entities = [(ent.text, ent.label_) for ent in doc.ents] sentences = [sent.text.strip() for sent in doc.sents] # 生成摘要 summary = self.generate_summary(text) return { "entities": entities, "sentences": sentences, "summary": summary, "word_count": len(doc) } async def generate_summary(self, text: str) -> str: """生成文本摘要""" try: response = await openai.ChatCompletion.acreate( model=self.config.model_name, messages=[ {"role": "system", "content": "请生成以下文本的简要摘要"}, {"role": "user", "content": text} ], max_tokens=150, temperature=0.3 ) return response.choices[0].message.content except Exception as e: logger.error(f"生成摘要失败: {str(e)}") return "无法生成摘要"

3. 代码助手

python
import ast from typing import List, Dict class CodeAssistant(AIApplication): def __init__(self, config: AIConfig): super().__init__(config) self.language_specs = self.load_language_specs() def load_language_specs(self) -> Dict: """加载编程语言规范""" return { "python": { "file_ext": ".py", "comment": "#", "docstring": '"""' }, "javascript": { "file_ext": ".js", "comment": "//", "docstring": "/**" } } async def review_code(self, code: str, language: str) -> Dict: """代码审查""" try: response = await openai.ChatCompletion.acreate( model=self.config.model_name, messages=[ {"role": "system", "content": f"你是一个{language}专家,请审查以下代码"}, {"role": "user", "content": code} ], max_tokens=500, temperature=0.1 ) return { "review": response.choices[0].message.content, "metrics": self.analyze_code_metrics(code, language) } except Exception as e: logger.error(f"代码审查失败: {str(e)}") raise def analyze_code_metrics(self, code: str, language: str) -> Dict: """分析代码指标""" if language == "python": return self._analyze_python_code(code) # 添加其他语言的支持 return {} def _analyze_python_code(self, code: str) -> Dict: """分析Python代码""" try: tree = ast.parse(code) # 收集指标 metrics = { "num_functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]), "num_classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]), "num_imports": len([node for node in ast.walk(tree) if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom)]), "num_lines": len(code.splitlines()) } return metrics except Exception as e: logger.error(f"Python代码分析失败: {str(e)}") return {}

最佳实践

1. 错误处理

python
class AIError(Exception): """AI应用错误基类""" pass class ModelError(AIError): """模型错误""" pass class RateLimitError(AIError): """速率限制错误""" pass def handle_ai_error(func): """错误处理装饰器""" @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except openai.error.RateLimitError: raise RateLimitError("API调用次数超限") except openai.error.APIError as e: raise ModelError(f"API错误: {str(e)}") except Exception as e: logger.error(f"未知错误: {str(e)}") raise AIError(f"处理失败: {str(e)}") return wrapper

2. 性能优化

python
from concurrent.futures import ThreadPoolExecutor from functools import partial class PerformanceOptimizer: """性能优化器""" def __init__(self, max_workers: int = 4): self.executor = ThreadPoolExecutor(max_workers=max_workers) async def batch_process(self, func, items: List, chunk_size: int = 10): """批量处理""" results = [] for i in range(0, len(items), chunk_size): chunk = items[i:i + chunk_size] chunk_results = await self._process_chunk(func, chunk) results.extend(chunk_results) return results async def _process_chunk(self, func, chunk: List): """处理数据块""" return await asyncio.gather(*[func(item) for item in chunk])

3. 监控和日志

python
import time from dataclasses import dataclass from typing import Dict, Any @dataclass class Metrics: """性能指标""" latency: float success: bool error_type: Optional[str] = None metadata: Dict[str, Any] = None class MetricsCollector: """指标收集器""" def __init__(self): self.metrics = [] def record(self, metric: Metrics): """记录指标""" self.metrics.append(metric) def get_statistics(self) -> Dict: """获取统计信息""" if not self.metrics: return {} successful_requests = [m for m in self.metrics if m.success] return { "total_requests": len(self.metrics), "success_rate": len(successful_requests) / len(self.metrics), "avg_latency": sum(m.latency for m in successful_requests) / len(successful_requests), "error_types": {m.error_type: len([x for x in self.metrics if x.error_type == m.error_type]) for m in self.metrics if m.error_type} } def monitor(collector: MetricsCollector): """监控装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) collector.record(Metrics( latency=time.time() - start_time, success=True, metadata={"function": func.__name__} )) return result except Exception as e: collector.record(Metrics( latency=time.time() - start_time, success=False, error_type=type(e).__name__, metadata={"function": func.__name__} )) raise return wrapper return decorator

部署和扩展

1. 容器化

dockerfile
# Dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

2. API设计

python
from fastapi import FastAPI, HTTPException from pydantic import BaseModel app = FastAPI() class Message(BaseModel): user_id: str content: str @app.post("/chat") async def chat_endpoint(message: Message): try: response = await customer_service.handle_message( message.user_id, message.content ) return {"response": response} except RateLimitError: raise HTTPException(status_code=429, detail="Too many requests") except AIError as e: raise HTTPException(status_code=500, detail=str(e))

安全考虑

  1. API密钥管理
  2. 数据加密
  3. 输入验证
  4. 访问控制
  5. 审计日志

测试策略

  1. 单元测试
  2. 集成测试
  3. 性能测试
  4. 安全测试
  5. 用户验收测试