AI应用开发指南
AI应用架构
1. 基础架构
python
from dataclasses import dataclass
from typing import List, Optional
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AIConfig:
"""AI应用配置"""
model_name: str
api_key: str
max_tokens: int
temperature: float
timeout: int = 30
class AIApplication:
"""AI应用基础类"""
def __init__(self, config: AIConfig):
self.config = config
self.initialize()
def initialize(self):
"""初始化应用"""
try:
# 初始化AI模型
self.setup_model()
# 初始化数据库连接
self.setup_database()
# 初始化缓存
self.setup_cache()
except Exception as e:
logger.error(f"初始化失败: {str(e)}")
raise
def setup_model(self):
"""设置AI模型"""
pass
def setup_database(self):
"""设置数据库"""
pass
def setup_cache(self):
"""设置缓存"""
pass
2. 中间件
python
from functools import wraps
import time
def rate_limit(max_requests: int, time_window: int):
"""速率限制装饰器"""
requests = []
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# 清理过期请求
while requests and now - requests[0] > time_window:
requests.pop(0)
if len(requests) >= max_requests:
raise Exception("Rate limit exceeded")
requests.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
def cache_result(ttl: int = 3600):
"""结果缓存装饰器"""
cache = {}
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < ttl:
return result
result = func(*args, **kwargs)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
应用示例
1. 智能客服
python
from typing import Dict, List
import openai
class CustomerService(AIApplication):
def __init__(self, config: AIConfig):
super().__init__(config)
self.conversation_history: Dict[str, List[Dict]] = {}
@rate_limit(max_requests=60, time_window=60)
async def handle_message(self, user_id: str, message: str) -> str:
"""处理用户消息"""
# 获取对话历史
history = self.conversation_history.get(user_id, [])
try:
# 生成回复
response = await openai.ChatCompletion.acreate(
model=self.config.model_name,
messages=[
{"role": "system", "content": "你是一个专业的客服代表"},
*history,
{"role": "user", "content": message}
],
max_tokens=self.config.max_tokens,
temperature=self.config.temperature
)
# 更新对话历史
history.extend([
{"role": "user", "content": message},
{"role": "assistant", "content": response.choices[0].message.content}
])
self.conversation_history[user_id] = history[-10:] # 保留最近10条记录
return response.choices[0].message.content
except Exception as e:
logger.error(f"处理消息失败: {str(e)}")
return "抱歉,我现在无法回答您的问题,请稍后再试。"
2. 文档分析器
python
import PyPDF2
from typing import List, Dict
import spacy
class DocumentAnalyzer(AIApplication):
def __init__(self, config: AIConfig):
super().__init__(config)
self.nlp = spacy.load("zh_core_web_sm")
def process_pdf(self, file_path: str) -> Dict:
"""处理PDF文档"""
try:
# 读取PDF
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
# 分析文本
return self.analyze_text(text)
except Exception as e:
logger.error(f"处理PDF失败: {str(e)}")
raise
@cache_result(ttl=3600)
def analyze_text(self, text: str) -> Dict:
"""分析文本内容"""
doc = self.nlp(text)
# 提取关键信息
entities = [(ent.text, ent.label_) for ent in doc.ents]
sentences = [sent.text.strip() for sent in doc.sents]
# 生成摘要
summary = self.generate_summary(text)
return {
"entities": entities,
"sentences": sentences,
"summary": summary,
"word_count": len(doc)
}
async def generate_summary(self, text: str) -> str:
"""生成文本摘要"""
try:
response = await openai.ChatCompletion.acreate(
model=self.config.model_name,
messages=[
{"role": "system", "content": "请生成以下文本的简要摘要"},
{"role": "user", "content": text}
],
max_tokens=150,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"生成摘要失败: {str(e)}")
return "无法生成摘要"
3. 代码助手
python
import ast
from typing import List, Dict
class CodeAssistant(AIApplication):
def __init__(self, config: AIConfig):
super().__init__(config)
self.language_specs = self.load_language_specs()
def load_language_specs(self) -> Dict:
"""加载编程语言规范"""
return {
"python": {
"file_ext": ".py",
"comment": "#",
"docstring": '"""'
},
"javascript": {
"file_ext": ".js",
"comment": "//",
"docstring": "/**"
}
}
async def review_code(self, code: str, language: str) -> Dict:
"""代码审查"""
try:
response = await openai.ChatCompletion.acreate(
model=self.config.model_name,
messages=[
{"role": "system", "content": f"你是一个{language}专家,请审查以下代码"},
{"role": "user", "content": code}
],
max_tokens=500,
temperature=0.1
)
return {
"review": response.choices[0].message.content,
"metrics": self.analyze_code_metrics(code, language)
}
except Exception as e:
logger.error(f"代码审查失败: {str(e)}")
raise
def analyze_code_metrics(self, code: str, language: str) -> Dict:
"""分析代码指标"""
if language == "python":
return self._analyze_python_code(code)
# 添加其他语言的支持
return {}
def _analyze_python_code(self, code: str) -> Dict:
"""分析Python代码"""
try:
tree = ast.parse(code)
# 收集指标
metrics = {
"num_functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]),
"num_classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]),
"num_imports": len([node for node in ast.walk(tree) if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom)]),
"num_lines": len(code.splitlines())
}
return metrics
except Exception as e:
logger.error(f"Python代码分析失败: {str(e)}")
return {}
最佳实践
1. 错误处理
python
class AIError(Exception):
"""AI应用错误基类"""
pass
class ModelError(AIError):
"""模型错误"""
pass
class RateLimitError(AIError):
"""速率限制错误"""
pass
def handle_ai_error(func):
"""错误处理装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except openai.error.RateLimitError:
raise RateLimitError("API调用次数超限")
except openai.error.APIError as e:
raise ModelError(f"API错误: {str(e)}")
except Exception as e:
logger.error(f"未知错误: {str(e)}")
raise AIError(f"处理失败: {str(e)}")
return wrapper
2. 性能优化
python
from concurrent.futures import ThreadPoolExecutor
from functools import partial
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def batch_process(self, func, items: List, chunk_size: int = 10):
"""批量处理"""
results = []
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
chunk_results = await self._process_chunk(func, chunk)
results.extend(chunk_results)
return results
async def _process_chunk(self, func, chunk: List):
"""处理数据块"""
return await asyncio.gather(*[func(item) for item in chunk])
3. 监控和日志
python
import time
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class Metrics:
"""性能指标"""
latency: float
success: bool
error_type: Optional[str] = None
metadata: Dict[str, Any] = None
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.metrics = []
def record(self, metric: Metrics):
"""记录指标"""
self.metrics.append(metric)
def get_statistics(self) -> Dict:
"""获取统计信息"""
if not self.metrics:
return {}
successful_requests = [m for m in self.metrics if m.success]
return {
"total_requests": len(self.metrics),
"success_rate": len(successful_requests) / len(self.metrics),
"avg_latency": sum(m.latency for m in successful_requests) / len(successful_requests),
"error_types": {m.error_type: len([x for x in self.metrics if x.error_type == m.error_type])
for m in self.metrics if m.error_type}
}
def monitor(collector: MetricsCollector):
"""监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
collector.record(Metrics(
latency=time.time() - start_time,
success=True,
metadata={"function": func.__name__}
))
return result
except Exception as e:
collector.record(Metrics(
latency=time.time() - start_time,
success=False,
error_type=type(e).__name__,
metadata={"function": func.__name__}
))
raise
return wrapper
return decorator
部署和扩展
1. 容器化
dockerfile
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
2. API设计
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class Message(BaseModel):
user_id: str
content: str
@app.post("/chat")
async def chat_endpoint(message: Message):
try:
response = await customer_service.handle_message(
message.user_id,
message.content
)
return {"response": response}
except RateLimitError:
raise HTTPException(status_code=429, detail="Too many requests")
except AIError as e:
raise HTTPException(status_code=500, detail=str(e))
安全考虑
- API密钥管理
- 数据加密
- 输入验证
- 访问控制
- 审计日志
测试策略
- 单元测试
- 集成测试
- 性能测试
- 安全测试
- 用户验收测试