Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import os | |
import six | |
import six.moves.urllib as urllib | |
import sys | |
import tarfile | |
import tensorflow as tf | |
import zipfile | |
import pathlib | |
import json | |
import matplotlib.pyplot as plt | |
from datetime import datetime | |
from collections import defaultdict | |
from io import StringIO | |
from matplotlib import pyplot as plt | |
from PIL import Image | |
from IPython.display import display | |
import io | |
import pathlib | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
os.system('rm -rf models') | |
if "models" in pathlib.Path.cwd().parts: | |
while "models" in pathlib.Path.cwd().parts: | |
os.chdir('..') | |
elif not pathlib.Path('models').exists(): | |
os.system('git clone --depth 1 https://github.com/tensorflow/models') | |
os.chdir('models/research/') | |
os.system('protoc object_detection/protos/*.proto --python_out=.') | |
os.system('cp object_detection/packages/tf2/setup.py .') | |
os.system('python -m pip install .') | |
from object_detection.utils import ops as utils_ops | |
from object_detection.utils import label_map_util | |
from object_detection.utils import visualization_utils as vis_utils | |
# patch tf1 into `utils.ops` | |
utils_ops.tf = tf.compat.v1 | |
# Patch the location of gfile | |
tf.gfile = tf.io.gfile | |
os.system('python object_detection/builders/model_builder_tf2_test.py') | |
def load_model(model_dir): | |
model = tf.saved_model.load(str(model_dir)) | |
model = model.signatures['serving_default'] | |
return model | |
os.system('mkdir "Tortoise"') | |
os.chdir('Tortoise/') | |
os.system('curl -L "https://app.roboflow.com/ds/jCjxJgk04M?key=3JE38XqESy" > roboflow.zip; unzip roboflow.zip; rm roboflow.zip') | |
os.chdir('..') | |
os.system('mkdir "COCO"') | |
os.chdir('COCO/') | |
os.system('curl -L "https://app.roboflow.com/ds/Yb2OGQm2xb?key=pbWEWpS5ec" > roboflow.zip; unzip roboflow.zip; rm roboflow.zip') | |
os.chdir('..') | |
PATH_TO_TEST_IMAGES_DIR = pathlib.Path("COCO" + '/test/') | |
TEST_IMAGE_PATHS = sorted(list(PATH_TO_TEST_IMAGES_DIR.glob("*.jpg"))) | |
dataset = 'Tortoise' | |
test_record_fname = dataset + '/test/tortoise.tfrecord' | |
train_record_fname = dataset + '/train/tortoise.tfrecord' | |
label_map_pbtxt_fname = dataset + '/train/tortoise_label_map.pbtxt' | |
PATH_TO_LABELS = dataset + '/train/tortoise_label_map.pbtxt' | |
category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=False) | |
test_data_json = 'COCO/test/_annotations.coco.json' | |
with open(test_data_json, 'r') as f: | |
test_metadata = json.load(f) | |
for im in test_metadata['images']: | |
im['date_captured'] = str(datetime.strptime(im['file_name'][6:21],"%Y%m%d-%H%M%S")) | |
image_id_to_datetime = {im['id']:im['date_captured'] for im in test_metadata['images']} | |
image_path_to_id = {im['file_name']: im['id'] | |
for im in test_metadata['images']} | |
faster_rcnn_model = load_model('../../Faster RCNN/saved_model') | |
print(faster_rcnn_model.inputs) | |
print(faster_rcnn_model.outputs) | |
def run_inference_for_single_image(model, image): | |
'''Run single image through tensorflow object detection saved_model. | |
This function runs a saved_model on a (single) provided image and returns | |
inference results in numpy arrays. | |
Args: | |
model: tensorflow saved_model. This model can be obtained using | |
export_inference_graph.py. | |
image: uint8 numpy array with shape (img_height, img_width, 3) | |
Returns: | |
output_dict: a dictionary holding the following entries: | |
`num_detections`: an integer | |
`detection_boxes`: a numpy (float32) array of shape [N, 4] | |
`detection_classes`: a numpy (uint8) array of shape [N] | |
`detection_scores`: a numpy (float32) array of shape [N] | |
`detection_features`: a numpy (float32) array of shape [N, 7, 7, 2048] | |
''' | |
image = np.asarray(image) | |
# The input needs to be a tensor, convert it using `tf.convert_to_tensor`. | |
input_tensor = tf.convert_to_tensor(image) | |
# The model expects a batch of images, so add an axis with `tf.newaxis`. | |
input_tensor = input_tensor[tf.newaxis,...] | |
# Run inference | |
output_dict = model(input_tensor) | |
# All outputs are batches tensors. | |
# Convert to numpy arrays, and take index [0] to remove the batch dimension. | |
# We're only interested in the first num_detections. | |
num_dets = output_dict.pop('num_detections') | |
num_detections = int(num_dets) | |
for key,value in output_dict.items(): | |
output_dict[key] = value[0, :num_detections].numpy() | |
output_dict['num_detections'] = num_detections | |
# detection_classes should be ints. | |
output_dict['detection_classes'] = output_dict['detection_classes'].astype( | |
np.int64) | |
return output_dict | |
def embed_date_captured(date_captured): | |
"""Encodes the datetime of the image. | |
Takes a datetime object and encodes it into a normalized embedding of shape | |
[5], using hard-coded normalization factors for year, month, day, hour, | |
minute. | |
Args: | |
date_captured: A datetime object. | |
Returns: | |
A numpy float32 embedding of shape [5]. | |
""" | |
embedded_date_captured = [] | |
month_max = 12.0 | |
day_max = 31.0 | |
hour_max = 24.0 | |
minute_max = 60.0 | |
min_year = 1990.0 | |
max_year = 2030.0 | |
year = (date_captured.year-min_year)/float(max_year-min_year) | |
embedded_date_captured.append(year) | |
month = (date_captured.month-1)/month_max | |
embedded_date_captured.append(month) | |
day = (date_captured.day-1)/day_max | |
embedded_date_captured.append(day) | |
hour = date_captured.hour/hour_max | |
embedded_date_captured.append(hour) | |
minute = date_captured.minute/minute_max | |
embedded_date_captured.append(minute) | |
return np.asarray(embedded_date_captured) | |
def embed_position_and_size(box): | |
"""Encodes the bounding box of the object of interest. | |
Takes a bounding box and encodes it into a normalized embedding of shape | |
[4] - the center point (x,y) and width and height of the box. | |
Args: | |
box: A bounding box, formatted as [ymin, xmin, ymax, xmax]. | |
Returns: | |
A numpy float32 embedding of shape [4]. | |
""" | |
ymin = box[0] | |
xmin = box[1] | |
ymax = box[2] | |
xmax = box[3] | |
w = xmax - xmin | |
h = ymax - ymin | |
x = xmin + w / 2.0 | |
y = ymin + h / 2.0 | |
return np.asarray([x, y, w, h]) | |
def get_context_feature_embedding(date_captured, detection_boxes, | |
detection_features, detection_scores): | |
"""Extracts representative feature embedding for a given input image. | |
Takes outputs of a detection model and focuses on the highest-confidence | |
detected object. Starts with detection_features and uses average pooling to | |
remove the spatial dimensions, then appends an embedding of the box position | |
and size, and an embedding of the date and time the image was captured, | |
returning a one-dimensional representation of the object. | |
Args: | |
date_captured: A datetime string of format '%Y-%m-%d %H:%M:%S'. | |
detection_features: A numpy (float32) array of shape [N, 7, 7, 2048]. | |
detection_boxes: A numpy (float32) array of shape [N, 4]. | |
detection_scores: A numpy (float32) array of shape [N]. | |
Returns: | |
A numpy float32 embedding of shape [2057]. | |
""" | |
date_captured = datetime.strptime(date_captured,'%Y-%m-%d %H:%M:%S') | |
temporal_embedding = embed_date_captured(date_captured) | |
embedding = detection_features[0] | |
pooled_embedding = np.mean(np.mean(embedding, axis=1), axis=0) | |
box = detection_boxes[0] | |
position_embedding = embed_position_and_size(box) | |
bb_embedding = np.concatenate((pooled_embedding, position_embedding)) | |
embedding = np.expand_dims(np.concatenate((bb_embedding,temporal_embedding)), | |
axis=0) | |
score = detection_scores[0] | |
return embedding, score | |
def run_inference(model, image_path, date_captured, resize_image=True): | |
"""Runs inference over a single input image and extracts contextual features. | |
Args: | |
model: A tensorflow saved_model object. | |
image_path: Absolute path to the input image. | |
date_captured: A datetime string of format '%Y-%m-%d %H:%M:%S'. | |
resize_image: Whether to resize the input image before running inference. | |
Returns: | |
context_feature: A numpy float32 array of shape [2057]. | |
score: A numpy float32 object score for the embedded object. | |
output_dict: The saved_model output dictionary for the image. | |
""" | |
with open(image_path,'rb') as f: | |
image = Image.open(f) | |
if resize_image: | |
image.thumbnail((640,640),Image.ANTIALIAS) | |
image_np = np.array(image) | |
# Actual detection. | |
output_dict = run_inference_for_single_image(model, image_np) | |
context_feature, score = get_context_feature_embedding( | |
date_captured, output_dict['detection_boxes'], | |
output_dict['detection_features'], output_dict['detection_scores']) | |
return context_feature, score, output_dict | |
import posixpath | |
context_features = [] | |
scores = [] | |
faster_rcnn_results = {} | |
for image_path in TEST_IMAGE_PATHS: | |
head,tail = posixpath.split(image_path) | |
image_id = image_path_to_id[str(tail)] | |
date_captured = image_id_to_datetime[image_id] | |
context_feature, score, results = run_inference( | |
faster_rcnn_model, image_path, date_captured) | |
faster_rcnn_results[image_id] = results | |
context_features.append(context_feature) | |
scores.append(score) | |
# Concatenate all extracted context embeddings into a contextual memory bank. | |
context_features_matrix = np.concatenate(context_features, axis=0) | |
context_rcnn_model = load_model('../../Context RCNN/saved_model') | |
context_padding_size = 2000 | |
print(context_rcnn_model.inputs) | |
print(context_rcnn_model.outputs) | |
def run_context_rcnn_inference_for_single_image( | |
model, image, context_features, context_padding_size): | |
'''Run single image through a Context R-CNN saved_model. | |
This function runs a saved_model on a (single) provided image and provided | |
contextual features and returns inference results in numpy arrays. | |
Args: | |
model: tensorflow Context R-CNN saved_model. This model can be obtained | |
using export_inference_graph.py and setting side_input fields. | |
Example export call - | |
python export_inference_graph.py \ | |
--input_type image_tensor \ | |
--pipeline_config_path /path/to/context_rcnn_model.config \ | |
--trained_checkpoint_prefix /path/to/context_rcnn_model.ckpt \ | |
--output_directory /path/to/output_dir \ | |
--use_side_inputs True \ | |
--side_input_shapes 1,2000,2057/1 \ | |
--side_input_names context_features,valid_context_size \ | |
--side_input_types float,int \ | |
--input_shape 1,-1,-1,3 | |
image: uint8 numpy array with shape (img_height, img_width, 3) | |
context_features: A numpy float32 contextual memory bank of shape | |
[num_context_examples, 2057] | |
context_padding_size: The amount of expected padding in the contextual | |
memory bank, defined in the Context R-CNN config as | |
max_num_context_features. | |
Returns: | |
output_dict: a dictionary holding the following entries: | |
`num_detections`: an integer | |
`detection_boxes`: a numpy (float32) array of shape [N, 4] | |
`detection_classes`: a numpy (uint8) array of shape [N] | |
`detection_scores`: a numpy (float32) array of shape [N] | |
''' | |
image = np.asarray(image) | |
# The input image needs to be a tensor, convert it using | |
# `tf.convert_to_tensor`. | |
image_tensor = tf.convert_to_tensor( | |
image, name='image_tensor')[tf.newaxis,...] | |
context_features = np.asarray(context_features) | |
valid_context_size = context_features.shape[0] | |
valid_context_size_tensor = tf.convert_to_tensor( | |
valid_context_size, name='valid_context_size')[tf.newaxis,...] | |
padded_context_features = np.pad( | |
context_features, | |
((0,context_padding_size-valid_context_size),(0,0)), mode='constant') | |
padded_context_features_tensor = tf.convert_to_tensor( | |
padded_context_features, | |
name='context_features', | |
dtype=tf.float32)[tf.newaxis,...] | |
# Run inference | |
output_dict = model( | |
inputs=image_tensor, | |
context_features=padded_context_features_tensor, | |
valid_context_size=valid_context_size_tensor) | |
# All outputs are batches tensors. | |
# Convert to numpy arrays, and take index [0] to remove the batch dimension. | |
# We're only interested in the first num_detections. | |
num_dets = output_dict.pop('num_detections') | |
num_detections = int(num_dets) | |
for key,value in output_dict.items(): | |
output_dict[key] = value[0, :num_detections].numpy() | |
output_dict['num_detections'] = num_detections | |
# detection_classes should be ints. | |
output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64) | |
return output_dict | |
def show_context_rcnn_inference( | |
model, image_path, context_features, faster_rcnn_output_dict, | |
context_padding_size, resize_image=True): | |
"""Runs inference over a single input image and visualizes Faster R-CNN vs. | |
Context R-CNN results. | |
Args: | |
model: A tensorflow saved_model object. | |
image_path: Absolute path to the input image. | |
context_features: A numpy float32 contextual memory bank of shape | |
[num_context_examples, 2057] | |
faster_rcnn_output_dict: The output_dict corresponding to this input image | |
from the single-frame Faster R-CNN model, which was previously used to | |
build the memory bank. | |
context_padding_size: The amount of expected padding in the contextual | |
memory bank, defined in the Context R-CNN config as | |
max_num_context_features. | |
resize_image: Whether to resize the input image before running inference. | |
Returns: | |
context_rcnn_image_np: Numpy image array showing Context R-CNN Results. | |
faster_rcnn_image_np: Numpy image array showing Faster R-CNN Results. | |
""" | |
# the array based representation of the image will be used later in order to prepare the | |
# result image with boxes and labels on it. | |
with open(image_path,'rb') as f: | |
image = Image.open(f) | |
if resize_image: | |
image.thumbnail((640,640),Image.ANTIALIAS) | |
image_np = np.array(image) | |
image.thumbnail((400,400),Image.ANTIALIAS) | |
context_rcnn_image_np = np.array(image) | |
faster_rcnn_image_np = np.copy(context_rcnn_image_np) | |
# Actual detection. | |
output_dict = run_context_rcnn_inference_for_single_image( | |
model, image_np, context_features, 2000) | |
# Visualization of the results of a context_rcnn detection. | |
vis_utils.visualize_boxes_and_labels_on_image_array( | |
context_rcnn_image_np, | |
output_dict['detection_boxes'], | |
output_dict['detection_classes'], | |
output_dict['detection_scores'], | |
category_index, | |
use_normalized_coordinates=True, | |
line_thickness=2) | |
# Visualization of the results of a faster_rcnn detection. | |
vis_utils.visualize_boxes_and_labels_on_image_array( | |
faster_rcnn_image_np, | |
faster_rcnn_output_dict['detection_boxes'], | |
faster_rcnn_output_dict['detection_classes'], | |
faster_rcnn_output_dict['detection_scores'], | |
category_index, | |
use_normalized_coordinates=True, | |
line_thickness=2) | |
return context_rcnn_image_np, faster_rcnn_image_np | |
def segment(image): | |
plt.rcParams['axes.grid'] = False | |
plt.rcParams['xtick.labelsize'] = False | |
plt.rcParams['ytick.labelsize'] = False | |
plt.rcParams['xtick.top'] = False | |
plt.rcParams['xtick.bottom'] = False | |
plt.rcParams['ytick.left'] = False | |
plt.rcParams['ytick.right'] = False | |
plt.rcParams['figure.figsize'] = [7.5,5] | |
date_captured = datetime.strptime(Image.open(image.name)._getexif()[36867], '%Y:%m:%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S') | |
context_feature, score, results = run_inference( | |
faster_rcnn_model, image.name, date_captured) | |
faster_rcnn_output_dict = results | |
context_rcnn_image, faster_rcnn_image = show_context_rcnn_inference( | |
context_rcnn_model, image.name, context_features_matrix, | |
faster_rcnn_output_dict, context_padding_size) | |
plt.subplot(1,2,1) | |
plt.imshow(faster_rcnn_image) | |
plt.title('Faster R-CNN') | |
plt.subplot(1,2,2) | |
plt.imshow(context_rcnn_image) | |
plt.title('Context R-CNN') | |
buf = io.BytesIO() | |
plt.savefig(buf, dpi=600) | |
buf.seek(0) | |
img = Image.open(buf) | |
return img | |
examples = os.listdir('../../Examples') | |
examples = ['../../Examples/' + item for item in examples] | |
title="Context R-CNN" | |
description=f"Gradio demo for **Context R-CNN**: [[Paper]](https://arxiv.org/abs/1912.03538). This model is a real-time neural network for object detection of Gopher Tortoises. Faster R-CNN is used to get a context feature matrix which is used by Context R-CNN to improve detection. Uploaded images need to have date taken attribute in the metadata for it to work. Context R-CNN improves upon Faster R-CNN by building a contextual memory bank using environmental factors like time of day, seasonal changes, etc." | |
gr.Interface(fn=segment, inputs = "file",outputs = "image" ,title=title, description=description ,examples=examples,enable_queue=True).launch() |