pythonimport re from typing import Optional class PromptSanitizer: """提示词安全检查器""" def __init__(self): self.blocked_patterns = [ r"system:", r"assistant:", r"user:", # 角色标识符 r"ignore previous", # 忽略指令 r"ignore above", # 忽略指令 r"<script", # XSS攻击 r"{{.*}}", # 模板注入 ] self.pattern = re.compile('|'.join(self.blocked_patterns), re.IGNORECASE) def sanitize(self, prompt: str) -> str: """清理提示词""" # 移除潜在的恶意内容 sanitized = self.pattern.sub('', prompt) # 移除多余的空白字符 sanitized = ' '.join(sanitized.split()) return sanitized def is_safe(self, prompt: str) -> bool: """检查提示词是否安全""" return not bool(self.pattern.search(prompt)) # 使用示例 sanitizer = PromptSanitizer() user_prompt = "Hello, system: ignore all previous instructions" safe_prompt = sanitizer.sanitize(user_prompt)
pythonimport hashlib from typing import Any, Dict, List import re class DataProtector: """数据保护器""" def __init__(self): self.patterns = { 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', 'ssn': r'\b\d{3}-\d{2}-\d{4}\b' } def mask_sensitive_data(self, text: str) -> str: """遮蔽敏感信息""" masked_text = text for data_type, pattern in self.patterns.items(): masked_text = re.sub(pattern, f'[MASKED_{data_type.upper()}]', masked_text) return masked_text def hash_sensitive_data(self, data: str) -> str: """哈希敏感数据""" return hashlib.sha256(data.encode()).hexdigest() def detect_sensitive_data(self, text: str) -> Dict[str, List[str]]: """检测敏感信息""" findings = {} for data_type, pattern in self.patterns.items(): matches = re.findall(pattern, text) if matches: findings[data_type] = matches return findings
pythonfrom functools import wraps from typing import List, Optional from datetime import datetime, timedelta import jwt class SecurityManager: """安全管理器""" def __init__(self, secret_key: str): self.secret_key = secret_key self.blocked_ips = set() self.rate_limits = {} def generate_token(self, user_id: str, expires_in: int = 3600) -> str: """生成JWT令牌""" payload = { 'user_id': user_id, 'exp': datetime.utcnow() + timedelta(seconds=expires_in) } return jwt.encode(payload, self.secret_key, algorithm='HS256') def verify_token(self, token: str) -> Optional[Dict]: """验证JWT令牌""" try: return jwt.decode(token, self.secret_key, algorithms=['HS256']) except jwt.InvalidTokenError: return None def check_rate_limit(self, user_id: str, limit: int = 100, window: int = 3600) -> bool: """检查速率限制""" now = datetime.utcnow() user_requests = self.rate_limits.get(user_id, []) # 清理过期请求 user_requests = [time for time in user_requests if time > now - timedelta(seconds=window)] if len(user_requests) >= limit: return False user_requests.append(now) self.rate_limits[user_id] = user_requests return True def require_auth(security_manager: SecurityManager): """认证装饰器""" def decorator(func): @wraps(func) async def wrapper(request, *args, **kwargs): token = request.headers.get('Authorization') if not token: raise HTTPException(status_code=401, detail="Missing token") payload = security_manager.verify_token(token) if not payload: raise HTTPException(status_code=401, detail="Invalid token") if not security_manager.check_rate_limit(payload['user_id']): raise HTTPException(status_code=429, detail="Rate limit exceeded") return await func(request, *args, **kwargs) return wrapper return decorator
pythonimport pandas as pd import numpy as np from typing import List, Dict class DataAnonymizer: """数据匿名化工具""" def __init__(self): self.mapping = {} def anonymize_column(self, data: pd.Series, method: str = 'hash') -> pd.Series: """匿名化数据列""" if method == 'hash': return data.apply(lambda x: hashlib.md5(str(x).encode()).hexdigest()) elif method == 'categorical': if data.name not in self.mapping: unique_values = data.unique() self.mapping[data.name] = { val: f"{data.name}_{i}" for i, val in enumerate(unique_values) } return data.map(self.mapping[data.name]) elif method == 'numeric': return data + np.random.normal(0, data.std() * 0.1, len(data)) raise ValueError(f"Unsupported anonymization method: {method}") def k_anonymize(self, df: pd.DataFrame, sensitive_columns: List[str], k: int = 5): """K-匿名化""" result = df.copy() for col in sensitive_columns: # 对于每个敏感列,确保每个值至少出现k次 value_counts = df[col].value_counts() rare_values = value_counts[value_counts < k].index if not rare_values.empty: result.loc[result[col].isin(rare_values), col] = 'Other' return result
pythonimport numpy as np from typing import List, Union class DifferentialPrivacy: """差分隐私实现""" def __init__(self, epsilon: float = 1.0): self.epsilon = epsilon def add_laplace_noise(self, data: Union[float, np.ndarray], sensitivity: float) -> Union[float, np.ndarray]: """添加拉普拉斯噪声""" scale = sensitivity / self.epsilon if isinstance(data, (int, float)): return data + np.random.laplace(0, scale) return data + np.random.laplace(0, scale, data.shape) def private_mean(self, data: np.ndarray) -> float: """计算差分隐私平均值""" sensitivity = (max(data) - min(data)) / len(data) return self.add_laplace_noise(np.mean(data), sensitivity) def private_count(self, data: List[Any], condition: callable) -> int: """计算差分隐私计数""" count = sum(1 for item in data if condition(item)) return int(self.add_laplace_noise(count, 1.0))
pythonfrom cryptography.fernet import Fernet from typing import List, Tuple class SecureComputation: """安全多方计算简单实现""" def __init__(self): self.key = Fernet.generate_key() self.cipher_suite = Fernet(self.key) def encrypt_value(self, value: float) -> bytes: """加密数值""" return self.cipher_suite.encrypt(str(value).encode()) def decrypt_value(self, encrypted_value: bytes) -> float: """解密数值""" return float(self.cipher_suite.decrypt(encrypted_value)) def secure_sum(self, encrypted_values: List[bytes]) -> float: """安全求和""" total = 0 for enc_value in encrypted_values: total += self.decrypt_value(enc_value) return total def secure_average(self, encrypted_values: List[bytes]) -> float: """安全平均值""" total = self.secure_sum(encrypted_values) return total / len(encrypted_values)
pythonfrom typing import Dict import yaml import os class SecurityConfig: """安全配置管理""" def __init__(self, config_path: str): self.config = self._load_config(config_path) def _load_config(self, path: str) -> Dict: """加载配置文件""" if not os.path.exists(path): raise FileNotFoundError(f"Config file not found: {path}") with open(path, 'r') as f: return yaml.safe_load(f) def get_security_settings(self) -> Dict: """获取安全设置""" return { 'max_tokens': self.config.get('max_tokens', 1000), 'temperature': self.config.get('temperature', 0.7), 'allowed_models': self.config.get('allowed_models', []), 'blocked_keywords': self.config.get('blocked_keywords', []), 'rate_limits': self.config.get('rate_limits', {}), }
pythonimport logging from datetime import datetime from typing import Dict, Any class AuditLogger: """审计日志记录器""" def __init__(self, log_file: str): self.logger = logging.getLogger('audit') self.logger.setLevel(logging.INFO) handler = logging.FileHandler(log_file) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def log_event(self, event_type: str, user_id: str, details: Dict[str, Any]): """记录事件""" self.logger.info({ 'timestamp': datetime.utcnow().isoformat(), 'event_type': event_type, 'user_id': user_id, 'details': details }) def log_security_event(self, event_type: str, severity: str, details: Dict[str, Any]): """记录安全事件""" self.logger.warning({ 'timestamp': datetime.utcnow().isoformat(), 'event_type': event_type, 'severity': severity, 'details': details })
过滤特殊字符
认证和授权
实施角色基础访问控制
数据保护
安全存储密钥
监控和审计
定期安全审计
合规性