Module mot.object_detection.modeling.model_frcnn
View Source
# -*- coding: utf-8 -*-
# File: model_frcnn.py
import tensorflow as tf
from tensorpack.models import Conv2D, FullyConnected, layer_register
from tensorpack.tfutils.argscope import argscope
from tensorpack.tfutils.common import get_tf_version_tuple
from tensorpack.tfutils.scope_utils import under_name_scope
from tensorpack.tfutils.summary import add_moving_summary
from tensorpack.utils.argtools import memoized_method
from mot.object_detection.config import config as cfg
from mot.object_detection.utils.box_ops import pairwise_iou
from mot.object_detection.modeling.model_box import decode_bbox_target, encode_bbox_target
from mot.object_detection.modeling.backbone import GroupNorm
@under_name_scope()
def proposal_metrics(iou):
    """
    Add summaries for RPN proposals.
    Args:
        iou: nxm, #proposal x #gt
    """
    # find best roi for each gt, for summary only
    best_iou = tf.reduce_max(iou, axis=0)
    mean_best_iou = tf.reduce_mean(best_iou, name='best_iou_per_gt')
    summaries = [mean_best_iou]
    with tf.device('/cpu:0'):
        for th in [0.3, 0.5]:
            recall = tf.truediv(
                tf.count_nonzero(best_iou >= th),
                tf.size(best_iou, out_type=tf.int64),
                name='recall_iou{}'.format(th))
            summaries.append(recall)
    add_moving_summary(*summaries)
@under_name_scope()
def sample_fast_rcnn_targets(boxes, gt_boxes, gt_labels):
    """
    Sample some boxes from all proposals for training.
    #fg is guaranteed to be > 0, because ground truth boxes will be added as proposals.
    Args:
        boxes: nx4 region proposals, floatbox
        gt_boxes: mx4, floatbox
        gt_labels: m, int32
    Returns:
        A BoxProposals instance, with:
            sampled_boxes: tx4 floatbox, the rois
            sampled_labels: t int64 labels, in [0, #class). Positive means foreground.
            fg_inds_wrt_gt: #fg indices, each in range [0, m-1].
                It contains the matching GT of each foreground roi.
    """
    iou = pairwise_iou(boxes, gt_boxes)     # nxm
    proposal_metrics(iou)
    # add ground truth as proposals as well
    boxes = tf.concat([boxes, gt_boxes], axis=0)    # (n+m) x 4
    iou = tf.concat([iou, tf.eye(tf.shape(gt_boxes)[0])], axis=0)   # (n+m) x m
    # #proposal=n+m from now on
    def sample_fg_bg(iou):
        fg_mask = tf.cond(tf.shape(iou)[1] > 0,
                          lambda: tf.reduce_max(iou, axis=1) >= cfg.FRCNN.FG_THRESH,
                          lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.bool))
        fg_inds = tf.reshape(tf.where(fg_mask), [-1])
        num_fg = tf.minimum(int(
            cfg.FRCNN.BATCH_PER_IM * cfg.FRCNN.FG_RATIO),
            tf.size(fg_inds), name='num_fg')
        fg_inds = tf.random_shuffle(fg_inds)[:num_fg]
        bg_inds = tf.reshape(tf.where(tf.logical_not(fg_mask)), [-1])
        num_bg = tf.minimum(
            cfg.FRCNN.BATCH_PER_IM - num_fg,
            tf.size(bg_inds), name='num_bg')
        bg_inds = tf.random_shuffle(bg_inds)[:num_bg]
        add_moving_summary(num_fg, num_bg)
        return fg_inds, bg_inds
    fg_inds, bg_inds = sample_fg_bg(iou)
    # fg,bg indices w.r.t proposals
    best_iou_ind = tf.cond(tf.shape(iou)[1] > 0,
                           lambda: tf.argmax(iou, axis=1),   # #proposal, each in 0~m-1
                           lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.int64))
    fg_inds_wrt_gt = tf.gather(best_iou_ind, fg_inds)   # num_fg
    all_indices = tf.concat([fg_inds, bg_inds], axis=0)   # indices w.r.t all n+m proposal boxes
    ret_boxes = tf.gather(boxes, all_indices)
    ret_labels = tf.concat(
        [tf.gather(gt_labels, fg_inds_wrt_gt),
         tf.zeros_like(bg_inds, dtype=tf.int64)], axis=0)
    # stop the gradient -- they are meant to be training targets
    return BoxProposals(
        tf.stop_gradient(ret_boxes, name='sampled_proposal_boxes'),
        tf.stop_gradient(ret_labels, name='sampled_labels'),
        tf.stop_gradient(fg_inds_wrt_gt))
@layer_register(log_shape=True)
def fastrcnn_outputs(feature, num_categories, class_agnostic_regression=False):
    """
    Args:
        feature (any shape):
        num_categories (int):
        class_agnostic_regression (bool): if True, regression to N x 1 x 4
    Returns:
        cls_logits: N x num_class classification logits
        reg_logits: N x num_classx4 or Nx1x4 if class agnostic
    """
    num_classes = num_categories + 1
    classification = FullyConnected(
        'class', feature, num_classes,
        kernel_initializer=tf.random_normal_initializer(stddev=0.01))
    num_classes_for_box = 1 if class_agnostic_regression else num_classes
    box_regression = FullyConnected(
        'box', feature, num_classes_for_box * 4,
        kernel_initializer=tf.random_normal_initializer(stddev=0.001))
    box_regression = tf.reshape(box_regression, (-1, num_classes_for_box, 4), name='output_box')
    return classification, box_regression
@under_name_scope()
def fastrcnn_losses(labels, label_logits, fg_boxes, fg_box_logits):
    """
    Args:
        labels: n,
        label_logits: nxC
        fg_boxes: nfgx4, encoded
        fg_box_logits: nfgxCx4 or nfgx1x4 if class agnostic
    Returns:
        label_loss, box_loss
    """
    label_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
        labels=labels, logits=label_logits)
    label_loss = tf.reduce_mean(label_loss, name='label_loss')
    fg_inds = tf.where(labels > 0)[:, 0]
    fg_labels = tf.gather(labels, fg_inds)
    num_fg = tf.size(fg_inds, out_type=tf.int64)
    empty_fg = tf.equal(num_fg, 0)
    if int(fg_box_logits.shape[1]) > 1:
        if get_tf_version_tuple() >= (1, 14):
            fg_labels = tf.expand_dims(fg_labels, axis=1)  # nfg x 1
            fg_box_logits = tf.gather(fg_box_logits, fg_labels, batch_dims=1)
        else:
            indices = tf.stack([tf.range(num_fg), fg_labels], axis=1)  # nfgx2
            fg_box_logits = tf.gather_nd(fg_box_logits, indices)
    fg_box_logits = tf.reshape(fg_box_logits, [-1, 4])  # nfg x 4
    with tf.name_scope('label_metrics'), tf.device('/cpu:0'):
        prediction = tf.argmax(label_logits, axis=1, name='label_prediction')
        correct = tf.cast(tf.equal(prediction, labels), tf.float32)  # boolean/integer gather is unavailable on GPU
        accuracy = tf.reduce_mean(correct, name='accuracy')
        fg_label_pred = tf.argmax(tf.gather(label_logits, fg_inds), axis=1)
        num_zero = tf.reduce_sum(tf.cast(tf.equal(fg_label_pred, 0), tf.int64), name='num_zero')
        false_negative = tf.where(
            empty_fg, 0., tf.cast(tf.truediv(num_zero, num_fg), tf.float32), name='false_negative')
        fg_accuracy = tf.where(
            empty_fg, 0., tf.reduce_mean(tf.gather(correct, fg_inds)), name='fg_accuracy')
    box_loss = tf.reduce_sum(tf.abs(fg_boxes - fg_box_logits))
    box_loss = tf.truediv(
        box_loss, tf.cast(tf.shape(labels)[0], tf.float32), name='box_loss')
    add_moving_summary(label_loss, box_loss, accuracy,
                       fg_accuracy, false_negative, tf.cast(num_fg, tf.float32, name='num_fg_label'))
    return [label_loss, box_loss]
@under_name_scope()
def fastrcnn_predictions(boxes, scores):
    """
    Generate final results from predictions of all proposals.
    Args:
        boxes: n#classx4 floatbox in float32
        scores: nx#class
    Returns:
        boxes: Kx4
        scores: K
        labels: K
    """
    assert boxes.shape[1] == scores.shape[1]
    boxes = tf.transpose(boxes, [1, 0, 2])[1:, :, :]  # #catxnx4
    scores = tf.transpose(scores[:, 1:], [1, 0])  # #catxn
    max_coord = tf.reduce_max(boxes)
    filtered_ids = tf.where(scores > cfg.TEST.RESULT_SCORE_THRESH)  # Fx2
    filtered_boxes = tf.gather_nd(boxes, filtered_ids)  # Fx4
    filtered_scores = tf.gather_nd(scores, filtered_ids)  # F,
    cls_per_box = tf.slice(filtered_ids, [0, 0], [-1, 1])
    offsets = tf.cast(cls_per_box, tf.float32) * (max_coord + 1)  # F,1
    nms_boxes = filtered_boxes + offsets
    selection = tf.image.non_max_suppression(
        nms_boxes,
        filtered_scores,
        cfg.TEST.RESULTS_PER_IM,
        cfg.TEST.FRCNN_NMS_THRESH)
    # The next lines are really dirty: it's a trick to return the scores for all classes
    final_scores = tf.gather(tf.gather(scores, filtered_ids[:, 1], axis=1), selection, axis=1)
    final_scores = tf.transpose(final_scores, name="scores")
    final_labels = tf.add(tf.gather(cls_per_box[:, 0], selection), 1, name='labels')
    final_boxes = tf.gather(filtered_boxes, selection, name='boxes')
    return final_boxes, final_scores, final_labels
"""
FastRCNN heads for FPN:
"""
@layer_register(log_shape=True)
def fastrcnn_2fc_head(feature):
    """
    Args:
        feature (any shape):
    Returns:
        2D head feature
    """
    dim = cfg.FPN.FRCNN_FC_HEAD_DIM
    init = tf.variance_scaling_initializer()
    hidden = FullyConnected('fc6', feature, dim, kernel_initializer=init, activation=tf.nn.relu)
    hidden = FullyConnected('fc7', hidden, dim, kernel_initializer=init, activation=tf.nn.relu)
    return hidden
@layer_register(log_shape=True)
def fastrcnn_Xconv1fc_head(feature, num_convs, norm=None):
    """
    Args:
        feature (NCHW):
        num_classes(int): num_category + 1
        num_convs (int): number of conv layers
        norm (str or None): either None or 'GN'
    Returns:
        2D head feature
    """
    assert norm in [None, 'GN'], norm
    l = feature
    with argscope(Conv2D, data_format='channels_first',
                  kernel_initializer=tf.variance_scaling_initializer(
                      scale=2.0, mode='fan_out',
                      distribution='untruncated_normal' if get_tf_version_tuple() >= (1, 12) else 'normal')):
        for k in range(num_convs):
            l = Conv2D('conv{}'.format(k), l, cfg.FPN.FRCNN_CONV_HEAD_DIM, 3, activation=tf.nn.relu)
            if norm is not None:
                l = GroupNorm('gn{}'.format(k), l)
        l = FullyConnected('fc', l, cfg.FPN.FRCNN_FC_HEAD_DIM,
                           kernel_initializer=tf.variance_scaling_initializer(), activation=tf.nn.relu)
    return l
def fastrcnn_4conv1fc_head(*args, **kwargs):
    return fastrcnn_Xconv1fc_head(*args, num_convs=4, **kwargs)
def fastrcnn_4conv1fc_gn_head(*args, **kwargs):
    return fastrcnn_Xconv1fc_head(*args, num_convs=4, norm='GN', **kwargs)
class BoxProposals(object):
    """
    A structure to manage box proposals and their relations with ground truth.
    """
    def __init__(self, boxes, labels=None, fg_inds_wrt_gt=None):
        """
        Args:
            boxes: Nx4
            labels: N, each in [0, #class), the true label for each input box
            fg_inds_wrt_gt: #fg, each in [0, M)
        The last four arguments could be None when not training.
        """
        for k, v in locals().items():
            if k != 'self' and v is not None:
                setattr(self, k, v)
    @memoized_method
    def fg_inds(self):
        """ Returns: #fg indices in [0, N-1] """
        return tf.reshape(tf.where(self.labels > 0), [-1], name='fg_inds')
    @memoized_method
    def fg_boxes(self):
        """ Returns: #fg x4"""
        return tf.gather(self.boxes, self.fg_inds(), name='fg_boxes')
    @memoized_method
    def fg_labels(self):
        """ Returns: #fg"""
        return tf.gather(self.labels, self.fg_inds(), name='fg_labels')
class FastRCNNHead(object):
    """
    A class to process & decode inputs/outputs of a fastrcnn classification+regression head.
    """
    def __init__(self, proposals, box_logits, label_logits, gt_boxes, bbox_regression_weights):
        """
        Args:
            proposals: BoxProposals
            box_logits: Nx#classx4 or Nx1x4, the output of the head
            label_logits: Nx#class, the output of the head
            gt_boxes: Mx4
            bbox_regression_weights: a 4 element tensor
        """
        for k, v in locals().items():
            if k != 'self' and v is not None:
                setattr(self, k, v)
        self._bbox_class_agnostic = int(box_logits.shape[1]) == 1
        self._num_classes = box_logits.shape[1]
    @memoized_method
    def fg_box_logits(self):
        """ Returns: #fg x ? x 4 """
        return tf.gather(self.box_logits, self.proposals.fg_inds(), name='fg_box_logits')
    @memoized_method
    def losses(self):
        encoded_fg_gt_boxes = encode_bbox_target(
            tf.gather(self.gt_boxes, self.proposals.fg_inds_wrt_gt),
            self.proposals.fg_boxes()) * self.bbox_regression_weights
        return fastrcnn_losses(
            self.proposals.labels, self.label_logits,
            encoded_fg_gt_boxes, self.fg_box_logits()
        )
    @memoized_method
    def decoded_output_boxes(self):
        """ Returns: N x #class x 4 """
        anchors = tf.tile(tf.expand_dims(self.proposals.boxes, 1),
                          [1, self._num_classes, 1])   # N x #class x 4
        decoded_boxes = decode_bbox_target(
            self.box_logits / self.bbox_regression_weights,
            anchors
        )
        return decoded_boxes
    @memoized_method
    def decoded_output_boxes_for_true_label(self):
        """ Returns: Nx4 decoded boxes """
        return self._decoded_output_boxes_for_label(self.proposals.labels)
    @memoized_method
    def decoded_output_boxes_for_predicted_label(self):
        """ Returns: Nx4 decoded boxes """
        return self._decoded_output_boxes_for_label(self.predicted_labels())
    @memoized_method
    def decoded_output_boxes_for_label(self, labels):
        assert not self._bbox_class_agnostic
        indices = tf.stack([
            tf.range(tf.size(labels, out_type=tf.int64)),
            labels
        ])
        needed_logits = tf.gather_nd(self.box_logits, indices)
        decoded = decode_bbox_target(
            needed_logits / self.bbox_regression_weights,
            self.proposals.boxes
        )
        return decoded
    @memoized_method
    def decoded_output_boxes_class_agnostic(self):
        """ Returns: Nx4 """
        assert self._bbox_class_agnostic
        box_logits = tf.reshape(self.box_logits, [-1, 4])
        decoded = decode_bbox_target(
            box_logits / self.bbox_regression_weights,
            self.proposals.boxes
        )
        return decoded
    @memoized_method
    def output_scores(self, name=None):
        """ Returns: N x #class scores, summed to one for each box."""
        return tf.nn.softmax(self.label_logits, name=name)
    @memoized_method
    def predicted_labels(self):
        """ Returns: N ints """
        return tf.argmax(self.label_logits, axis=1, name='predicted_labels')
Functions
fastrcnn_2fc_head
def fastrcnn_2fc_head(
    feature
)
Args: feature (any shape):
Returns: 2D head feature
View Source
@layer_register(log_shape=True)
def fastrcnn_2fc_head(feature):
    """
    Args:
        feature (any shape):
    Returns:
        2D head feature
    """
    dim = cfg.FPN.FRCNN_FC_HEAD_DIM
    init = tf.variance_scaling_initializer()
    hidden = FullyConnected('fc6', feature, dim, kernel_initializer=init, activation=tf.nn.relu)
    hidden = FullyConnected('fc7', hidden, dim, kernel_initializer=init, activation=tf.nn.relu)
    return hidden
fastrcnn_4conv1fc_gn_head
def fastrcnn_4conv1fc_gn_head(
    *args,
    **kwargs
)
View Source
def fastrcnn_4conv1fc_gn_head(*args, **kwargs):
    return fastrcnn_Xconv1fc_head(*args, num_convs=4, norm='GN', **kwargs)
fastrcnn_4conv1fc_head
def fastrcnn_4conv1fc_head(
    *args,
    **kwargs
)
View Source
def fastrcnn_4conv1fc_head(*args, **kwargs):
    return fastrcnn_Xconv1fc_head(*args, num_convs=4, **kwargs)
fastrcnn_Xconv1fc_head
def fastrcnn_Xconv1fc_head(
    feature,
    num_convs,
    norm=None
)
Args: feature (NCHW): num_classes(int): num_category + 1 num_convs (int): number of conv layers norm (str or None): either None or 'GN'
Returns: 2D head feature
View Source
@layer_register(log_shape=True)
def fastrcnn_Xconv1fc_head(feature, num_convs, norm=None):
    """
    Args:
        feature (NCHW):
        num_classes(int): num_category + 1
        num_convs (int): number of conv layers
        norm (str or None): either None or 'GN'
    Returns:
        2D head feature
    """
    assert norm in [None, 'GN'], norm
    l = feature
    with argscope(Conv2D, data_format='channels_first',
                  kernel_initializer=tf.variance_scaling_initializer(
                      scale=2.0, mode='fan_out',
                      distribution='untruncated_normal' if get_tf_version_tuple() >= (1, 12) else 'normal')):
        for k in range(num_convs):
            l = Conv2D('conv{}'.format(k), l, cfg.FPN.FRCNN_CONV_HEAD_DIM, 3, activation=tf.nn.relu)
            if norm is not None:
                l = GroupNorm('gn{}'.format(k), l)
        l = FullyConnected('fc', l, cfg.FPN.FRCNN_FC_HEAD_DIM,
                           kernel_initializer=tf.variance_scaling_initializer(), activation=tf.nn.relu)
    return l
fastrcnn_losses
def fastrcnn_losses(
    labels,
    label_logits,
    fg_boxes,
    fg_box_logits
)
Args: labels: n, label_logits: nxC fg_boxes: nfgx4, encoded fg_box_logits: nfgxCx4 or nfgx1x4 if class agnostic
Returns: label_loss, box_loss
View Source
@under_name_scope()
def fastrcnn_losses(labels, label_logits, fg_boxes, fg_box_logits):
    """
    Args:
        labels: n,
        label_logits: nxC
        fg_boxes: nfgx4, encoded
        fg_box_logits: nfgxCx4 or nfgx1x4 if class agnostic
    Returns:
        label_loss, box_loss
    """
    label_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
        labels=labels, logits=label_logits)
    label_loss = tf.reduce_mean(label_loss, name='label_loss')
    fg_inds = tf.where(labels > 0)[:, 0]
    fg_labels = tf.gather(labels, fg_inds)
    num_fg = tf.size(fg_inds, out_type=tf.int64)
    empty_fg = tf.equal(num_fg, 0)
    if int(fg_box_logits.shape[1]) > 1:
        if get_tf_version_tuple() >= (1, 14):
            fg_labels = tf.expand_dims(fg_labels, axis=1)  # nfg x 1
            fg_box_logits = tf.gather(fg_box_logits, fg_labels, batch_dims=1)
        else:
            indices = tf.stack([tf.range(num_fg), fg_labels], axis=1)  # nfgx2
            fg_box_logits = tf.gather_nd(fg_box_logits, indices)
    fg_box_logits = tf.reshape(fg_box_logits, [-1, 4])  # nfg x 4
    with tf.name_scope('label_metrics'), tf.device('/cpu:0'):
        prediction = tf.argmax(label_logits, axis=1, name='label_prediction')
        correct = tf.cast(tf.equal(prediction, labels), tf.float32)  # boolean/integer gather is unavailable on GPU
        accuracy = tf.reduce_mean(correct, name='accuracy')
        fg_label_pred = tf.argmax(tf.gather(label_logits, fg_inds), axis=1)
        num_zero = tf.reduce_sum(tf.cast(tf.equal(fg_label_pred, 0), tf.int64), name='num_zero')
        false_negative = tf.where(
            empty_fg, 0., tf.cast(tf.truediv(num_zero, num_fg), tf.float32), name='false_negative')
        fg_accuracy = tf.where(
            empty_fg, 0., tf.reduce_mean(tf.gather(correct, fg_inds)), name='fg_accuracy')
    box_loss = tf.reduce_sum(tf.abs(fg_boxes - fg_box_logits))
    box_loss = tf.truediv(
        box_loss, tf.cast(tf.shape(labels)[0], tf.float32), name='box_loss')
    add_moving_summary(label_loss, box_loss, accuracy,
                       fg_accuracy, false_negative, tf.cast(num_fg, tf.float32, name='num_fg_label'))
    return [label_loss, box_loss]
fastrcnn_outputs
def fastrcnn_outputs(
    feature,
    num_categories,
    class_agnostic_regression=False
)
Args: feature (any shape): num_categories (int): class_agnostic_regression (bool): if True, regression to N x 1 x 4
Returns: cls_logits: N x num_class classification logits reg_logits: N x num_classx4 or Nx1x4 if class agnostic
View Source
@layer_register(log_shape=True)
def fastrcnn_outputs(feature, num_categories, class_agnostic_regression=False):
    """
    Args:
        feature (any shape):
        num_categories (int):
        class_agnostic_regression (bool): if True, regression to N x 1 x 4
    Returns:
        cls_logits: N x num_class classification logits
        reg_logits: N x num_classx4 or Nx1x4 if class agnostic
    """
    num_classes = num_categories + 1
    classification = FullyConnected(
        'class', feature, num_classes,
        kernel_initializer=tf.random_normal_initializer(stddev=0.01))
    num_classes_for_box = 1 if class_agnostic_regression else num_classes
    box_regression = FullyConnected(
        'box', feature, num_classes_for_box * 4,
        kernel_initializer=tf.random_normal_initializer(stddev=0.001))
    box_regression = tf.reshape(box_regression, (-1, num_classes_for_box, 4), name='output_box')
    return classification, box_regression
fastrcnn_predictions
def fastrcnn_predictions(
    boxes,
    scores
)
Generate final results from predictions of all proposals.
Args: boxes: n#classx4 floatbox in float32 scores: nx#class
Returns: boxes: Kx4 scores: K labels: K
View Source
@under_name_scope()
def fastrcnn_predictions(boxes, scores):
    """
    Generate final results from predictions of all proposals.
    Args:
        boxes: n#classx4 floatbox in float32
        scores: nx#class
    Returns:
        boxes: Kx4
        scores: K
        labels: K
    """
    assert boxes.shape[1] == scores.shape[1]
    boxes = tf.transpose(boxes, [1, 0, 2])[1:, :, :]  # #catxnx4
    scores = tf.transpose(scores[:, 1:], [1, 0])  # #catxn
    max_coord = tf.reduce_max(boxes)
    filtered_ids = tf.where(scores > cfg.TEST.RESULT_SCORE_THRESH)  # Fx2
    filtered_boxes = tf.gather_nd(boxes, filtered_ids)  # Fx4
    filtered_scores = tf.gather_nd(scores, filtered_ids)  # F,
    cls_per_box = tf.slice(filtered_ids, [0, 0], [-1, 1])
    offsets = tf.cast(cls_per_box, tf.float32) * (max_coord + 1)  # F,1
    nms_boxes = filtered_boxes + offsets
    selection = tf.image.non_max_suppression(
        nms_boxes,
        filtered_scores,
        cfg.TEST.RESULTS_PER_IM,
        cfg.TEST.FRCNN_NMS_THRESH)
    # The next lines are really dirty: it's a trick to return the scores for all classes
    final_scores = tf.gather(tf.gather(scores, filtered_ids[:, 1], axis=1), selection, axis=1)
    final_scores = tf.transpose(final_scores, name="scores")
    final_labels = tf.add(tf.gather(cls_per_box[:, 0], selection), 1, name='labels')
    final_boxes = tf.gather(filtered_boxes, selection, name='boxes')
    return final_boxes, final_scores, final_labels
proposal_metrics
def proposal_metrics(
    iou
)
Add summaries for RPN proposals.
Args: iou: nxm, #proposal x #gt
View Source
@under_name_scope()
def proposal_metrics(iou):
    """
    Add summaries for RPN proposals.
    Args:
        iou: nxm, #proposal x #gt
    """
    # find best roi for each gt, for summary only
    best_iou = tf.reduce_max(iou, axis=0)
    mean_best_iou = tf.reduce_mean(best_iou, name='best_iou_per_gt')
    summaries = [mean_best_iou]
    with tf.device('/cpu:0'):
        for th in [0.3, 0.5]:
            recall = tf.truediv(
                tf.count_nonzero(best_iou >= th),
                tf.size(best_iou, out_type=tf.int64),
                name='recall_iou{}'.format(th))
            summaries.append(recall)
    add_moving_summary(*summaries)
sample_fast_rcnn_targets
def sample_fast_rcnn_targets(
    boxes,
    gt_boxes,
    gt_labels
)
Sample some boxes from all proposals for training.
fg is guaranteed to be > 0, because ground truth boxes will be added as proposals.
Args: boxes: nx4 region proposals, floatbox gt_boxes: mx4, floatbox gt_labels: m, int32
Returns: A BoxProposals instance, with: sampled_boxes: tx4 floatbox, the rois sampled_labels: t int64 labels, in [0, #class). Positive means foreground. fg_inds_wrt_gt: #fg indices, each in range [0, m-1]. It contains the matching GT of each foreground roi.
View Source
@under_name_scope()
def sample_fast_rcnn_targets(boxes, gt_boxes, gt_labels):
    """
    Sample some boxes from all proposals for training.
    #fg is guaranteed to be > 0, because ground truth boxes will be added as proposals.
    Args:
        boxes: nx4 region proposals, floatbox
        gt_boxes: mx4, floatbox
        gt_labels: m, int32
    Returns:
        A BoxProposals instance, with:
            sampled_boxes: tx4 floatbox, the rois
            sampled_labels: t int64 labels, in [0, #class). Positive means foreground.
            fg_inds_wrt_gt: #fg indices, each in range [0, m-1].
                It contains the matching GT of each foreground roi.
    """
    iou = pairwise_iou(boxes, gt_boxes)     # nxm
    proposal_metrics(iou)
    # add ground truth as proposals as well
    boxes = tf.concat([boxes, gt_boxes], axis=0)    # (n+m) x 4
    iou = tf.concat([iou, tf.eye(tf.shape(gt_boxes)[0])], axis=0)   # (n+m) x m
    # #proposal=n+m from now on
    def sample_fg_bg(iou):
        fg_mask = tf.cond(tf.shape(iou)[1] > 0,
                          lambda: tf.reduce_max(iou, axis=1) >= cfg.FRCNN.FG_THRESH,
                          lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.bool))
        fg_inds = tf.reshape(tf.where(fg_mask), [-1])
        num_fg = tf.minimum(int(
            cfg.FRCNN.BATCH_PER_IM * cfg.FRCNN.FG_RATIO),
            tf.size(fg_inds), name='num_fg')
        fg_inds = tf.random_shuffle(fg_inds)[:num_fg]
        bg_inds = tf.reshape(tf.where(tf.logical_not(fg_mask)), [-1])
        num_bg = tf.minimum(
            cfg.FRCNN.BATCH_PER_IM - num_fg,
            tf.size(bg_inds), name='num_bg')
        bg_inds = tf.random_shuffle(bg_inds)[:num_bg]
        add_moving_summary(num_fg, num_bg)
        return fg_inds, bg_inds
    fg_inds, bg_inds = sample_fg_bg(iou)
    # fg,bg indices w.r.t proposals
    best_iou_ind = tf.cond(tf.shape(iou)[1] > 0,
                           lambda: tf.argmax(iou, axis=1),   # #proposal, each in 0~m-1
                           lambda: tf.zeros([tf.shape(iou)[0]], dtype=tf.int64))
    fg_inds_wrt_gt = tf.gather(best_iou_ind, fg_inds)   # num_fg
    all_indices = tf.concat([fg_inds, bg_inds], axis=0)   # indices w.r.t all n+m proposal boxes
    ret_boxes = tf.gather(boxes, all_indices)
    ret_labels = tf.concat(
        [tf.gather(gt_labels, fg_inds_wrt_gt),
         tf.zeros_like(bg_inds, dtype=tf.int64)], axis=0)
    # stop the gradient -- they are meant to be training targets
    return BoxProposals(
        tf.stop_gradient(ret_boxes, name='sampled_proposal_boxes'),
        tf.stop_gradient(ret_labels, name='sampled_labels'),
        tf.stop_gradient(fg_inds_wrt_gt))
Classes
BoxProposals
class BoxProposals(
    boxes,
    labels=None,
    fg_inds_wrt_gt=None
)
A structure to manage box proposals and their relations with ground truth.
Methods
fg_boxes
def fg_boxes(
    self
)
Returns: #fg x4
View Source
    @memoized_method
    def fg_boxes(self):
        """ Returns: #fg x4"""
        return tf.gather(self.boxes, self.fg_inds(), name='fg_boxes')
fg_inds
def fg_inds(
    self
)
Returns: #fg indices in [0, N-1]
View Source
    @memoized_method
    def fg_inds(self):
        """ Returns: #fg indices in [0, N-1] """
        return tf.reshape(tf.where(self.labels > 0), [-1], name='fg_inds')
fg_labels
def fg_labels(
    self
)
Returns: #fg
View Source
    @memoized_method
    def fg_labels(self):
        """ Returns: #fg"""
        return tf.gather(self.labels, self.fg_inds(), name='fg_labels')
FastRCNNHead
class FastRCNNHead(
    proposals,
    box_logits,
    label_logits,
    gt_boxes,
    bbox_regression_weights
)
A class to process & decode inputs/outputs of a fastrcnn classification+regression head.
Methods
decoded_output_boxes
def decoded_output_boxes(
    self
)
Returns: N x #class x 4
View Source
    @memoized_method
    def decoded_output_boxes(self):
        """ Returns: N x #class x 4 """
        anchors = tf.tile(tf.expand_dims(self.proposals.boxes, 1),
                          [1, self._num_classes, 1])   # N x #class x 4
        decoded_boxes = decode_bbox_target(
            self.box_logits / self.bbox_regression_weights,
            anchors
        )
        return decoded_boxes
decoded_output_boxes_class_agnostic
def decoded_output_boxes_class_agnostic(
    self
)
Returns: Nx4
View Source
    @memoized_method
    def decoded_output_boxes_class_agnostic(self):
        """ Returns: Nx4 """
        assert self._bbox_class_agnostic
        box_logits = tf.reshape(self.box_logits, [-1, 4])
        decoded = decode_bbox_target(
            box_logits / self.bbox_regression_weights,
            self.proposals.boxes
        )
        return decoded
decoded_output_boxes_for_label
def decoded_output_boxes_for_label(
    self,
    labels
)
View Source
    @memoized_method
    def decoded_output_boxes_for_label(self, labels):
        assert not self._bbox_class_agnostic
        indices = tf.stack([
            tf.range(tf.size(labels, out_type=tf.int64)),
            labels
        ])
        needed_logits = tf.gather_nd(self.box_logits, indices)
        decoded = decode_bbox_target(
            needed_logits / self.bbox_regression_weights,
            self.proposals.boxes
        )
        return decoded
decoded_output_boxes_for_predicted_label
def decoded_output_boxes_for_predicted_label(
    self
)
Returns: Nx4 decoded boxes
View Source
    @memoized_method
    def decoded_output_boxes_for_predicted_label(self):
        """ Returns: Nx4 decoded boxes """
        return self._decoded_output_boxes_for_label(self.predicted_labels())
decoded_output_boxes_for_true_label
def decoded_output_boxes_for_true_label(
    self
)
Returns: Nx4 decoded boxes
View Source
    @memoized_method
    def decoded_output_boxes_for_true_label(self):
        """ Returns: Nx4 decoded boxes """
        return self._decoded_output_boxes_for_label(self.proposals.labels)
fg_box_logits
def fg_box_logits(
    self
)
Returns: #fg x ? x 4
View Source
    @memoized_method
    def fg_box_logits(self):
        """ Returns: #fg x ? x 4 """
        return tf.gather(self.box_logits, self.proposals.fg_inds(), name='fg_box_logits')
losses
def losses(
    self
)
View Source
    @memoized_method
    def losses(self):
        encoded_fg_gt_boxes = encode_bbox_target(
            tf.gather(self.gt_boxes, self.proposals.fg_inds_wrt_gt),
            self.proposals.fg_boxes()) * self.bbox_regression_weights
        return fastrcnn_losses(
            self.proposals.labels, self.label_logits,
            encoded_fg_gt_boxes, self.fg_box_logits()
        )
output_scores
def output_scores(
    self,
    name=None
)
Returns: N x #class scores, summed to one for each box.
View Source
    @memoized_method
    def output_scores(self, name=None):
        """ Returns: N x #class scores, summed to one for each box."""
        return tf.nn.softmax(self.label_logits, name=name)
predicted_labels
def predicted_labels(
    self
)
Returns: N ints
View Source
    @memoized_method
    def predicted_labels(self):
        """ Returns: N ints """
        return tf.argmax(self.label_logits, axis=1, name='predicted_labels')