Skip to content

Module mot.object_detection.modeling.model_frcnn

View Source
# -*- coding: utf-8 -*-

# File: model_frcnn.py

import tensorflow as tf

from tensorpack.models import Conv2D, FullyConnected, layer_register

from tensorpack.tfutils.argscope import argscope

from tensorpack.tfutils.common import get_tf_version_tuple

from tensorpack.tfutils.scope_utils import under_name_scope

from tensorpack.tfutils.summary import add_moving_summary

from tensorpack.utils.argtools import memoized_method

from mot.object_detection.config import config as cfg

from mot.object_detection.utils.box_ops import pairwise_iou

from mot.object_detection.modeling.model_box import decode_bbox_target, encode_bbox_target

from mot.object_detection.modeling.backbone import GroupNorm

@under_name_scope()

def proposal_metrics(iou):

    """

    Add summaries for RPN proposals.

    Args:

        iou: nxm, #proposal x #gt

    """

    # find best roi for each gt, for summary only

    best_iou = tf.reduce_max(iou, axis=0)

    mean_best_iou = tf.reduce_mean(best_iou, name='best_iou_per_gt')

    summaries = [mean_best_iou]

    with tf.device('/cpu:0'):

        for th in [0.3, 0.5]:

            recall = tf.truediv(

                tf.count_nonzero(best_iou >= th),

                tf.size(best_iou, out_type=tf.int64),

                name='recall_iou{}'.format(th))

            summaries.append(recall)

    add_moving_summary(*summaries)

@under_name_scope()

def sample_fast_rcnn_targets(boxes, gt_boxes, gt_labels):

    """

    Sample some boxes from all proposals for training.

    #fg is guaranteed to be > 0, because ground truth boxes will be added as proposals.

    Args:

        boxes: nx4 region proposals, floatbox

        gt_boxes: mx4, floatbox

        gt_labels: m, int32

    Returns:

        A BoxProposals instance, with:

            sampled_boxes: tx4 floatbox, the rois

            sampled_labels: t int64 labels, in [0, #class). Positive means foreground.

            fg_inds_wrt_gt: #fg indices, each in range [0, m-1].

                It contains the matching GT of each foreground roi.

    """

    iou = pairwise_iou(boxes, gt_boxes)     # nxm

    proposal_metrics(iou)

    # add ground truth as proposals as well

    boxes = tf.concat([boxes, gt_boxes], axis=0)    # (n+m) x 4

    iou = tf.concat([iou, tf.eye(tf.shape(gt_boxes)[0])], axis=0)   # (n+m) x m

    # #proposal=n+m from now on

    def sample_fg_bg(iou):

        fg_mask = tf.cond(tf.shape(iou)[1] > 0,

                          lambda: tf.reduce_max(iou, axis=1) >= cfg.FRCNN.FG_THRESH,

                          lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.bool))

        fg_inds = tf.reshape(tf.where(fg_mask), [-1])

        num_fg = tf.minimum(int(

            cfg.FRCNN.BATCH_PER_IM * cfg.FRCNN.FG_RATIO),

            tf.size(fg_inds), name='num_fg')

        fg_inds = tf.random_shuffle(fg_inds)[:num_fg]

        bg_inds = tf.reshape(tf.where(tf.logical_not(fg_mask)), [-1])

        num_bg = tf.minimum(

            cfg.FRCNN.BATCH_PER_IM - num_fg,

            tf.size(bg_inds), name='num_bg')

        bg_inds = tf.random_shuffle(bg_inds)[:num_bg]

        add_moving_summary(num_fg, num_bg)

        return fg_inds, bg_inds

    fg_inds, bg_inds = sample_fg_bg(iou)

    # fg,bg indices w.r.t proposals

    best_iou_ind = tf.cond(tf.shape(iou)[1] > 0,

                           lambda: tf.argmax(iou, axis=1),   # #proposal, each in 0~m-1

                           lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.int64))

    fg_inds_wrt_gt = tf.gather(best_iou_ind, fg_inds)   # num_fg

    all_indices = tf.concat([fg_inds, bg_inds], axis=0)   # indices w.r.t all n+m proposal boxes

    ret_boxes = tf.gather(boxes, all_indices)

    ret_labels = tf.concat(

        [tf.gather(gt_labels, fg_inds_wrt_gt),

         tf.zeros_like(bg_inds, dtype=tf.int64)], axis=0)

    # stop the gradient -- they are meant to be training targets

    return BoxProposals(

        tf.stop_gradient(ret_boxes, name='sampled_proposal_boxes'),

        tf.stop_gradient(ret_labels, name='sampled_labels'),

        tf.stop_gradient(fg_inds_wrt_gt))

@layer_register(log_shape=True)

def fastrcnn_outputs(feature, num_categories, class_agnostic_regression=False):

    """

    Args:

        feature (any shape):

        num_categories (int):

        class_agnostic_regression (bool): if True, regression to N x 1 x 4

    Returns:

        cls_logits: N x num_class classification logits

        reg_logits: N x num_classx4 or Nx1x4 if class agnostic

    """

    num_classes = num_categories + 1

    classification = FullyConnected(

        'class', feature, num_classes,

        kernel_initializer=tf.random_normal_initializer(stddev=0.01))

    num_classes_for_box = 1 if class_agnostic_regression else num_classes

    box_regression = FullyConnected(

        'box', feature, num_classes_for_box * 4,

        kernel_initializer=tf.random_normal_initializer(stddev=0.001))

    box_regression = tf.reshape(box_regression, (-1, num_classes_for_box, 4), name='output_box')

    return classification, box_regression

@under_name_scope()

def fastrcnn_losses(labels, label_logits, fg_boxes, fg_box_logits):

    """

    Args:

        labels: n,

        label_logits: nxC

        fg_boxes: nfgx4, encoded

        fg_box_logits: nfgxCx4 or nfgx1x4 if class agnostic

    Returns:

        label_loss, box_loss

    """

    label_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(

        labels=labels, logits=label_logits)

    label_loss = tf.reduce_mean(label_loss, name='label_loss')

    fg_inds = tf.where(labels > 0)[:, 0]

    fg_labels = tf.gather(labels, fg_inds)

    num_fg = tf.size(fg_inds, out_type=tf.int64)

    empty_fg = tf.equal(num_fg, 0)

    if int(fg_box_logits.shape[1]) > 1:

        if get_tf_version_tuple() >= (1, 14):

            fg_labels = tf.expand_dims(fg_labels, axis=1)  # nfg x 1

            fg_box_logits = tf.gather(fg_box_logits, fg_labels, batch_dims=1)

        else:

            indices = tf.stack([tf.range(num_fg), fg_labels], axis=1)  # nfgx2

            fg_box_logits = tf.gather_nd(fg_box_logits, indices)

    fg_box_logits = tf.reshape(fg_box_logits, [-1, 4])  # nfg x 4

    with tf.name_scope('label_metrics'), tf.device('/cpu:0'):

        prediction = tf.argmax(label_logits, axis=1, name='label_prediction')

        correct = tf.cast(tf.equal(prediction, labels), tf.float32)  # boolean/integer gather is unavailable on GPU

        accuracy = tf.reduce_mean(correct, name='accuracy')

        fg_label_pred = tf.argmax(tf.gather(label_logits, fg_inds), axis=1)

        num_zero = tf.reduce_sum(tf.cast(tf.equal(fg_label_pred, 0), tf.int64), name='num_zero')

        false_negative = tf.where(

            empty_fg, 0., tf.cast(tf.truediv(num_zero, num_fg), tf.float32), name='false_negative')

        fg_accuracy = tf.where(

            empty_fg, 0., tf.reduce_mean(tf.gather(correct, fg_inds)), name='fg_accuracy')

    box_loss = tf.reduce_sum(tf.abs(fg_boxes - fg_box_logits))

    box_loss = tf.truediv(

        box_loss, tf.cast(tf.shape(labels)[0], tf.float32), name='box_loss')

    add_moving_summary(label_loss, box_loss, accuracy,

                       fg_accuracy, false_negative, tf.cast(num_fg, tf.float32, name='num_fg_label'))

    return [label_loss, box_loss]

@under_name_scope()

def fastrcnn_predictions(boxes, scores):

    """

    Generate final results from predictions of all proposals.

    Args:

        boxes: n#classx4 floatbox in float32

        scores: nx#class

    Returns:

        boxes: Kx4

        scores: K

        labels: K

    """

    assert boxes.shape[1] == scores.shape[1]

    boxes = tf.transpose(boxes, [1, 0, 2])[1:, :, :]  # #catxnx4

    scores = tf.transpose(scores[:, 1:], [1, 0])  # #catxn

    max_coord = tf.reduce_max(boxes)

    filtered_ids = tf.where(scores > cfg.TEST.RESULT_SCORE_THRESH)  # Fx2

    filtered_boxes = tf.gather_nd(boxes, filtered_ids)  # Fx4

    filtered_scores = tf.gather_nd(scores, filtered_ids)  # F,

    cls_per_box = tf.slice(filtered_ids, [0, 0], [-1, 1])

    offsets = tf.cast(cls_per_box, tf.float32) * (max_coord + 1)  # F,1

    nms_boxes = filtered_boxes + offsets

    selection = tf.image.non_max_suppression(

        nms_boxes,

        filtered_scores,

        cfg.TEST.RESULTS_PER_IM,

        cfg.TEST.FRCNN_NMS_THRESH)

    # The next lines are really dirty: it's a trick to return the scores for all classes

    final_scores = tf.gather(tf.gather(scores, filtered_ids[:, 1], axis=1), selection, axis=1)

    final_scores = tf.transpose(final_scores, name="scores")

    final_labels = tf.add(tf.gather(cls_per_box[:, 0], selection), 1, name='labels')

    final_boxes = tf.gather(filtered_boxes, selection, name='boxes')

    return final_boxes, final_scores, final_labels

"""

FastRCNN heads for FPN:

"""

@layer_register(log_shape=True)

def fastrcnn_2fc_head(feature):

    """

    Args:

        feature (any shape):

    Returns:

        2D head feature

    """

    dim = cfg.FPN.FRCNN_FC_HEAD_DIM

    init = tf.variance_scaling_initializer()

    hidden = FullyConnected('fc6', feature, dim, kernel_initializer=init, activation=tf.nn.relu)

    hidden = FullyConnected('fc7', hidden, dim, kernel_initializer=init, activation=tf.nn.relu)

    return hidden

@layer_register(log_shape=True)

def fastrcnn_Xconv1fc_head(feature, num_convs, norm=None):

    """

    Args:

        feature (NCHW):

        num_classes(int): num_category + 1

        num_convs (int): number of conv layers

        norm (str or None): either None or 'GN'

    Returns:

        2D head feature

    """

    assert norm in [None, 'GN'], norm

    l = feature

    with argscope(Conv2D, data_format='channels_first',

                  kernel_initializer=tf.variance_scaling_initializer(

                      scale=2.0, mode='fan_out',

                      distribution='untruncated_normal' if get_tf_version_tuple() >= (1, 12) else 'normal')):

        for k in range(num_convs):

            l = Conv2D('conv{}'.format(k), l, cfg.FPN.FRCNN_CONV_HEAD_DIM, 3, activation=tf.nn.relu)

            if norm is not None:

                l = GroupNorm('gn{}'.format(k), l)

        l = FullyConnected('fc', l, cfg.FPN.FRCNN_FC_HEAD_DIM,

                           kernel_initializer=tf.variance_scaling_initializer(), activation=tf.nn.relu)

    return l

def fastrcnn_4conv1fc_head(*args, **kwargs):

    return fastrcnn_Xconv1fc_head(*args, num_convs=4, **kwargs)

def fastrcnn_4conv1fc_gn_head(*args, **kwargs):

    return fastrcnn_Xconv1fc_head(*args, num_convs=4, norm='GN', **kwargs)

class BoxProposals(object):

    """

    A structure to manage box proposals and their relations with ground truth.

    """

    def __init__(self, boxes, labels=None, fg_inds_wrt_gt=None):

        """

        Args:

            boxes: Nx4

            labels: N, each in [0, #class), the true label for each input box

            fg_inds_wrt_gt: #fg, each in [0, M)

        The last four arguments could be None when not training.

        """

        for k, v in locals().items():

            if k != 'self' and v is not None:

                setattr(self, k, v)

    @memoized_method

    def fg_inds(self):

        """ Returns: #fg indices in [0, N-1] """

        return tf.reshape(tf.where(self.labels > 0), [-1], name='fg_inds')

    @memoized_method

    def fg_boxes(self):

        """ Returns: #fg x4"""

        return tf.gather(self.boxes, self.fg_inds(), name='fg_boxes')

    @memoized_method

    def fg_labels(self):

        """ Returns: #fg"""

        return tf.gather(self.labels, self.fg_inds(), name='fg_labels')

class FastRCNNHead(object):

    """

    A class to process & decode inputs/outputs of a fastrcnn classification+regression head.

    """

    def __init__(self, proposals, box_logits, label_logits, gt_boxes, bbox_regression_weights):

        """

        Args:

            proposals: BoxProposals

            box_logits: Nx#classx4 or Nx1x4, the output of the head

            label_logits: Nx#class, the output of the head

            gt_boxes: Mx4

            bbox_regression_weights: a 4 element tensor

        """

        for k, v in locals().items():

            if k != 'self' and v is not None:

                setattr(self, k, v)

        self._bbox_class_agnostic = int(box_logits.shape[1]) == 1

        self._num_classes = box_logits.shape[1]

    @memoized_method

    def fg_box_logits(self):

        """ Returns: #fg x ? x 4 """

        return tf.gather(self.box_logits, self.proposals.fg_inds(), name='fg_box_logits')

    @memoized_method

    def losses(self):

        encoded_fg_gt_boxes = encode_bbox_target(

            tf.gather(self.gt_boxes, self.proposals.fg_inds_wrt_gt),

            self.proposals.fg_boxes()) * self.bbox_regression_weights

        return fastrcnn_losses(

            self.proposals.labels, self.label_logits,

            encoded_fg_gt_boxes, self.fg_box_logits()

        )

    @memoized_method

    def decoded_output_boxes(self):

        """ Returns: N x #class x 4 """

        anchors = tf.tile(tf.expand_dims(self.proposals.boxes, 1),

                          [1, self._num_classes, 1])   # N x #class x 4

        decoded_boxes = decode_bbox_target(

            self.box_logits / self.bbox_regression_weights,

            anchors

        )

        return decoded_boxes

    @memoized_method

    def decoded_output_boxes_for_true_label(self):

        """ Returns: Nx4 decoded boxes """

        return self._decoded_output_boxes_for_label(self.proposals.labels)

    @memoized_method

    def decoded_output_boxes_for_predicted_label(self):

        """ Returns: Nx4 decoded boxes """

        return self._decoded_output_boxes_for_label(self.predicted_labels())

    @memoized_method

    def decoded_output_boxes_for_label(self, labels):

        assert not self._bbox_class_agnostic

        indices = tf.stack([

            tf.range(tf.size(labels, out_type=tf.int64)),

            labels

        ])

        needed_logits = tf.gather_nd(self.box_logits, indices)

        decoded = decode_bbox_target(

            needed_logits / self.bbox_regression_weights,

            self.proposals.boxes

        )

        return decoded

    @memoized_method

    def decoded_output_boxes_class_agnostic(self):

        """ Returns: Nx4 """

        assert self._bbox_class_agnostic

        box_logits = tf.reshape(self.box_logits, [-1, 4])

        decoded = decode_bbox_target(

            box_logits / self.bbox_regression_weights,

            self.proposals.boxes

        )

        return decoded

    @memoized_method

    def output_scores(self, name=None):

        """ Returns: N x #class scores, summed to one for each box."""

        return tf.nn.softmax(self.label_logits, name=name)

    @memoized_method

    def predicted_labels(self):

        """ Returns: N ints """

        return tf.argmax(self.label_logits, axis=1, name='predicted_labels')

Functions

fastrcnn_2fc_head

def fastrcnn_2fc_head(
    feature
)

Args: feature (any shape):

Returns: 2D head feature

View Source
@layer_register(log_shape=True)

def fastrcnn_2fc_head(feature):

    """

    Args:

        feature (any shape):

    Returns:

        2D head feature

    """

    dim = cfg.FPN.FRCNN_FC_HEAD_DIM

    init = tf.variance_scaling_initializer()

    hidden = FullyConnected('fc6', feature, dim, kernel_initializer=init, activation=tf.nn.relu)

    hidden = FullyConnected('fc7', hidden, dim, kernel_initializer=init, activation=tf.nn.relu)

    return hidden

fastrcnn_4conv1fc_gn_head

def fastrcnn_4conv1fc_gn_head(
    *args,
    **kwargs
)
View Source
def fastrcnn_4conv1fc_gn_head(*args, **kwargs):

    return fastrcnn_Xconv1fc_head(*args, num_convs=4, norm='GN', **kwargs)

fastrcnn_4conv1fc_head

def fastrcnn_4conv1fc_head(
    *args,
    **kwargs
)
View Source
def fastrcnn_4conv1fc_head(*args, **kwargs):

    return fastrcnn_Xconv1fc_head(*args, num_convs=4, **kwargs)

fastrcnn_Xconv1fc_head

def fastrcnn_Xconv1fc_head(
    feature,
    num_convs,
    norm=None
)

Args: feature (NCHW): num_classes(int): num_category + 1 num_convs (int): number of conv layers norm (str or None): either None or 'GN'

Returns: 2D head feature

View Source
@layer_register(log_shape=True)

def fastrcnn_Xconv1fc_head(feature, num_convs, norm=None):

    """

    Args:

        feature (NCHW):

        num_classes(int): num_category + 1

        num_convs (int): number of conv layers

        norm (str or None): either None or 'GN'

    Returns:

        2D head feature

    """

    assert norm in [None, 'GN'], norm

    l = feature

    with argscope(Conv2D, data_format='channels_first',

                  kernel_initializer=tf.variance_scaling_initializer(

                      scale=2.0, mode='fan_out',

                      distribution='untruncated_normal' if get_tf_version_tuple() >= (1, 12) else 'normal')):

        for k in range(num_convs):

            l = Conv2D('conv{}'.format(k), l, cfg.FPN.FRCNN_CONV_HEAD_DIM, 3, activation=tf.nn.relu)

            if norm is not None:

                l = GroupNorm('gn{}'.format(k), l)

        l = FullyConnected('fc', l, cfg.FPN.FRCNN_FC_HEAD_DIM,

                           kernel_initializer=tf.variance_scaling_initializer(), activation=tf.nn.relu)

    return l

fastrcnn_losses

def fastrcnn_losses(
    labels,
    label_logits,
    fg_boxes,
    fg_box_logits
)

Args: labels: n, label_logits: nxC fg_boxes: nfgx4, encoded fg_box_logits: nfgxCx4 or nfgx1x4 if class agnostic

Returns: label_loss, box_loss

View Source
@under_name_scope()

def fastrcnn_losses(labels, label_logits, fg_boxes, fg_box_logits):

    """

    Args:

        labels: n,

        label_logits: nxC

        fg_boxes: nfgx4, encoded

        fg_box_logits: nfgxCx4 or nfgx1x4 if class agnostic

    Returns:

        label_loss, box_loss

    """

    label_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(

        labels=labels, logits=label_logits)

    label_loss = tf.reduce_mean(label_loss, name='label_loss')

    fg_inds = tf.where(labels > 0)[:, 0]

    fg_labels = tf.gather(labels, fg_inds)

    num_fg = tf.size(fg_inds, out_type=tf.int64)

    empty_fg = tf.equal(num_fg, 0)

    if int(fg_box_logits.shape[1]) > 1:

        if get_tf_version_tuple() >= (1, 14):

            fg_labels = tf.expand_dims(fg_labels, axis=1)  # nfg x 1

            fg_box_logits = tf.gather(fg_box_logits, fg_labels, batch_dims=1)

        else:

            indices = tf.stack([tf.range(num_fg), fg_labels], axis=1)  # nfgx2

            fg_box_logits = tf.gather_nd(fg_box_logits, indices)

    fg_box_logits = tf.reshape(fg_box_logits, [-1, 4])  # nfg x 4

    with tf.name_scope('label_metrics'), tf.device('/cpu:0'):

        prediction = tf.argmax(label_logits, axis=1, name='label_prediction')

        correct = tf.cast(tf.equal(prediction, labels), tf.float32)  # boolean/integer gather is unavailable on GPU

        accuracy = tf.reduce_mean(correct, name='accuracy')

        fg_label_pred = tf.argmax(tf.gather(label_logits, fg_inds), axis=1)

        num_zero = tf.reduce_sum(tf.cast(tf.equal(fg_label_pred, 0), tf.int64), name='num_zero')

        false_negative = tf.where(

            empty_fg, 0., tf.cast(tf.truediv(num_zero, num_fg), tf.float32), name='false_negative')

        fg_accuracy = tf.where(

            empty_fg, 0., tf.reduce_mean(tf.gather(correct, fg_inds)), name='fg_accuracy')

    box_loss = tf.reduce_sum(tf.abs(fg_boxes - fg_box_logits))

    box_loss = tf.truediv(

        box_loss, tf.cast(tf.shape(labels)[0], tf.float32), name='box_loss')

    add_moving_summary(label_loss, box_loss, accuracy,

                       fg_accuracy, false_negative, tf.cast(num_fg, tf.float32, name='num_fg_label'))

    return [label_loss, box_loss]

fastrcnn_outputs

def fastrcnn_outputs(
    feature,
    num_categories,
    class_agnostic_regression=False
)

Args: feature (any shape): num_categories (int): class_agnostic_regression (bool): if True, regression to N x 1 x 4

Returns: cls_logits: N x num_class classification logits reg_logits: N x num_classx4 or Nx1x4 if class agnostic

View Source
@layer_register(log_shape=True)

def fastrcnn_outputs(feature, num_categories, class_agnostic_regression=False):

    """

    Args:

        feature (any shape):

        num_categories (int):

        class_agnostic_regression (bool): if True, regression to N x 1 x 4

    Returns:

        cls_logits: N x num_class classification logits

        reg_logits: N x num_classx4 or Nx1x4 if class agnostic

    """

    num_classes = num_categories + 1

    classification = FullyConnected(

        'class', feature, num_classes,

        kernel_initializer=tf.random_normal_initializer(stddev=0.01))

    num_classes_for_box = 1 if class_agnostic_regression else num_classes

    box_regression = FullyConnected(

        'box', feature, num_classes_for_box * 4,

        kernel_initializer=tf.random_normal_initializer(stddev=0.001))

    box_regression = tf.reshape(box_regression, (-1, num_classes_for_box, 4), name='output_box')

    return classification, box_regression

fastrcnn_predictions

def fastrcnn_predictions(
    boxes,
    scores
)

Generate final results from predictions of all proposals.

Args: boxes: n#classx4 floatbox in float32 scores: nx#class

Returns: boxes: Kx4 scores: K labels: K

View Source
@under_name_scope()

def fastrcnn_predictions(boxes, scores):

    """

    Generate final results from predictions of all proposals.

    Args:

        boxes: n#classx4 floatbox in float32

        scores: nx#class

    Returns:

        boxes: Kx4

        scores: K

        labels: K

    """

    assert boxes.shape[1] == scores.shape[1]

    boxes = tf.transpose(boxes, [1, 0, 2])[1:, :, :]  # #catxnx4

    scores = tf.transpose(scores[:, 1:], [1, 0])  # #catxn

    max_coord = tf.reduce_max(boxes)

    filtered_ids = tf.where(scores > cfg.TEST.RESULT_SCORE_THRESH)  # Fx2

    filtered_boxes = tf.gather_nd(boxes, filtered_ids)  # Fx4

    filtered_scores = tf.gather_nd(scores, filtered_ids)  # F,

    cls_per_box = tf.slice(filtered_ids, [0, 0], [-1, 1])

    offsets = tf.cast(cls_per_box, tf.float32) * (max_coord + 1)  # F,1

    nms_boxes = filtered_boxes + offsets

    selection = tf.image.non_max_suppression(

        nms_boxes,

        filtered_scores,

        cfg.TEST.RESULTS_PER_IM,

        cfg.TEST.FRCNN_NMS_THRESH)

    # The next lines are really dirty: it's a trick to return the scores for all classes

    final_scores = tf.gather(tf.gather(scores, filtered_ids[:, 1], axis=1), selection, axis=1)

    final_scores = tf.transpose(final_scores, name="scores")

    final_labels = tf.add(tf.gather(cls_per_box[:, 0], selection), 1, name='labels')

    final_boxes = tf.gather(filtered_boxes, selection, name='boxes')

    return final_boxes, final_scores, final_labels

proposal_metrics

def proposal_metrics(
    iou
)

Add summaries for RPN proposals.

Args: iou: nxm, #proposal x #gt

View Source
@under_name_scope()

def proposal_metrics(iou):

    """

    Add summaries for RPN proposals.

    Args:

        iou: nxm, #proposal x #gt

    """

    # find best roi for each gt, for summary only

    best_iou = tf.reduce_max(iou, axis=0)

    mean_best_iou = tf.reduce_mean(best_iou, name='best_iou_per_gt')

    summaries = [mean_best_iou]

    with tf.device('/cpu:0'):

        for th in [0.3, 0.5]:

            recall = tf.truediv(

                tf.count_nonzero(best_iou >= th),

                tf.size(best_iou, out_type=tf.int64),

                name='recall_iou{}'.format(th))

            summaries.append(recall)

    add_moving_summary(*summaries)

sample_fast_rcnn_targets

def sample_fast_rcnn_targets(
    boxes,
    gt_boxes,
    gt_labels
)

Sample some boxes from all proposals for training.

fg is guaranteed to be > 0, because ground truth boxes will be added as proposals.

Args: boxes: nx4 region proposals, floatbox gt_boxes: mx4, floatbox gt_labels: m, int32

Returns: A BoxProposals instance, with: sampled_boxes: tx4 floatbox, the rois sampled_labels: t int64 labels, in [0, #class). Positive means foreground. fg_inds_wrt_gt: #fg indices, each in range [0, m-1]. It contains the matching GT of each foreground roi.

View Source
@under_name_scope()

def sample_fast_rcnn_targets(boxes, gt_boxes, gt_labels):

    """

    Sample some boxes from all proposals for training.

    #fg is guaranteed to be > 0, because ground truth boxes will be added as proposals.

    Args:

        boxes: nx4 region proposals, floatbox

        gt_boxes: mx4, floatbox

        gt_labels: m, int32

    Returns:

        A BoxProposals instance, with:

            sampled_boxes: tx4 floatbox, the rois

            sampled_labels: t int64 labels, in [0, #class). Positive means foreground.

            fg_inds_wrt_gt: #fg indices, each in range [0, m-1].

                It contains the matching GT of each foreground roi.

    """

    iou = pairwise_iou(boxes, gt_boxes)     # nxm

    proposal_metrics(iou)

    # add ground truth as proposals as well

    boxes = tf.concat([boxes, gt_boxes], axis=0)    # (n+m) x 4

    iou = tf.concat([iou, tf.eye(tf.shape(gt_boxes)[0])], axis=0)   # (n+m) x m

    # #proposal=n+m from now on

    def sample_fg_bg(iou):

        fg_mask = tf.cond(tf.shape(iou)[1] > 0,

                          lambda: tf.reduce_max(iou, axis=1) >= cfg.FRCNN.FG_THRESH,

                          lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.bool))

        fg_inds = tf.reshape(tf.where(fg_mask), [-1])

        num_fg = tf.minimum(int(

            cfg.FRCNN.BATCH_PER_IM * cfg.FRCNN.FG_RATIO),

            tf.size(fg_inds), name='num_fg')

        fg_inds = tf.random_shuffle(fg_inds)[:num_fg]

        bg_inds = tf.reshape(tf.where(tf.logical_not(fg_mask)), [-1])

        num_bg = tf.minimum(

            cfg.FRCNN.BATCH_PER_IM - num_fg,

            tf.size(bg_inds), name='num_bg')

        bg_inds = tf.random_shuffle(bg_inds)[:num_bg]

        add_moving_summary(num_fg, num_bg)

        return fg_inds, bg_inds

    fg_inds, bg_inds = sample_fg_bg(iou)

    # fg,bg indices w.r.t proposals

    best_iou_ind = tf.cond(tf.shape(iou)[1] > 0,

                           lambda: tf.argmax(iou, axis=1),   # #proposal, each in 0~m-1

                           lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.int64))

    fg_inds_wrt_gt = tf.gather(best_iou_ind, fg_inds)   # num_fg

    all_indices = tf.concat([fg_inds, bg_inds], axis=0)   # indices w.r.t all n+m proposal boxes

    ret_boxes = tf.gather(boxes, all_indices)

    ret_labels = tf.concat(

        [tf.gather(gt_labels, fg_inds_wrt_gt),

         tf.zeros_like(bg_inds, dtype=tf.int64)], axis=0)

    # stop the gradient -- they are meant to be training targets

    return BoxProposals(

        tf.stop_gradient(ret_boxes, name='sampled_proposal_boxes'),

        tf.stop_gradient(ret_labels, name='sampled_labels'),

        tf.stop_gradient(fg_inds_wrt_gt))

Classes

BoxProposals

class BoxProposals(
    boxes,
    labels=None,
    fg_inds_wrt_gt=None
)

A structure to manage box proposals and their relations with ground truth.

Methods

fg_boxes
def fg_boxes(
    self
)

Returns: #fg x4

View Source
    @memoized_method

    def fg_boxes(self):

        """ Returns: #fg x4"""

        return tf.gather(self.boxes, self.fg_inds(), name='fg_boxes')
fg_inds
def fg_inds(
    self
)

Returns: #fg indices in [0, N-1]

View Source
    @memoized_method

    def fg_inds(self):

        """ Returns: #fg indices in [0, N-1] """

        return tf.reshape(tf.where(self.labels > 0), [-1], name='fg_inds')
fg_labels
def fg_labels(
    self
)

Returns: #fg

View Source
    @memoized_method

    def fg_labels(self):

        """ Returns: #fg"""

        return tf.gather(self.labels, self.fg_inds(), name='fg_labels')

FastRCNNHead

class FastRCNNHead(
    proposals,
    box_logits,
    label_logits,
    gt_boxes,
    bbox_regression_weights
)

A class to process & decode inputs/outputs of a fastrcnn classification+regression head.

Methods

decoded_output_boxes
def decoded_output_boxes(
    self
)

Returns: N x #class x 4

View Source
    @memoized_method

    def decoded_output_boxes(self):

        """ Returns: N x #class x 4 """

        anchors = tf.tile(tf.expand_dims(self.proposals.boxes, 1),

                          [1, self._num_classes, 1])   # N x #class x 4

        decoded_boxes = decode_bbox_target(

            self.box_logits / self.bbox_regression_weights,

            anchors

        )

        return decoded_boxes
decoded_output_boxes_class_agnostic
def decoded_output_boxes_class_agnostic(
    self
)

Returns: Nx4

View Source
    @memoized_method

    def decoded_output_boxes_class_agnostic(self):

        """ Returns: Nx4 """

        assert self._bbox_class_agnostic

        box_logits = tf.reshape(self.box_logits, [-1, 4])

        decoded = decode_bbox_target(

            box_logits / self.bbox_regression_weights,

            self.proposals.boxes

        )

        return decoded
decoded_output_boxes_for_label
def decoded_output_boxes_for_label(
    self,
    labels
)
View Source
    @memoized_method

    def decoded_output_boxes_for_label(self, labels):

        assert not self._bbox_class_agnostic

        indices = tf.stack([

            tf.range(tf.size(labels, out_type=tf.int64)),

            labels

        ])

        needed_logits = tf.gather_nd(self.box_logits, indices)

        decoded = decode_bbox_target(

            needed_logits / self.bbox_regression_weights,

            self.proposals.boxes

        )

        return decoded
decoded_output_boxes_for_predicted_label
def decoded_output_boxes_for_predicted_label(
    self
)

Returns: Nx4 decoded boxes

View Source
    @memoized_method

    def decoded_output_boxes_for_predicted_label(self):

        """ Returns: Nx4 decoded boxes """

        return self._decoded_output_boxes_for_label(self.predicted_labels())
decoded_output_boxes_for_true_label
def decoded_output_boxes_for_true_label(
    self
)

Returns: Nx4 decoded boxes

View Source
    @memoized_method

    def decoded_output_boxes_for_true_label(self):

        """ Returns: Nx4 decoded boxes """

        return self._decoded_output_boxes_for_label(self.proposals.labels)
fg_box_logits
def fg_box_logits(
    self
)

Returns: #fg x ? x 4

View Source
    @memoized_method

    def fg_box_logits(self):

        """ Returns: #fg x ? x 4 """

        return tf.gather(self.box_logits, self.proposals.fg_inds(), name='fg_box_logits')
losses
def losses(
    self
)
View Source
    @memoized_method

    def losses(self):

        encoded_fg_gt_boxes = encode_bbox_target(

            tf.gather(self.gt_boxes, self.proposals.fg_inds_wrt_gt),

            self.proposals.fg_boxes()) * self.bbox_regression_weights

        return fastrcnn_losses(

            self.proposals.labels, self.label_logits,

            encoded_fg_gt_boxes, self.fg_box_logits()

        )
output_scores
def output_scores(
    self,
    name=None
)

Returns: N x #class scores, summed to one for each box.

View Source
    @memoized_method

    def output_scores(self, name=None):

        """ Returns: N x #class scores, summed to one for each box."""

        return tf.nn.softmax(self.label_logits, name=name)
predicted_labels
def predicted_labels(
    self
)

Returns: N ints

View Source
    @memoized_method

    def predicted_labels(self):

        """ Returns: N ints """

        return tf.argmax(self.label_logits, axis=1, name='predicted_labels')