import streamlit as st from streamlit_webrtc import webrtc_streamer, WebRtcMode import av import os from twilio.base.exceptions import TwilioRestException from twilio.rest import Client from streamlit_image_select import image_select import cv2 as cv import numpy as np import math import torch from torch import nn from PIL import Image import feat CLASS_LABELS = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise'] def _get_resource_path(): return "/home/user/app/resources" feat.utils.io.get_resource_path = _get_resource_path def _get_pretrained_models(face_model, landmark_model, au_model, emotion_model, facepose_model, verbose): return ( face_model, landmark_model, au_model, emotion_model, facepose_model, ) feat.pretrained.get_pretrained_models = _get_pretrained_models from feat import Detector os.environ["TWILIO_ACCOUNT_SID"] = "ACf1e76f3fd6e9cbca940decc4ed443c20" os.environ["TWILIO_AUTH_TOKEN"] = "c732a947bafc840e" + "797b7587d6abed92" def get_ice_servers(): try: account_sid = os.environ["TWILIO_ACCOUNT_SID"] auth_token = os.environ["TWILIO_AUTH_TOKEN"] except KeyError: logger.warning("TURN credentials are not set. Fallback to a free STUN server from Google.") return [{"urls": ["stun:stun.l.google.com:19302"]}] client = Client(account_sid, auth_token) token = client.tokens.create() return token.ice_servers class MyNeuralNetwork(nn.Module): def __init__(self, layers, dropout): super().__init__() self.net = nn.Sequential( nn.Linear(70, layers[0]), nn.LeakyReLU(), nn.Dropout(p = dropout[0]), nn.Linear(layers[0], layers[1]), nn.LeakyReLU(), nn.Dropout(p = dropout[1]), nn.Linear(layers[1], layers[2]), nn.LeakyReLU(), nn.Dropout(p = dropout[2]), nn.Linear(layers[2], layers[3]), nn.LeakyReLU(), nn.Dropout(p = dropout[3]), nn.Linear(layers[3], 7), ) def forward(self, inputs): return self.net(inputs) def eye_aspect_ratio(eye): A = math.dist(eye[1], eye[5]) B = math.dist(eye[2], eye[4]) C = math.dist(eye[0], eye[3]) ear = (A + B) / (2.0 * C) return ear def detect_eyes(landmarks, img, threshold): lm = landmarks eyes = np.array(lm[36:48], np.int32) left_eye = eyes[0:6] right_eye = eyes[6:12] ear = max(eye_aspect_ratio(left_eye), eye_aspect_ratio(right_eye)) left_eye = left_eye.reshape((-1,1,2)) right_eye = right_eye.reshape((-1,1,2)) cv.polylines(img, [left_eye], True, (0, 255, 255)) cv.polylines(img, [right_eye], True, (255, 0, 255)) if (ear > threshold): return True else: return False def proc_image(img, detector): detected_faces = detector.detect_faces(img) faces_detected = len(detected_faces[0]) if ( faces_detected < 1): return img detected_landmarks = detector.detect_landmarks(img, detected_faces) assert len(detected_landmarks[0]) == faces_detected, "Number of faces and landsmarks are mismatched!" is_eye_open = [detect_eyes(face, img, 0.20) for face in detected_landmarks[0]] eye_dict = {True: "eyes open", False: "eyes closed"} device = ( "cuda" if torch.cuda.is_available() else "cpu" ) emo_model = torch.load("acc_96.8", map_location=device) features = [torch.tensor(np.array(extract_features(*object)).astype(np.float32)).to(device) for object in zip(detected_landmarks[0], detected_faces[0])] detected_emotions = [emo_model(facefeat).softmax(dim=0).argmax(dim=0).to("cpu") for facefeat in features] assert len(detected_emotions) == faces_detected, "Number of faces and emotions are mismatched!" for face, has_open_eyes, label in zip(detected_faces[0], (eye_dict[eyes] for eyes in is_eye_open), detected_emotions): (x0, y0, x1, y1, p) = face res_scale = img.shape[0]/704 cv.rectangle(img, (int(x0), int(y0)), (int(x1), int(y1)), color = (0, 0, 255), thickness = 3) cv.putText(img, CLASS_LABELS[label], (int(x0)-10, int(y1+25*res_scale*1.5)), fontFace = 0, color = (0, 255, 0), thickness = 2, fontScale = res_scale) cv.putText(img, f"{faces_detected } face(s) found", (0, int(25*res_scale*1.5)), fontFace = 0, color = (0, 255, 0), thickness = 2, fontScale = res_scale) cv.putText(img, has_open_eyes, (int(x0)-10, int(y0)-10), fontFace = 0, color = (0, 255, 0), thickness = 2, fontScale = res_scale) return img def extract_features(landmarks, face): features = [math.dist(landmarks[33], landmark) for landmark in landmarks] + [face[2] - face[0], face[3] - face[1]] return features def image_processing(img): ann = proc_image(img, detector) if recog else img return ann def video_frame_callback(frame): img = frame.to_ndarray(format="bgr24") ann = proc_image(img, detector) if recog else img return av.VideoFrame.from_ndarray(ann, format="bgr24") st.markdown(""" """, unsafe_allow_html=True) detector = Detector(face_model="retinaface", landmark_model= "pfld", au_model = "xgb", emotion_model = "resmasknet") source = "Webcam" recog = True source = st.radio( label = "Image source for emotion recognition", options = ["Webcam", "Images"], horizontal = True, label_visibility = "collapsed", args = (source, ) ) has_cam = True if (source == "Webcam") else False stream = st.container() with stream: if has_cam: webrtc_streamer( key="example", mode=WebRtcMode.SENDRECV, video_frame_callback=video_frame_callback, rtc_configuration={ "iceServers": get_ice_servers() }, media_stream_constraints={"video": True, "audio": False}, async_processing=True, ) recog = st.toggle(":green[Emotion recogntion]", key = "stream", value = True) else: pic = st.container() frame = image_select( label="Try the classifier on one of the provided examples!", images=[ "ex1.jpg", "ex4.jpg", "ex5.jpg", "ex6.jpg", ], use_container_width= False ) img = np.array(Image.open(frame)) pic.image(image_processing(img), use_column_width = "always")