File size: 3,927 Bytes
fef0a8d
294c109
 
99eb93c
fef0a8d
 
294c109
0c034e2
45099c6
542f90d
fef0a8d
294c109
fef0a8d
 
 
 
 
5268082
45099c6
5268082
e52a62d
0629ecb
4bd5128
294c109
 
4bd5128
96f2f76
 
 
 
 
 
 
 
45099c6
294c109
 
 
45099c6
 
294c109
 
 
 
 
 
 
5268082
5a25e75
baefccb
5a25e75
029aec2
294c109
5a25e75
48ebe09
c60c480
294c109
48ebe09
 
5a25e75
48ebe09
5a25e75
 
48ebe09
5d48470
5a25e75
48ebe09
5a25e75
 
 
 
 
 
 
294c109
5a25e75
af21e2a
5268082
 
 
 
 
1b6a68d
5268082
294c109
 
5a25e75
8a86647
294c109
5268082
294c109
5268082
 
 
46010b5
 
 
5a25e75
5268082
 
eb3d2f3
5268082
 
eb3d2f3
5268082
 
 
5a25e75
5268082
 
 
5a25e75
5268082
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Imports
import os
import sys
import gradio as gr
import spaces
import torch
import librosa
from PIL import Image
from decord import VideoReader, cpu
from transformers import AutoModel, AutoTokenizer, AutoProcessor

# Variables
DEVICE = "auto"
if DEVICE == "auto":
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[SYSTEM] | Using {DEVICE} type compute device.")

DEFAULT_INPUT = "Describe in one paragraph."
MAX_FRAMES = 64

model_name = "openbmb/MiniCPM-o-2_6"

repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)

css = '''
.gradio-container{max-width: 560px !important}
h1{text-align:center}
footer {
    visibility: hidden
}
'''

def encode_video(video_path):
    def uniform_sample(idxs, n):
        gap = len(idxs) / n
        return [idxs[int(i*gap + gap/2)] for i in range(n)]

    vr = VideoReader(video_path, ctx=cpu(0))
    fps = round(vr.get_avg_fps())
    idxs = list(range(0, len(vr), fps))
    if len(idxs) > MAX_FRAMES:
        idxs = uniform_sample(idxs, MAX_FRAMES)
    frames = vr.get_batch(idxs).asnumpy()
    return [Image.fromarray(f.astype("uint8")) for f in frames]

@spaces.GPU(duration=60)
def generate(input=[], instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512):

    print(input)
    print(instruction)

    content = []
    if not input:
        return "No input provided."

    for path in input:
        ext = os.path.splitext(path)[1].lower()
        if ext in [".jpg", ".jpeg", ".png", ".bmp", ".gif"]:
            img = Image.open(path).convert("RGB")
            content.append(img)
        elif ext in [".mp4", ".mov", ".avi", ".mkv"]:
            frames = encode_video(path)
            content.extend(frames)
        elif ext in [".wav", ".mp3", ".flac", ".aac"]:
            aud, _ = librosa.load(path, sr=16000, mono=True)
            content.append(aud)
        else:
            continue

    content.append(instruction)
    inputs_payload = [{"role": "user", "content": content}]

    params = {
        "msgs": inputs_payload,
        "tokenizer": tokenizer,
        "sampling": sampling,
        "temperature": temperature,
        "top_p": top_p,
        "top_k": top_k,
        "repetition_penalty": repetition_penalty,
        "max_new_tokens": max_tokens,
    }

    output = repo.chat(**params)
    
    print(output)

    return output

def cloud():
    print("[CLOUD] | Space maintained.")

# Initialize
with gr.Blocks(css=css) as main:
    with gr.Column():
        input = gr.File(label="Input", file_count="multiple", file_types=["image", "video", "audio"], type="filepath", allow_reordering=True)
        instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction")
        sampling = gr.Checkbox(value=False, label="Sampling")
        temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature")
        top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P")
        top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K")
        repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty")
        max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens")
        submit = gr.Button("▶")
        maintain = gr.Button("☁️")

    with gr.Column():
        output = gr.Textbox(lines=1, value="", label="Output")

    submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False)
    maintain.click(cloud, inputs=[], outputs=[], queue=False)

main.launch(show_api=True)