Multiple CNN
¶

限于算力,只能将模型做得相对简单,应对复杂任务时随时可更改。此外有小概率输出会收敛到某个类,猜测可能是初始化的原因,重新运行即可解决。

In [1]:
import os
# os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
In [2]:
import time
import numpy as np
import pandas as pd
import cv2
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
from torchvision.datasets import ImageFolder
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, roc_curve
import albumentations as alb
from albumentations.pytorch.transforms import ToTensorV2
from PIL import Image
from matplotlib import pyplot as plt
import seaborn as sns
import netron
In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu'); device
Out[3]:
device(type='cuda')

Training¶

In [4]:
learning_rate = 1e-4
num_epochs = 20
img_size = (124, 124)
batch_size = 16
category = 2

可根据需求更改,这里的代码适用的文件结构是:

├───train
│   ├───cats (7500 files)
│   └───dogs (7500 files)
└───validation
    ├───cats (5000 files)
    └───dogs (5000 files)
In [5]:
class MyDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.dataset = ImageFolder(root=root_dir)
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        
        if self.transform is not None:
            augmented = self.transform(image=np.array(image))
            image = augmented['image']

        return image, label
In [6]:
transform = alb.Compose([
    alb.Resize(width=img_size[0], height=img_size[1]),
#     alb.CenterCrop(width=img_size[0], height=img_size[1]),
    alb.HorizontalFlip(p=0.5),
    alb.Rotate(limit=30),
    alb.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    alb.Normalize(),
    ToTensorV2()
])

transformNorm = alb.Compose([
    alb.Resize(width=img_size[0], height=img_size[1]),
    alb.Normalize(),
    ToTensorV2()
])

dataset = MyDataset(r'D:\\临时文件\\output_folder\\train', transform=transform)
datasetVal = MyDataset(r'D:\\临时文件\\output_folder\\validation', transform=transformNorm)

train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(datasetVal, batch_size=batch_size, shuffle=False)
In [7]:
class ConvNet(nn.Module):
    def __init__(self, category):
        super(ConvNet, self).__init__()

        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Linear(128 * 15 * 15, 2048),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(2048, category),
            nn.Softmax(dim=1)
        )

        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x
    
    def pred(self, x, prob=True):
        self.eval()
        with torch.no_grad():
            x = self.forward(x)
            return x if prob else torch.argmax(x, dim=1).cpu().item()
    
    def save(self, name='ConvNet', path='./model_state'):
        if not os.path.exists(path):
            os.makedirs(path)
        torch.save(model.state_dict(), path + '/{}.pt'.format(name))
        return None
    
    def load(self, filePath):
        self.load_state_dict(torch.load(filePath))
        return None
    
    
    def test(self, test_loader, criterion_, save=False, fileName='', savePath='./model_state', device=None):
        if not device:
            device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        self.eval()
        correct = 0
        total = 0
        loss = 0
        y_true = []
        y_pred = []
        
        with torch.no_grad():
            for images, labels in test_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = self.pred(images, True)
                loss += criterion_(outputs, labels).item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                y_true.extend(labels.cpu().numpy())
                y_pred.extend(predicted.cpu().numpy())
        

        recall = recall_score(y_true, y_pred, average='weighted')
        precision = precision_score(y_true, y_pred, average='weighted')
        F1 = f1_score(y_true, y_pred, average='weighted')
        CM = confusion_matrix(y_true, y_pred) 
        
        if save:
            self.save('ConvNet {} Epoch'.format(fileName))
        
        return (loss/total, correct/total, recall, precision, F1, CM)


model = ConvNet(category).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
In [8]:
torch.onnx.export(model, torch.randn(1, 3, 124, 124).to(device), './model_state/structure.pth')
netron.start('./model_state/structure.pth')
Serving './model_state/structure.pth' at http://localhost:8080
Out[8]:
('localhost', 8080)

将在此处展示网络结构:

In [9]:
val = pd.DataFrame({
    'Training_Loss': np.zeros(num_epochs),
    'Testing_Loss': np.zeros(num_epochs),
    'ACC': np.zeros(num_epochs),
    'Recall': np.zeros(num_epochs),
    'Precision': np.zeros(num_epochs),
    'F1': np.zeros(num_epochs),
    'CM': np.zeros(num_epochs)
}, dtype=object)


start = time.time()
for epoch in range(num_epochs):
    training_loss = 0
    
    model.train()

    for images, labels in train_dataloader:
        
        optimizer.zero_grad()
        outputs = model(images.to(device))
        loss = criterion(outputs, labels.to(device))
        loss.backward()
        training_loss += loss.item()
        optimizer.step()
        
    metrics = model.test(test_dataloader, criterion, save=True, fileName=str(epoch+1))
    
    if epoch % 3 == 0:
        print('{} Epoch, Test Accuracy: {:.3f}%, Loss: {:.3f}'.format(epoch+1, 100*metrics[1], metrics[0]))
        print('Recall: {:.3f}%, Precision: {:.3f}%, F1: {:.3f}'.format(100*metrics[2], 100*metrics[3], metrics[4]))
        print('Confusion Matrix:\n', metrics[5], '\n', '='*50)
    
    val.iloc[epoch] = [training_loss/len(dataset), *metrics]
    val.to_csv(r'./model_state/metrics.csv')

end = time.time()
print('******** Running Time: {:.3f} min ********'.format((end-start)/60))
1 Epoch, Test Accuracy: 75.360%, Loss: 0.034
Recall: 75.360%, Precision: 75.363%, F1: 0.754
Confusion Matrix:
 [[3739 1261]
 [1203 3797]] 
 ==================================================
4 Epoch, Test Accuracy: 80.860%, Loss: 0.031
Recall: 80.860%, Precision: 80.867%, F1: 0.809
Confusion Matrix:
 [[4080  920]
 [ 994 4006]] 
 ==================================================
7 Epoch, Test Accuracy: 82.050%, Loss: 0.030
Recall: 82.050%, Precision: 82.786%, F1: 0.819
Confusion Matrix:
 [[4477  523]
 [1272 3728]] 
 ==================================================
10 Epoch, Test Accuracy: 83.120%, Loss: 0.030
Recall: 83.120%, Precision: 83.172%, F1: 0.831
Confusion Matrix:
 [[4255  745]
 [ 943 4057]] 
 ==================================================
13 Epoch, Test Accuracy: 85.080%, Loss: 0.028
Recall: 85.080%, Precision: 85.082%, F1: 0.851
Confusion Matrix:
 [[4237  763]
 [ 729 4271]] 
 ==================================================
16 Epoch, Test Accuracy: 85.880%, Loss: 0.028
Recall: 85.880%, Precision: 85.904%, F1: 0.859
Confusion Matrix:
 [[4229  771]
 [ 641 4359]] 
 ==================================================
19 Epoch, Test Accuracy: 85.870%, Loss: 0.028
Recall: 85.870%, Precision: 86.095%, F1: 0.858
Confusion Matrix:
 [[4491  509]
 [ 904 4096]] 
 ==================================================
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[9], line 38
     36 end = time.time()
     37 val.to_csv(r'./model_state/metrics.csv')
---> 38 print('******** Running Time: {:.3f} min ********'.format(end-start)/60)

TypeError: unsupported operand type(s) for /: 'str' and 'int'

……在这里我犯一点小错误,但无关紧要。

In [10]:
print('******** Running Time: {:.3f} min ********'.format((end-start)/60))
******** Running Time: 32.329 min ********
In [11]:
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE'

plt.plot(range(1, num_epochs+1), val['Training_Loss'], linewidth=2.0, label='Training Loss')
plt.plot(range(1, num_epochs+1), val['Testing_Loss'], linewidth=2.0, label='Testing Loss')
plt.xlabel('epoch', fontsize=14)
plt.ylabel('CEL', fontsize=14)
plt.title('Loss of each completion of a training', fontsize=18, loc='left')
plt.legend(fontsize=12)
plt.grid(True)
plt.xticks(range(1, num_epochs+1, 3))
plt.savefig('CNN_CEL.svg', format='svg')
plt.show()

plt.plot(range(1, num_epochs+1), val['ACC'], linewidth=2.0, label='Accuracy')
plt.plot(range(1, num_epochs+1), val['F1'], linewidth=2.0, label='F1')
plt.plot(range(1, num_epochs+1), val['Precision'], linewidth=2.0, label='Precision')
plt.xlabel('epoch', fontsize=14)
plt.ylabel('value', fontsize=14)
plt.title('Testing Metrics', fontsize=18, loc='left')
plt.legend(fontsize=12)
plt.grid(True)
plt.xticks(range(1, num_epochs+1, 3))
plt.savefig('CNN_metrics.svg', format='svg')
plt.show()

看上去模型还有很大进步空间,但是训练所花费时间太长,这里便不再继续了。

Classify¶

In [12]:
choice = 20

bestModel = ConvNet(category).to(device)
bestModel.load('model_state/ConvNet {} Epoch.pt'.format(choice))
In [13]:
cmap = sns.color_palette('Oranges_r')
sns.heatmap(val['CM'][choice-1], annot=True, cmap=cmap)  # cbar_kws={'ticks': range(0, 500, 40)}
plt.savefig('CNN_CM.svg', format='svg')
plt.show()

  • 输出形状计算公式与卷积可视化:请轻击这里

  • metrics公式:请轻击这里

In [14]:
image = cv2.imread('QQ图片20230526212527.jpg')[:,:,::-1]
plt.imshow(image)
image = transformNorm(image=image)['image'].unsqueeze(0)
print('Cat!') if model.pred(image.to(device), False) == 0 else print('Dog!')
Cat!
In [15]:
image = cv2.imread('QQ图片20230526214055.jpg')[:,:,::-1]
plt.imshow(image)
image = transformNorm(image=image)['image'].unsqueeze(0)
print('Cat!') if model.pred(image.to(device), False) == 0 else print('Dog!')
Dog!

Tip: 我记得原本这是繁体字的对话,这可能是在台湾发生的可爱事件,这应该是大陆抖音翻拍。

  • 可尝试SPP方法;

  • 网络架构简单,但这是没办法的事,也曾尝试过,可惜设备不支持;

  • 迁移学习。