Spaces:
Running
Running
File size: 7,342 Bytes
b57a642 50a369a b57a642 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
import os
import io
import requests
from bs4 import BeautifulSoup
import gradio as gr
import pandas as pd
import time
def get_papers_since(url, since_year=2023):
"""
给定一个 Google Scholar citations page URL,返回指定年份(默认2023)及之后的论文列表。
每篇论文用一个字典表示:{"title": ..., "year": ...}
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}
papers = []
start = 0
pagesize = 20 # Google Scholar 通常每页显示 20 条记录
while True:
params = {"cstart": start, "pagesize": pagesize}
response = requests.get(url, params=params, headers=headers)
if response.status_code != 200:
break
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="gsc_a_tr")
if not rows:
break
for row in rows:
title_tag = row.find("a", class_="gsc_a_at")
title = title_tag.text.strip() if title_tag else "Unknown Title"
year_tag = row.find("span", class_="gsc_a_h")
try:
year = int(year_tag.text.strip()) if year_tag else None
except ValueError:
year = None
if year and year >= since_year:
papers.append({"title": title, "year": year})
next_button = soup.find("button", id="gsc_bpf_more")
if next_button and "disabled" not in next_button.attrs:
start += pagesize
else:
break
return papers
def search_paper_by_title(paper_title: str):
"""
使用 Semantic Scholar "match" endpoint 根据标题搜索论文,返回 top-1 匹配结果。
返回:
dict: 包含 'paperId', 'title', 'abstract', 'venue' 四个键的字典。
若找不到对应论文(HTTP 404 或无结果),则返回 None。
若出现其它错误则抛出异常。
"""
base_url = "https://api.semanticscholar.org/graph/v1/paper/search/match"
params = {
"query": paper_title,
"fields": "paperId,abstract,title,venue"
}
headers = {
"x-api-key": os.getenv("S2_API_KEY")
}
response = requests.get(base_url, params=params, headers=headers)
if response.status_code == 404:
return None
elif response.status_code != 200:
raise Exception(f"错误 {response.status_code}: {response.text}")
data_list = response.json().get("data", [])
if not data_list:
return None
paper_data = data_list[0]
return {
"paperId": paper_data.get("paperId"),
"title": paper_data.get("title"),
"abstract": paper_data.get("abstract"),
"venue": paper_data.get("venue")
}
def process_profiles(profiles_text, wechat, progress=None):
"""
1. 将用户输入的多个 Google Scholar profile links (以换行分隔) 转为列表
2. 爬取所有链接在 2023 年及之后的论文
3. 对每篇论文调用 Semantic Scholar 获取匹配信息,若无匹配则丢弃
4. 根据 paperId 去重
5. 返回一个 CSV 文件内容(bytes),用于用户下载
"""
log_messages = ["开始处理..."]
def update_log(message):
log_messages.append(message)
return "\n".join(log_messages)
if not profiles_text.strip():
return update_log("错误: 未提供任何链接"), None
if not wechat.strip():
return update_log("错误: 未提供微信号"), None
# 将用户输入按换行拆分
profile_urls = [line.strip() for line in profiles_text.splitlines() if line.strip()]
message = f"已识别 {len(profile_urls)} 个档案链接,开始处理..."
yield update_log(message), None
all_papers = []
for i, url in enumerate(profile_urls):
message = f"处理第 {i+1}/{len(profile_urls)} 个档案: {url}"
yield update_log(message), None
papers = get_papers_since(url, 2023)
all_papers.extend(papers)
message = f"已从档案 {i+1}/{len(profile_urls)} 中收集 {len(papers)} 篇论文"
yield update_log(message), None
message = f"共收集到 {len(all_papers)} 篇论文,开始获取详细信息..."
yield update_log(message), None
paperid_map = {}
total_papers = len(all_papers)
for i, paper in enumerate(all_papers):
title = paper["title"]
year = paper["year"]
if i % 10 == 0 or i == total_papers - 1:
percent = round((i+1)/total_papers*100)
message = f"正在处理论文 {i+1}/{total_papers} ({percent}% 完成)"
yield update_log(message), None
if i > 0 and i % 10 == 0:
time.sleep(1)
try:
time.sleep(0.3)
paper_info = search_paper_by_title(title)
except Exception as e:
continue
if not paper_info or not paper_info.get("paperId"):
continue
pid = paper_info["paperId"]
if pid not in paperid_map:
paperid_map[pid] = {
"paperId": pid,
"title": paper_info["title"],
"abstract": paper_info["abstract"],
"venue": paper_info["venue"],
"year": year
}
if not paperid_map:
message = "错误: 未找到任何匹配的论文"
return update_log(message), None
message = f"去重后共有 {len(paperid_map)} 篇论文,正在生成CSV..."
yield update_log(message), None
df = pd.DataFrame(list(paperid_map.values()))
# 使用微信号作为 CSV 文件名
temp_csv_path = f"{wechat.strip()}.csv"
df.to_csv(temp_csv_path, index=False, encoding='utf-8')
message = f"任务完成! 已生成包含 {len(paperid_map)} 篇论文的CSV文件: {temp_csv_path}"
yield update_log(message), temp_csv_path
def build_app():
"""
使用 gradio 搭建一个小型 Demo app。
"""
with gr.Blocks() as demo:
gr.Markdown("## Google Scholar & Semantic Scholar 信息整合工具")
gr.Markdown("在下方输入任意多个 Google Scholar Profile 链接,每行一个,然后输入微信号,点击 **开始爬取**。")
profile_input = gr.Textbox(
lines=5,
placeholder="粘贴或输入多个 Google Scholar 个人页面链接(每行一个,需要下面格式 'https://scholar.google.com/citations?user=NVii64oAAAAJ')"
)
# 新增微信号输入框
wechat_input = gr.Textbox(
label="微信号",
placeholder="请输入您的微信号"
)
progress_output = gr.Textbox(
label="进度",
value="等待开始...",
lines=10,
interactive=False
)
download_output = gr.File(label="下载结果 CSV")
run_button = gr.Button("开始爬取")
# 传入两个输入值
run_button.click(
fn=process_profiles,
inputs=[profile_input, wechat_input],
outputs=[progress_output, download_output]
)
return demo
if __name__ == "__main__":
app = build_app()
app.launch()
|