Spaces:
Runtime error
Runtime error
| # Copyright (c) Alibaba Cloud. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """A simple web interactive chat demo based on gradio.""" | |
| from argparse import ArgumentParser | |
| from pathlib import Path | |
| import copy | |
| import gradio as gr | |
| import os | |
| import re | |
| import secrets | |
| import tempfile | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from transformers import GenerationConfig | |
| # from modelscope.hub.api import HubApi | |
| from pydub import AudioSegment | |
| import os | |
| YOUR_ACCESS_TOKEN = os.getenv('YOUR_ACCESS_TOKEN') | |
| # api = HubApi() | |
| # api.login(YOUR_ACCESS_TOKEN) | |
| # DEFAULT_CKPT_PATH = snapshot_download('qwen/Qwen-Audio-Chat') | |
| DEFAULT_CKPT_PATH = "Qwen/Qwen-Audio-Chat" | |
| def _get_args(): | |
| parser = ArgumentParser() | |
| parser.add_argument("-c", "--checkpoint-path", type=str, default=DEFAULT_CKPT_PATH, | |
| help="Checkpoint name or path, default to %(default)r") | |
| parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only") | |
| parser.add_argument("--share", action="store_true", default=False, | |
| help="Create a publicly shareable link for the interface.") | |
| parser.add_argument("--inbrowser", action="store_true", default=False, | |
| help="Automatically launch the interface in a new tab on the default browser.") | |
| parser.add_argument("--server-port", type=int, default=7860, | |
| help="Demo server port.") | |
| parser.add_argument("--server-name", type=str, default="0.0.0.0", | |
| help="Demo server name.") | |
| args = parser.parse_args() | |
| return args | |
| def _load_model_tokenizer(args): | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| args.checkpoint_path, trust_remote_code=True, resume_download=True, token=YOUR_ACCESS_TOKEN | |
| ) | |
| if args.cpu_only: | |
| device_map = "cpu" | |
| else: | |
| device_map = "cuda" | |
| model = AutoModelForCausalLM.from_pretrained( | |
| args.checkpoint_path, | |
| device_map=device_map, | |
| trust_remote_code=True, | |
| resume_download=True, | |
| token=YOUR_ACCESS_TOKEN | |
| ).eval() | |
| model.generation_config = GenerationConfig.from_pretrained( | |
| args.checkpoint_path, trust_remote_code=True, resume_download=True, token=YOUR_ACCESS_TOKEN | |
| ) | |
| return model, tokenizer | |
| def _parse_text(text): | |
| lines = text.split("\n") | |
| lines = [line for line in lines if line != ""] | |
| count = 0 | |
| for i, line in enumerate(lines): | |
| if "```" in line: | |
| count += 1 | |
| items = line.split("`") | |
| if count % 2 == 1: | |
| lines[i] = f'<pre><code class="language-{items[-1]}">' | |
| else: | |
| lines[i] = f"<br></code></pre>" | |
| else: | |
| if i > 0: | |
| if count % 2 == 1: | |
| line = line.replace("`", r"\`") | |
| line = line.replace("<", "<") | |
| line = line.replace(">", ">") | |
| line = line.replace(" ", " ") | |
| line = line.replace("*", "*") | |
| line = line.replace("_", "_") | |
| line = line.replace("-", "-") | |
| line = line.replace(".", ".") | |
| line = line.replace("!", "!") | |
| line = line.replace("(", "(") | |
| line = line.replace(")", ")") | |
| line = line.replace("$", "$") | |
| lines[i] = "<br>" + line | |
| text = "".join(lines) | |
| return text | |
| def _launch_demo(args, model, tokenizer): | |
| uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str( | |
| Path(tempfile.gettempdir()) / "gradio" | |
| ) | |
| def predict(_chatbot, task_history): | |
| query = task_history[-1][0] | |
| print("User: " + _parse_text(query)) | |
| history_cp = copy.deepcopy(task_history) | |
| full_response = "" | |
| history_filter = [] | |
| audio_idx = 1 | |
| pre = "" | |
| global last_audio | |
| for i, (q, a) in enumerate(history_cp): | |
| if isinstance(q, (tuple, list)): | |
| last_audio = q[0] | |
| q = f'Audio {audio_idx}: <audio>{q[0]}</audio>' | |
| pre += q + '\n' | |
| audio_idx += 1 | |
| else: | |
| pre += q | |
| history_filter.append((pre, a)) | |
| pre = "" | |
| history, message = history_filter[:-1], history_filter[-1][0] | |
| response, history = model.chat(tokenizer, message, history=history) | |
| ts_pattern = r"<\|\d{1,2}\.\d+\|>" | |
| all_time_stamps = re.findall(ts_pattern, response) | |
| print(response) | |
| if (len(all_time_stamps) > 0) and (len(all_time_stamps) % 2 ==0) and last_audio: | |
| ts_float = [ float(t.replace("<|","").replace("|>","")) for t in all_time_stamps] | |
| ts_float_pair = [ts_float[i:i + 2] for i in range(0,len(all_time_stamps),2)] | |
| # θ―»ει³ι’ζδ»Ά | |
| format = os.path.splitext(last_audio)[-1].replace(".","") | |
| audio_file = AudioSegment.from_file(last_audio, format=format) | |
| chat_response_t = response.replace("<|", "").replace("|>", "") | |
| chat_response = chat_response_t | |
| temp_dir = secrets.token_hex(20) | |
| temp_dir = Path(uploaded_file_dir) / temp_dir | |
| temp_dir.mkdir(exist_ok=True, parents=True) | |
| # ζͺει³ι’ζδ»Ά | |
| for pair in ts_float_pair: | |
| audio_clip = audio_file[pair[0] * 1000: pair[1] * 1000] | |
| # δΏει³ι’ζδ»Ά | |
| name = f"tmp{secrets.token_hex(5)}.{format}" | |
| filename = temp_dir / name | |
| audio_clip.export(filename, format=format) | |
| _chatbot[-1] = (_parse_text(query), chat_response) | |
| _chatbot.append((None, (str(filename),))) | |
| else: | |
| _chatbot[-1] = (_parse_text(query), response) | |
| full_response = _parse_text(response) | |
| task_history[-1] = (query, full_response) | |
| print("Qwen-Audio-Chat: " + _parse_text(full_response)) | |
| return _chatbot | |
| def regenerate(_chatbot, task_history): | |
| if not task_history: | |
| return _chatbot | |
| item = task_history[-1] | |
| if item[1] is None: | |
| return _chatbot | |
| task_history[-1] = (item[0], None) | |
| chatbot_item = _chatbot.pop(-1) | |
| if chatbot_item[0] is None: | |
| _chatbot[-1] = (_chatbot[-1][0], None) | |
| else: | |
| _chatbot.append((chatbot_item[0], None)) | |
| return predict(_chatbot, task_history) | |
| def add_text(history, task_history, text): | |
| history = history + [(_parse_text(text), None)] | |
| task_history = task_history + [(text, None)] | |
| return history, task_history, "" | |
| def add_file(history, task_history, file): | |
| history = history + [((file.name,), None)] | |
| task_history = task_history + [((file.name,), None)] | |
| return history, task_history | |
| def add_mic(history, task_history, file): | |
| if file is None: | |
| return history, task_history | |
| os.rename(file, file + '.wav') | |
| print("add_mic file:", file) | |
| print("add_mic history:", history) | |
| print("add_mic task_history:", task_history) | |
| # history = history + [((file.name,), None)] | |
| # task_history = task_history + [((file.name,), None)] | |
| task_history = task_history + [((file + '.wav',), None)] | |
| history = history + [((file + '.wav',), None)] | |
| print("task_history", task_history) | |
| return history, task_history | |
| def reset_user_input(): | |
| return gr.update(value="") | |
| def reset_state(task_history): | |
| task_history.clear() | |
| return [] | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""<p align="center"><img src="https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen-Audio/audio_logo.jpg" style="height: 80px"/><p>""") ## todo | |
| gr.Markdown("""<center><font size=8>Qwen-Audio-Chat Bot</center>""") | |
| gr.Markdown( | |
| """\ | |
| <center><font size=3>This WebUI is based on Qwen-Audio-Chat, developed by Alibaba Cloud. </center>""") | |
| gr.Markdown("""\ | |
| <center><font size=4>Qwen-Audio <a href="https://modelscope.cn/models/qwen/Qwen-Audio/summary">π€ </a> | |
| | <a href="https://huggingface.co/Qwen/Qwen-Audio">π€</a>  ο½ | |
| Qwen-Audio-Chat <a href="https://modelscope.cn/models/qwen/Qwen-Audio-Chat/summary">π€ </a> | | |
| <a href="https://huggingface.co/Qwen/Qwen-Audio-Chat">π€</a>  ο½ | |
|  <a href="https://github.com/QwenLM/Qwen-Audio">Github</a></center>""") | |
| chatbot = gr.Chatbot(label='Qwen-Audio-Chat', elem_classes="control-height", height=750) | |
| query = gr.Textbox(lines=2, label='Input') | |
| task_history = gr.State([]) | |
| # mic = gr.Audio(source="microphone", type="filepath") | |
| mic = gr.Audio(type="filepath") | |
| with gr.Row(): | |
| empty_bin = gr.Button("π§Ή Clear History") | |
| submit_btn = gr.Button("π Submit") | |
| regen_btn = gr.Button("π€οΈ Regenerate") | |
| addfile_btn = gr.UploadButton("π Upload", file_types=["audio"]) | |
| mic.change(add_mic, [chatbot, task_history, mic], [chatbot, task_history]) | |
| submit_btn.click(add_text, [chatbot, task_history, query], [chatbot, task_history]).then( | |
| predict, [chatbot, task_history], [chatbot], show_progress=True | |
| ) | |
| submit_btn.click(reset_user_input, [], [query]) | |
| empty_bin.click(reset_state, [task_history], [chatbot], show_progress=True) | |
| regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True) | |
| addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True) | |
| gr.Markdown("""\ | |
| <font size=2>Note: This demo is governed by the original license of Qwen-Audio. \ | |
| We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, \ | |
| including hate speech, violence, pornography, deception, etc. \ | |
| """) | |
| demo.queue().launch( | |
| share=args.share, | |
| inbrowser=args.inbrowser, | |
| server_port=args.server_port, | |
| server_name=args.server_name, | |
| # file_directories=["/tmp/"] | |
| ) | |
| def main(): | |
| args = _get_args() | |
| model, tokenizer = _load_model_tokenizer(args) | |
| _launch_demo(args, model, tokenizer) | |
| if __name__ == '__main__': | |
| main() |