"""DuckDuckGo 搜索源 — 免费,无需 API key""" import time import requests from providers.base import BaseProvider, SearchResult class DuckDuckGoProvider(BaseProvider): name = 'duckduckgo' display_name = 'DuckDuckGo' needs_api_key = False enabled = False # 国内网络不可用,默认关闭 priority = 30 def __init__(self, config: dict): super().__init__(config) def search(self, query: str, max_results: int = 10) -> list: results = [] # 1. 先尝试 Instant Answer API(获取摘要和主题) try: url = 'https://api.duckduckgo.com/' params = { 'q': query, 'format': 'json', 'no_html': 1, 'skip_disambig': 1, } resp = requests.get(url, params=params, timeout=8) if resp.status_code == 200: data = resp.json() # Abstract abstract = data.get('AbstractText', '') if abstract and data.get('AbstractURL'): results.append(SearchResult( title=data.get('Heading', 'DuckDuckGo 摘要'), url=data['AbstractURL'], content=abstract, score=0.9, source=self.name, )) # Related topics for topic in data.get('RelatedTopics', []): if 'Topics' in topic: for sub in topic['Topics'][:3]: if sub.get('Text'): results.append(SearchResult( title=sub.get('Text', '')[:80], url=sub.get('FirstURL', ''), content=sub.get('Text', ''), score=0.7, source=self.name, )) elif topic.get('Text'): results.append(SearchResult( title=topic.get('Text', '')[:80], url=topic.get('FirstURL', ''), content=topic.get('Text', ''), score=0.7, source=self.name, )) except requests.exceptions.RequestException: pass # 2. 如果结果不够,再抓取 HTML 版本获取更多结果 if len(results) < max_results: try: url = 'https://html.duckduckgo.com/html/' resp = requests.post(url, data={'q': query}, timeout=15, headers={'User-Agent': 'Mozilla/5.0'}) if resp.status_code == 200: html = resp.text more = self._parse_html_results(html, max_results - len(results)) results.extend(more) except requests.exceptions.RequestException: pass # 去重 seen_urls = set() unique = [] for r in results: if r.url and r.url not in seen_urls: seen_urls.add(r.url) unique.append(r) return unique[:max_results] def _parse_html_results(self, html: str, limit: int) -> list: """简单解析 DuckDuckGo HTML 搜索结果""" results = [] # 按 = limit: break try: # 提取 URL href_start = block.find('href="') if href_start == -1: continue href_start += 6 href_end = block.find('"', href_start) url = block[href_start:href_end] # 提取标题 (在 xxx) title_start = block.find('>', href_end) if title_start == -1: continue title_start += 1 title_end = block.find('', title_start) title = self._clean_text(block[title_start:title_end]) # 提取摘要(在 后的某个 或
中) snippet = '' for kw in ['class="result-snippet"', 'class="snippet"']: idx = block.find(kw) if idx != -1: tag_close = block.find('>', idx) + 1 next_tag = block.find('<', tag_close) if next_tag != -1: snippet = self._clean_text(block[tag_close:next_tag]) break if url and title: results.append(SearchResult( title=title, url=url, content=snippet or title, score=0.6, source=self.name, )) except Exception: continue return results @staticmethod def _clean_text(text: str) -> str: """清理 HTML 标签和多余空白""" import re text = re.sub(r'<[^>]+>', '', text) text = re.sub(r'\s+', ' ', text).strip() return text