import asyncio import time from typing import Dict, List import aiohttp from config.global_storage import get_model_config from dto.bio_document import PubMedDocument from service.pubmed_xml_parse import PubmedXmlParse from utils.bio_logger import bio_logger as logger PUBMED_ACCOUNT = [ {"email": "email1@gmail.com", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"}, {"email": "email2@gmail.com", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"}, {"email": "email3@gmail.com", "api_key": "026586b79437a2b21d1e27d8c3f339230208"}, {"email": "email4@gmail.com", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"}, ] class PubMedAsyncApi: def __init__(self): self.pubmed_xml_parse = PubmedXmlParse() self.model_config = get_model_config() async def pubmed_search_function( self, query: str, top_k: int, search_type: str ) -> List[PubMedDocument]: try: start_time = time.time() logger.info( f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}' ) id_list = await self.search_database( query, db="pubmed", retmax=top_k, search_type=search_type ) articles = await self.fetch_details( id_list, db="pubmed", rettype="abstract" ) end_search_pubmed_time = time.time() logger.info( f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(articles)} results' ) return [ PubMedDocument( title=result["title"], abstract=result["abstract"], authors=self.process_authors(result["authors"]), doi=result["doi"], source="pubmed", source_id=result["pmid"], pub_date=result["pub_date"], journal=result["journal"], ) for result in articles ] except Exception as e: logger.error(f"Error searching PubMed query: {query} error: {e}") raise e def process_authors(self, author_list: List[Dict]) -> str: return ", ".join( [f"{author['forename']} {author['lastname']}" for author in author_list] ) # 搜索数据库(ESearch) async def search_database( self, query: str, db: str, retmax: int, search_type: str = "keyword" ) -> List[Dict]: if search_type not in ["keyword", "advanced"]: raise ValueError("search_type must be one of 'keyword' or 'advanced'") if search_type == "keyword": art_type_list = [ "Address", "Bibliography", "Biography", "Books and Documents", "Clinical Conference", "Clinical Study", "Collected Works", "Comment", "Congress", "Consensus Development Conference", "Consensus Development Conference, NIH", "Dictionary", "Directory", "Duplicate Publication", "Editorial", "Festschrift", "Government Document", "Guideline", "Interactive Tutorial", "Interview", "Lecture", "Legal Case", "Legislation", "Letter", "News", "Newspaper Article", "Patient Education Handout", "Periodical Index", "Personal Narrative", "Practice Guideline", "Published Erratum", "Technical Report", "Video-Audio Media", "Webcast", ] art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")" query = "( " + query + ")" query += " AND (fha[Filter]) NOT " + art_type id_list = await self.esearch(query=query, retmax=retmax) if len(id_list) == 0: return [] return id_list async def esearch(self, query=None, retmax=10): start_time = time.time() db = "pubmed" server = "esearch" random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT)) random_pubmed_account = PUBMED_ACCOUNT[random_index] api_key = random_pubmed_account["api_key"] url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&term={query}&retmode=json&api_key={api_key}&sort=relevance&retmax={retmax}" response = await self.async_http_get(url=url) id_list = response["esearchresult"]["idlist"] logger.info( f"pubmed_async_http get id_list, search Time taken: {time.time() - start_time}s" ) return id_list async def async_http_get(self, url: str): async with aiohttp.ClientSession() as session: try_time = 1 while try_time < 4: async with session.get(url) as response: if response.status == 200: return await response.json() else: logger.error( f"{url},try_time:{try_time},Error: {response.status}" ) try_time += 1 # 睡眠0.5秒后重试 await asyncio.sleep(0.5) raise Exception(f"Failed to fetch data from {url} after 3 attempts") async def async_http_get_text(self, url: str, params=None): async with aiohttp.ClientSession() as session: try_time = 1 while try_time < 4: async with session.get(url, params=params) as response: if response.status == 200: return await response.text() else: logger.error( f"{url},try_time:{try_time},Error: {response.status}" ) try_time += 1 # 睡眠0.5秒后重试 await asyncio.sleep(0.5) raise Exception(f"Failed to fetch data from {url} after 3 attempts") # 获取详细信息(EFetch) async def fetch_details(self, id_list, db="pubmed", rettype="abstract"): start_time = time.time() try: ids = ",".join(id_list) server = "efetch" random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT)) random_pubmed_account = PUBMED_ACCOUNT[random_index] api_key = random_pubmed_account["api_key"] url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}" response = await self.async_http_get_text(url=url) articles = self.pubmed_xml_parse.parse_pubmed_xml(response) logger.info( f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}" ) return articles except Exception as e: logger.error(f"Error fetching details for id_list: {id_list}, error: {e}") # pmid 精准匹配 return []