Image Segmentation Tutorial#

Run on Google Colab View source on GitHub Download notebook

Start EVA server#

We are reusing the start server notebook for launching the EVA server.

!wget -nc "https://raw.githubusercontent.com/georgia-tech-db/eva/master/tutorials/00-start-eva-server.ipynb"
%run 00-start-eva-server.ipynb
cursor = connect_to_server()
File ‘00-start-eva-server.ipynb’ already there; not retrieving.
Note: you may need to restart the kernel to use updated packages.
Stopping EVA Server ...
Starting EVA Server ...
nohup eva_server > eva.log 2>&1 &

Download the Videos#

# # Getting the video files
!wget -nc "https://www.dropbox.com/s/k00wge9exwkfxz6/ua_detrac.mp4?raw=1" -O ua_detrac.mp4
File ‘ua_detrac.mp4’ already there; not retrieving.

Load sample video from DAVIS dataset for analysis#

cursor.execute('DROP TABLE IF EXISTS VideoForSegmentation;').fetch_all().as_df()
cursor.execute('LOAD VIDEO "ua_detrac.mp4" INTO VideoForSegmentation;').fetch_all().as_df()
0
0 Number of loaded VIDEO: 1

Visualize Video#

from IPython.display import Video
Video("ua_detrac.mp4", embed=True)

Register Hugging Face Segmentation Model as an User-Defined Function (UDF) in EVA#

### Using HuggingFace with EVA requires specifying the task
### The task here is 'image-segmentation'
### The model is 'facebook/detr-resnet-50-panoptic'
cursor.execute("""CREATE UDF IF NOT EXISTS HFSegmentation
      TYPE HuggingFace
      'task' 'image-segmentation'
      'model' 'facebook/detr-resnet-50-panoptic';
      """).fetch_all().as_df()
0
0 UDF HFSegmentation successfully added to the d...

Run Image Segmentation on the video#

response = cursor.execute("""SELECT HFSegmentation(data)
                  FROM VideoForSegmentation SAMPLE 5
                  WHERE id < 20;""").fetch_all().as_df()

Visualizing output of the Image Segmenter on the video#

import numpy as np
from PIL import Image
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import cv2

def get_color_mapping(all_labels):
    unique_labels = set(label for labels in all_labels for label in labels)
    num_colors = len(unique_labels)
    colormap = plt.colormaps["tab20"]
    colors = [colormap(i % 20)[:3] for i in range(num_colors)]
    colors = [tuple(int(c * 255) for c in color) for color in colors]
    color_mapping = {label: color for label, color in zip(unique_labels, colors)}
    return  color_mapping

def annotate_single_frame(frame, segments, labels, color_mapping):
    overlay = np.zeros_like(frame)

    # Overlay segments
    for mask, label in zip(segments, labels):
        mask_np = np.array(mask).astype(bool)
        overlay[mask_np] = color_mapping[label]

    # Combine original frame with overlay
    new_frame = Image.blend(
        Image.fromarray(frame.astype(np.uint8)),
        Image.fromarray(overlay.astype(np.uint8)),
        alpha=0.5,
    )

    return new_frame

def annotate_video(segmentations, input_video_path, output_video_path, model_name = 'hfsegmentation'):
    all_segments = segmentations[f'{model_name}.mask']
    all_labels = segmentations[f'{model_name}.label']


    color_mapping = get_color_mapping(all_labels)

    vcap = cv2.VideoCapture(input_video_path)
    width = int(vcap.get(3))
    height = int(vcap.get(4))
    fps = vcap.get(5)
    fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v') #codec
    video=cv2.VideoWriter(output_video_path, fourcc, fps, (width,height))

    frame_id = 0
    ret, frame = vcap.read() 
    while ret and frame_id < len(all_segments):
        segments = all_segments[frame_id]
        labels = all_labels[frame_id]
        new_frame = annotate_single_frame(frame, segments, labels, color_mapping)
        video.write(np.array(new_frame))
        if frame_id % 5 == 0:
            legend_patches = [mpatches.Patch(color=np.array(color_mapping[label])/255, label=label) for label in set(labels)]
            plt.imshow(new_frame)
            plt.legend(handles=legend_patches, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
            plt.axis('off')
            plt.tight_layout()
            plt.show()


        frame_id += 1
        ret, frame = vcap.read()

    video.release()
    vcap.release()
from ipywidgets import Video
input_path = 'ua_detrac.mp4'
output_path = 'video.mp4'

annotate_video(response, input_path, output_path)
Video.from_file(output_path)
../../_images/383905368d5dddae4b1ac7876091d1149e6f5144aca362655d60c362a9835230.png

Dropping an User-Defined Function (UDF)#

cursor.execute("DROP UDF HFSegmentation;")
response = cursor.fetch_all()
response.as_df()
0
0 UDF HFSegmentation successfully dropped