Spaces:
Running
Running
File size: 6,345 Bytes
79899c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import time
from typing import Dict, List
from Bio import Entrez
import requests
from config.global_storage import get_model_config
from dto.bio_document import PubMedDocument
from service.pubmed_xml_parse import PubmedXmlParse
from utils.bio_logger import bio_logger as logger
PUBMED_ACCOUNT = [
{"email": "email1@gmail.com", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"},
{"email": "email2@gmail.com", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"},
{"email": "email3@gmail.com", "api_key": "026586b79437a2b21d1e27d8c3f339230208"},
{"email": "email4@gmail.com", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"},
]
class PubMedApi:
def __init__(self):
self.pubmed_xml_parse = PubmedXmlParse()
self.model_config = get_model_config()
def pubmed_search_function(
self, query: str, top_k: int, search_type: str
) -> List[PubMedDocument]:
try:
start_time = time.time()
logger.info(
f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}'
)
id_list = self.search_database(query, retmax=top_k, search_type=search_type)
records = self.fetch_details(id_list, db="pubmed", rettype="abstract")
end_search_pubmed_time = time.time()
logger.info(
f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(records)} results'
)
return [
PubMedDocument(
title=result["title"],
abstract=result["abstract"],
authors=self.process_authors(result["authors"]),
doi=result["doi"],
source="pubmed",
source_id=result["pmid"],
pub_date=result["pub_date"],
journal=result["journal"],
text=result["abstract"],
)
for result in records
]
except Exception as e:
logger.error(f"Error searching PubMed query: {query} error: {e}")
raise e
def process_authors(self, author_list: List[Dict]) -> str:
return ", ".join(
[f"{author['forename']} {author['lastname']}" for author in author_list]
)
# 搜索数据库(ESearch)
def search_database(
self, query: str, retmax: int, search_type: str = "keyword"
) -> List[str]:
"""
获取pubmed数据库中的记录id列表
:param search_type: 搜索类型,keyword或advanced
:param query: 查询字符串
:param retmax: 返回的最大结果数
"""
start_time = time.time()
db = "pubmed"
# 随机从pubmed账号池中选择一个
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
random_pubmed_account = PUBMED_ACCOUNT[random_index]
Entrez.email = random_pubmed_account["email"]
Entrez.api_key = random_pubmed_account["api_key"]
if search_type == "keyword":
art_type_list = [
"Address",
"Bibliography",
"Biography",
"Books and Documents",
"Clinical Conference",
"Clinical Study",
"Collected Works",
"Comment",
"Congress",
"Consensus Development Conference",
"Consensus Development Conference, NIH",
"Dictionary",
"Directory",
"Duplicate Publication",
"Editorial",
"Festschrift",
"Government Document",
"Guideline",
"Interactive Tutorial",
"Interview",
"Lecture",
"Legal Case",
"Legislation",
"Letter",
"News",
"Newspaper Article",
"Patient Education Handout",
"Periodical Index",
"Personal Narrative",
"Practice Guideline",
"Published Erratum",
"Technical Report",
"Video-Audio Media",
"Webcast",
]
art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")"
query = "( " + query + ")"
query += " AND (fha[Filter]) NOT " + art_type
handle = Entrez.esearch(
db=db, term=query, usehistory="y", sort="relevance", retmax=retmax
)
elif search_type == "advanced":
handle = Entrez.esearch(
db=db, term=query, usehistory="y", sort="relevance", retmax=retmax
)
else:
raise ValueError("search_type must be either 'keyword' or 'advanced'")
results = Entrez.read(handle)
handle.close()
id_list = results["IdList"]
logger.info(
f"Finished searching PubMed id, took {time.time() - start_time:.2f} seconds, found {len(id_list)} results,query: {query}"
)
logger.info(
f"Search type:{search_type} PubMed search query: {query}, id_list: {id_list}"
)
if len(id_list) == 0:
return []
return id_list
def fetch_details(self, id_list, db="pubmed", rettype="abstract"):
start_time = time.time()
try:
ids = ",".join(id_list)
server = "efetch"
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
random_pubmed_account = PUBMED_ACCOUNT[random_index]
api_key = random_pubmed_account["api_key"]
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}"
response = requests.get(url)
articles = self.pubmed_xml_parse.parse_pubmed_xml(response.text)
logger.info(
f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}"
)
return articles
except Exception as e:
logger.error(f"Error fetching details for id_list: {id_list}, error: {e}")
# pmid 精准匹配
return []
|