Spaces:
Running
Running
# Imports | |
import os | |
import sys | |
import gradio as gr | |
import spaces | |
import torch | |
import librosa | |
from PIL import Image | |
from decord import VideoReader, cpu | |
from transformers import AutoModel, AutoTokenizer, AutoProcessor | |
# Variables | |
DEVICE = "auto" | |
if DEVICE == "auto": | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"[SYSTEM] | Using {DEVICE} type compute device.") | |
DEFAULT_INPUT = "Describe in one paragraph." | |
MAX_FRAMES = 64 | |
model_name = "openbmb/MiniCPM-o-2_6" | |
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) | |
css = ''' | |
.gradio-container{max-width: 560px !important} | |
h1{text-align:center} | |
footer { | |
visibility: hidden | |
} | |
''' | |
def encode_video(video_path): | |
def uniform_sample(idxs, n): | |
gap = len(idxs) / n | |
return [idxs[int(i*gap + gap/2)] for i in range(n)] | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
fps = round(vr.get_avg_fps()) | |
idxs = list(range(0, len(vr), fps)) | |
if len(idxs) > MAX_FRAMES: | |
idxs = uniform_sample(idxs, MAX_FRAMES) | |
frames = vr.get_batch(idxs).asnumpy() | |
return [Image.fromarray(f.astype("uint8")) for f in frames] | |
def generate(input=[], instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512): | |
print(input) | |
print(instruction) | |
content = [] | |
if not input: | |
return "No input provided." | |
for path in input: | |
ext = os.path.splitext(path)[1].lower() | |
if ext in [".jpg", ".jpeg", ".png", ".bmp", ".gif"]: | |
img = Image.open(path).convert("RGB") | |
content.append(img) | |
elif ext in [".mp4", ".mov", ".avi", ".mkv"]: | |
frames = encode_video(path) | |
content.extend(frames) | |
elif ext in [".wav", ".mp3", ".flac", ".aac"]: | |
aud, _ = librosa.load(path, sr=16000, mono=True) | |
content.append(aud) | |
else: | |
continue | |
content.append(instruction) | |
inputs_payload = [{"role": "user", "content": content}] | |
params = { | |
"msgs": inputs_payload, | |
"tokenizer": tokenizer, | |
"sampling": sampling, | |
"temperature": temperature, | |
"top_p": top_p, | |
"top_k": top_k, | |
"repetition_penalty": repetition_penalty, | |
"max_new_tokens": max_tokens, | |
} | |
output = repo.chat(**params) | |
print(output) | |
return output | |
def cloud(): | |
print("[CLOUD] | Space maintained.") | |
# Initialize | |
with gr.Blocks(css=css) as main: | |
with gr.Column(): | |
input = gr.File(label="Input", file_count="multiple", file_types=["image", "video", "audio"], type="filepath", allow_reordering=True) | |
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction") | |
sampling = gr.Checkbox(value=False, label="Sampling") | |
temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature") | |
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P") | |
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K") | |
repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty") | |
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens") | |
submit = gr.Button("▶") | |
maintain = gr.Button("☁️") | |
with gr.Column(): | |
output = gr.Textbox(lines=1, value="", label="Output") | |
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False) | |
maintain.click(cloud, inputs=[], outputs=[], queue=False) | |
main.launch(show_api=True) |