IT

파이썬 데이터 코드 분석#1

프로개발러 2026. 3. 17. 23:15
반응형

 

데이터코드 분석을 할 일이 생겼다.

AI쓰면 금방 만들어주는 코드도 알아야 수정이 가능하다.

코드를 하나씩 뜯어가면서 분석해보겠다.

 

#운영체제와 상호작용하기 위해 내장 모듈을 불러오는 코드이다.

#파일 및 디렉토리 생성,삭제를 위한 코드

import os  

 

 

#한번쯤은 들어봤을 OpenCV를 불러오는 명령어이다(영상처리 관련)
import cv2

 

#랜덤값 라이브러리 사용
import random

 

#데이터 분석 라이브러리인 pandas를 불러오는 명령어이다.

#데이터프레임 생성, csv파일 불러올때 사용하는 명령어
import pandas as pd

 

#numpy라이브러리는 대규모 수학 연산을 할 때 사용한다.

#선형대수학,행렬,푸리에변환등 수학적 연산을 사용함
import numpy as np

 

#PIL은 파이썬 이미지 라이브러리라는 뜻으로

#이미지 파일을 다루기 위해서 사용함

from PIL import Image

 

 

#torch는 파이토치로서 딥러닝 설계를 할 때 사용한다.

#torch.nn은 neural network로서 레이어 정의나 모델 구조를 만든다

#optim는 optimizer로서 모델을 학습시키는 방법을 의미한다.
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

 

#미리 만들어진 CNN모델들을 가져온다.
from torchvision import models, transforms
from torchvision.models import ResNet18_Weights

 

#데이터를 학습 데이터와 테스트 데이터를 나누는 라이브러리

from sklearn.model_selection import train_test_split

#반복문 진행현황을 프로그래스바로 보여줌
from tqdm.auto import tqdm

# =====================
# 설정
# =====================
CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 15,
    'LR': 1e-3,
    'BATCH_SIZE': 32,
    'SEED': 42,
    'VIDEO_FRAMES': 5
}

 

 

CFG는 딕셔너리로

안에는 딥러닝을 위한 설정값들이 존재한다.

 

IMG_SIZE 224는 224x224픽셀이미지

EPOCHS는 몇번 학습을 시킬건지

LR은 학습률로 1e-3은 1/10^3 로 0.001

배치사이즈 32는 한번에 얼마나 뭉쳐서 돌릴건지?

SEED는 랜덤값을 돌렸을 때, 일정한 값으로 나오게 한다.

VIDEO_FRAMES은 한 영상당 프레임을 몇개 나눌건지를 의미한다.

 

 

 

 

 

 

def seed_everything(seed):

    random.seed(seed)

    np.random.seed(seed)

    torch.manual_seed(seed)

    torch.cuda.manual_seed(seed)

    torch.backends.cudnn.deterministic = True

 

seed_everything(CFG['SEED'])

 

=>실험 결과를 매번 동일하게 나오도록 고정

 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

=>cuda가 있으면 gpu 아니면 cpu를 사용하도록 설정

 

참고로 맥북에서는 아래와 같이 사용

if torch.cuda.is_available():

    device = torch.device("cuda")

elif torch.backends.mps.is_available():

    device = torch.device("mps")

else:

    device = torch.device("cpu")

    

 

 

 

 

# =====================

# 데이터 로드

# =====================

train_df = pd.read_csv('./train.csv')

dev_df = pd.read_csv('./dev.csv')

 

# 🔥 train + dev 통합

full_df = pd.concat([train_df, dev_df]).reset_index(drop=True)

 

train_df, val_df = train_test_split(

    full_df,

    test_size=0.1,

    stratify=full_df['label'],

    random_state=CFG['SEED']

)

 

test_df = pd.read_csv('./sample_submission.csv')

 

데이터를 불러와서 학습(train),검증(val),테스트(test) 구조로 만드는 전처리 단계

 

 

train과 dev를 합치고 train과 val를 나누는 구조

 

 

 

 

 

 

# =====================

# Transform

# =====================

train_transform = transforms.Compose([

    transforms.RandomResizedCrop(CFG['IMG_SIZE'], scale=(0.6,1.0)),

-> 이미지 일부를 잘라서 사용 60~100% 영역만

    transforms.RandomHorizontalFlip(),  //좌우 뒤집기

    transforms.RandomVerticalFlip(),   //상하 뒤집기

    transforms.RandomRotation(30), //30도 회전

 

    transforms.ColorJitter(0.4,0.4,0.4,0.1),  //밝기,대비,채도,색상 변화

    transforms.RandomGrayscale(p=0.2),  //20% 확률로 흑백 처리

    transforms.GaussianBlur(3),   //이미지 살짝 흐리게 

 

    transforms.ToTensor(),   //이미지를 숫자 텐서로 변환

    transforms.Normalize([0.485,0.456,0.406],

                         [0.229,0.224,0.225])    

])

 

test_transform = transforms.Compose([

    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),

    transforms.ToTensor(),

    transforms.Normalize([0.485,0.456,0.406],

                         [0.229,0.224,0.225])

])

 

이미지 데이터를 모델에 넣기전에 변경하는 구조

목적 : 데이터 다양하게 만들고, 오버피팅 방지 

 

 

 

 

 

 

 

# =====================

# Dataset

# =====================

class MultiModalDataset(Dataset):

    def __init__(self, df, transform=None, is_test=False):

        self.df = df

        self.transform = transform

        self.is_test = is_test

        self.label_map = {'stable':0, 'unstable':1}

 

    def __len__(self):

        return len(self.df)

 

-> pytorch에서 데이터를 모델에 넣기 위한 데이터셋 클래스를 정의하는 코드

__init__함수는 MultiModalDataset 클래스를 호출하면 자동 실행됨.

 

 

 

load_video_frames 함수는 영상파일을 불러와서

프레임 여러개 추출하고 이미지 변환,transform한후에

tensor로 묶는다.

 

    def load_video_frames(self, video_path):

        frames = []

        cap = cv2.VideoCapture(video_path)

 

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))//전체 프레임 개수 확인

 

        if total_frames == 0:

            return torch.zeros(CFG['VIDEO_FRAMES'], 3, CFG['IMG_SIZE'], CFG['IMG_SIZE'])

 

        indices = np.linspace(0, total_frames-1, CFG['VIDEO_FRAMES']).astype(int)

 

        for idx in indices:

            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)

            ret, frame = cap.read()

 

            if not ret:

                frame = np.zeros((CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3), dtype=np.uint8)

 

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            frame = Image.fromarray(frame)

 

            if self.transform:

                frame = self.transform(frame)

 

            frames.append(frame)

 

        cap.release()

 

        return torch.stack(frames)

 

    def __getitem__(self, idx):

        row = self.df.iloc[idx]

        sample_id = str(row['id'])

 

        # 경로 자동 분기

        if os.path.exists(f'./train/{sample_id}'):

            root = './train'

        elif os.path.exists(f'./dev/{sample_id}'):

            root = './dev'

        else:

            root = './test'

 

        folder = os.path.join(root, sample_id)

 

        # 이미지

        views = []

        for name in ['front','top']:

            img = Image.open(os.path.join(folder,f'{name}.png')).convert("RGB")

            if self.transform:

                img = self.transform(img)

            views.append(img)

 

        views = torch.stack(views)

 

        # 영상 (train/dev만 존재)

        video_tensor = None

        video_path = os.path.join(folder, 'simulation.mp4')

 

        if os.path.exists(video_path):

            video_tensor = self.load_video_frames(video_path)

        else:

            video_tensor = torch.zeros(CFG['VIDEO_FRAMES'],3,CFG['IMG_SIZE'],CFG['IMG_SIZE'])

 

        if self.is_test:

            return views, video_tensor

 

        label = self.label_map[row['label']]

        return views, video_tensor, label

 

 

 

 

 

 

 

 

 

# =====================

# DataLoader

# =====================

train_loader = DataLoader(

    MultiModalDataset(train_df, train_transform),

    batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0

)

 

val_loader = DataLoader(

    MultiModalDataset(val_df, test_transform),

    batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0

)

 

test_loader = DataLoader(

    MultiModalDataset(test_df, test_transform, is_test=True),

    batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0

)

 

 

위코드는 pyTorch에서 데이터를 모델에 넣기 위해 배치 단위로 불러오는 설정.

DataLoader를 통해 데이터를 자동으로 나눠서 학습/검증/테스트에 맞게 공급해줌

 

 

DataLoader의 역할은 데이터를 batch_size만큼 묶어서 shuffle여부에 따라 섞음

num_workers은 데이터를 불러오는 프로세스 수이다.

 

 

 

 

 

 

 

# =====================

# 모델

# =====================

class MultiModalModel(nn.Module):

    def __init__(self):

        super().__init__()

 

        self.backbone = models.resnet18(weights=ResNet18_Weights.DEFAULT)

        self.backbone.fc = nn.Identity()

 

        # video encoder (공유)

        self.video_backbone = models.resnet18(weights=ResNet18_Weights.DEFAULT)

        self.video_backbone.fc = nn.Identity()

 

        # 🔥 추가

        self.lstm = nn.LSTM(512, 256, batch_first=True, bidirectional=True)

 

        self.fc = nn.Sequential(

            nn.Linear(512*2 + 512, 512),

            nn.BatchNorm1d(512),

            nn.ReLU(),

            nn.Dropout(0.4),

 

            nn.Linear(512,128),

            nn.ReLU(),

 

            nn.Linear(128,1)

        )

 

    def forward(self, views, video):

 

        b,v,c,h,w = views.shape

 

        views = views.view(-1,c,h,w)

        img_feat = self.backbone(views)

        img_feat = img_feat.view(b, -1)  # concat

 

        # video

        vf = video.view(-1,c,h,w)

        vf_feat = self.video_backbone(vf)

        vf_feat = vf_feat.view(b, CFG['VIDEO_FRAMES'], -1)

        

        vf_feat, _ = self.lstm(vf_feat)

        

        vf_feat = vf_feat.mean(dim=1)

 

        x = torch.cat([img_feat, vf_feat], dim=1)

 

        return self.fc(x)

 

->이미지,영상을 같이보고 판단하는 멀티모달 모델

 

 

model = MultiModalModel().to(device) //모델생성

 

criterion = nn.BCEWithLogitsLoss()//이진분류용 loss함수

optimizer = optim.Adam(model.parameters(), lr=CFG['LR'])//옵티마이저 adam사용

#scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(

    optimizer, T_0=5, T_mult=1

)//학습률을 자동으로 조절하는 전략

 

scaler = torch.amp.GradScaler("cuda") if device.type == "cuda" else None

 

 

 

 

 

# =====================

# 학습

# =====================

best_loss = float('inf')

 

for epoch in range(CFG['EPOCHS'])://epoch만큼 학습

 

    model.train()//모델 트레인

    total_loss = 0

 

    for views, video, labels in tqdm(train_loader): //데이터로딩

        views, video, labels = views.to(device), video.to(device), labels.float().to(device)

 

        

        # 🔥 추가

        views, video, labels_a, labels_b, lam = mixup(views, video, labels)

        outputs = model(views, video).view(-1)

 

        # 🔥 수정

        loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)

 

 

        optimizer.zero_grad()

 

        with torch.amp.autocast(device_type="cuda", enabled=(scaler is not None)):

            outputs = model(views, video).view(-1)

            loss = criterion(outputs, labels)

 

        if scaler:

            scaler.scale(loss).backward()

            scaler.step(optimizer)

            scaler.update()

        else:

            loss.backward()

            optimizer.step()

 

        total_loss += loss.item()

 

    # validation

    model.eval()

    all_probs, all_labels = [], []

 

    with torch.no_grad():

        for views, video, labels in val_loader:

            views, video = views.to(device), video.to(device)

 

            outputs = model(views, video).view(-1)

            probs = torch.sigmoid(outputs)

 

            all_probs.append(probs.cpu())

            all_labels.append(labels)

 

    all_probs = torch.cat(all_probs).numpy()

    all_labels = torch.cat(all_labels).numpy()

 

    eps = 1e-15

    p = np.clip(all_probs, eps, 1-eps)

    logloss = -np.mean(all_labels*np.log(p)+(1-all_labels)*np.log(1-p))

 

    print(f"Epoch {epoch+1} | Loss {total_loss/len(train_loader):.4f} | Val LogLoss {logloss:.5f}")

 

    if logloss < best_loss:

        best_loss = logloss

        torch.save(model.state_dict(), "best_model.pth")

        print("✓ saved")

 

    scheduler.step()

 

 

반응형