jackkuo's picture
add QA
79899c0
import asyncio
import time
from typing import Dict, List
import aiohttp
from config.global_storage import get_model_config
from dto.bio_document import PubMedDocument
from service.pubmed_xml_parse import PubmedXmlParse
from utils.bio_logger import bio_logger as logger
PUBMED_ACCOUNT = [
{"email": "email1@gmail.com", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"},
{"email": "email2@gmail.com", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"},
{"email": "email3@gmail.com", "api_key": "026586b79437a2b21d1e27d8c3f339230208"},
{"email": "email4@gmail.com", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"},
]
class PubMedAsyncApi:
def __init__(self):
self.pubmed_xml_parse = PubmedXmlParse()
self.model_config = get_model_config()
async def pubmed_search_function(
self, query: str, top_k: int, search_type: str
) -> List[PubMedDocument]:
try:
start_time = time.time()
logger.info(
f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}'
)
id_list = await self.search_database(
query, db="pubmed", retmax=top_k, search_type=search_type
)
articles = await self.fetch_details(
id_list, db="pubmed", rettype="abstract"
)
end_search_pubmed_time = time.time()
logger.info(
f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(articles)} results'
)
return [
PubMedDocument(
title=result["title"],
abstract=result["abstract"],
authors=self.process_authors(result["authors"]),
doi=result["doi"],
source="pubmed",
source_id=result["pmid"],
pub_date=result["pub_date"],
journal=result["journal"],
)
for result in articles
]
except Exception as e:
logger.error(f"Error searching PubMed query: {query} error: {e}")
raise e
def process_authors(self, author_list: List[Dict]) -> str:
return ", ".join(
[f"{author['forename']} {author['lastname']}" for author in author_list]
)
# 搜索数据库(ESearch)
async def search_database(
self, query: str, db: str, retmax: int, search_type: str = "keyword"
) -> List[Dict]:
if search_type not in ["keyword", "advanced"]:
raise ValueError("search_type must be one of 'keyword' or 'advanced'")
if search_type == "keyword":
art_type_list = [
"Address",
"Bibliography",
"Biography",
"Books and Documents",
"Clinical Conference",
"Clinical Study",
"Collected Works",
"Comment",
"Congress",
"Consensus Development Conference",
"Consensus Development Conference, NIH",
"Dictionary",
"Directory",
"Duplicate Publication",
"Editorial",
"Festschrift",
"Government Document",
"Guideline",
"Interactive Tutorial",
"Interview",
"Lecture",
"Legal Case",
"Legislation",
"Letter",
"News",
"Newspaper Article",
"Patient Education Handout",
"Periodical Index",
"Personal Narrative",
"Practice Guideline",
"Published Erratum",
"Technical Report",
"Video-Audio Media",
"Webcast",
]
art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")"
query = "( " + query + ")"
query += " AND (fha[Filter]) NOT " + art_type
id_list = await self.esearch(query=query, retmax=retmax)
if len(id_list) == 0:
return []
return id_list
async def esearch(self, query=None, retmax=10):
start_time = time.time()
db = "pubmed"
server = "esearch"
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
random_pubmed_account = PUBMED_ACCOUNT[random_index]
api_key = random_pubmed_account["api_key"]
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&term={query}&retmode=json&api_key={api_key}&sort=relevance&retmax={retmax}"
response = await self.async_http_get(url=url)
id_list = response["esearchresult"]["idlist"]
logger.info(
f"pubmed_async_http get id_list, search Time taken: {time.time() - start_time}s"
)
return id_list
async def async_http_get(self, url: str):
async with aiohttp.ClientSession() as session:
try_time = 1
while try_time < 4:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logger.error(
f"{url},try_time:{try_time},Error: {response.status}"
)
try_time += 1
# 睡眠0.5秒后重试
await asyncio.sleep(0.5)
raise Exception(f"Failed to fetch data from {url} after 3 attempts")
async def async_http_get_text(self, url: str, params=None):
async with aiohttp.ClientSession() as session:
try_time = 1
while try_time < 4:
async with session.get(url, params=params) as response:
if response.status == 200:
return await response.text()
else:
logger.error(
f"{url},try_time:{try_time},Error: {response.status}"
)
try_time += 1
# 睡眠0.5秒后重试
await asyncio.sleep(0.5)
raise Exception(f"Failed to fetch data from {url} after 3 attempts")
# 获取详细信息(EFetch)
async def fetch_details(self, id_list, db="pubmed", rettype="abstract"):
start_time = time.time()
try:
ids = ",".join(id_list)
server = "efetch"
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
random_pubmed_account = PUBMED_ACCOUNT[random_index]
api_key = random_pubmed_account["api_key"]
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}"
response = await self.async_http_get_text(url=url)
articles = self.pubmed_xml_parse.parse_pubmed_xml(response)
logger.info(
f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}"
)
return articles
except Exception as e:
logger.error(f"Error fetching details for id_list: {id_list}, error: {e}")
# pmid 精准匹配
return []