Spaces:
Build error
Build error
import re | |
import requests | |
import chardet | |
import config as cfg | |
from bs4 import BeautifulSoup | |
from pathlib import Path | |
from transformers import AutoTokenizer | |
from duckduckgo_search import DDGS | |
def log_in(uid, state): | |
state['chat_history'] = [] | |
state['thinking_history'] = '' | |
state['uid'] = uid | |
if uid!=0: | |
response = f"Your Log In UID: {uid}" | |
else: | |
response = f"You Are Not Logged In Yet, Use Public Directory" | |
user_dir = Path(cfg.USER_DIR) / str(uid) | |
user_dir.mkdir(parents=True, exist_ok=True) | |
state['user_dir'] = user_dir | |
# 加载历史会话 | |
state['available_history'] = [] | |
for json_file in user_dir.rglob("*.json"): | |
state['available_history'].append(json_file.stem) | |
return response, state | |
def clean_response(response_content): | |
response_content = re.sub(r'\*\*|__', '', response_content) | |
response_content = re.sub(r'\\\(|\\\)|\\\[|\\\]', '', response_content) | |
response_content = re.sub(r'\\boxed\{([^}]*)\}', r'\1', response_content) | |
response_content = re.sub(r'\\\\', '', response_content) | |
response_content = re.sub(r'\n\s*\n', '\n\n', response_content) | |
response_content = re.sub(r'\s+', ' ', response_content) | |
return response_content.strip() | |
def parse_output(response_content): | |
cleaned_content = clean_response(response_content) | |
if "<think>" in cleaned_content and "</think>" in cleaned_content: | |
split_pattern = r'<think>|</think>' | |
parts = re.split(split_pattern, cleaned_content) | |
return parts[1], parts[2] | |
return None, cleaned_content | |
def parse_chat_history(chat_history): | |
"""从保存的历史会话中解析出chatbot可以识别的格式 | |
chat_history示例: | |
[ | |
{ | |
"role": "user", | |
"content": "hello" | |
}, | |
{ | |
"role": "assistant", | |
"content": " Hello! How can I assist you today? 😊" | |
} | |
] | |
Args: | |
chat_history (list): _description_ | |
""" | |
from gradio import Warning | |
try: | |
assert len(chat_history) % 2 == 0 | |
except AssertionError: | |
Warning('历史会话可能有遗失,用户和AI的消息数不匹配,截断最后一条消息...') | |
chat_history = chat_history[:-1] | |
if len(chat_history) == 0: | |
Warning('历史会话为空或无法匹配,加载历史会话失败...') | |
return [] | |
messages = [] | |
responses = [] | |
for conversation in chat_history: | |
if conversation['role'] == 'user': | |
messages.append(conversation['content']) | |
elif conversation['role'] == 'assistant': | |
responses.append(conversation['content']) | |
if len(messages) != len(responses): | |
Warning('用户和AI的消息无法匹配,加载历史会话失败...') | |
return [] | |
return list(zip(messages, responses)) | |
def web_search(query: str, max_results: int = 3): | |
"""获取网络搜索结果并提取关键内容""" | |
try: | |
# 获取搜索结果链接 | |
with DDGS() as ddgs: | |
results = [r for r in ddgs.text(query, max_results=max_results)] | |
# 提取网页正文 | |
web_contents = [] | |
for result in results: | |
try: | |
response = requests.get(result['href'], timeout=5) | |
encoding = chardet.detect(response.content)['encoding'] | |
if response.encoding != encoding: | |
response.encoding = encoding | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# 提取主要文本内容(可根据网站结构调整) | |
main_content = soup.find('main') or soup.find('article') or soup.body | |
web_contents.append({ | |
'title': result['title'], | |
'content': main_content.get_text(separator=' ', strip=True)[:1000] # 限制长度 | |
}) | |
except Exception as e: | |
print('该结果搜索异常...', e) | |
continue | |
return web_contents | |
except Exception as e: | |
print('网络搜索异常,返回空...', e) | |
return [] | |
def parse_net_search(search_res): | |
res = [] | |
for item in search_res: | |
if len(item['content']) > 0: | |
res.append(f"标题:\n{item['title']}\n内容:\n{item['content']}\n") | |
return res | |
def wash_up_content(doc_score): | |
"""_summary_ | |
Args: | |
doc (string): 整个文档的内容,可能有多行,用换行符分割 | |
""" | |
if isinstance(doc_score, tuple): | |
doc, score = doc_score | |
else: | |
doc = doc_score | |
score = None | |
res = list(filter(lambda x: len(x) > 0, doc.split('\n'))) | |
prefix = '✅<br>"' if score is None else '✅Recall with score:{:.3f}<br>'.format(score) | |
res[0] = prefix + res[0] | |
return res | |
MODEL_HF_MAPPING = { | |
"qwen2.5:14b-instruct": "Qwen/Qwen2.5-14B-Instruct", | |
"qwen2.5:32b-instruct": "Qwen/Qwen2.5-32B-Instruct", | |
"qwen2.5:7b-instruct": "Qwen/Qwen2.5-7B-Instruct", | |
"qwen2.5:3b-instruct": "Qwen/Qwen2.5-3B-Instruct", | |
"qwen2.5:0.5b-instruct": "Qwen/Qwen2.5-0.5B-Instruct", | |
"qwen2.5:0.5b": "Qwen/Qwen2.5-0.5B", | |
"qwen2.5:32b": "Qwen/Qwen2.5-32B", | |
"qwen3:32b": "Qwen/Qwen3-32B", | |
"qwen3:14b": "Qwen/Qwen3-14B", | |
"qwen3:4b": "Qwen/Qwen3-4B", | |
"qwen3:8b": "Qwen/Qwen3-8B", | |
"qwen3:30b-a3b": "Qwen/Qwen3-30B-A3B", | |
"qwq": "Qwen/QwQ-32B", | |
"deepseek-r1:14b":"deepseek-ai/DeepSeek-R1-Distill-Qwen-14B", | |
"deepseek-r1:7b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", | |
"deepseek-r1:32b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", | |
} | |
def load_tokenizer(model_name): | |
hf_model_name = MODEL_HF_MAPPING.get(model_name, model_name) | |
return AutoTokenizer.from_pretrained(hf_model_name, use_fast=True) | |
def messages_to_prompt(messages): | |
# 按模型要求的格式拼接(此处为示例,需根据实际模型调整) | |
prompt = "" | |
for msg in messages: | |
prompt += f"{msg['role']}: {msg['content']}\n" | |
return prompt.strip() | |
def count_tokens_local(messages, tokenizer): | |
prompt = messages_to_prompt(messages) | |
return len(tokenizer(prompt, return_tensors=None, truncation=False)["input_ids"]) | |
def concate_metadata(metadata): | |
"""把Document对象的metadata的各个键值拼接起来 | |
Args: | |
metadata (dict): _description_ | |
""" | |
return '\n'.join([f"{k}: {v}" for k, v in metadata.items()]) | |
if __name__ == "__main__": | |
query = "今天几号" | |
ret = web_search(query) | |
for item in ret: | |
print(item) | |
ret = parse_net_search(ret) | |
print(ret) |