PyTorchでCIFAR10を画像分類する【VGG】ソースコード付き

Python

PyTorchでカラー画像のデータセット(CIFAR10)を分類するソースコードです。

CIFAR10は学習データ5万枚、テストデータ1万枚の合計6万枚の色画像で構成されています。クラスは10クラスあります。

モデルは畳み込み層6層、全結合層3層からなるVGG9を使用しました。

まずはmain関数があるmain.pyファイルを作成します。最初はCIFAR10データをダウンロードするためdownload=Trueとしましたが、2回目からはdownload=Falseとしてください。

# -*- coding: utf-8 -*-

import time
import random
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from matplotlib.ticker import ScalarFormatter
import numpy as np
import torch.utils.data as D
import torch.optim as optim
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets,transforms
import argparse
import sys
import model as M
import os

def ParseArgs():
    parser = argparse.ArgumentParser(description='pytorch CIFAR10 vgg9')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size , default =64')
    parser.add_argument('--test-batch-size', type=int, default=64, metavar='N',help='input batch size for testing default=64')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.9, metavar='M',help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed,default=1)')
    parser.add_argument('--eps', type=float, default=1e-5, metavar='LR',help='learning rate,default=1e-5')
    parser.add_argument('--log-interval', type=int, default=100, metavar='N',help='for printing  training data is log interval')
    parser.add_argument('--best_acc', type=float, default=0, metavar='N',help='Record of best accuracy')
    parser.add_argument('--layer',type=int, default=1024, metavar='G',help='aaaa')
    parser.add_argument('--weight-decay',type=float,default=1e-5,metavar='WD', help='weight decay(default: 1e-5)')
    args = parser.parse_args()
    args.cuda = not args.no_cuda and torch.cuda.is_available()
    return args


def main():
    args = ParseArgs()
    if args.cuda:
        torch.cuda.manual_seed(args.seed)
    else:
        torch.manual_seed(args.seed)
    kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}

    
    train_loader = D.DataLoader(datasets.CIFAR10('/data', train=True, download=True,
                    transform=transforms.Compose([transforms.ToTensor(),
                        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
                    ])),batch_size=args.batch_size, shuffle=True,**kwargs)
        
    test_loader = D.DataLoader(datasets.CIFAR10('/data', train=False, download=True,
                    transform=transforms.Compose([transforms.ToTensor(),
                        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
                    ])),batch_size=args.test_batch_size, shuffle=True,**kwargs)
    

    model = M.VGG9()

    #dir_org = 'stored.pytorch_state'
    #model.load_state_dict(torch.load(dir_org))

    criterion = nn.CrossEntropyLoss()

    if args.cuda:
        model.to('cuda:0')
        criterion.to('cuda:0')

    #optimizer = optim.SGD(model.parameters(), lr=args.lr,momentum=args.momentum)
    optimizer = optim.Adam(model.parameters(),lr=args.lr)
    best_acc = 0.0
    acc_array = np.array([])
  
    for epoch_index in range(1, args.epochs + 1):
        adjust_learning_rate(optimizer, epoch_index)
        train(args,epoch_index,train_loader,model,optimizer,criterion)   
        acc = test(args,model,test_loader,criterion)
        acc_array = np.append(acc_array, acc.numpy())
        j = []
        for i in acc_array:
            j.append(int(i))
        print(j)
        if acc > best_acc:
            best_acc = acc
            save(model,best_acc,'stored.pytorch_state')
        print('best_acc: '+str(best_acc.numpy()))
        
    plothist(model,N=9)
    plothist_many(model)
    plot_acc(acc_array, args.epochs)

def adjust_learning_rate(optimizer, epoch_index):
    update_list = [10,20,40,150,400,600]
    if epoch_index in update_list:
        for param_group in optimizer.param_groups:
            param_group['lr'] = param_group['lr'] * 0.5
    return

def save(model,best_acc,filename):
    print('==>>> saving model')
    torch.save(model.state_dict(), filename)

def make_matrix(matrix,predicted,target):
    p = predicted.to('cpu').numpy().T
    t = target.data.view_as(predicted).to('cpu').numpy().T
    for i in range(len(p)):
        for j in range(len(p[0])):
                matrix[p[i][j]][t[i][j]]+=1

def train(args,epoch_index,train_loader,model,optimizer,criterion):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if args.cuda:
            data, target = data.to('cuda:0'), target.to('cuda:0')
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch_index, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

def test(args,model,test_loader,criterion):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    matrix=np.zeros((10,10))
    for data, target in test_loader:
        
        data, target = data.to('cuda:0'), target.to('cuda:0')
        output = model(data)
        test_loss += criterion(output, target).item()

        predicted = output.max(1,keepdim=True)[1]
        total += target.size(0)
        correct += predicted.eq(target.data.view_as(predicted)).to('cpu').sum()
        make_matrix(matrix,predicted,target)
      
    acc = 100. * correct/len(test_loader.dataset)

    test_loss /= len(test_loader)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

    print(matrix)

    return acc

def make_dir(dir):
    if not os.path.exists(dir):
        os.mkdir(dir)
    
def plothist_many(model):
    #make_dir(dir = 'output_file')
    for n,p in model.to('cpu').named_parameters():
        tmp0 = p.detach()
        tmp1 = tmp0.numpy()
        w = tmp1.flatten()
        plt.title(str(n))
        plt.hist(w, bins=100, color='deepskyblue')
        #plt.savefig(dir+'/'+str(n)+'.png')
        plt.show()

def plothist(model,N):
    #make_dir(dir = 'output_file')
    for n,p in model.to('cpu').named_parameters():
        if n in ['features.0.weight','features.2.weight','features.5.weight','features.7.weight','features.10.weight','features.12.weight','classifier.0.weight','classifier.2.weight','classifier.4.weight']:#VGG9
            print(n)
            plt.subplot(3,3,N)
            N = N+1
            tmp0 = p.detach()
            tmp1 = tmp0.numpy()
            w = tmp1.flatten()
            plt.hist(w, bins=100,color='deepskyblue')
            plt.gca().yaxis.set_major_formatter(ScalarFormatter(useMathText=True))
            plt.gca().yaxis.offsetText.set_fontsize(3)
            plt.gca().ticklabel_format(style="sci",  axis="y",scilimits=(0,0))
            plt.tick_params(labelsize=5)
    #plt.savefig(dir+'./'+'histgram_all.png')
    plt.show()

def plot_acc(acc_array, epoch):
    #make_dir(dir = 'output_file')
    l = (range(1,epoch+1))
    k = []
    plt.plot(l,acc_array,label='test_acc')
    plt.xlabel('epoch')
    plt.legend()
    for i in l:
        if i%5 == 0 or i==0:
            k.append(i)
    k_str = [str(n) for n in k]
    plt.xticks(k,k_str)
    plt.ylim(5, 90)
    #plt.savefig(dir+'/plot.png')
    plt.show()

if __name__ == '__main__':
    main()

次にmodel.pyにネットワークモデル記述します。 モデルはVGGを参考にし、畳み込み層6層、全結合層3層からなるVGG9を作成しました。

import torch 
import torch.nn as nn

class VGG9(nn.Module):

    def __init__(self):
        super(VGG9, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1, bias=True),#0
            nn.BatchNorm2d(128),#1
            nn.ReLU(inplace=True),#2

            nn.Conv2d(128, 128, kernel_size=3, padding=1, bias=True),#3
            nn.MaxPool2d(kernel_size=2, stride=2),#4
            nn.BatchNorm2d(128),#5
            nn.ReLU(inplace=True),#6
        
            nn.Conv2d(128, 256, kernel_size=3, padding=1, bias=True),#7
            nn.BatchNorm2d(256),#8
            nn.ReLU(inplace=True),#9
          
            nn.Conv2d(256, 256, kernel_size=3, padding=1, bias=True),#10
            nn.MaxPool2d(kernel_size=2, stride=2),#11
            nn.BatchNorm2d(256),#12
            nn.ReLU(inplace=True),#13
            
            nn.Conv2d(256, 512, kernel_size=3, padding=1, bias=True),#14
            nn.BatchNorm2d(512),#15
            nn.ReLU(inplace=True),#16
           
            nn.Conv2d(512, 512, kernel_size=3, padding=1, bias=True),#17
            nn.MaxPool2d(kernel_size=2, stride=2),#18
            nn.BatchNorm2d(512),#19
            nn.ReLU(inplace=True)#20
        )
        self.classifier = nn.Sequential(
            nn.Linear(512 * 4 * 4, 1024, bias=True),#0
            nn.BatchNorm1d(1024),#1
            nn.ReLU(inplace=True),#2

            nn.Linear(1024, 1024, bias=True),#3
            nn.BatchNorm1d(1024),#4
            nn.ReLU(inplace=True),#5

            nn.Linear(1024, 10, bias=True),#6
            nn.BatchNorm1d(10),#7
            nn.ReLU(inplace=True)#8
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(-1, 512 * 4 * 4)
        x = self.classifier(x)
        return x

実行時はターミナルに以下のように入力します。

python main.py

デフォルトのエポック数は10ですが、引数で指定できます。例えば100エポックにしたい場合です。

python main.py --epochs 100

GPUを使わない場合は

python main.py --no-cuda

で実行します。

また、学習後のパラメータはtorch.save(model.state_dict(), ファイル名)で保存でき、

model.load_state_dict(torch.load(ファイル名))でロードできます。

すでに学習させたモデルを、再度学習させたい場合に使います。

ディープラーニングについて理解を深めたい方はゼロから作るDeep Learningをおすすめします。

コメント

タイトルとURLをコピーしました