본문 바로가기
컴퓨터비전/CNN

[딥러닝] Albumentations의 데이터 증강 이해

by PIAI 2022. 3. 17.

Albumentations의 데이터 증강

 

albumentations는 이미지 데이터 증강 라이브러리입니다. 이미지를 좌, 우, 회전, 색변환, 노이즈 등등 넣어서 다양한 데이터를 모델이 학습시킬 수 있게 변환해주는 것입니다. 코드를 보면서 어떻게 사용하는지 확인하겠습니다.

 

import numpy as np
import pandas as pd
import os
import cv2
import matplotlib.pyplot as plt

image = cv2.cvtColor(cv2.imread('test.jpg'), cv2.COLOR_BGR2RGB)
plt.imshow(image)

여기 원본 강아지사진 한 장이 있습니다. albumentations은 함수마다 확률 값을 줘서 작동을 확률 값으로 하는 것입니다. 예를 들어 좌우반전이라는 함수가 있고, 0.5의 확률 값을 주면 2번 중에 한 번만 작동한다는 것입니다. 아래 코드는 이미지를 원본 + augmentation을 적용한 이미지 4장을 출력하는 함수입니다.

 

# 이미지 배열을 입력받아 5개 출력하는 함수
def show_images(images, labels):
    fig, axs = plt.subplots(nrows=1, ncols=5, figsize=(20, 8))
    for i in range(5):
        axs[i].imshow(images[i])
        axs[i].set_title(labels[i])

# 원본 이미지를 입력받아 4개의 augmentation 적용하여 시각화
def aug_apply(image, label, aug):
    image_list = [image]
    label_list = ['origin']

    for i in range(4):
        aug_image = aug(image=image)['image']
        image_list.append(aug_image)
        label_list.append(label)

    show_images(image_list, label_list)

 

자 이제 시각화 함수를 만들었으니 Horizontal(좌우), Vertical(상하), Rotate(회전), RandomRotate90(90, 180, 270, 360 랜덤하게 회전)을 코드로 구현해보겠습니다.

 

import albumentations as at

# 좌우 반전
aug_horizontal = at.HorizontalFlip(p=0.5)
# 상하 반전
aug_vertical = at.VerticalFlip(p=0.5)
# -90 ~ 90도 사이로 돌림, 남은 공간은 주변 환경으로 채움 (cv2.BORDER_CONSTAN, cv2.BORDER_REFLECT, cv2.BORDER_REFLECT_101, cv2.BORDER_WRAP)
aug_rotate = at.Rotate(limit=90, p=1, border_mode=cv2.BORDER_REPLICATE)
# 원본사이즈를 변경 하면서 90, 180, 270, 360도 돌림
aug_random_rotate = at.RandomRotate90(p=1)

aug_apply(image=image, label='Horizontal', aug=aug_horizontal)
aug_apply(image=image, label='Vertical', aug=aug_vertical)
aug_apply(image=image, label='Rotate', aug=aug_rotate)
aug_apply(image=image, label='RandomRotate', aug=aug_random_rotate)

 

 

horizontal과 vertical은 확률값을 0.5로 주었기 때문에 바뀐 것도 있고 바뀌지 않은 것도 있습니다. 그리고 RandomRotate90을 보면 원본 이미지가 바뀌면서 회전된 것을 볼 수 있습니다. (나중에 resize 해주어야 합니다.)

 

이번에는 shift, scale, rotate를 한 번에 작동시켜 보겠습니다.

# shift, scale, rotate를 한번에 적용
aug_shift_scale_rotate = at.ShiftScaleRotate(shift_limit=0.4, scale_limit=(0.5, 0.9), rotate_limit=90, p=1, border_mode=cv2.BORDER_REPLICATE)
# scale만 적용한 경우
aug_only_scale = at.ShiftScaleRotate(shift_limit=0, scale_limit=(0.5, 0.9), rotate_limit=0, p=1)
# shift만 적용한 경우 (height, width)
aug_only_shift = at.ShiftScaleRotate(shift_limit=(0.4, 0.5), scale_limit=0, rotate_limit=0, p=1,  border_mode=cv2.BORDER_REPLICATE)

aug_apply(image=image, label='Shift_Scale_Rotate', aug=aug_shift_scale_rotate)
aug_apply(image=image, label='Only_Scale', aug=aug_only_scale)
aug_apply(image=image, label='Only_Shift', aug=aug_only_shift)

scale값을 0보다 큰 값을 입력하면 확대한 것을 볼 수 있습니다.

 

이 위의 기능을 여러 가지 섞어서도 사용할 수 있습니다. 물론 이 섞은 기능 자체를 사용할지 말지 확률도 줄 수 있습니다.

#여러개 동시에 적용 (Compose 자체에도 확률을 적용할 수 있음)
aug_multi = at.Compose([
    at.VerticalFlip(p=0.5),
    at.HorizontalFlip(p=0.5)
], p=1)

aug_apply(image=image, label='Comsose', aug=aug_multi)

 

특정 이미지 영역을 자를 수도 있습니다. (단, 자른 후에 자른 이미지를 원본 사이즈만큼 키워줘야 합니다. 모델은 동일한 이미지 크기를 학습하기때문)

 

# Crop은 원본 이미지의 특정 영역을 잘라낸 후, 잘라낸 사이즈를 반환(Resize 해줘야함)
aug_crop = at.Crop(x_min=500, y_min=300, x_max=1000, y_max=800, p=1)
# Crop and Resize
aug_crop_resize = at.Compose([
    at.Crop(x_min=500, y_min=300, x_max=1000, y_max=800, p=1),
    at.Resize(1370, 2000)
])
aug_apply(image=image, label='Crop', aug=aug_crop)
aug_apply(image=image, label='Crop_Resize', aug=aug_crop_resize)

이미지의 중심을 기준으로 height, width값만큼 잘라보겠습니다. 물론 Resize 해주어야 합니다.

# 이미지의 중심을 기준으로 입력된 height, width값 만큼의 영역을 잘라낸 후, 잘라낸 사이즈를 반환(반드시 Resize 해주어야함)
aug_center_crop = at.CenterCrop(width=500, height=300, p=1)
aug_center_crop_resize = at.Compose([
    at.CenterCrop(width=500, height=300, p=1),
    at.Resize(1370, 2000)
])

aug_apply(image=image, label='CenterCrop', aug=aug_center_crop)
aug_apply(image=image, label='CenterCrop_Reize', aug=aug_center_crop_resize)

 

위에는 공간단위로 이미지를 변경했다면 이제는 픽셀 단위로 변경해보겠습니다. (명도, 채도, 대비, 밝기, 노이즈 등등)

밝기와 대비 먼저 변경해보겠습니다.

# 밝기와 대비 변경 (대비를 올리면 어두운색은 더 어둡게, 밝은색은 더 밝게)
aug_bright_contrast = at.RandomBrightnessContrast(brightness_limit=(-0.3, 0.3), contrast_limit=(-0.3, 0.3), p=1)
# 밝기만 변경
aug_bright = at.RandomBrightnessContrast(brightness_limit=(-0.8, 0.8), contrast_limit=0, p=1)
# 대비만 변경
aug_contrast = at.RandomBrightnessContrast(brightness_limit=0, contrast_limit=(-0.8, 0.8), p=1)

aug_apply(image=image, label='Bright_Contrast', aug=aug_bright_contrast)
aug_apply(image=image, label='Only_Bright', aug=aug_bright)
aug_apply(image=image, label='Only_Contrast', aug=aug_contrast)

 

다음은 HSV, RGBShift, ChannelShuffle을 변경해보겠습니다.

# 색상 채도 명도 변경, default(hue_shift_limit=(-20, 20), sat_shift_limit=(-30, 30), val_shift_limit=(-20, 20))
aug_hsv = at.HueSaturationValue(p=1)
#RGB 값 각각 범위내 임의로 변경 default(r_shift_limit=(-20, 20), g_shift_limit=(-20, 20), b_shift_limit=(-20, 20))
aug_rgb = at.RGBShift(p=1)
#RGB Channel을 랜덤하게 섞음
aug_rgb_shuffle = at.ChannelShuffle(p=1)

aug_apply(image=image, label='HueSaturationValue', aug=aug_hsv)
aug_apply(image=image, label='RGBShift', aug=aug_rgb)
aug_apply(image=image, label='RGBShuffle', aug=aug_rgb_shuffle)

가우시안 노이즈로 백색소음을 줄 수도 있고, 정사각형 노이즈도 추가할 수 있습니다.

#가우시안 노이즈 분포를 가지는 노이즈를 추가
aug_noise = at.GaussNoise(p=1, var_limit=(100, 200))
#정사각형 노이즈 추가
aug_cut = at.Cutout(p=1, num_holes=8, max_h_size=24, max_w_size=24)

aug_apply(image=image, label='GaussNoise', aug=aug_noise)
aug_apply(image=image, label='Cutout', aug=aug_cut)

 

CLAHE를 이용해 보다 선명한 이미지와, Blur로 보다 흐린 이미지를 만들수도 있습니다.

# 히스토그램 균일화 기법인 CLAHE를 이용하여 보다 선명한 이미지 재생
aug_clahe = at.CLAHE(p=1)
# blur_limit가 클수록 더 흐림
aug_blur = at.Blur(p=1, blur_limit=(50, 60))

aug_apply(image=image, label='CLAHE', aug=aug_clahe)
aug_apply(image=image, label='Blur', aug=aug_blur)

 

이제 픽셀 단위와, 공간 단위를 혼합하여 한번에 적용해보겠습니다.

# 여러개 적용
aug_multi = at.Compose([
    at.CenterCrop(height=700, width=800, p=0.5),
    at.HorizontalFlip(p=0.5),
    at.RandomBrightnessContrast(p=0.5, brightness_limit=0.8, contrast_limit=0.8),
    at.Resize(1370, 2000)
])

aug_apply(image=image, label='Multi', aug=aug_multi)

 

여러 개를 적용해보았는데, 여러 개 중에 한 개만 적용할 수도 있습니다.

# OneOf() 로 한개만 적용될수도 있음
aug_multi = at.Compose([
    at.HorizontalFlip(p=0.5),
    at.ShiftScaleRotate(p=0.5),
    at.OneOf([
        at.CLAHE(p=0.3),
        at.Blur(blur_limit=(30, 40), p=0.5)
    ], p=0.5)
])
aug_apply(image=image, label='Multi', aug=aug_multi)
aug_apply(image=image, label='Multi', aug=aug_multi)
aug_apply(image=image, label='Multi', aug=aug_multi)

 

대략적인 종류를 봤습니다. 이제 모델에 적용하는 방법을 살펴보겠습니다.

Sequence

데이터 세트로는 kaggle에 있는 cat and dog를 쓰겠습니다. 아무 데이터나 써도 무방할 것 같습니다. cat and dog의 폴더 구조는

이렇게 되어있고 각 cats와 dogs폴더 아래에. jpg로 메타정보들이 담겨있습니다. 이 데이터들을 pandas의 dataframe으로 3개의 columns들 경로(path), 구분(train, test), label(DOG, CAT)의 정보들로 각각 분리하고 확인해보겠습니다.

 

import numpy as np
import pandas as pd
import os

paths = []
data_div = []
label_div = []

for dirname, _, filenames in os.walk('/content/input'):
    for filename in filenames:
        if '.jpg' in filename:
            file_path = dirname + '/' + filename
            paths.append(file_path)
            if '/training_set/' in file_path:
                data_div.append('train')
            elif '/test_set/' in file_path:
                data_div.append('test')

            if 'dogs' in file_path:
                label_div.append('DOG')
            elif 'cats' in file_path:
                label_div.append('CAT')

df = pd.DataFrame({'path':paths, 'data':data_div, 'label':label_div})
df.head()

테이블에 아주 잘 삽입된 것을 볼 수 있습니다. 앞서 keras의 전처리와 데이터 로딩에서 말한 것처럼 keras의 imageDataGenerator의 flow_from_dataframe로 Generator만 만들어주고, x_col과 y_col만 잘 입력해주어 알아서 학습을 하는 구조였습니다.

하지만 Albumentation은 tf.keras.utils.Sequence라는 클래스를 상속받아 scaling, augmentation 등등 이미지 전처리와 로딩을 위한 코딩을 직접 수행해줘야 합니다. 여기서 이제 의문이 듭니다. 그럼 이 복잡한 것을 쓰지 않고 imageDataGenerator만 쓰면 더 편하지 않냐라고 생각이 들 수 있는데 ImageDataGenerator는 Augmentation의 기능이 제한적이며, Augmentation들의 확률을 줄 수 없고 랜덤 하게 기능이 수행됩니다. 그리고 Sequence를 쓰는 가장 큰 이유는 병렬 수행으로 데이터 전처리/로딩 시 속도가 빠르므로 권장됩니다. 이제 Sequence클래스를 상속받은 기본 뼈대를 보겠습니다.

 

class Dataset(tf.keras.utils.Sequence):
	def __init__(self, ...):
    		...
        
    	def __len__(self):
    		...
        
    	def __getitem__(self, index):
    		...
     
    	def on_epoch_end(self):
    		...

 

여기서 __init__은 그냥 기본 생성자이고, __len__, __getitem__, on_epoch_end는 처음 보는 구조입니다.

  • __len__: 시퀀스의 배치 수입니다.
  • __getitem__: step의 index부터 batch_size만큼 데이터를 가져와 전처리, augmentation, scaling 등을 진행할 것입니다.
  • on_epoch_end: 데이터를 shuffle 하는 메서드입니다.

아래의 코드를 보면 이해가 더 쉬울 것입니다.

a = [1, 2, 3]
print('길이: ', a.__len__())
print('index 1번째: ', a.__getitem__(1))

a.__len__() == len(a)

a.__getitem__(1) == a [1]

 

이제 이 Sequence를 상속받은 클래스의 __len__기능과, __getitem__ 기능을 구현해보겠습니다.

from tensorflow.keras.utils import Sequence
import sklearn
import cv2

class CDSequence(Sequence):
    def __init__(self, filenames, labels, batch_size=64, aug=None, shuffle=False):
    	# image의 절대경로들
        self.filenames = filenames
        self.labels = labels
        self.batch_size = batch_size
        # albumentation 객체
        self.aug = aug
        self.shuffle = shuffle
		
        # 훈련 데이터의 경우
        if self.shuffle:
            self.on_epoch_end()

    def __len__(self):
    	# 총 step의 갯수
        return len(self.labels) // self.batch_size

    def __getitem__(self, index):
    	# 현재 인덱스를 기준으로 batch_size만큼 데이터를 가져옴
        meta_data = self.filenames[index*self.batch_size:(index+1)*self.batch_size]
        # 훈련, 검증 데이터세트인 경우
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]

        # 불러온 meta_data를 np.array로 저장할 빈 공간을 생성
        image_batch = np.zeros((meta_data.shape[0], 224, 224, 3))
        for i in range(meta_data.shape[0]):
            # cv2는 이미지를 BGR로 불러오기 때문이 RGB로 바꾸어줌
            image = cv2.cvtColor(cv2.imread(meta_data[i]), cv2.COLOR_BGR2RGB)
            # 이미지의 크기가 전부 다르기 때문에 통일 시켜 주어야함
            image = cv2.resize(image, (224, 224))
            # augmentation이 있으면 적용
            if self.aug is not None:
                image = self.aug(image=image)['image']
            
            # 빈 이미지 배치에 최종 이미지를 등록
            image_batch[i] = image
        
        return image_batch, label_batch

    def on_epoch_end(self):
        # 파일과 라벨을 같이 섞어 주어야한다.
        if self.shuffle:
            self.image_filenames, self.labels = sklearn.utils.shuffle(self.filenames, self.labels)
        else:
            pass

모든 설명을 주석으로 대체하겠습니다. 이제 데이터가 잘 들어갔는지 확인하는 함수를 만들겠습니다. iter로 시퀀스를 반복하는 생성기를 만들어 next로 첫 번째 배치 사이즈만큼 데이터를 불러와서 데이터가 잘 담겼는지 디버깅하는 함수입니다.

import matplotlib.pyplot as plt

def show_first_data(dataset, image_verbose=False):
    first_data = next(iter(dataset))
    images = first_data[0]
    labels = first_data[1]
    print(images.shape, labels.shape)
    print(images[0])
    if image_verbose:
        fig, axs = plt.subplots(figsize=(20, 8), nrows=1, ncols=4)
        for i in range(4):
            axs[i].imshow(np.array(images[i], dtype='int32'))

 

자 이제 테이블의 훈련과 테스트 데이터를 각각 분리시키고 augmentation을 만들어 시퀀스를 만들고, 잘 만들어졌는지 디버깅해보겠습니다.

 

import albumentations as at

# 학습과 테스트데이터 테이블 분리
train_df = df[df['data'] == 'train']
test_df = df[df['data'] == 'test']

# 학습과 테스트데이터의 각각 meta정보와 label정보 분리(np.array)
train_filenames = train_df['path'].values
train_labels = train_df['label'].values
test_filenames = test_df['path'].values
test_labels = test_df['label'].values

CD_aug = at.Compose([
    at.HorizontalFlip(p=0.5),
    at.VerticalFlip(p=0.5),
    at.ShiftScaleRotate(p=0.5),
    at.HueSaturationValue(p=0.5),
    at.GaussNoise(p=0.5)
])

CD_dataset = CDSequence(train_filenames, train_labels, batch_size=64, aug=CD_aug, shuffle=False)
show_first_data(CD_dataset, image_verbose=True)

처음 배치 사이즈(64) 개만큼 데이터도 잘 불러져왔고, 이미지들이 augmentation도 잘 적용된 것을 볼 수 있습니다. 위의 데이터는 scaling이 적용되지 않았습니다. (이미지 시각화를 위해 잠시 넣지 않은 것)

자 이제 훈련 데이터를 검증 데이터와 분리시키고, 개와 고양이 두 종류의 라벨이므로 sigmoid함수를 사용하여 binary분류를 하겠습니다. 그러기 위해선 일단 Label들을 LabelEncoding 하겠습니다.

 

from sklearn.model_selection import train_test_split
train_enc_labels = pd.factorize(train_labels)[0]

tr_path, val_path, tr_label, val_label = train_test_split(train_filenames, train_enc_labels, test_size=0.15)
print(tr_path.shape, tr_label.shape, val_path.shape, val_label.shape)
print(tr_label)

데이터 분리와 LabelEncoding까지 잘 된 것을 확인할 수 있습니다. 위의 Sequence모델은 이미지 시각화를 위해 스케일링이 적용되지 않은 모델이므로 적용한 객체를 다시 만들겠습니다.

from tensorflow.keras.utils import Sequence
from tensorflow.keras.applications.xception import preprocess_input
import sklearn
import cv2

class CDSequence(Sequence):
    def __init__(self, filenames, labels, batch_size=64, aug=None, shuffle=False, pre_func=None):
        self.filenames = filenames
        self.labels = labels
        self.batch_size = batch_size
        self.aug = aug
        self.shuffle = shuffle
        self.pre_func = pre_func

        if self.shuffle:
            self.on_epoch_end()

    def __len__(self):
        return len(self.labels) // self.batch_size

    def __getitem__(self, index):
        meta_data = self.filenames[index*self.batch_size:(index+1)*self.batch_size]
        # 훈련, 검증 데이터세트인 경우
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]

        # 불러온 meta_data를 np.array로 저장할 빈 공간을 생성
        image_batch = np.zeros((meta_data.shape[0], 224, 224, 3), dtype='float32')
        for i in range(meta_data.shape[0]):
            # cv2는 이미지를 BGR로 불러오기 때문이 RGB로 바꾸어줌
            image = cv2.cvtColor(cv2.imread(meta_data[i]), cv2.COLOR_BGR2RGB)
            # 이미지의 크기가 전부 다르기 때문에 통일 시켜 주어야함
            image = cv2.resize(image, (224, 224))
            # augmentation이 있으면 적용
            if self.aug is not None:
                image = self.aug(image=image)['image']
            
            # 이미지 값을 self.pre_func 함수로 스케일링
            if self.pre_func is not None:
                image = self.pre_func(image)

            # 빈 이미지 배치에 최종 이미지를 등록
            image_batch[i] = image
        
        return image_batch, label_batch

    def on_epoch_end(self):
        # 파일과 라벨을 같이 섞어 주어야한다.
        if self.shuffle:
            self.filenames, self.labels = sklearn.utils.shuffle(self.filenames, self.labels)
        else:
            pass
            
 tr_dataset = CDSequence(tr_path, tr_label, batch_size=64, aug=CD_aug, shuffle=True, pre_func=preprocess_input)
val_dataset = CDSequence(val_path, val_label, batch_size=64, aug=None, shuffle=False, pre_func=preprocess_input)

이제 Sequence가 만들어졌으니 Xception 모델을 전이 학습시켜 마지막 Dense Layer는 softmax가 아닌 sigmoid로 (개와 고양이 2종류 이기 때문) 만들고 compile도 마찬가지로 이진 분류기 때문에 binary_crossentropy를 하겠습니다.

 

from tensorflow.keras.applications import Xception
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dropout, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

input_tensor = Input(shape=(224, 224, 3))
base_model = Xception(input_tensor=input_tensor, include_top=False, weights='imagenet')
x = GlobalAveragePooling2D()(base_model.output)
x = Dropout(0.5)(x)
x = Dense(50, activation='relu')(x)
output = Dense(1, activation='sigmoid')(x)
model = Model(inputs=input_tensor, outputs=output)

model.compile(optimizer=Adam(0.001), loss='binary_crossentropy', metrics=['accuracy'])

stop_cb = EarlyStopping(monitor='val_loss', patience=5, mode='min', verbose=1)
lr_cb = ReduceLROnPlateau(monitor='val_loss', patience=3, factor=0.2, mode='min', verbose=1)

 

자 이제 학습을 시키고 그 결과를 확인하겠습니다.

 

test_enc_labels = pd.factorize(test_labels)[0]

test_dataset = CDSequence(test_filenames, test_enc_labels, batch_size=64, aug=None, shuffle=False, pre_func=preprocess_input)
res = model.evaluate(test_dataset)
print('loss: ', res[0], 'accuracy: ', res[1])

댓글