데이터코드 분석을 할 일이 생겼다.
AI쓰면 금방 만들어주는 코드도 알아야 수정이 가능하다.
코드를 하나씩 뜯어가면서 분석해보겠다.
#운영체제와 상호작용하기 위해 내장 모듈을 불러오는 코드이다.
#파일 및 디렉토리 생성,삭제를 위한 코드
import os
#한번쯤은 들어봤을 OpenCV를 불러오는 명령어이다(영상처리 관련)
import cv2
#랜덤값 라이브러리 사용
import random
#데이터 분석 라이브러리인 pandas를 불러오는 명령어이다.
#데이터프레임 생성, csv파일 불러올때 사용하는 명령어
import pandas as pd
#numpy라이브러리는 대규모 수학 연산을 할 때 사용한다.
#선형대수학,행렬,푸리에변환등 수학적 연산을 사용함
import numpy as np
#PIL은 파이썬 이미지 라이브러리라는 뜻으로
#이미지 파일을 다루기 위해서 사용함
from PIL import Image
#torch는 파이토치로서 딥러닝 설계를 할 때 사용한다.
#torch.nn은 neural network로서 레이어 정의나 모델 구조를 만든다
#optim는 optimizer로서 모델을 학습시키는 방법을 의미한다.
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
#미리 만들어진 CNN모델들을 가져온다.
from torchvision import models, transforms
from torchvision.models import ResNet18_Weights
#데이터를 학습 데이터와 테스트 데이터를 나누는 라이브러리
from sklearn.model_selection import train_test_split
#반복문 진행현황을 프로그래스바로 보여줌
from tqdm.auto import tqdm
# =====================
# 설정
# =====================
CFG = {
'IMG_SIZE': 224,
'EPOCHS': 15,
'LR': 1e-3,
'BATCH_SIZE': 32,
'SEED': 42,
'VIDEO_FRAMES': 5
}
CFG는 딕셔너리로
안에는 딥러닝을 위한 설정값들이 존재한다.
IMG_SIZE 224는 224x224픽셀이미지
EPOCHS는 몇번 학습을 시킬건지
LR은 학습률로 1e-3은 1/10^3 로 0.001
배치사이즈 32는 한번에 얼마나 뭉쳐서 돌릴건지?
SEED는 랜덤값을 돌렸을 때, 일정한 값으로 나오게 한다.
VIDEO_FRAMES은 한 영상당 프레임을 몇개 나눌건지를 의미한다.
def seed_everything(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
seed_everything(CFG['SEED'])
=>실험 결과를 매번 동일하게 나오도록 고정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
=>cuda가 있으면 gpu 아니면 cpu를 사용하도록 설정
참고로 맥북에서는 아래와 같이 사용
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
# =====================
# 데이터 로드
# =====================
train_df = pd.read_csv('./train.csv')
dev_df = pd.read_csv('./dev.csv')
# 🔥 train + dev 통합
full_df = pd.concat([train_df, dev_df]).reset_index(drop=True)
train_df, val_df = train_test_split(
full_df,
test_size=0.1,
stratify=full_df['label'],
random_state=CFG['SEED']
)
test_df = pd.read_csv('./sample_submission.csv')
데이터를 불러와서 학습(train),검증(val),테스트(test) 구조로 만드는 전처리 단계
train과 dev를 합치고 train과 val를 나누는 구조
# =====================
# Transform
# =====================
train_transform = transforms.Compose([
transforms.RandomResizedCrop(CFG['IMG_SIZE'], scale=(0.6,1.0)),
-> 이미지 일부를 잘라서 사용 60~100% 영역만
transforms.RandomHorizontalFlip(), //좌우 뒤집기
transforms.RandomVerticalFlip(), //상하 뒤집기
transforms.RandomRotation(30), //30도 회전
transforms.ColorJitter(0.4,0.4,0.4,0.1), //밝기,대비,채도,색상 변화
transforms.RandomGrayscale(p=0.2), //20% 확률로 흑백 처리
transforms.GaussianBlur(3), //이미지 살짝 흐리게
transforms.ToTensor(), //이미지를 숫자 텐서로 변환
transforms.Normalize([0.485,0.456,0.406],
[0.229,0.224,0.225])
])
test_transform = transforms.Compose([
transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
transforms.ToTensor(),
transforms.Normalize([0.485,0.456,0.406],
[0.229,0.224,0.225])
])
이미지 데이터를 모델에 넣기전에 변경하는 구조
목적 : 데이터 다양하게 만들고, 오버피팅 방지
# =====================
# Dataset
# =====================
class MultiModalDataset(Dataset):
def __init__(self, df, transform=None, is_test=False):
self.df = df
self.transform = transform
self.is_test = is_test
self.label_map = {'stable':0, 'unstable':1}
def __len__(self):
return len(self.df)
-> pytorch에서 데이터를 모델에 넣기 위한 데이터셋 클래스를 정의하는 코드
__init__함수는 MultiModalDataset 클래스를 호출하면 자동 실행됨.
load_video_frames 함수는 영상파일을 불러와서
프레임 여러개 추출하고 이미지 변환,transform한후에
tensor로 묶는다.
def load_video_frames(self, video_path):
frames = []
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))//전체 프레임 개수 확인
if total_frames == 0:
return torch.zeros(CFG['VIDEO_FRAMES'], 3, CFG['IMG_SIZE'], CFG['IMG_SIZE'])
indices = np.linspace(0, total_frames-1, CFG['VIDEO_FRAMES']).astype(int)
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if not ret:
frame = np.zeros((CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3), dtype=np.uint8)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = Image.fromarray(frame)
if self.transform:
frame = self.transform(frame)
frames.append(frame)
cap.release()
return torch.stack(frames)
def __getitem__(self, idx):
row = self.df.iloc[idx]
sample_id = str(row['id'])
# 경로 자동 분기
if os.path.exists(f'./train/{sample_id}'):
root = './train'
elif os.path.exists(f'./dev/{sample_id}'):
root = './dev'
else:
root = './test'
folder = os.path.join(root, sample_id)
# 이미지
views = []
for name in ['front','top']:
img = Image.open(os.path.join(folder,f'{name}.png')).convert("RGB")
if self.transform:
img = self.transform(img)
views.append(img)
views = torch.stack(views)
# 영상 (train/dev만 존재)
video_tensor = None
video_path = os.path.join(folder, 'simulation.mp4')
if os.path.exists(video_path):
video_tensor = self.load_video_frames(video_path)
else:
video_tensor = torch.zeros(CFG['VIDEO_FRAMES'],3,CFG['IMG_SIZE'],CFG['IMG_SIZE'])
if self.is_test:
return views, video_tensor
label = self.label_map[row['label']]
return views, video_tensor, label
# =====================
# DataLoader
# =====================
train_loader = DataLoader(
MultiModalDataset(train_df, train_transform),
batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0
)
val_loader = DataLoader(
MultiModalDataset(val_df, test_transform),
batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0
)
test_loader = DataLoader(
MultiModalDataset(test_df, test_transform, is_test=True),
batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0
)
위코드는 pyTorch에서 데이터를 모델에 넣기 위해 배치 단위로 불러오는 설정.
DataLoader를 통해 데이터를 자동으로 나눠서 학습/검증/테스트에 맞게 공급해줌
DataLoader의 역할은 데이터를 batch_size만큼 묶어서 shuffle여부에 따라 섞음
num_workers은 데이터를 불러오는 프로세스 수이다.
# =====================
# 모델
# =====================
class MultiModalModel(nn.Module):
def __init__(self):
super().__init__()
self.backbone = models.resnet18(weights=ResNet18_Weights.DEFAULT)
self.backbone.fc = nn.Identity()
# video encoder (공유)
self.video_backbone = models.resnet18(weights=ResNet18_Weights.DEFAULT)
self.video_backbone.fc = nn.Identity()
# 🔥 추가
self.lstm = nn.LSTM(512, 256, batch_first=True, bidirectional=True)
self.fc = nn.Sequential(
nn.Linear(512*2 + 512, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.4),
nn.Linear(512,128),
nn.ReLU(),
nn.Linear(128,1)
)
def forward(self, views, video):
b,v,c,h,w = views.shape
views = views.view(-1,c,h,w)
img_feat = self.backbone(views)
img_feat = img_feat.view(b, -1) # concat
# video
vf = video.view(-1,c,h,w)
vf_feat = self.video_backbone(vf)
vf_feat = vf_feat.view(b, CFG['VIDEO_FRAMES'], -1)
vf_feat, _ = self.lstm(vf_feat)
vf_feat = vf_feat.mean(dim=1)
x = torch.cat([img_feat, vf_feat], dim=1)
return self.fc(x)
->이미지,영상을 같이보고 판단하는 멀티모달 모델
model = MultiModalModel().to(device) //모델생성
criterion = nn.BCEWithLogitsLoss()//이진분류용 loss함수
optimizer = optim.Adam(model.parameters(), lr=CFG['LR'])//옵티마이저 adam사용
#scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])
scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
optimizer, T_0=5, T_mult=1
)//학습률을 자동으로 조절하는 전략
scaler = torch.amp.GradScaler("cuda") if device.type == "cuda" else None
# =====================
# 학습
# =====================
best_loss = float('inf')
for epoch in range(CFG['EPOCHS'])://epoch만큼 학습
model.train()//모델 트레인
total_loss = 0
for views, video, labels in tqdm(train_loader): //데이터로딩
views, video, labels = views.to(device), video.to(device), labels.float().to(device)
# 🔥 추가
views, video, labels_a, labels_b, lam = mixup(views, video, labels)
outputs = model(views, video).view(-1)
# 🔥 수정
loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
optimizer.zero_grad()
with torch.amp.autocast(device_type="cuda", enabled=(scaler is not None)):
outputs = model(views, video).view(-1)
loss = criterion(outputs, labels)
if scaler:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
loss.backward()
optimizer.step()
total_loss += loss.item()
# validation
model.eval()
all_probs, all_labels = [], []
with torch.no_grad():
for views, video, labels in val_loader:
views, video = views.to(device), video.to(device)
outputs = model(views, video).view(-1)
probs = torch.sigmoid(outputs)
all_probs.append(probs.cpu())
all_labels.append(labels)
all_probs = torch.cat(all_probs).numpy()
all_labels = torch.cat(all_labels).numpy()
eps = 1e-15
p = np.clip(all_probs, eps, 1-eps)
logloss = -np.mean(all_labels*np.log(p)+(1-all_labels)*np.log(1-p))
print(f"Epoch {epoch+1} | Loss {total_loss/len(train_loader):.4f} | Val LogLoss {logloss:.5f}")
if logloss < best_loss:
best_loss = logloss
torch.save(model.state_dict(), "best_model.pth")
print("✓ saved")
scheduler.step()
'IT' 카테고리의 다른 글
| 백준 촌수 계산 (0) | 2026.03.22 |
|---|---|
| 맥컬록 피츠뉴런 (0) | 2026.03.20 |
| 모수의 점추정 (0) | 2024.05.02 |
| spring boot custom login failed 핸들러+thymeleaf 에러메시지 처리방법 - 세션에 에러메시지 담으면 안되는 이유 (0) | 2023.10.30 |
| 스프링시큐리티 전역메서드 보안 17장 / 사전필터링과 사후필터링 / 스프링시큐리티 oauth2 어플리케이션 18장 (0) | 2023.10.16 |