EMOTION ANALYSIS#
Run on Google Colab | View source on GitHub | Download notebook |
Start EVA Server#
We are reusing the start server notebook for launching the EVA server
!wget -nc "https://raw.githubusercontent.com/georgia-tech-db/eva/master/tutorials/00-start-eva-server.ipynb"
%run 00-start-eva-server.ipynb
cursor = connect_to_server()
File ‘00-start-eva-server.ipynb’ already there; not retrieving.
Note: you may need to restart the kernel to use updated packages.
Stopping EVA Server ...
Starting EVA Server ...
nohup eva_server > eva.log 2>&1 &
Video Files#
getting some video files to test
# A video of a happy person
!wget -nc "https://www.dropbox.com/s/gzfhwmib7u804zy/defhappy.mp4?raw=1" -O defhappy.mp4
# Adding Emotion detection
!wget -nc https://raw.githubusercontent.com/georgia-tech-db/eva/master/eva/udfs/emotion_detector.py
# Adding Face Detector
!wget -nc https://raw.githubusercontent.com/georgia-tech-db/eva/master/eva/udfs/face_detector.py
File ‘defhappy.mp4’ already there; not retrieving.
File ‘emotion_detector.py’ already there; not retrieving.
File ‘face_detector.py’ already there; not retrieving.
Adding the video file to EVADB for analysis#
response = cursor.execute('DROP TABLE IF EXISTS HAPPY').fetch_all().as_df()
print(response)
cursor.execute('LOAD VIDEO "defhappy.mp4" INTO HAPPY').fetch_all().as_df()
0
0 Table Successfully dropped: HAPPY
0 | |
---|---|
0 | Number of loaded VIDEO: 1 |
Visualize Video#
from IPython.display import Video
Video("defhappy.mp4", height=450, width=800, embed=True)
Create an user-defined function(UDF) for analyzing the frames#
cursor.execute("""CREATE UDF IF NOT EXISTS EmotionDetector
INPUT (frame NDARRAY UINT8(3, ANYDIM, ANYDIM))
OUTPUT (labels NDARRAY STR(ANYDIM), scores NDARRAY FLOAT32(ANYDIM))
TYPE Classification IMPL 'emotion_detector.py';
""").fetch_all().as_df()
cursor.execute("""CREATE UDF IF NOT EXISTS FaceDetector
INPUT (frame NDARRAY UINT8(3, ANYDIM, ANYDIM))
OUTPUT (bboxes NDARRAY FLOAT32(ANYDIM, 4),
scores NDARRAY FLOAT32(ANYDIM))
TYPE FaceDetection
IMPL 'face_detector.py';
""").fetch_all().as_df()
0 | |
---|---|
0 | UDF FaceDetector already exists, nothing added. |
Run the Face Detection UDF on video#
cursor.execute("""SELECT id, FaceDetector(data)
FROM HAPPY WHERE id<10;""").fetch_all().as_df()
happy.id | facedetector.bboxes | facedetector.scores | |
---|---|---|---|
0 | 0 | [[502, 94, 762, 435], [238, 296, 325, 398]] | [0.99990165, 0.79820246] |
1 | 1 | [[501, 96, 763, 435]] | [0.999918] |
2 | 2 | [[504, 97, 766, 437]] | [0.9999138] |
3 | 3 | [[498, 90, 776, 446]] | [0.99996686] |
4 | 4 | [[496, 99, 767, 444]] | [0.9999982] |
5 | 5 | [[499, 87, 777, 448], [236, 305, 324, 407]] | [0.9999136, 0.8369736] |
6 | 6 | [[500, 89, 778, 449]] | [0.9999131] |
7 | 7 | [[501, 89, 781, 452]] | [0.9999124] |
8 | 8 | [[503, 90, 783, 450]] | [0.99994683] |
9 | 9 | [[508, 87, 786, 447]] | [0.999949] |
Run the Emotion Detection UDF on the outputs of the Face Detection UDF#
response = cursor.execute("""SELECT id, bbox, EmotionDetector(Crop(data, bbox))
FROM HAPPY JOIN LATERAL UNNEST(FaceDetector(data)) AS Face(bbox, conf)
WHERE id < 15;""").fetch_all().as_df()
import cv2
from pprint import pprint
from matplotlib import pyplot as plt
def annotate_video(detections, input_video_path, output_video_path):
color1=(207, 248, 64)
color2=(255, 49, 49)
thickness=4
vcap = cv2.VideoCapture(input_video_path)
width = int(vcap.get(3))
height = int(vcap.get(4))
fps = vcap.get(5)
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v') #codec
video=cv2.VideoWriter(output_video_path, fourcc, fps, (width,height))
frame_id = 0
# Capture frame-by-frame
# ret = 1 if the video is captured; frame is the image
ret, frame = vcap.read()
while ret:
df = detections
df = df[['Face.bbox', 'emotiondetector.labels', 'emotiondetector.scores']][df.index == frame_id]
if df.size:
x1, y1, x2, y2 = df['Face.bbox'].values[0]
label = df['emotiondetector.labels'].values[0]
score = df['emotiondetector.scores'].values[0]
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
# object bbox
frame=cv2.rectangle(frame, (x1, y1), (x2, y2), color1, thickness)
# object label
cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color1, thickness)
# object score
cv2.putText(frame, str(round(score, 5)), (x1+120, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color1, thickness)
# frame label
cv2.putText(frame, 'Frame ID: ' + str(frame_id), (700, 500), cv2.FONT_HERSHEY_SIMPLEX, 1.2, color2, thickness)
video.write(frame)
# Show every fifth frame
if frame_id % 5 == 0:
plt.imshow(frame)
plt.show()
frame_id+=1
ret, frame = vcap.read()
video.release()
vcap.release()
from ipywidgets import Video, Image
input_path = 'defhappy.mp4'
output_path = 'video.mp4'
annotate_video(response, input_path, output_path)