Spaces:
Sleeping
Sleeping
import asyncio | |
import time | |
from typing import Dict, List | |
import aiohttp | |
from config.global_storage import get_model_config | |
from dto.bio_document import PubMedDocument | |
from service.pubmed_xml_parse import PubmedXmlParse | |
from utils.bio_logger import bio_logger as logger | |
PUBMED_ACCOUNT = [ | |
{"email": "email1@gmail.com", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"}, | |
{"email": "email2@gmail.com", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"}, | |
{"email": "email3@gmail.com", "api_key": "026586b79437a2b21d1e27d8c3f339230208"}, | |
{"email": "email4@gmail.com", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"}, | |
] | |
class PubMedAsyncApi: | |
def __init__(self): | |
self.pubmed_xml_parse = PubmedXmlParse() | |
self.model_config = get_model_config() | |
async def pubmed_search_function( | |
self, query: str, top_k: int, search_type: str | |
) -> List[PubMedDocument]: | |
try: | |
start_time = time.time() | |
logger.info( | |
f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}' | |
) | |
id_list = await self.search_database( | |
query, db="pubmed", retmax=top_k, search_type=search_type | |
) | |
articles = await self.fetch_details( | |
id_list, db="pubmed", rettype="abstract" | |
) | |
end_search_pubmed_time = time.time() | |
logger.info( | |
f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(articles)} results' | |
) | |
return [ | |
PubMedDocument( | |
title=result["title"], | |
abstract=result["abstract"], | |
authors=self.process_authors(result["authors"]), | |
doi=result["doi"], | |
source="pubmed", | |
source_id=result["pmid"], | |
pub_date=result["pub_date"], | |
journal=result["journal"], | |
) | |
for result in articles | |
] | |
except Exception as e: | |
logger.error(f"Error searching PubMed query: {query} error: {e}") | |
raise e | |
def process_authors(self, author_list: List[Dict]) -> str: | |
return ", ".join( | |
[f"{author['forename']} {author['lastname']}" for author in author_list] | |
) | |
# 搜索数据库(ESearch) | |
async def search_database( | |
self, query: str, db: str, retmax: int, search_type: str = "keyword" | |
) -> List[Dict]: | |
if search_type not in ["keyword", "advanced"]: | |
raise ValueError("search_type must be one of 'keyword' or 'advanced'") | |
if search_type == "keyword": | |
art_type_list = [ | |
"Address", | |
"Bibliography", | |
"Biography", | |
"Books and Documents", | |
"Clinical Conference", | |
"Clinical Study", | |
"Collected Works", | |
"Comment", | |
"Congress", | |
"Consensus Development Conference", | |
"Consensus Development Conference, NIH", | |
"Dictionary", | |
"Directory", | |
"Duplicate Publication", | |
"Editorial", | |
"Festschrift", | |
"Government Document", | |
"Guideline", | |
"Interactive Tutorial", | |
"Interview", | |
"Lecture", | |
"Legal Case", | |
"Legislation", | |
"Letter", | |
"News", | |
"Newspaper Article", | |
"Patient Education Handout", | |
"Periodical Index", | |
"Personal Narrative", | |
"Practice Guideline", | |
"Published Erratum", | |
"Technical Report", | |
"Video-Audio Media", | |
"Webcast", | |
] | |
art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")" | |
query = "( " + query + ")" | |
query += " AND (fha[Filter]) NOT " + art_type | |
id_list = await self.esearch(query=query, retmax=retmax) | |
if len(id_list) == 0: | |
return [] | |
return id_list | |
async def esearch(self, query=None, retmax=10): | |
start_time = time.time() | |
db = "pubmed" | |
server = "esearch" | |
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT)) | |
random_pubmed_account = PUBMED_ACCOUNT[random_index] | |
api_key = random_pubmed_account["api_key"] | |
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&term={query}&retmode=json&api_key={api_key}&sort=relevance&retmax={retmax}" | |
response = await self.async_http_get(url=url) | |
id_list = response["esearchresult"]["idlist"] | |
logger.info( | |
f"pubmed_async_http get id_list, search Time taken: {time.time() - start_time}s" | |
) | |
return id_list | |
async def async_http_get(self, url: str): | |
async with aiohttp.ClientSession() as session: | |
try_time = 1 | |
while try_time < 4: | |
async with session.get(url) as response: | |
if response.status == 200: | |
return await response.json() | |
else: | |
logger.error( | |
f"{url},try_time:{try_time},Error: {response.status}" | |
) | |
try_time += 1 | |
# 睡眠0.5秒后重试 | |
await asyncio.sleep(0.5) | |
raise Exception(f"Failed to fetch data from {url} after 3 attempts") | |
async def async_http_get_text(self, url: str, params=None): | |
async with aiohttp.ClientSession() as session: | |
try_time = 1 | |
while try_time < 4: | |
async with session.get(url, params=params) as response: | |
if response.status == 200: | |
return await response.text() | |
else: | |
logger.error( | |
f"{url},try_time:{try_time},Error: {response.status}" | |
) | |
try_time += 1 | |
# 睡眠0.5秒后重试 | |
await asyncio.sleep(0.5) | |
raise Exception(f"Failed to fetch data from {url} after 3 attempts") | |
# 获取详细信息(EFetch) | |
async def fetch_details(self, id_list, db="pubmed", rettype="abstract"): | |
start_time = time.time() | |
try: | |
ids = ",".join(id_list) | |
server = "efetch" | |
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT)) | |
random_pubmed_account = PUBMED_ACCOUNT[random_index] | |
api_key = random_pubmed_account["api_key"] | |
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}" | |
response = await self.async_http_get_text(url=url) | |
articles = self.pubmed_xml_parse.parse_pubmed_xml(response) | |
logger.info( | |
f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}" | |
) | |
return articles | |
except Exception as e: | |
logger.error(f"Error fetching details for id_list: {id_list}, error: {e}") | |
# pmid 精准匹配 | |
return [] | |