Spaces:
Runtime error
Runtime error
import csv | |
import time | |
from datetime import datetime, timedelta | |
from ultralytics import YOLO | |
import cv2 | |
import mediapipe as mp | |
import numpy as np | |
from flask import Flask, render_template, Response, redirect, url_for, request, send_from_directory, flash | |
import os | |
import plotly.express as px | |
import pandas as pd | |
from werkzeug.utils import secure_filename | |
import json | |
import matplotlib.pyplot as plt | |
import uuid | |
import random # Make sure to import the random module | |
from datetime import datetime # Import datetime for timestamp | |
import string # Add this line to use string.ascii_letters and string.digits | |
import pyaudio | |
import wave | |
import whisper | |
from transformers import pipeline | |
import csv | |
import os | |
import pandas as pd | |
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash | |
from flask import jsonify | |
from transformers import T5Tokenizer, T5ForConditionalGeneration | |
import torch | |
from openai import OpenAI | |
from io import StringIO | |
import re | |
from flask import session | |
# Flask app setup | |
app = Flask(__name__) | |
app.config['UPLOAD_FOLDER'] = 'uploads' | |
app.secret_key = 'supersecretkey' | |
# Custom function to load YOLO model safely | |
def safe_load_yolo_model(model_path): | |
try: | |
return YOLO(model_path) | |
except Exception as e: | |
print(f"Failed to load model: {e}") | |
raise | |
# Load YOLO model | |
model_path = './best.pt' | |
model = safe_load_yolo_model(model_path) | |
mp_hands = mp.solutions.hands | |
mp_drawing = mp.solutions.drawing_utils | |
mp_drawing_styles = mp.solutions.drawing_styles | |
# Load Whisper model for speech-to-text | |
whisper_model = whisper.load_model("base") # ✅ This is correct! | |
# Variables to hold CSV data and other states between requests | |
original_data = None | |
updated_data = None | |
csv_filename = None | |
hands = mp_hands.Hands(static_image_mode=True, min_detection_confidence=0.3) | |
# Initialize variables for tracking gestures | |
previous_gesture = None | |
gesture_start_time = None | |
gesture_data_list = [] | |
capture_flag = True # This flag is used to indicate when to capture | |
start_recording_time = None # To record the start time of the session | |
# Default labels dictionary | |
labels_dict = {0: 'fist', 1: 'ok', 2: 'peace', 3: 'stop', 4: 'two up'} | |
custom_labels_dict = labels_dict.copy() # To store custom labels set by user | |
# Initialize OpenAI client | |
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
# Function to interact with OpenAI's GPT model using streaming | |
def get_gpt_instruction_response(instruction, csv_data): | |
messages = [ | |
{"role": "system", "content": "You are a helpful assistant that processes CSV files."}, | |
{"role": "user", "content": f"Here is a CSV data:\n\n{csv_data}\n\nThe user has requested the following change: {instruction}\n\nPlease process the data accordingly and return the modified CSV."} | |
] | |
# Stream response from OpenAI API | |
stream = client.chat.completions.create( | |
model="gpt-4o-mini", # or "gpt-3.5-turbo" | |
messages=messages, | |
stream=True, | |
) | |
response = "" | |
for chunk in stream: | |
if chunk.choices[0].delta.content is not None: | |
response += chunk.choices[0].delta.content | |
return response.strip() | |
# Function to read CSV and convert it to string | |
def read_csv_to_string(file_path): | |
df = pd.read_csv(file_path) | |
return df.to_csv(index=False) | |
# Function to write modified CSV string to a file | |
def write_csv_from_string(csv_string, output_file_path): | |
with open(output_file_path, 'w') as file: | |
file.write(csv_string) | |
# Function to record audio | |
def record_audio(filename, duration=10): | |
chunk = 1024 | |
sample_format = pyaudio.paInt16 | |
channels = 1 | |
fs = 44100 | |
p = pyaudio.PyAudio() | |
print('Recording...') | |
stream = p.open(format=sample_format, channels=channels, rate=fs, frames_per_buffer=chunk, input=True) | |
frames = [] | |
for _ in range(0, int(fs / chunk * duration)): | |
data = stream.read(chunk) | |
frames.append(data) | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
print('Finished recording.') | |
wf = wave.open(filename, 'wb') | |
wf.setnchannels(channels) | |
wf.setsampwidth(p.get_sample_size(sample_format)) | |
wf.setframerate(fs) | |
wf.writeframes(b''.join(frames)) | |
wf.close() | |
# Function to transcribe audio using Whisper | |
def transcribe_audio(file_path): | |
result = whisper_model.transcribe(file_path) | |
return result["text"] | |
def index(): | |
return render_template('index.html') | |
def set_labels(): | |
global custom_labels_dict | |
if request.method == 'POST': | |
custom_labels_dict[0] = request.form['label1'] | |
custom_labels_dict[1] = request.form['label2'] | |
custom_labels_dict[2] = request.form['label3'] | |
custom_labels_dict[3] = request.form['label4'] | |
custom_labels_dict[4] = request.form['label5'] | |
# Remove empty labels | |
custom_labels_dict = {k: v for k, v in custom_labels_dict.items() if v} | |
return redirect(url_for('recognize')) | |
return render_template('set_labels.html') | |
def recognize(): | |
return render_template('recognize.html') | |
def video_feed(): | |
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') | |
def generate_frames(): | |
global previous_gesture, gesture_start_time, gesture_data_list, capture_flag, start_recording_time | |
# Initialize start recording time | |
start_recording_time = datetime.now() | |
cap = cv2.VideoCapture(0) | |
while capture_flag: | |
data_aux = [] | |
x_ = [] | |
y_ = [] | |
ret, frame = cap.read() | |
if not ret: | |
break | |
H, W, _ = frame.shape | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = hands.process(frame_rgb) | |
if results.multi_hand_landmarks: | |
for hand_landmarks in results.multi_hand_landmarks: | |
mp_drawing.draw_landmarks( | |
frame, | |
hand_landmarks, | |
mp_hands.HAND_CONNECTIONS, | |
mp_drawing_styles.get_default_hand_landmarks_style(), | |
mp_drawing_styles.get_default_hand_connections_style()) | |
for hand_landmarks in results.multi_hand_landmarks: | |
for i in range(len(hand_landmarks.landmark)): | |
x = hand_landmarks.landmark[i].x | |
y = hand_landmarks.landmark[i].y | |
x_.append(x) | |
y_.append(y) | |
for i in range(len(hand_landmarks.landmark)): | |
x = hand_landmarks.landmark[i].x | |
y = hand_landmarks.landmark[i].y | |
data_aux.append(x - min(x_)) | |
data_aux.append(y - min(y_)) | |
x1 = int(min(x_) * W) - 10 | |
y1 = int(min(y_) * H) - 10 | |
x2 = int(max(x_) * W) + 10 | |
y2 = int(max(y_) * H) + 10 | |
prediction = model.predict(frame, conf=0.25, iou=0.45) | |
probs = prediction[0].probs.data.numpy() | |
detected_gesture_index = np.argmax(probs) | |
detected_gesture = custom_labels_dict.get(detected_gesture_index, None) | |
if detected_gesture is None: | |
continue | |
# Get the current timestamp and calculate relative time from the start | |
current_time = datetime.now() | |
relative_time = current_time - start_recording_time | |
# Check if the detected gesture has changed | |
if detected_gesture != previous_gesture: | |
# If the detected gesture has changed, calculate the duration of the previous gesture | |
if previous_gesture is not None: | |
gesture_end_time = relative_time.total_seconds() | |
gesture_duration = gesture_end_time - gesture_start_time | |
# Store the detected gesture, start time, end time, and duration in the list | |
gesture_data_list.append([previous_gesture, gesture_start_time, gesture_end_time, round(gesture_duration, 2)]) | |
# Update the previous gesture and its start time | |
previous_gesture = detected_gesture | |
gesture_start_time = relative_time.total_seconds() | |
# Draw rectangle around the detected gesture | |
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 0), 4) | |
cv2.putText(frame, detected_gesture, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1.3, (0, 255, 0), 3, cv2.LINE_AA) | |
ret, buffer = cv2.imencode('.jpg', frame) | |
frame = buffer.tobytes() | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
cap.release() | |
def upload_csv(): | |
try: | |
# Handle file upload | |
file = request.files.get('csv_file') | |
if file: | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(file.filename)) | |
file.save(file_path) | |
flash("CSV file uploaded successfully!", "success") | |
# Load the uploaded CSV file as original data | |
original_df = pd.read_csv(file_path) | |
original_data = original_df.to_dict('records') | |
columns = original_df.columns.tolist() | |
# Store the original data and file path in the session | |
session['original_data'] = original_data | |
session['columns'] = columns | |
session['file_path'] = file_path | |
else: | |
flash("Please upload a CSV file.", "warning") | |
except Exception as e: | |
app.logger.error(f"Error in upload_csv route: {e}") | |
flash("An unexpected error occurred. Please check the logs.", "danger") | |
return redirect(url_for('edit_csv')) | |
def edit_csv(): | |
updated_data = None | |
original_data = session.get('original_data', None) | |
columns = session.get('columns', None) | |
if request.method == 'POST': | |
try: | |
# Ensure a file has been uploaded | |
file_path = session.get('file_path') | |
if not file_path: | |
flash("Please upload a CSV file first.", "warning") | |
return redirect(url_for('edit_csv')) | |
# Load the CSV data as string for processing | |
csv_data = read_csv_to_string(file_path) | |
# Get the instruction from the form | |
instruction = request.form.get('transcription', "").strip() | |
if not instruction: | |
flash("Please provide an instruction.", "warning") | |
return redirect(url_for('edit_csv')) | |
# Process the CSV using OpenAI API | |
raw_output = get_gpt_instruction_response(instruction, csv_data) | |
# Extract and clean only the CSV part from the GPT output | |
csv_pattern = re.compile(r"(?<=```)([\s\S]*?)(?=```)|([\s\S]*)", re.DOTALL) | |
match = csv_pattern.search(raw_output) | |
if match: | |
csv_content = match.group(1) or match.group(2) | |
csv_content = csv_content.strip() # Clean up leading/trailing spaces | |
else: | |
raise ValueError("No valid CSV content found in GPT output.") | |
# Further cleaning: Remove any lines not starting with valid CSV columns | |
csv_lines = csv_content.splitlines() | |
cleaned_csv_lines = [ | |
line for line in csv_lines if ',' in line and not line.startswith("Here is") | |
] | |
cleaned_csv_content = "\n".join(cleaned_csv_lines) | |
# Save the modified CSV to a file | |
modified_file_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
with open(modified_file_path, 'w') as f: | |
f.write(cleaned_csv_content) | |
# Load the modified data | |
updated_data = pd.read_csv(StringIO(cleaned_csv_content)).to_dict('records') | |
# Store the updated data in the session | |
session['updated_data'] = updated_data | |
except Exception as e: | |
app.logger.error(f"Error in edit_csv route: {e}") | |
flash("An unexpected error occurred. Please check the logs.", "danger") | |
# Load updated data from session if available | |
updated_data = session.get('updated_data', None) | |
return render_template('edit_csv.html', original_data=original_data, updated_data=updated_data, columns=columns) | |
# Route: Download Modified CSV | |
def download_csv_updated(): | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
if not os.path.isfile(file_path): | |
flash("Updated CSV file not found!", "warning") | |
return redirect(url_for('edit_csv')) | |
return send_from_directory(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv', as_attachment=True) | |
# Process uploaded audio using Whisper | |
def process_audio(): | |
if 'audio' not in request.files: | |
return jsonify({'error': 'No audio file provided'}), 400 | |
audio_file = request.files['audio'] | |
audio_file_path = 'recorded_audio.wav' | |
audio_file.save(audio_file_path) | |
# Transcribe audio using Whisper | |
transcription = transcribe_audio(audio_file_path) | |
return jsonify({'transcription': transcription}) | |
def data_view(): | |
csv_file = request.args.get('csv_file', 'static/gesture_data.csv') | |
gesture_data = load_csv_data(csv_file) | |
df = pd.DataFrame(gesture_data, columns=['Gesture', 'Start Time', 'End Time', 'Duration']) | |
gesture_counts = df['Gesture'].value_counts().reset_index() | |
gesture_counts.columns = ['Gesture', 'Count'] | |
fig = px.pie(gesture_counts, values='Count', names='Gesture', title='Gesture Distribution') | |
html_chart = fig.to_html(full_html=False) | |
return render_template('data.html', gesture_data=gesture_data, html_chart=html_chart) | |
import pandas as pd | |
from flask import render_template | |
def datadiff(): | |
# Load original and modified CSV files | |
original_csv_path = os.path.join(app.config['UPLOAD_FOLDER'], 'gesture_data.csv') | |
modified_csv_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
# Read the CSVs into pandas DataFrames | |
original_csv = pd.read_csv(original_csv_path) | |
modified_csv = pd.read_csv(modified_csv_path) | |
# Render the datadiff.html page with the data for comparison | |
return render_template('datadiff.html', original_csv=original_csv, modified_csv=modified_csv) | |
def load_csv_data(file_path): | |
gesture_data = [] | |
with open(file_path, 'r') as csvfile: | |
reader = csv.reader(csvfile) | |
next(reader) | |
for row in reader: | |
gesture_data.append(row) | |
return gesture_data | |
def save_gesture_data(): | |
global capture_flag | |
capture_flag = False | |
# Ensure gesture data is actually populated | |
print("Saving gesture data:", gesture_data_list) | |
# Ensure the static directory exists | |
os.makedirs('static', exist_ok=True) | |
# Save data to JSON file in Label Studio-compatible format | |
json_file_path = os.path.join('static', 'gesture_data_labelstudio.json') | |
save_label_studio_json(gesture_data_list, json_file_path) | |
# Save data to CSV file for visualization | |
csv_file_path = os.path.join('static', 'gesture_data.csv') | |
save_gesture_csv(gesture_data_list, csv_file_path) | |
return redirect(url_for('data')) | |
import random # Make sure to import the random module | |
import uuid # Make sure to import uuid for unique IDs | |
from datetime import datetime # Import datetime for timestamp | |
def generate_alphanumeric_id(length=5): | |
"""Generates a random alphanumeric ID.""" | |
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
def save_label_studio_json(gesture_data, file_path): | |
current_time = datetime.utcnow().isoformat() + "Z" | |
# Create a single task with all annotations | |
annotations = { | |
"id": 1, # Task ID | |
"annotations": [ | |
{ | |
"id": 1, # Annotation ID | |
"completed_by": 1, | |
"result": [], | |
"was_cancelled": False, | |
"ground_truth": False, | |
"created_at": current_time, | |
"updated_at": current_time, | |
"draft_created_at": current_time, | |
"lead_time": sum(duration for _, _, _, duration in gesture_data), | |
"prediction": {}, | |
"result_count": 0, | |
"unique_id": str(uuid.uuid4()), | |
"import_id": None, | |
"last_action": None, | |
"task": 1, | |
"project": 25, | |
"updated_by": 1, | |
"parent_prediction": None, | |
"parent_annotation": None, | |
"last_created_by": None | |
} | |
], | |
"file_upload": "1212df4d-HandyLabels.MP4", | |
"drafts": [], | |
"predictions": [], | |
"data": { | |
"video_url": "/data/upload/30/030cca83-Video_1.mp4" | |
}, | |
"meta": {}, | |
"created_at": current_time, | |
"updated_at": current_time, | |
"inner_id": 1, | |
"total_annotations": 1, | |
"cancelled_annotations": 0, | |
"total_predictions": 0, | |
"comment_count": 0, | |
"unresolved_comment_count": 0, | |
"last_comment_updated_at": None, | |
"project": 25, | |
"updated_by": 1, | |
"comment_authors": [] | |
} | |
# Add each gesture as an individual result within the annotation | |
for gesture, start_time, end_time, duration in gesture_data: | |
annotation_result = { | |
"original_length": end_time - start_time, | |
"value": { | |
"start": start_time, | |
"end": end_time, | |
"channel": 0, | |
"labels": [gesture] | |
}, | |
"id": generate_alphanumeric_id(), # Generate a unique 5-character alphanumeric ID for each result | |
"from_name": "tricks", | |
"to_name": "audio", | |
"type": "labels", | |
"origin": "manual" | |
} | |
annotations["annotations"][0]["result"].append(annotation_result) | |
# Save the consolidated JSON to the file | |
with open(file_path, 'w') as json_file: | |
json.dump([annotations], json_file, indent=2) | |
print(f"Label Studio JSON saved to: {file_path}") | |
def save_gesture_csv(gesture_data, file_path): | |
with open(file_path, 'w', newline='') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(['Gesture', 'Start Time', 'End Time', 'Duration']) | |
for gesture, start_time, end_time, duration in gesture_data: | |
writer.writerow([gesture, start_time, end_time, duration]) | |
def data(): | |
gesture_data = load_csv_data() | |
# Convert to DataFrame for easier manipulation | |
df = pd.DataFrame(gesture_data, columns=['Gesture', 'Start Time', 'End Time', 'Duration']) | |
# Count occurrences of each gesture | |
gesture_counts = df['Gesture'].value_counts().reset_index() | |
gesture_counts.columns = ['Gesture', 'Count'] | |
# Create the pie chart using Plotly | |
fig = px.pie(gesture_counts, values='Count', names='Gesture', title='Gesture Distribution') | |
# Convert the plotly chart to HTML | |
html_chart = fig.to_html(full_html=False) | |
return render_template('data.html', gesture_data=gesture_data, html_chart=html_chart) | |
def load_csv_data(): | |
gesture_data = [] | |
with open('static/gesture_data.csv', 'r') as csvfile: | |
reader = csv.reader(csvfile) | |
next(reader) # Skip the header row | |
for row in reader: | |
gesture_data.append(row) | |
return gesture_data | |
def download_json(): | |
file_path = os.path.join('static', 'gesture_data_labelstudio.json') | |
if not os.path.isfile(file_path): | |
return "JSON file not found!", 404 | |
return send_from_directory('static', 'gesture_data_labelstudio.json', as_attachment=True) | |
def download_csv(): | |
filename = request.args.get('filename') | |
if filename == 'original': | |
path = os.path.join(app.config['UPLOAD_FOLDER'], 'gesture_data.csv') | |
elif filename == 'updated': | |
path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
else: | |
flash('Invalid file requested') | |
return redirect(url_for('edit_csv')) | |
if not os.path.exists(path): | |
flash('File not found!') | |
return redirect(url_for('edit_csv')) | |
return send_from_directory(app.config['UPLOAD_FOLDER'], os.path.basename(path), as_attachment=True) | |
# New route to download the modified CSV | |
def download_csv_modified(): | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
if not os.path.isfile(file_path): | |
return "Modified CSV file not found!", 404 | |
return send_from_directory(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv', as_attachment=True) | |
# Import Data Functionality to Visualize Imported CSV | |
def import_data(): | |
if request.method == 'POST': | |
if 'file' not in request.files: | |
flash('No file part') | |
return redirect(request.url) | |
file = request.files['file'] | |
if file.filename == '': | |
flash('No selected file') | |
return redirect(request.url) | |
if file: | |
filename = secure_filename(file.filename) | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
file.save(file_path) | |
return redirect(url_for('visualize_data', file_path=file_path)) | |
return render_template('import_data.html') | |
def visualize_data(): | |
file_path = request.args.get('file_path') | |
if not os.path.exists(file_path): | |
return "The file could not be found.", 404 | |
return visualize_csv(file_path) | |
def visualize_csv(file_path): | |
try: | |
# Load gesture data from CSV and process it for visualization | |
data = pd.read_csv(file_path) | |
# Check if necessary columns are present | |
required_columns = ['Gesture', 'Start Time', 'End Time', 'Duration'] | |
if not set(required_columns).issubset(data.columns): | |
return f"The uploaded CSV must contain the following columns: {required_columns}", 400 | |
# Extract relevant columns | |
gesture_df = data[required_columns] | |
# Generate a pie chart for gesture distribution | |
gesture_counts = gesture_df['Gesture'].value_counts().reset_index() | |
gesture_counts.columns = ['Gesture', 'Count'] | |
# Create the pie chart using Plotly | |
fig = px.pie(gesture_counts, values='Count', names='Gesture', title='Gesture Distribution') | |
# Convert the plotly chart to HTML | |
html_chart = fig.to_html(full_html=False) | |
# Render the data.html template with the gesture data and chart | |
return render_template('data.html', gesture_data=gesture_df.to_dict('records'), html_chart=html_chart) | |
except Exception as e: | |
return f"An error occurred while processing the file: {str(e)}", 500 | |
if __name__ == '__main__': | |
port = int(os.environ.get("PORT", 5000)) | |
app.run(host='0.0.0.0', port=port, debug=True) | |