Image Segmentation Tutorial#
Run on Google Colab | View source on GitHub | Download notebook |
Start EVA server#
We are reusing the start server notebook for launching the EVA server.
!wget -nc "https://raw.githubusercontent.com/georgia-tech-db/eva/master/tutorials/00-start-eva-server.ipynb"
%run 00-start-eva-server.ipynb
cursor = connect_to_server()
File '00-start-eva-server.ipynb' already there; not retrieving.
Note: you may need to restart the kernel to use updated packages.
nohup eva_server --port 8803 > eva.log 2>&1 &
Download the Videos#
# # Getting the video files
!wget -nc "https://www.dropbox.com/s/k00wge9exwkfxz6/ua_detrac.mp4?raw=1" -O ua_detrac.mp4
File 'ua_detrac.mp4' already there; not retrieving.
Load sample video from DAVIS dataset for analysis#
cursor.execute('DROP TABLE IF EXISTS VideoForSegmentation;')
response = cursor.fetch_all()
response.as_df()
cursor.execute('LOAD VIDEO "ua_detrac.mp4" INTO VideoForSegmentation')
response = cursor.fetch_all()
response.as_df()
0 | |
---|---|
0 | Number of loaded VIDEO: 1 |
Visualize Video#
from IPython.display import Video
Video("ua_detrac.mp4", embed=True)
Register Hugging Face Segmentation Model as an User-Defined Function (UDF) in EVA#
### Using HuggingFace with EVA requires specifying the task
### The task here is 'image-segmentation'
### The model is 'facebook/detr-resnet-50-panoptic'
cursor.execute("""CREATE UDF IF NOT EXISTS HFSegmentation
TYPE HuggingFace
'task' 'image-segmentation'
'model' 'facebook/detr-resnet-50-panoptic'
""")
response = cursor.fetch_all()
response.as_df()
0 | |
---|---|
0 | UDF HFSegmentation successfully added to the d... |
Run Image Segmentation on the video#
cursor.execute("""SELECT HFSegmentation(data)
FROM VideoForSegmentation SAMPLE 5
WHERE id < 20""")
response = cursor.fetch_all()
response.as_df()
hfsegmentation.score | hfsegmentation.label | hfsegmentation.mask | |
---|---|---|---|
0 | [0.906596, 0.989519, 0.960914, 0.923789, 0.960... | [motorcycle, motorcycle, person, car, car, per... | [<PIL.Image.Image image mode=L size=960x540 at... |
1 | [0.985118, 0.963139, 0.963819, 0.960939, 0.926... | [motorcycle, person, car, car, person, bridge,... | [<PIL.Image.Image image mode=L size=960x540 at... |
2 | [0.989573, 0.900049, 0.966254, 0.96056, 0.9388... | [motorcycle, person, person, car, car, car, pe... | [<PIL.Image.Image image mode=L size=960x540 at... |
3 | [0.913261, 0.949733, 0.943763, 0.98639, 0.9744... | [truck, person, car, car, car, car, car, perso... | [<PIL.Image.Image image mode=L size=960x540 at... |
Visualizing output of the Image Segmenter on the video#
import numpy as np
from PIL import Image
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import cv2
def get_color_mapping(all_labels):
unique_labels = set(label for labels in all_labels for label in labels)
num_colors = len(unique_labels)
colormap = plt.colormaps["tab20"]
colors = [colormap(i % 20)[:3] for i in range(num_colors)]
colors = [tuple(int(c * 255) for c in color) for color in colors]
color_mapping = {label: color for label, color in zip(unique_labels, colors)}
return color_mapping
def annotate_single_frame(frame, segments, labels, color_mapping):
overlay = np.zeros_like(frame)
# Overlay segments
for mask, label in zip(segments, labels):
mask_np = np.array(mask).astype(bool)
overlay[mask_np] = color_mapping[label]
# Combine original frame with overlay
new_frame = Image.blend(
Image.fromarray(frame.astype(np.uint8)),
Image.fromarray(overlay.astype(np.uint8)),
alpha=0.5,
)
return new_frame
def annotate_video(segmentations, input_video_path, output_video_path, model_name = 'hfsegmentation'):
all_segments = segmentations[f'{model_name}.mask']
all_labels = segmentations[f'{model_name}.label']
color_mapping = get_color_mapping(all_labels)
vcap = cv2.VideoCapture(input_video_path)
width = int(vcap.get(3))
height = int(vcap.get(4))
fps = vcap.get(5)
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v') #codec
video=cv2.VideoWriter(output_video_path, fourcc, fps, (width,height))
frame_id = 0
ret, frame = vcap.read()
while ret and frame_id < len(all_segments):
segments = all_segments[frame_id]
labels = all_labels[frame_id]
new_frame = annotate_single_frame(frame, segments, labels, color_mapping)
video.write(np.array(new_frame))
if frame_id % 5 == 0:
legend_patches = [mpatches.Patch(color=np.array(color_mapping[label])/255, label=label) for label in set(labels)]
plt.imshow(new_frame)
plt.legend(handles=legend_patches, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
plt.axis('off')
plt.tight_layout()
plt.show()
frame_id += 1
ret, frame = vcap.read()
video.release()
vcap.release()
from ipywidgets import Video
input_path = 'ua_detrac.mp4'
output_path = 'video.mp4'
dataframe = response.as_df()
annotate_video(dataframe, input_path, output_path)
Video.from_file(output_path)
Dropping an User-Defined Function (UDF)#
cursor.execute("DROP UDF HFSegmentation;")
response = cursor.fetch_all()
response.as_df()
0 | |
---|---|
0 | UDF HFSegmentation successfully dropped |