Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,470 Bytes
ba1eb4b b54618b ba1eb4b b54618b ba1eb4b 402c2c1 ba1eb4b eba62a3 b54618b ba1eb4b eba62a3 ba1eb4b b54618b a21637d b54618b a21637d df2ba9f a21637d df2ba9f a21637d df2ba9f a21637d df2ba9f ba1eb4b df2ba9f b54618b ba1eb4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch
import numpy as np
import av
import gc
import spaces
import gradio as gr
import os
import json
import csv
import io
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf'
processor = LlavaNextVideoProcessor.from_pretrained(model_name)
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_name,
quantization_config=quantization_config,
device_map='auto'
)
@spaces.GPU
def read_video_pyav(container, indices):
'''
Decode the video with PyAV decoder.
'''
frames = []
container.seek(0)
start_index = indices[0]
end_index = indices[-1]
for i, frame in enumerate(container.decode(video=0)):
if i > end_index:
break
if i >= start_index and i in indices:
frames.append(frame)
return np.stack([x.to_ndarray(format="rgb24") for x in frames])
@spaces.GPU
def process_video(video_file, question):
'''
Processes a single video and returns the answer to the given question.
'''
with av.open(video_file.name) as container:
total_frames = container.streams.video[0].frames
indices = np.arange(0, total_frames, total_frames / 8).astype(int)
video_clip = read_video_pyav(container, indices)
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": f"{question}"},
{"type": "video"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}
output = model.generate(**input, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0]
return generated_text.split("ASSISTANT: ", 1)[-1].strip()
@spaces.GPU
def analyze_videos(video_files, selected_questions):
"""Analyzes videos, saves results to CSV, and returns CSV data and JSON."""
all_results = {}
questions = {
"hands_free": "Examine the subject’s right and left hands in the video to check if they are holding anything like a microphone, book, paper(White color), object, or any electronic device, try segmentations and decide if the hands are free or not.",
"standing/sitting": "Evaluate the subject’s body posture and movement within the video. Are they standing upright with both feet planted firmly on the ground? If so, they are standing. If they seem to be seated, they are seated.",
"interaction_with_background": "Assess the surroundings behind the subject in the video. Do they seem to interact with any visible screens, such as laptops, TVs, or digital billboards? If yes, then they are interacting with a screen. If not, they are not interacting with a screen.",
"indoors/outdoors": "Consider the broader environmental context shown in the video’s background. Are there signs of an open-air space, like greenery, structures, or people passing by? If so, it’s an outdoor setting. If the setting looks confined with furniture, walls, or home decorations, it’s an indoor environment."
}
for video_file in video_files:
video_name = os.path.basename(video_file.name)
all_results[video_name] = {}
for question_key in selected_questions:
answer = process_video(video_file, questions[question_key])
all_results[video_name][question_key] = "true" if "yes" in answer.lower() else "false"
del answer
gc.collect()
torch.cuda.empty_cache()
# Create CSV content
csv_output = io.StringIO()
writer = csv.writer(csv_output)
header = ["Video File"] + list(questions.keys())
writer.writerow(header)
for video_name, results in all_results.items():
row = [video_name] + [results.get(key, "") for key in questions]
writer.writerow(row)
csv_content = csv_output.getvalue()
# Return both JSON and CSV
json_output = json.dumps(all_results, indent=4)
return json_output, csv_content
def download_csv(csv_content):
"""Creates a downloadable CSV file."""
return gr.File.update(
value=csv_content,
filename="video_analysis.csv",
)
# Define Gradio interface
with gr.Blocks() as iface:
with gr.Row():
file_input = gr.File(label="Upload Videos", file_count="multiple")
question_input = gr.CheckboxGroup(["hands_free", "standing/sitting", "interaction_with_background", "indoors/outdoors"],
label="Select Questions to Apply")
process_button = gr.Button("Process Videos") # Process button below checkboxes
with gr.Row():
json_output = gr.JSON(label="Analysis Results (JSON)")
csv_output = gr.Textbox(label="CSV Results", lines=15)
download_button = gr.Button("Download CSV")
# Link buttons to their respective functions
process_button.click(analyze_videos, inputs=[file_input, question_input], outputs=[json_output, csv_output])
download_button.click(download_csv, inputs=csv_output, outputs=download_button)
if __name__ == "__main__":
iface.launch(debug=True) |