|
import streamlit as st |
|
from streamlit_webrtc import webrtc_streamer, WebRtcMode |
|
import av |
|
import os |
|
from twilio.base.exceptions import TwilioRestException |
|
from twilio.rest import Client |
|
from streamlit_image_select import image_select |
|
import cv2 as cv |
|
import numpy as np |
|
import math |
|
import torch |
|
from torch import nn |
|
from PIL import Image |
|
import feat |
|
|
|
CLASS_LABELS = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise'] |
|
|
|
def _get_resource_path(): |
|
return "/home/user/app/resources" |
|
|
|
feat.utils.io.get_resource_path = _get_resource_path |
|
|
|
def _get_pretrained_models(face_model, landmark_model, au_model, emotion_model, facepose_model, verbose): |
|
return ( |
|
face_model, |
|
landmark_model, |
|
au_model, |
|
emotion_model, |
|
facepose_model, |
|
) |
|
|
|
feat.pretrained.get_pretrained_models = _get_pretrained_models |
|
|
|
from feat import Detector |
|
|
|
os.environ["TWILIO_ACCOUNT_SID"] = "ACf1e76f3fd6e9cbca940decc4ed443c20" |
|
os.environ["TWILIO_AUTH_TOKEN"] = "c732a947bafc840e" + "797b7587d6abed92" |
|
|
|
def get_ice_servers(): |
|
try: |
|
account_sid = os.environ["TWILIO_ACCOUNT_SID"] |
|
auth_token = os.environ["TWILIO_AUTH_TOKEN"] |
|
except KeyError: |
|
logger.warning("TURN credentials are not set. Fallback to a free STUN server from Google.") |
|
return [{"urls": ["stun:stun.l.google.com:19302"]}] |
|
|
|
client = Client(account_sid, auth_token) |
|
|
|
token = client.tokens.create() |
|
|
|
return token.ice_servers |
|
|
|
class MyNeuralNetwork(nn.Module): |
|
def __init__(self, layers, dropout): |
|
super().__init__() |
|
self.net = nn.Sequential( |
|
nn.Linear(70, layers[0]), |
|
nn.LeakyReLU(), |
|
nn.Dropout(p = dropout[0]), |
|
nn.Linear(layers[0], layers[1]), |
|
nn.LeakyReLU(), |
|
nn.Dropout(p = dropout[1]), |
|
nn.Linear(layers[1], layers[2]), |
|
nn.LeakyReLU(), |
|
nn.Dropout(p = dropout[2]), |
|
nn.Linear(layers[2], layers[3]), |
|
nn.LeakyReLU(), |
|
nn.Dropout(p = dropout[3]), |
|
nn.Linear(layers[3], 7), |
|
) |
|
|
|
def forward(self, inputs): |
|
return self.net(inputs) |
|
|
|
def eye_aspect_ratio(eye): |
|
|
|
A = math.dist(eye[1], eye[5]) |
|
B = math.dist(eye[2], eye[4]) |
|
|
|
C = math.dist(eye[0], eye[3]) |
|
|
|
ear = (A + B) / (2.0 * C) |
|
|
|
return ear |
|
|
|
def detect_eyes(landmarks, img, threshold): |
|
lm = landmarks |
|
eyes = np.array(lm[36:48], np.int32) |
|
|
|
left_eye = eyes[0:6] |
|
right_eye = eyes[6:12] |
|
ear = max(eye_aspect_ratio(left_eye), eye_aspect_ratio(right_eye)) |
|
left_eye = left_eye.reshape((-1,1,2)) |
|
right_eye = right_eye.reshape((-1,1,2)) |
|
cv.polylines(img, [left_eye], True, (0, 255, 255)) |
|
cv.polylines(img, [right_eye], True, (255, 0, 255)) |
|
|
|
if (ear > threshold): |
|
return True |
|
else: |
|
return False |
|
|
|
def proc_image(img, detector): |
|
detected_faces = detector.detect_faces(img) |
|
faces_detected = len(detected_faces[0]) |
|
if ( faces_detected < 1): |
|
return img |
|
|
|
detected_landmarks = detector.detect_landmarks(img, detected_faces) |
|
assert len(detected_landmarks[0]) == faces_detected, "Number of faces and landsmarks are mismatched!" |
|
|
|
is_eye_open = [detect_eyes(face, img, 0.20) for face in detected_landmarks[0]] |
|
eye_dict = {True: "eyes open", False: "eyes closed"} |
|
|
|
device = ( |
|
"cuda" |
|
if torch.cuda.is_available() |
|
else "cpu" |
|
) |
|
|
|
emo_model = torch.load("acc_96.8", map_location=device) |
|
features = [torch.tensor(np.array(extract_features(*object)).astype(np.float32)).to(device) for object in zip(detected_landmarks[0], detected_faces[0])] |
|
detected_emotions = [emo_model(facefeat).softmax(dim=0).argmax(dim=0).to("cpu") for facefeat in features] |
|
assert len(detected_emotions) == faces_detected, "Number of faces and emotions are mismatched!" |
|
|
|
for face, has_open_eyes, label in zip(detected_faces[0], (eye_dict[eyes] for eyes in is_eye_open), detected_emotions): |
|
(x0, y0, x1, y1, p) = face |
|
res_scale = img.shape[0]/704 |
|
cv.rectangle(img, (int(x0), int(y0)), (int(x1), int(y1)), color = (0, 0, 255), thickness = 3) |
|
cv.putText(img, CLASS_LABELS[label], (int(x0)-10, int(y1+25*res_scale*1.5)), fontFace = 0, color = (0, 255, 0), thickness = 2, fontScale = res_scale) |
|
cv.putText(img, f"{faces_detected } face(s) found", (0, int(25*res_scale*1.5)), fontFace = 0, color = (0, 255, 0), thickness = 2, fontScale = res_scale) |
|
cv.putText(img, has_open_eyes, (int(x0)-10, int(y0)-10), fontFace = 0, color = (0, 255, 0), thickness = 2, fontScale = res_scale) |
|
return img |
|
|
|
def extract_features(landmarks, face): |
|
features = [math.dist(landmarks[33], landmark) for landmark in landmarks] + [face[2] - face[0], face[3] - face[1]] |
|
return features |
|
|
|
def image_processing(img): |
|
ann = proc_image(img, detector) if recog else img |
|
return ann |
|
|
|
def video_frame_callback(frame): |
|
img = frame.to_ndarray(format="bgr24") |
|
|
|
ann = proc_image(img, detector) if recog else img |
|
|
|
return av.VideoFrame.from_ndarray(ann, format="bgr24") |
|
|
|
st.markdown(""" |
|
<style> |
|
.block-container { |
|
padding-top: 1.75rem; |
|
padding-bottom: 0rem; |
|
padding-left: 0rem; |
|
padding-right: 0rem; |
|
max-width: 664px; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
detector = Detector(face_model="retinaface", landmark_model= "pfld", au_model = "xgb", emotion_model = "resmasknet") |
|
source = "Webcam" |
|
recog = True |
|
|
|
source = st.radio( |
|
label = "Image source for emotion recognition", |
|
options = ["Webcam", "Images"], |
|
horizontal = True, |
|
label_visibility = "collapsed", |
|
args = (source, ) |
|
) |
|
|
|
has_cam = True if (source == "Webcam") else False |
|
|
|
stream = st.container() |
|
with stream: |
|
if has_cam: |
|
webrtc_streamer( |
|
key="example", |
|
mode=WebRtcMode.SENDRECV, |
|
video_frame_callback=video_frame_callback, |
|
rtc_configuration={ "iceServers": get_ice_servers() }, |
|
media_stream_constraints={"video": True, "audio": False}, |
|
async_processing=True, |
|
) |
|
recog = st.toggle(":green[Emotion recogntion]", key = "stream", value = True) |
|
else: |
|
pic = st.container() |
|
frame = image_select( |
|
label="Try the classifier on one of the provided examples!", |
|
images=[ |
|
"ex1.jpg", |
|
"ex4.jpg", |
|
"ex5.jpg", |
|
"ex6.jpg", |
|
], |
|
use_container_width= False |
|
) |
|
img = np.array(Image.open(frame)) |
|
pic.image(image_processing(img), use_column_width = "always") |
|
|
|
|