Files
aihot/backend/app/ai/processor.py
2026-05-27 17:14:08 +08:00

198 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
from datetime import datetime, date
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from ..models.news import RawNews, ProcessedNews, LLMConfig, NewsSource, SystemLog
from ..crawler.rss_fetcher import fetch_rss
from .llm_client import LLMClient
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = "你是医药行业资深分析师,擅长解读全球医药政策、临床研究、行业动态。"
ANALYSIS_PROMPT = """分析以下新闻,返回严格的 JSON 格式结果,不要包含任何其他文字。
新闻标题:{title}
新闻内容:{content}
新闻语言:{language}
返回格式:
{{
"is_medical_related": true,
"title_zh": "中文标题(英文原文请翻译成简洁中文)",
"summary": "中文摘要100-150字客观陈述核心内容",
"opinion": "核心观点或行业影响50-100字分析性语言点明实际意义",
"keywords": ["关键词1", "关键词2", "关键词3", "关键词4", "关键词5"],
"importance_score": 85,
"importance_reason": "评分理由30字内",
"category": "药品监管"
}}
category 只能是以下四个之一:药品监管 / 临床研究 / 行业动态 / 政策法规
importance_score 评分标准1-100整数
90-100重大监管决定 / 突破性研究 / 影响整个行业的政策
70-89 :行业重要动态,有明显商业或学术价值
50-69 :常规行业新闻,有一定参考价值
1-49 :普通资讯,信息价值有限
注意:只有 85 分及以上的新闻才有资格进入每日精选,请严格区分。
"""
async def _log(db: AsyncSession, level: str, event_type: str, message: str):
db.add(SystemLog(level=level, event_type=event_type, message=message))
await db.commit()
async def _get_active_llm(db: AsyncSession) -> LLMConfig | None:
result = await db.execute(select(LLMConfig).where(LLMConfig.is_active == True).limit(1))
return result.scalar_one_or_none()
async def _analyze_article(client: LLMClient, title: str, content: str, language: str) -> dict | None:
prompt = ANALYSIS_PROMPT.format(
title=title,
content=content[:2000] if content else "(无正文)",
language="中文" if language == "zh" else "英文",
)
try:
raw = await client.complete(SYSTEM_PROMPT, prompt)
raw = raw.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
return json.loads(raw)
except Exception as e:
logger.warning(f"LLM parse error: {e}")
return None
async def _select_top_10(db: AsyncSession, target: date):
"""Reset featured flags and elect TOP 10 with category diversity.
Only news with importance_score >= 85 is eligible for 精选."""
result = await db.execute(
select(ProcessedNews)
.where(func.date(ProcessedNews.processed_at) == target)
.order_by(ProcessedNews.importance_score.desc())
)
all_news = result.scalars().all()
# Reset all
for n in all_news:
n.is_featured = False
n.featured_rank = None
# Only candidates with score >= 85
candidates = [n for n in all_news if n.importance_score >= 85]
categories = ["药品监管", "临床研究", "行业动态", "政策法规"]
selected: list[ProcessedNews] = []
seen_cats: set[str] = set()
# First pass: one guaranteed per category (from high-score candidates)
for cat in categories:
for n in candidates:
if n.category == cat and cat not in seen_cats and n not in selected:
selected.append(n)
seen_cats.add(cat)
break
# Second pass: fill up to 10 by score (still from candidates only)
for n in candidates:
if len(selected) >= 10:
break
if n not in selected:
selected.append(n)
for rank, n in enumerate(selected, start=1):
n.is_featured = True
n.featured_rank = rank
await db.commit()
return len(selected)
async def run_daily_pipeline(db: AsyncSession):
await _log(db, "INFO", "pipeline_start", "每日流水线启动")
llm_cfg = await _get_active_llm(db)
if not llm_cfg:
await _log(db, "ERROR", "pipeline_error", "未找到激活的 LLM 配置,请在管理后台配置")
return
client = LLMClient(
provider=llm_cfg.provider,
api_key=llm_cfg.api_key,
base_url=llm_cfg.base_url,
model=llm_cfg.model_name,
)
# ── 1. 抓取 ──────────────────────────────────────────────────────────────
sources_result = await db.execute(select(NewsSource).where(NewsSource.is_active == True))
sources = sources_result.scalars().all()
raw_added = 0
for src in sources:
items = await fetch_rss(src.url)
for item in items:
exists = await db.execute(select(RawNews.id).where(RawNews.url == item["url"]))
if exists.scalar_one_or_none():
continue
db.add(RawNews(
source_id=src.id,
title=item["title"],
url=item["url"],
raw_content=item["content"],
image_url=item.get("image_url"),
published_at=item["published_at"],
))
raw_added += 1
await db.commit()
await _log(db, "INFO", "crawl_done", f"抓取完成,新增 {raw_added} 条原始新闻")
# ── 2. AI 处理 ────────────────────────────────────────────────────────────
pending_result = await db.execute(
select(RawNews).join(RawNews.source).where(RawNews.status == "pending").limit(120)
)
pending = pending_result.scalars().all()
processed_count = 0
skipped_count = 0
for raw in pending:
language = raw.source.language if raw.source else "zh"
analysis = await _analyze_article(client, raw.title, raw.raw_content or "", language)
if not analysis or not analysis.get("is_medical_related"):
raw.status = "skipped"
skipped_count += 1
else:
db.add(ProcessedNews(
raw_news_id=raw.id,
title_zh=analysis.get("title_zh", raw.title),
summary=analysis.get("summary", ""),
opinion=analysis.get("opinion"),
keywords=analysis.get("keywords", []),
importance_score=float(analysis.get("importance_score", 50.0)),
importance_reason=analysis.get("importance_reason"),
category=analysis.get("category", "行业动态"),
source_name=raw.source.name if raw.source else "",
source_url=raw.url,
image_url=raw.image_url,
published_at=raw.published_at,
))
raw.status = "processed"
processed_count += 1
await db.commit()
await _log(db, "INFO", "process_done", f"AI 处理完成:{processed_count} 条入库,{skipped_count} 条跳过")
# ── 3. 精选 TOP 10 ────────────────────────────────────────────────────────
featured = await _select_top_10(db, date.today())
await _log(db, "INFO", "pipeline_done", f"流水线完成,精选 {featured} 条入今日 TOP 10")