Sense / app.py
Staticaliza's picture
Update app.py
89f7ce7 verified
raw
history blame
6.39 kB
# Imports
import gradio as gr
import spaces
import torch
import os
import math
import tempfile
import librosa
import gc
from PIL import Image, ImageSequence
from decord import VideoReader, cpu
from moviepy.editor import VideoFileClip
from transformers import AutoModel, AutoTokenizer, AutoProcessor
# Variables
DEVICE = "auto"
if DEVICE == "auto":
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[SYSTEM] | Using {DEVICE} type compute device.")
DEFAULT_INPUT = "Describe in one short sentence."
MAX_FRAMES = 64
AUDIO_SR = 16000
model_name = "openbmb/MiniCPM-o-2_6"
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
css = '''
.gradio-container{max-width: 560px !important}
h1{text-align:center}
footer {
visibility: hidden
}
'''
input_prefixes = {
"Image": "(A image file called β–ˆ has been attached, describe the image content) ",
"GIF": "(A GIF file called β–ˆ has been attached, describe the GIF content) ",
"Video": "(A audio video file called β–ˆ has been attached, describe the video content and the audio content) ",
"Audio": "(A audio file called β–ˆ has been attached, describe the audio content) ",
}
filetypes = {
"Image": [".jpg", ".jpeg", ".png", ".bmp"],
"GIF": [".gif"],
"Video": [".mp4", ".mov", ".avi", ".mkv"],
"Audio": [".wav", ".mp3", ".flac", ".aac"],
}
def infer_filetype(ext):
return next((k for k, v in filetypes.items() if ext in v), None)
def uniform_sample(seq, n):
step = max(len(seq) // n, 1)
return seq[::step][:n]
def frames_from_video(path):
vr = VideoReader(path, ctx = cpu(0))
idx = uniform_sample(range(len(vr)), MAX_FRAMES)
batch = vr.get_batch(idx).asnumpy()
return [Image.fromarray(frame.astype("uint8")) for frame in batch]
def audio_from_video(path):
clip = VideoFileClip(path)
audio = clip.audio.to_soundarray(fps = AUDIO_SR)
clip.close()
return librosa.to_mono(audio.T)
def load_audio(path):
audio_np, _ = librosa.load(path, sr = AUDIO_SR, mono = True)
return audio_np
def build_video_omni(path, prefix, instruction):
frames = frames_from_video(path)
audio = audio_from_video(path)
return processor.build_omni_input(
frames = frames,
audio = audio,
prefix = prefix,
instruction = instruction,
max_frames = MAX_FRAMES,
sr = AUDIO_SR
)
def build_image_omni(path, prefix, instruction):
image = Image.open(path).convert("RGB")
return processor.build_omni_input(
frames = [image],
audio = None,
prefix = prefix,
instruction = instruction
)
def build_gif_omni(path, prefix, instruction):
img = Image.open(path)
frames = [frame.copy().convert("RGB") for frame in ImageSequence.Iterator(img)]
frames = uniform_sample(frames, MAX_FRAMES)
return processor.build_omni_input(
frames = frames,
audio = None,
prefix = prefix,
instruction = instruction
)
def build_audio_omni(path, prefix, instruction):
audio = load_audio(path)
return processor.build_omni_input(
frames = None,
audio = audio,
prefix = prefix,
instruction = instruction,
sr = AUDIO_SR
)
@spaces.GPU(duration = 60)
def generate(input,
instruction = DEFAULT_INPUT,
sampling = False,
temperature = 0.7,
top_p = 0.8,
top_k = 100,
repetition_penalty = 1.05,
max_tokens = 512):
if not input:
return "no input provided."
extension = os.path.splitext(input)[1].lower()
filetype = infer_filetype(extension)
if not filetype:
return "unsupported file type."
filename = os.path.basename(input)
prefix = input_prefixes[filetype].replace("β–ˆ", filename)
builder_map = {
"Image": build_image_omni,
"GIF" : build_gif_omni,
"Video": build_video_omni,
"Audio": build_audio_omni
}
omni_content = builder_map[filetype](input, prefix, instruction)
sys_msg = repo.get_sys_prompt(mode = "omni", language = "en")
msgs = [sys_msg, { "role": "user", "content": omni_content }]
output = repo.chat(
msgs = msgs,
tokenizer = tokenizer,
sampling = sampling,
temperature = temperature,
top_p = top_p,
top_k = top_k,
repetition_penalty = repetition_penalty,
max_new_tokens = max_tokens,
omni_input = True,
use_image_id = False,
max_slice_nums = 2
)
torch.cuda.empty_cache()
gc.collect()
return output
def cloud():
print("[CLOUD] | Space maintained.")
# Initialize
with gr.Blocks(css=css) as main:
with gr.Column():
input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath")
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction")
sampling = gr.Checkbox(value=False, label="Sampling")
temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature")
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P")
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K")
repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty")
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens")
submit = gr.Button("β–Ά")
maintain = gr.Button("☁️")
with gr.Column():
output = gr.Textbox(lines=1, value="", label="Output")
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False)
maintain.click(cloud, inputs=[], outputs=[], queue=False)
main.launch(show_api=True)