"""AI 总结服务 — 基于 OpenCodeZen / OpenAI 兼容 API""" import json import time import requests from providers.base import BaseProvider def build_prompts(query, results): search_text = '' for i, r in enumerate(results, 1): search_text += f"[{i}] 标题: {r.get('title', '')}\n" search_text += f" 来源: {r.get('url', '')}\n" search_text += f" 内容: {r.get('content', '')}\n\n" system_prompt = ( '你是一个专业的搜索结果分析助手。' '请根据用户搜索词和搜索结果,生成一份结构化的中文总结报告。' '要求:\n' '1. 先概括本次搜索的核心主题\n' '2. 列出关键发现和重要信息点(分点说明)\n' '3. 指出不同信息来源之间的共识与分歧(如有)\n' '4. 给出综合结论\n' '使用简洁的 Markdown 格式,不要过于冗长。' ) user_prompt = f'## 搜索词\n{query}\n\n## 搜索结果\n{search_text}\n\n请根据以上搜索结果生成总结报告。' return system_prompt, user_prompt class AIProvider(BaseProvider): name = 'ai' display_name = 'AI 总结' needs_api_key = True enabled = False priority = 50 def __init__(self, config: dict): super().__init__(config) oc = config.get('opencodezen', {}) self.api_key = oc.get('api_key') self.base_url = oc.get('base_url', 'https://opencode.ai/zen/go/v1').rstrip('/') self.model = oc.get('model', 'deepseek-v4-flash') def is_available(self) -> bool: return bool(self.api_key) def search(self, query: str, max_results: int = 5) -> list: return [] def summarize(self, query: str, results: list) -> dict: if not self.is_available(): return {'error': 'AI 总结未配置', 'summary': ''} system_prompt, user_prompt = build_prompts(query, results) url = f'{self.base_url}/chat/completions' headers = { 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json', } payload = { 'model': self.model, 'messages': [ {'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': user_prompt}, ], 'max_tokens': 4096, 'temperature': 0.3, } start = time.time() try: resp = requests.post(url, json=payload, headers=headers, timeout=60) elapsed = round(time.time() - start, 2) if resp.status_code != 200: return {'error': f'AI API 返回 {resp.status_code}', 'summary': '', 'elapsed': elapsed} data = resp.json() return { 'summary': data['choices'][0]['message']['content'], 'model': self.model, 'elapsed': elapsed, 'usage': data.get('usage', {}), } except Exception as e: return {'error': str(e), 'summary': '', 'elapsed': round(time.time() - start, 2)} def summarize_stream(self, query: str, results: list): if not self.is_available(): yield f"event: error\ndata: {json.dumps({'error': 'AI 总结未配置'})}\n\n" return system_prompt, user_prompt = build_prompts(query, results) url = f'{self.base_url}/chat/completions' headers = { 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json', } payload = { 'model': self.model, 'messages': [ {'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': user_prompt}, ], 'max_tokens': 4096, 'temperature': 0.3, 'stream': True, } start = time.time() try: resp = requests.post(url, json=payload, headers=headers, stream=True, timeout=120) if resp.status_code != 200: yield f"event: error\ndata: {json.dumps({'error': f'AI API 返回 {resp.status_code}'})}\n\n" return for line in resp.iter_lines(): if not line: continue line_str = line.decode('utf-8', errors='replace') if not line_str.startswith('data: '): continue data_str = line_str[6:] if data_str.strip() == '[DONE]': break try: chunk = json.loads(data_str) delta = chunk.get('choices', [{}])[0].get('delta', {}) content = delta.get('content', '') if content: yield f"event: delta\ndata: {json.dumps({'content': content})}\n\n" except json.JSONDecodeError: continue elapsed = round(time.time() - start, 2) yield f"event: meta\ndata: {json.dumps({'model': self.model, 'elapsed': elapsed})}\n\n" except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"