File size: 6,345 Bytes
79899c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import time
from typing import Dict, List
from Bio import Entrez
import requests
from config.global_storage import get_model_config
from dto.bio_document import PubMedDocument
from service.pubmed_xml_parse import PubmedXmlParse
from utils.bio_logger import bio_logger as logger

PUBMED_ACCOUNT = [
    {"email": "email1@gmail.com", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"},
    {"email": "email2@gmail.com", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"},
    {"email": "email3@gmail.com", "api_key": "026586b79437a2b21d1e27d8c3f339230208"},
    {"email": "email4@gmail.com", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"},
]


class PubMedApi:
    def __init__(self):
        self.pubmed_xml_parse = PubmedXmlParse()
        self.model_config = get_model_config()

    def pubmed_search_function(
        self, query: str, top_k: int, search_type: str
    ) -> List[PubMedDocument]:

        try:
            start_time = time.time()
            logger.info(
                f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}'
            )
            id_list = self.search_database(query, retmax=top_k, search_type=search_type)
            records = self.fetch_details(id_list, db="pubmed", rettype="abstract")

            end_search_pubmed_time = time.time()
            logger.info(
                f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(records)} results'
            )

            return [
                PubMedDocument(
                    title=result["title"],
                    abstract=result["abstract"],
                    authors=self.process_authors(result["authors"]),
                    doi=result["doi"],
                    source="pubmed",
                    source_id=result["pmid"],
                    pub_date=result["pub_date"],
                    journal=result["journal"],
                    text=result["abstract"],
                )
                for result in records
            ]
        except Exception as e:
            logger.error(f"Error searching PubMed query: {query} error: {e}")
            raise e

    def process_authors(self, author_list: List[Dict]) -> str:

        return ", ".join(
            [f"{author['forename']} {author['lastname']}" for author in author_list]
        )

    # 搜索数据库(ESearch)
    def search_database(
        self, query: str, retmax: int, search_type: str = "keyword"
    ) -> List[str]:
        """
        获取pubmed数据库中的记录id列表
        :param search_type: 搜索类型,keyword或advanced
        :param query: 查询字符串
        :param retmax: 返回的最大结果数
        """
        start_time = time.time()
        db = "pubmed"
        # 随机从pubmed账号池中选择一个
        random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
        random_pubmed_account = PUBMED_ACCOUNT[random_index]
        Entrez.email = random_pubmed_account["email"]
        Entrez.api_key = random_pubmed_account["api_key"]
        if search_type == "keyword":
            art_type_list = [
                "Address",
                "Bibliography",
                "Biography",
                "Books and Documents",
                "Clinical Conference",
                "Clinical Study",
                "Collected Works",
                "Comment",
                "Congress",
                "Consensus Development Conference",
                "Consensus Development Conference, NIH",
                "Dictionary",
                "Directory",
                "Duplicate Publication",
                "Editorial",
                "Festschrift",
                "Government Document",
                "Guideline",
                "Interactive Tutorial",
                "Interview",
                "Lecture",
                "Legal Case",
                "Legislation",
                "Letter",
                "News",
                "Newspaper Article",
                "Patient Education Handout",
                "Periodical Index",
                "Personal Narrative",
                "Practice Guideline",
                "Published Erratum",
                "Technical Report",
                "Video-Audio Media",
                "Webcast",
            ]
            art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")"
            query = "( " + query + ")"
            query += " AND (fha[Filter]) NOT " + art_type
            handle = Entrez.esearch(
                db=db, term=query, usehistory="y", sort="relevance", retmax=retmax
            )
        elif search_type == "advanced":
            handle = Entrez.esearch(
                db=db, term=query, usehistory="y", sort="relevance", retmax=retmax
            )
        else:
            raise ValueError("search_type must be either 'keyword' or 'advanced'")

        results = Entrez.read(handle)
        handle.close()
        id_list = results["IdList"]
        logger.info(
            f"Finished searching PubMed id, took {time.time() - start_time:.2f} seconds, found {len(id_list)} results,query: {query}"
        )
        logger.info(
            f"Search type:{search_type} PubMed search query: {query}, id_list: {id_list}"
        )
        if len(id_list) == 0:
            return []
        return id_list

    def fetch_details(self, id_list, db="pubmed", rettype="abstract"):
        start_time = time.time()
        try:
            ids = ",".join(id_list)
            server = "efetch"

            random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
            random_pubmed_account = PUBMED_ACCOUNT[random_index]
            api_key = random_pubmed_account["api_key"]
            url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}"
            response = requests.get(url)
            articles = self.pubmed_xml_parse.parse_pubmed_xml(response.text)
            logger.info(
                f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}"
            )
            return articles
        except Exception as e:
            logger.error(f"Error fetching details for id_list: {id_list}, error: {e}")
            # pmid 精准匹配

        return []