Now Loading...

Now Loading...

はじめに

今回の大きなテーマは「CNNを用いた物体検出」です。CNNを用いた物体検出手法のうち、最も早く登場したR-CNNがありますが、今回では、その発展系とFast R-CNNを紹介していきます。

Fast R-CNN

Fast R-CNNは2015年に発表されたR-CNNより高速な物体検出手法です。Fast R-CNNはR-CNNのどのような点が改善されたのか見ていきましょう。

*R-CNN の仕組みを解説した記事もあります↓

アーキテクチャ

Fast R-CNNのR-CNNからの最も大きな変更点はアーキテクチャにあります。ここでR-CNNのアーキテクチャを振り返ってみましょう。R-CNNは以下の三つの部分で構成されています。

  • Region Proposalの抽出
  • CNNによる提案領域の特徴計算
  • 提案領域の分類

まず一つ目がRegion Proposalの抽出です。ここでは、ニューラルネットワークではない外部のアルゴリズムを使用して画像内の物体が写っていそうな領域を抽出します。そして二つ目が、CNNによって提案領域の特徴を計算します。そして最後に、その特徴をもとに領域内のクラス分類を行います。

R-CNNのアーキテクチャ

これに対してFast R-CNNのアーキテクチャは次にようになっています。

  • 提案領域の抽出
  • 画像全体の特徴マップの計算
  • 提案領域に対応する特徴マップのプーリング(ROIプーリング)
  • プーリングされた特徴に対するクラスの分類とバウンディングボックスのオフセットの回帰

まず、提案領域を抽出する部分はR-CNNと全く同じです。最も大きく異なるのが、次のCNNによる特徴計算の部分です。R-CNNでは、各提案領域ごとにCNNによって特徴を計算していました。これに対して、Fast R-CNNでは画像全体に対して一回だけ特徴を求める計算を行います。この変更によってFast R-CNNでは大幅に計算量を削減できました。

Pythonの疑似コードを見てみるとその差が一目瞭然です。

# R-CNNの場合
for 提案領域の画像パッチ in 一枚の画像:
    特徴ベクトル = CNN(提案領域の画像パッチ)
    予測クラス = FC層(特徴ベクトル)
# Fast R-CNNの場合
特徴マップ = CNN(一枚の画像)
for 提案領域 in 画像の全ての提案領域:
    特徴ベクトル = ROIプーリング(特徴マップ, 提案領域)
    予測クラス, バウンディングボックスのオフセット = FC層(特徴ベクトル)

R-CNNでは画像をCNNで計算するループが提案領域の数だけ回っているのがわかります。これに対して、Fast R-CNNではCNNに関する計算は一回だけです。この違いが速度に聞いてくるというわけです。

Fast R-CNNのアーキテクチャ

損失関数

R-CNNでは基本的には候補領域の物体のクラスを分類する損失関数のみが用いられており、さらに検出結果を改善するためには、バウンディングボックスのオフセット(真のバウンディングボックスとのずれ)を予測する後段のタスクを解くことを行う必要がありました。つまり、分類とバウンディングボックスの予測を二つのステージに分けて別々の損失関数で行う必要がありました。

これに対し、Fast R-CNNでは候補領域のクラス分類とバウンディングボックスのオフセットの予測を同時に行います。つまり、マルチタスク損失として一つの損失関数のみで学習を行なっていきます。Fast R-CNNでは以下のように損失関数を定義しています。

ここではマルチタスク損失全体を表しており、は物体のクラスを表しています。そして、は目的のRoIの正解クラスを表しています。は正解クラスの正解オフセットを、は予測したオフセットを表しています。クラス分類の関する損失であるとバウンディングボックスのオフセットに関する損失のを係数によってバランスをとることでマルチタスク損失としています。

それでは、まずはクラス分類に関する損失であるから中身を見ていきましょう。

これは正解クラスの確率が1より小さいほど大きくなるような関数です。したがって、クラス分類の損失関数として機能するのが確認できます。

次に、バウンディングボックスのオフセットに関する損失であるを見ていきます。

ここで、smooth L1は次のように定義されます。

Smooth L1 Lossは、ディープラーニングにおいて主に物体検出や回帰問題において用いられる誤差関数の一つです。二つの値の差の絶対値を用いる通常のL1 Lossに比べ、値の差が小さい場合には損失をより滑らかに計算することができます。Smooth L1 Lossは、値の差が小さい場合にはL2 Lossに近い挙動を示し、値の差が大きい場合にはL1 Lossに近い挙動を示します。そのため、モデルの学習がより安定することが期待されています。

Fast R-CNNの実装

データセットの準備

まずはモデルの学習に使用するデータセットを作成します。今回使用するのはR-CNNの時と同様にPSCAL VOC 2007というデータセットを使用します。なので、まずはPASCASL VOC 2007をダウンロードしてきてください。PASCSL VOCでは画像に対して、その真のバウンディングボックスとそのクラスがxml形式で与えられています。今回はR-CNNの時と同様に、車のクラスのみに対してモデルを訓練します。なので、まずはPASCAL VOCから車のみが写っている画像を抽出します。以下のコードがPASCAL VOCから車の画像のみを抽出するコードです。

import os
import shutil
import random
import numpy as np


def check_dir(data_dir):
    if not os.path.exists(data_dir):
        os.mkdir(data_dir)


suffix_xml = ".xml"
suffix_jpeg = ".jpg"

car_train_path = "./data/VOCdevkit/VOC2007/ImageSets/Main/car_train.txt"
car_val_path = "./data/VOCdevkit/VOC2007/ImageSets/Main/car_val.txt"

voc_annotation_dir = "./data/VOCdevkit/VOC2007/Annotations/"
voc_jpeg_dir = "./data/VOCdevkit/VOC2007/JPEGImages/"

car_root_dir = "./data/voc_car/"


def parse_train_val(data_path):
    """
    指定したカテゴリの画像を抽出する
    """
    samples = []

    with open(data_path, "r") as file:
        lines = file.readlines()
        for line in lines:
            res = line.strip().split(" ")
            if len(res) == 3 and int(res[2]) == 1:
                samples.append(res[0])

    return np.array(samples)


def sample_train_val(samples):
    """
    ランダムに1/10のデータを抽出
    """
    for name in ["train", "val"]:
        dataset = samples[name]
        length = len(dataset)

        random_samples = random.sample(range(length), int(length / 10))
        # print(random_samples)
        new_dataset = dataset[random_samples]
        samples[name] = new_dataset

    return samples


def save_car(car_samples, data_root_dir, data_annotation_dir, data_jpeg_dir):
    """
    目的のディレクトリに保存する
    """
    for sample_name in car_samples:
        src_annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
        dst_annotation_path = os.path.join(
            data_annotation_dir, sample_name + suffix_xml
        )
        shutil.copyfile(src_annotation_path, dst_annotation_path)

        src_jpeg_path = os.path.join(voc_jpeg_dir, sample_name + suffix_jpeg)
        dst_jpeg_path = os.path.join(data_jpeg_dir, sample_name + suffix_jpeg)
        shutil.copyfile(src_jpeg_path, dst_jpeg_path)

    csv_path = os.path.join(data_root_dir, "car.csv")
    np.savetxt(csv_path, np.array(car_samples), fmt="%s")


if __name__ == "__main__":
    samples = {
        "train": parse_train_val(car_train_path),
        "val": parse_train_val(car_val_path),
    }
    print(samples)
    # samples = sample_train_val(samples)
    # print(samples)

    check_dir(car_root_dir)
    for name in ["train", "val"]:
        data_root_dir = os.path.join(car_root_dir, name)
        data_annotation_dir = os.path.join(data_root_dir, "Annotations")
        data_jpeg_dir = os.path.join(data_root_dir, "JPEGImages")

        check_dir(data_root_dir)
        check_dir(data_annotation_dir)
        check_dir(data_jpeg_dir)
        save_car(samples[name], data_root_dir, data_annotation_dir, data_jpeg_dir)

    print("done")

次に、あらかじめデータセットないの各画像に対してSelective Searchを用いて領域提案を行い、候補領域を保存しておきます。以下のコードは車だけのデータセットに対してSelective Searchを行い、候補領域と、その領域に対するラベルをアノテーションするコードです。この時、候補領域のバウンディングボックスと真のバウンディングボックスとのIoUを計算し、ある閾値以上の候補領域を正(車のクラスラベルを付与する)、それ以下の候補領域を負(背景クラスのラベルを付与する)のバウンディングボックスとしています。IoUに関してはR-CNNの記事(リンク埋め込む)に詳しい説明がございますので、ぜひ参考にしてみてください。

import time
import shutil
import numpy as np
import cv2
import os
import sys
import xmltodict
import torch
import matplotlib.pyplot as plt


# train
# positive num: 66517
# negatie num: 464340
# val
# positive num: 64712
# negative num: 415134


def check_dir(data_dir):
    if not os.path.exists(data_dir):
        os.mkdir(data_dir)


def parse_car_csv(csv_dir):
    csv_path = os.path.join(csv_dir, "car.csv")
    samples = np.loadtxt(csv_path, dtype="unicode")
    return samples


def parse_xml(xml_path):
    """
    アノテーションのバウンディングボックスの座標を返すためにxmlファイルをパースする
    """
    with open(xml_path, "rb") as f:
        xml_dict = xmltodict.parse(f)

        bndboxs = list()
        objects = xml_dict["annotation"]["object"]
        if isinstance(objects, list):
            for obj in objects:
                obj_name = obj["name"]
                difficult = int(obj["difficult"])
                if "car".__eq__(obj_name) and difficult != 1:
                    bndbox = obj["bndbox"]
                    bndboxs.append(
                        (
                            int(bndbox["xmin"]),
                            int(bndbox["ymin"]),
                            int(bndbox["xmax"]),
                            int(bndbox["ymax"]),
                        )
                    )
        elif isinstance(objects, dict):
            obj_name = objects["name"]
            difficult = int(objects["difficult"])
            if "car".__eq__(obj_name) and difficult != 1:
                bndbox = objects["bndbox"]
                bndboxs.append(
                    (
                        int(bndbox["xmin"]),
                        int(bndbox["ymin"]),
                        int(bndbox["xmax"]),
                        int(bndbox["ymax"]),
                    )
                )
        else:
            pass

        return np.array(bndboxs)


def iou(pred_box, target_box):
    """
    候補となる提案とラベル付きバウンディングボックスのIoUを計算する
    :param pred_box: size [4].
    :param target_box: size [N, 4] :return: [N].
    :return: [N]
    """
    if len(target_box.shape) == 1:
        target_box = target_box[np.newaxis, :]

    xA = np.maximum(pred_box[0], target_box[:, 0])
    yA = np.maximum(pred_box[1], target_box[:, 1])
    xB = np.minimum(pred_box[2], target_box[:, 2])
    yB = np.minimum(pred_box[3], target_box[:, 3])

    intersection = np.maximum(0.0, xB - xA) * np.maximum(0.0, yB - yA)

    boxAArea = (pred_box[2] - pred_box[0]) * (pred_box[3] - pred_box[1])
    boxBArea = (target_box[:, 2] - target_box[:, 0]) * (
        target_box[:, 3] - target_box[:, 1]
    )

    scores = intersection / (boxAArea + boxBArea - intersection)
    return scores


def compute_ious(rects, bndboxs):
    iou_list = list()
    for rect in rects:
        scores = iou(rect, bndboxs)
        iou_list.append(max(scores))
    return iou_list


def save_model(model, model_save_path):
    check_dir("./models")
    torch.save(model.state_dict(), model_save_path)


def plot_loss(loss_list):
    x = list(range(len(loss_list)))
    fg = plt.figure()

    plt.plot(x, loss_list)
    plt.title("loss")
    plt.savefig("./loss.png")


def get_selective_search():
    gs = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()
    return gs


def config(gs, img, strategy="q"):
    gs.setBaseImage(img)

    if strategy == "s":
        gs.switchToSingleStrategy()
    elif strategy == "f":
        gs.switchToSelectiveSearchFast()
    elif strategy == "q":
        gs.switchToSelectiveSearchQuality()
    else:
        print(__doc__)
        sys.exit(1)


def get_rects(gs):
    rects = gs.process()
    rects[:, 2] += rects[:, 0]
    rects[:, 3] += rects[:, 1]

    return rects


def parse_annotation_jpeg(annotation_path, jpeg_path, gs):
    """
    正と負のサンプルを取得する(注:属性difficultがTrueのラベル付きバウンディングボックスは無視する)
    正のサンプル:候補とラベル付きバウンディングボックスの間のIoUが0.5以上のもの
    負のサンプル:IoUが0以上0.5未満。さらに負のサンプルの数を制限するために、サイズはラベルボックスの1/5より大きくなければなりません。
    """
    img = cv2.imread(jpeg_path)

    config(gs, img, strategy="q")
    # 候補領域の算出
    rects = get_rects(gs)
    # ラベルのバウンディングボックスを取得する
    bndboxs = parse_xml(annotation_path)

    # ラベルのバウンディングボックスの最大サイズを取得する
    maximum_bndbox_size = 0
    for bndbox in bndboxs:
        xmin, ymin, xmax, ymax = bndbox
        bndbox_size = (ymax - ymin) * (xmax - xmin)
        if bndbox_size > maximum_bndbox_size:
            maximum_bndbox_size = bndbox_size

    # 候補の提案とラベル付きバウンディングボックスのIoUを取得する
    iou_list = compute_ious(rects, bndboxs)

    positive_list = list()
    negative_list = list()
    for i in range(len(iou_list)):
        xmin, ymin, xmax, ymax = rects[i]
        rect_size = (ymax - ymin) * (xmax - xmin)

        iou_score = iou_list[i]
        if iou_list[i] >= 0.5:

            positive_list.append(rects[i])
        if 0 < iou_list[i] < 0.5 and rect_size > maximum_bndbox_size / 5.0:

            negative_list.append(rects[i])
        else:
            pass

    return positive_list, negative_list


if __name__ == "__main__":
    car_root_dir = "./data/voc_car/"
    finetune_root_dir = "./data/finetune_car/"
    check_dir(finetune_root_dir)

    gs = get_selective_search()
    for name in ["train", "val"]:
        src_root_dir = os.path.join(car_root_dir, name)
        src_annotation_dir = os.path.join(src_root_dir, "Annotations")
        src_jpeg_dir = os.path.join(src_root_dir, "JPEGImages")

        dst_root_dir = os.path.join(finetune_root_dir, name)
        dst_annotation_dir = os.path.join(dst_root_dir, "Annotations")
        dst_jpeg_dir = os.path.join(dst_root_dir, "JPEGImages")
        check_dir(dst_root_dir)
        check_dir(dst_annotation_dir)
        check_dir(dst_jpeg_dir)

        total_num_positive = 0
        total_num_negative = 0

        samples = parse_car_csv(src_root_dir)

        src_csv_path = os.path.join(src_root_dir, "car.csv")
        dst_csv_path = os.path.join(dst_root_dir, "car.csv")
        shutil.copyfile(src_csv_path, dst_csv_path)
        for sample_name in samples:
            since = time.time()

            src_annotation_path = os.path.join(src_annotation_dir, sample_name + ".xml")
            src_jpeg_path = os.path.join(src_jpeg_dir, sample_name + ".jpg")

            positive_list, negative_list = parse_annotation_jpeg(
                src_annotation_path, src_jpeg_path, gs
            )
            total_num_positive += len(positive_list)
            total_num_negative += len(negative_list)

            dst_annotation_positive_path = os.path.join(
                dst_annotation_dir, sample_name + "_1" + ".csv"
            )
            dst_annotation_negative_path = os.path.join(
                dst_annotation_dir, sample_name + "_0" + ".csv"
            )
            dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + ".jpg")

            shutil.copyfile(src_jpeg_path, dst_jpeg_path)

            np.savetxt(
                dst_annotation_positive_path,
                np.array(positive_list),
                fmt="%d",
                delimiter=" ",
            )
            np.savetxt(
                dst_annotation_negative_path,
                np.array(negative_list),
                fmt="%d",
                delimiter=" ",
            )

            time_elapsed = time.time() - since
            print(
                "parse {}.png in {:.0f}m {:.0f}s".format(
                    sample_name, time_elapsed // 60, time_elapsed % 60
                )
            )
        print("%s positive num: %d" % (name, total_num_positive))
        print("%s negative num: %d" % (name, total_num_negative))
    print("done")

以上で学習で使用するデータセットの準備は完了です。

データセットクラスの作成

次は、PyTorchを用いてモデルを学習するための自作データセットクラスを作成します。PyTorchでは一般的にDatasetクラスとDataloaderクラスをセットで使用することで学習中にミニバッチを取り出します。Fast R-CNNでは画像1枚に対して64個の候補領域をモデルに渡します。論文では一つのミニバッチの画像枚数が2枚となっているため、今回も一つのミニバッチには画像2枚と128個の候補領域が含まれるようにDatasetクラスを書いていきます。なお、論文中では64個の候補領域のうち、背景と物体の割合が4:1になるようにミニバッチを作っていますが、今回のクラス数が極端に少ない場合では背景にか学習を起こしてしまったため、背景と物体の割合を1:1にしています。以下が自作Datasetクラスのコードです。PyTorchのDatasetクラスの詳しい説明については割愛しますが、興味がある方は公式のドキュメントhttps://pytorch.org/tutorials/beginner/basics/data_tutorial.htmlを参考にしてみてください。

import random
import cv2
import os
import numpy as np
from torch.utils.data import Dataset
import torchvision.transforms as transforms


def iou(pred_box, target_box):
    """
    候補となる提案とラベル付きバウンディングボックスのIoUを計算する
    :param pred_box: size [4].
    :param target_box: size [N, 4] :return: [N].
    :return: [N]
    """
    if len(target_box.shape) == 1:
        target_box = target_box[np.newaxis, :]

    xA = np.maximum(pred_box[0], target_box[:, 0])
    yA = np.maximum(pred_box[1], target_box[:, 1])
    xB = np.minimum(pred_box[2], target_box[:, 2])
    yB = np.minimum(pred_box[3], target_box[:, 3])

    intersection = np.maximum(0.0, xB - xA) * np.maximum(0.0, yB - yA)

    boxAArea = (pred_box[2] - pred_box[0]) * (pred_box[3] - pred_box[1])
    boxBArea = (target_box[:, 2] - target_box[:, 0]) * (
        target_box[:, 3] - target_box[:, 1]
    )

    scores = intersection / (boxAArea + boxBArea - intersection)
    return scores


def parse_car_csv(csv_dir):
    csv_path = os.path.join(csv_dir, "car.csv")
    samples = np.loadtxt(csv_path, dtype="unicode")
    return samples


class CustomFinetuneDataset(Dataset):
    def __init__(self, root_dir, transform):
        """
        positive_anotations: [x_min, y_min, x_max, y_max, t_x, t_y, t_w, t_h]
        最初の4つは正規化された提案領域の座標。次の正解とのオフセット
        negative_annotations: [x_min, y_min, x_max, y_max]
        """
        self.transform = transform

        samples = parse_car_csv(root_dir)

        jpeg_images = list()
        annotation_dict = dict()

        for idx in range(len(samples)):
            sample_name = samples[idx]
            img = cv2.imread(os.path.join(root_dir, "JPEGImages", sample_name + ".jpg"))
            h, w = img.shape[:2]
            jpeg_images.append(img)

            bndbox_path = os.path.join(root_dir, "bndboxs", sample_name + ".csv")
            bndboxes = np.loadtxt(bndbox_path, dtype="float32", delimiter=" ")

            positive_annotation_path = os.path.join(
                root_dir, "Annotations", sample_name + "_1.csv"
            )
            positive_annotations = np.loadtxt(
                positive_annotation_path, dtype="float32", delimiter=" "
            )

            offsets = list()
            if len(positive_annotations.shape) == 1:
                positive_annotations = positive_annotations[np.newaxis, :]
                gt_bbox = self.get_bndbox(bndboxes, positive_annotations[0])
                # オフセットを計算する
                x_min, y_min, x_max, y_max = positive_annotations[0][:4]
                p_w = x_max - x_min
                p_h = y_max - y_min
                p_x = x_min + p_w / 2
                p_y = y_min + p_h / 2

                x_min, y_min, x_max, y_max = gt_bbox
                g_w = x_max - x_min
                g_h = y_max - y_min
                g_x = x_min + g_w / 2
                g_y = y_min + g_h / 2

                t_x = (g_x - p_x) / p_w
                t_y = (g_y - p_y) / p_h
                t_w = np.log(g_w / p_w)
                t_h = np.log(g_h / p_h)

                positive_annotations[0][0] /= w
                positive_annotations[0][1] /= h
                positive_annotations[0][2] /= w
                positive_annotations[0][3] /= h

                offsets.append(np.array([t_x, t_y, t_w, t_h]))

            else:
                for i in range(len(positive_annotations)):
                    gt_bbox = self.get_bndbox(bndboxes, positive_annotations[i])
                    # オフセットを計算する
                    x_min, y_min, x_max, y_max = positive_annotations[i][:4]
                    p_w = x_max - x_min
                    p_h = y_max - y_min
                    p_x = x_min + p_w / 2
                    p_y = y_min + p_h / 2

                    x_min, y_min, x_max, y_max = gt_bbox
                    g_w = x_max - x_min
                    g_h = y_max - y_min
                    g_x = x_min + g_w / 2
                    g_y = y_min + g_h / 2

                    t_x = (g_x - p_x) / p_w
                    t_y = (g_y - p_y) / p_h
                    t_w = np.log(g_w / p_w)
                    t_h = np.log(g_h / p_h)

                    positive_annotations[i][0] /= w
                    positive_annotations[i][1] /= h
                    positive_annotations[i][2] /= w
                    positive_annotations[i][3] /= h

                    offsets.append(np.array([t_x, t_y, t_w, t_h]))

            negative_annotation_path = os.path.join(
                root_dir, "Annotations", sample_name + "_0.csv"
            )
            negative_annotations = np.loadtxt(
                negative_annotation_path, dtype="float32", delimiter=" "
            )
            negative_annotations[:, 0] /= w
            negative_annotations[:, 1] /= h
            negative_annotations[:, 2] /= w
            negative_annotations[:, 3] /= h
            # positive_annotationsとoffsetsを結合
            offsets = np.array(offsets).reshape(-1, 4)
            positive_annotations = np.concatenate(
                (positive_annotations, offsets), axis=1
            )

            annotation_dict[str(idx)] = {
                "positive": positive_annotations,
                "negative": negative_annotations,
            }

        self.jpeg_images = jpeg_images
        self.annotation_dict = annotation_dict

    def __getitem__(self, index: int):
        """
        positiveな領域に関してはboxの座標とオフセットを返す
        negativeな領域に関してはboxの座標のみを返す
        :param index:
        :return:
        """
        assert index < len(self.jpeg_images), "現在のデータセットの合計数: %d、入力インデックス: %d" % (
            len(self.jpeg_images),
            index,
        )

        image = self.jpeg_images[index]
        annotation_dict = self.annotation_dict[str(index)]
        positive_annotations = annotation_dict["positive"]
        negative_annotations = annotation_dict["negative"]

        positive_num = 32
        negative_num = 32

        if len(positive_annotations) < positive_num:
            positive_num = len(positive_annotations)
            negative_num = 64 - positive_num

            positive_array = positive_annotations
        else:
            positive_array = positive_annotations[
                random.sample(range(positive_annotations.shape[0]), positive_num)
            ]
        positive_bbox = positive_array[:, :4]
        # zero array [negative_num, 4]. this is dummy data
        negative_offset = np.zeros((negative_num, 4))
        offset = positive_array[:, 4:]
        # concat negative offset and offset
        offset = np.concatenate((offset, negative_offset), axis=0)

        negative_array = negative_annotations[
            random.sample(range(negative_annotations.shape[0]), negative_num)
        ]

        # rect_array = np.vstack((positive_array, negative_array))
        rect_array = np.vstack((positive_bbox, negative_array))
        # targets = np.hstack((np.ones(positive_num), np.zeros(negative_num)))
        # make one-hot vector
        targets = np.zeros((positive_num + negative_num, 2))
        targets[:positive_num, 0] = 1
        targets[positive_num:, 1] = 1

        if self.transform:
            image = self.transform(image)

        return image, targets, rect_array, offset

    def __len__(self) -> int:
        return len(self.jpeg_images)

    def get_bndbox(self, bndboxes, positive):
        """
        入力されたpositiveなboxに対して、最もIoUが高いgt boxを返す
        :param bndboxes: [n, 4]
        :param positive: [4]
        :return: [4]
        """

        if len(bndboxes.shape) == 1:
            return bndboxes
        else:
            scores = iou(positive, bndboxes)
            return bndboxes[np.argmax(scores)]

損失関数

損失関数はPyTorchで用意されている損失関数を組み合わせることで実装しています。以下のコードがマルチタスク損失の実装です。ここで論文とクラス分類の損失の部分が異なりますが、実装の簡略化のためクロスエントロピー損失を用いています。

import torch.nn as nn


class MultiTaskLoss(nn.Module):
    def __init__(self, lam=1):
        super(MultiTaskLoss, self).__init__()
        self.lam = lam
        self.cls = nn.CrossEntropyLoss(reduction="mean")
        self.loc = nn.SmoothL1Loss(reduction="mean")

    def forward(self, scores, preds, targets, targets_loc):
        """
        :param scores: softmax関数を通過した後の分類結果 [batch size * roi num, class num]
        :param preds: 予測されたbboxのオフセット [batch size * roi num, 4]
        :param targets: 正解クラスのラベル [batch size * roi num, class num]
        :param targets_loc: 正解bboxのオフセット [batch size * roi num, 4]
        """
        cls_loss = self.cls(scores, targets)
        loc_loss = self.loc(preds, targets_loc)
        return cls_loss + self.lam * loc_loss

ネットワーク

下のコードはネットワーク部分の実装です。論文中ではCNNとしてVGG16というモデルを使用してますので、今回もVGG16を使用します。まず、PyTorchが用意してあるImageNetで事前学習済みVGG16をロードします。次に、あらかじめ用意した事前学習済みモデルがあるならば、それをロードします。今回は車のクラスのみを使用しますので、Fast R-CNNの学習時には、車の画像のみでfine-tuneしたVGG16モデルを使用します。そして、このVGG16をFast R-CNN仕様に変更していきます。まず、変更すべき所ははCNNの特徴マップを取り出す部分です。VGG16の特徴マップがmax-poolingされて7×7になる寸前の層までを特徴抽出器self.featuresとして使用します。そして、特徴抽出後のRoIプーリングはPyTorchで用意されている関数を使用します。詳しい使い方は公式ドキュメントを参照してくださいhttps://pytorch.org/vision/main/generated/torchvision.ops.roi_pool.html。RoIプーリングを行った後は、各RoIの7×7になった特徴を一つのベクトルにします。そして、二層の全結合層に通した後、分類結果とバウンディングボックスのオフセットを求めています。

import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.ops import roi_pool


class VGG16_RoI(nn.Module):
    def __init__(
        self,
        num_classes=1000,
        device=torch.device("cuda:0"),
        pretrained_model_path=None,
    ):
        """
        :param num_classes: 类别数,不包括背景类别
        :param init_weights:
        """
        super(VGG16_RoI, self).__init__()
        # load pretrained vgg16
        model = models.vgg16(pretrained=True)
        num_features = model.classifier[6].in_features
        model.classifier[6] = nn.Linear(num_features, 2)
        # load pretrained weight
        if pretrained_model_path is not None:
            model.load_state_dict(
                torch.load(pretrained_model_path, map_location=device)
            )
        model.load_state_dict(
            torch.load(
                "/home/zaima/zero2one/models/vgg16_car_finetuned.pth",
                map_location=device,
            )
        )
        # 512 * 28 * 28の特徴マップを取り出す
        self.features = nn.Sequential(*list(model.features.children())[:23])

        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
        )
        self.softmax = nn.Sequential(
            nn.Linear(4096, num_classes + 1),
            nn.Softmax(dim=1),
        )

        self.bbox = nn.Sequential(
            nn.Linear(4096, num_classes * 4),
            nn.ReLU(True),
        )

    def forward(self, x, rois):
        x = self.features(x)
        rois = list(rois)
        x = roi_pool(x, rois, (7, 7), spatial_scale=x.shape[2])
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        class_score = self.softmax(x)
        bbox_regression = self.bbox(x)
        return class_score, bbox_regression

Fast R-CNNの学習

それでは実際にモデルを学習していきます。モデルの学習には時間がかかりますので、学習用のコードを記載するのみとします。おおまかに次のような流れで処理を行っています。

  • データとモデルの準備
    • オリジナルのDatasetクラスを使って学習データとテストデータを用意する。
    • 作成したネットワークを準備。この際に事前学習済みモデルをロードする
  • 学習部分
    • Dataloaderから訓練データの、画像・RoIのクラス・RoI・正解のバウンディングボックスとのオフセットを取り出し、画像とRoIをモデルの渡す。
    • モデルから返ってきたクラスとバウンディングボックスのオフセットと、正解のクラスとバウンディングボックスのオフセットを損失関数に通す。
  • 最もvalidationデータでの精度が高いモデルを保存する
import os
import copy
import time
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

# 今回は他のファイルに記述したネットワークとデータセットと損失関数などをロードしてきています
# 必要に応じてこれらのクラスや関数を配置してください
from models.vgg16_roi import VGG16_RoI
from models.multi_task_loss import MultiTaskLoss
from utils.data.custom_finetune_dataset import CustomFinetuneDataset
from utils.util import check_dir


def load_data(data_root_dir, batch_size=128):
    transform = transforms.Compose(
        [
            transforms.ToPILImage(),
            transforms.Resize((227, 227)),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    )

    data_loaders = {}
    data_sizes = {}
    for name in ["train", "val"]:
        data_dir = os.path.join(data_root_dir, name)
        data_set = CustomFinetuneDataset(data_dir, transform=transform)
        data_loader = DataLoader(
            dataset=data_set,
            batch_size=batch_size,
            shuffle=True,
            num_workers=8,
            drop_last=True,
        )

        data_loaders[name] = data_loader
        data_sizes[name] = len(data_set)

    return data_loaders, data_sizes


def train_model(
    data_loaders, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None
):
    since = time.time()

    best_model_weights = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print("Epoch {}/{}".format(epoch, num_epochs - 1))
        print("-" * 10)

        # Each epoch has a training and validation phase
        for phase in ["train", "val"]:
            if phase == "train":
                model.train()  # Set model to training mode
            else:
                model.eval()  # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            for input_img, target_cls, rois, target_offsets in data_loaders[phase]:
                positive_num_per_batch = (
                    torch.sum(target_cls.reshape(-1, 2)[:, 0] == 1) / batch_size
                ).item()
                positive_num_per_batch = int(positive_num_per_batch)
                target_offsets = target_offsets[:, :positive_num_per_batch, :]
                input_img = input_img.to(device)
                target_cls = target_cls.to(device)
                rois = rois.to(device).float()
                target_offsets = target_offsets.to(device).float()

                optimizer.zero_grad()

                # forward
                with torch.set_grad_enabled(phase == "train"):
                    outputs_cls, outputs_offsets = model(input_img, rois)

                    loss = criterion(
                        outputs_cls,
                        outputs_offsets.reshape(batch_size, -1, 4)[
                            :, :positive_num_per_batch, :
                        ],
                        target_cls.reshape(-1, 2),
                        target_offsets,
                    )
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * target_cls.size(0)
                outputs_cls = torch.argmax(outputs_cls, dim=1)
                running_corrects += torch.sum(
                    outputs_cls == torch.argmax(target_cls.reshape(-1, 2), dim=1)
                )
            if phase == "train":
                lr_scheduler.step()

            epoch_loss = running_loss / data_sizes[phase]
            epoch_acc = running_corrects / data_sizes[phase]

            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == "val" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_weights = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(
        "Training complete in {:.0f}m {:.0f}s".format(
            time_elapsed // 60, time_elapsed % 60
        )
    )
    print("Best val Acc: {:4f}".format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_weights)
    return model


if __name__ == "__main__":
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    # バッチサイズ(画像枚数)
    batch_size = 2
    # エポック数
    num_epochs = 30
    # バウンディングボックスの損失の重み
    lam = 1

    data_loaders, data_sizes = load_data("./data/finetune_car", batch_size)

    model = VGG16_RoI(num_classes=1)

    model = model.to(device)
    print(model)

    criterion = MultiTaskLoss(lam=lam)
    optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
    lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    best_model = train_model(
        data_loaders,
        model,
        criterion,
        optimizer,
        lr_scheduler,
        device=device,
        num_epochs=num_epochs,
    )

    check_dir("./models")
    torch.save(best_model.state_dict(), "./models/fast_rcnn.pth")

推論

それでは、学習済みのモデルを使用してFast R-CNNの推論をしてみます。R-CNNと比べると、推論速度が高速になっています。

 

さいごに

今回はCNNによる物体検出シリーズの第2弾としてFast R-CNNの解説と実装を行いました。実装部分は正確な論文の設定とは離れている部分もあるため、完全な論文の手法の再現というわけではないですが、Fast R-CNNの原理を実装レベルから理解するための手助けとなっていれば幸いです。

\ シェア /

E資格スピードパッケージ

E資格スピードパッケージ2023#2修了者合格率100%達成

zero to one E資格 jdla

zero to oneの「E資格」向け認定プログラム

日本ディープラーニング協会の実施するE資格の受験ならzero to oneの「E資格」向け認定プログラム (税込165,000円) をおすすめします。当講座は、東京大学大学院工学系研究科の松尾豊教授と東北大学大学院情報科学研究科の岡谷貴之教授が監修する実践的なプログラムとなっています。
厚生労働省の教育訓練給付制度対象のE資格認定プログラムの中では最安値※となり、実質負担額49,500円~(支給割合70%の場合)で受講可能です。※2023年弊社調べ zero to one E資格 jdla

人工知能基礎講座を提供中

人工知能の第一人者である東京大学の松尾豊教授が監修した人工知能基礎講座を受講してみませんか? 人工知能の歴史から自然言語処理、機械学習、深層学習といった最先端のトピックやAIに関わる法律問題まで網羅しているので全てのビジネスパーソン・AIの初学者におすすめです。

サンプル動画

人工知能基礎講座はこちら↓ zero to one G検定 人工知能基礎 jdla

AI初学者・ビジネスパーソン向けのG検定対策講座

G検定受験前にトレーニングしたい方向けの問題集「G検定実践トレーニング」も提供中です。 zero to one E資格 jdla