본문 바로가기

Python/자연어처리

Python(43)- cbow 분류

*이 글을 읽기전에 작성자 개인의견이 있으니, 다른 블로그와 교차로 읽는것을 권장합니다.*

import urllib.request
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from copy import deepcopy
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
# 깃허브에 올라온 파일을 가져오기 위해선, filename = ''설정해줘야 함.
urllib.request.urlretrieve('https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt', filename='ratings_train.txt' )
urllib.request.urlretrieve('https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt', filename='ratings_test.txt' )
train_dataset = pd.read_table('ratings_train.txt')
train_dataset

# pos, neg 비율
train_dataset['label'].value_counts()

sum(train_dataset['document'].isnull())

train_dataset['document'].isnull()

train_dataset = train_dataset[~train_dataset['document'].isnull()]
sum(train_dataset['document'].isnull())

train_dataset


Tokenization

  • 자연어를 모델이 이해하기 위해서는 자연어를 숫자의 형식으로 변형시켜야함
train_dataset['document'].iloc[0].split()

vocab = set()
for doc in train_dataset['document']:
    for token in doc.split():
        vocab.add(token)
        
len(vocab)

# 단어의 빈도수 구하기
'''
[('아', 1204),
('더빙', 2),
('진짜',5929),
('짜증나네요',10),
('목소리',99),
]
'''
vocab_cnt_dict = {}
for doc in train_dataset['document']:
    for token in doc.split():
        if token not in vocab_cnt_dict:
            vocab_cnt_dict[token] = 0
        vocab_cnt_dict[token] += 1
  
vocab_cnt_list = [(token, cnt) for token, cnt in vocab_cnt_dict.items()]
vocab_cnt_list[:10]

top_vocabs = sorted(vocab_cnt_list, key=lambda tup:tup[1], reverse=True)
top_vocabs[:10]

cnts = [cnt for _, cnt in top_vocabs]
np.mean(cnts)

cnts[:10]

sum(np.array(cnts) > 2)

n_vocab = sum(np.array(cnts) > 2)
top_vocabs_truncated = top_vocabs[:n_vocab]

top_vocabs_truncated[:5]

vocabs = [token for token, _ in top_vocabs_truncated]
vocabs[:5]


special token

  • [UNK] : Unknown Token
  • [PAD] : Padding Token
unk_token = '[UNK]'
unk_token in vocabs

pad_token = '[PAD]'
pad_token in vocabs

vocabs.insert(0, unk_token)
vocabs.insert(0, pad_token)

vocabs[:5]

idx_to_token = vocabs
token_to_idx = {token: i for i, token in enumerate(idx_to_token)}
class Tokenizer:
    def __init__(self, vocabs, use_padding=True, max_padding=64, pad_token='[PAD]', unk_token='[UNK]'):
        self.idx_to_token = vocabs
        self.token_to_idx = {token: i for i, token in enumerate(self.idx_to_token)}
        self.use_padding = use_padding
        self.max_padding = max_padding
        self.pad_token = pad_token
        self.unk_token = unk_token
        self.unk_token_idx = self.token_to_idx[self.unk_token]
        self.pad_token_idx = self.token_to_idx[self.pad_token]

    def __call__(self, x:str):
        token_ids = []
        token_list = x.split()

        for token in token_list:
            if token in self.token_to_idx:
                token_idx = self.token_to_idx[token]
            else:
                token_idx = self.unk_token_idx
            token_ids.append(token_idx)

        if self.use_padding:
            token_ids = token_ids[:self.max_padding]
            n_pads = self.max_padding - len(token_ids)
            token_ids = token_ids + [self.pad_token_idx] * n_pads

        return token_ids
tokenizer = Tokenizer(vocabs, use_padding=False)

sample = train_dataset['document'].iloc[0]
print(sample)

tokenizer(sample) # [51, 1, 5, 10485, 1064]

token_length_list = []
for sample in train_dataset['document']:
    token_length_list.append(len(tokenizer(sample)))
    
plt.hist(token_length_list)
plt.xlabel('token length')
plt.ylabel('count')

max(token_length_list)

tokenizer = Tokenizer(vocabs, use_padding=True, max_padding=50)
print(tokenizer(sample)) # 데이터로더, 배치사이즈, 데이터셋 필요

train_valid_dataset = pd.read_table('ratings_train.txt')
test_dataset = pd.read_table('ratings_test.txt')
print(f'train, valid samples: {len(train_valid_dataset)}')
print(f'test samples: {len(test_dataset)}')

train_valid_dataset.head()

train_valid_dataset = train_valid_dataset.sample(frac=1.)
train_valid_dataset.head()

train_ratio = 0.8
n_train = int(len(train_valid_dataset) * train_ratio)
train_df = train_valid_dataset[:n_train]
valid_df = train_valid_dataset[n_train:]
test_df = test_dataset

print(f'train samples: {len(train_df)}')
print(f'valid samples: {len(valid_df)}')
print(f'test samples: {len(test_df)}')

# 1/10으로 샘플링
train_df = train_df.sample(frac=0.8)
valid_df = valid_df.sample(frac=0.8)
test_df = test_df.sample(frac=0.8)

print(f'train samples: {len(train_df)}')
print(f'valid samples: {len(valid_df)}')
print(f'test samples: {len(test_df)}')

class NSMCDataset(Dataset):
    def __init__(self, data_df, tokenizer=None):
        self.data_df = data_df
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data_df)

    def __getitem__(self, idx):
        sample_raw = self.data_df.iloc[idx]
        sample = {}
        sample['doc'] = str(sample_raw['document'])
        sample['label'] = int(sample_raw['label'])
        if self.tokenizer is not None:
            sample['doc_ids'] = self.tokenizer(sample['doc'])
        return sample
train_dataset = NSMCDataset(data_df=train_df, tokenizer=tokenizer)
valid_dataset = NSMCDataset(data_df=valid_df, tokenizer=tokenizer)
test_dataset = NSMCDataset(data_df=test_df, tokenizer=tokenizer)

print(train_dataset[0])

def collate_fn(batch):
    keys = [key for key in batch[0].keys()]
    data = {key: [] for key in keys}
    for item in batch:
        for key in keys:
            data[key].append(item[key])
    return data
train_dataloader = DataLoader(
    train_dataset,
    batch_size=128,
    collate_fn=collate_fn, # 배치사이즈를 어떻게 묶을것인가
    shuffle=True
)

valid_dataloader = DataLoader(
    valid_dataset,
    batch_size=128,
    collate_fn=collate_fn,
    shuffle=False
)

test_dataloader = DataLoader(
    test_dataset,
    batch_size=128,
    collate_fn=collate_fn,
    shuffle=False
)
sample = next(iter(test_dataloader))

sample.keys()   # dict_keys(['doc', 'label', 'doc_ids'])

sample['doc'][3] # 정말 재미지게 오랫동안 보게되는 드라마

print(sample['doc_ids'][3]) # [4, 17366, 2223, 2798, 52, 0, ... 0]


CBOW(Continuous Bag of Words)

  • 자연어 처리에서 단어의 의미를 벡터로 표현하는 Word2Vec 임베딩모델 중 하나
  • 문맥(context) 단어들을 사용해서 타겟(target) 단어를 예측하는 것(주변단어->중심단어)
  • 모델의 원리
    • 문장은 단어의 연속으로 구성
      • 예) The cat sat on the mat
      • 타겟단어 : cat, 주변단어 : (The, sat)
    • 문맥 단어의 수를 결정하는 윈도우 크기를 설정
      • 예) 윈도우 크기 : 2, 타겟 단어의 왼쪽과 오른쪽에서 각각 2개의 단어를 문맥 단어로 사용
    • 모든 단어를 고유한 인덱스로 매핑하고 원 핫 인코딩으로 변환
    • 입력으로 주어진 문맥 단어들을 이용해 타겟 단어를 예측하는 신경망을 학습
      • 예) 일반적으로 입력층, 은닉층, 출력층 3개의 층으로 구성
      • 입력층(Input Layer) : 문맥 단어들(중심단어)의 원 핫 인코딩 벡터를 받음, 입력 벡터의 차원으로, 단어집합의 크기(=어휘 크기)와 동일.
        • ex) 어휘 크기 = 10,000이라면 input_size = 10,000
      • 은닉층(Hidden Layer) : : 입력 벡터들의 평균을 계산하여 은닉층 벡터를 만듦, 입력층과 출력층 사이에 위치하며, 단어 임베딩 학습, 은닉층의 뉴런 수는 임베딩 벡터 크기와 동일
        • ex) 임베딩 벡터 차원 = 300라면 hidden_size = 300
      • 출력층(Output Layer) : 은닉층 벡터를 사용해 타겟 단어(중심단어)를 입력으로 받아 소프트맥스 함수를 통해 중심단어 확률 분포 출력, 중심 단어를 예측하기 위한 출력 벡터 차원, 출력층의 뉴런 수는 입력층과 동일

cbow 모델 학습 과정

1.데이터 준비 : 

  • 문장을 단어로 토큰화, 윈도우 크기(주변단어 수)에 따라 주변-중심 단어 쌍 생성
  • 주변 단어는 원핫 인코딩으로 입력 벡터 변환

특정 위치에만 1이 들어가고 나머지에는 0이 들어가는 형식

 2. 입력층 : 

하나의 중심 단어에 2m개의 단어 벡터를 입력 벡터로 갖는다.
cbow의 파라미터는 입력층->은닉층으로 가는 가중치와 은닉층->출력층으로 가는 가중치 2가지이다.
모델의 학습목적은 주변단어가 제시되었을 때, 중심단어의 조건부 확률을 최대화하는 것이 목적

3. 은닉층 : 

  • 입력 벡터들이 은닉층을 거치면서 임베딩 벡터로 변환
  • 입력 벡터 X 가중치 W 행렬 = 임베딩 벡터, w의 각 행을 각 단어의 임베딩 벡터로 사용
  • 입력 벡터의 평균을 구해서 하나의 임베딩 벡터 제작

원핫인코딩으로 변환된 입력벡터는 가중치 W와 곱하면 해당 단어에 대응되는 가중치 W의 행이 결과로 도출, [0 0 0 1 0]이 입력벡터, 가운데가 가중치 W, 곱해서 나온 결과가 임베딩 벡터(embedded word vector) [10 12 19].
임베딩 벡터 2m개의 평균 = 은닉층(Hidden Layer)

4. 출력층 : 

  • 은닉층의 임베딩 벡터가 출력층으로 전달, 소프트맥스 함수에 의해 중심단어의 확률분포로 변환

은닉층->출력층 파라미터 W'(=U) 곱해서 각 단어에 대한 score 생성
각 단어에 대한 score값들을 softmax함수로 곱해서 확률값 도출

5. 손실함수 및 역전파 : 

  • 출력 확률분포와 실제 중심단어의 원핫인코딩 벡터간의 교차 엔트로피 손실 계산
  • 역전파를 통해 손실 최소화하도록 가중치를 업데이트하여 최적의 output 뽑기

교차엔트로피(cross entropy) 함수
y = 1 일때, i는 예측하고자 하는 단어의 위치, yi와 yi^은 1이 되어 H(y^,y)값은 0
H(y^,y) = minimizeJ 으로 확률분포식으로 표현, 최적화(optimization)방법으로 경사하강법(SGD) 사용
입력벡터 갯수 C개, 임베딩 벡터 갯수 N개, 출력벡터 갯수 V개, 전체 계산량은 총 C X N + N X V 개.

 

class CBOW(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super().__init__()
        self.output_dim = embed_dim
        self.embeddings = nn.Embedding(vocab_size, embed_dim, padding_idx=0)

    def forward(self, x):
        # (batch_size, sequence) -> (batch_size, sequence, embed_dim)
        x_embeded = self.embeddings(x)
        stnc_repr = torch.mean(x_embeded, dim=1) # batch_size * embed_dim
        return stnc_repr
class Classifier(nn.Module):
    def __init__(self, sr_model, output_dim, vocab_size, embed_dim, **kwargs):
        super().__init__()
        self.sr_model = sr_model(vocab_size=vocab_size, embed_dim=embed_dim, **kwargs)
        self.input_dim = self.sr_model.output_dim
        self.output_dim = output_dim
        self.fc = nn.Linear(self.input_dim, self.output_dim)

    def forward(self, x):
        return self.fc(self.sr_model(x))
model = Classifier(sr_model=CBOW, output_dim=2, vocab_size=len(vocabs), embed_dim=16)

model.sr_model.embeddings.weight[0]

use_cuda = True and torch.cuda.is_available()

if use_cuda:
    model.cuda()
    
optimizer = optim.Adam(params=model.parameters(), lr=0.01)
calc_loss = nn.CrossEntropyLoss()
n_epoch = 10
global_i = 0

valid_loss_history = []
train_loss_history = []

best_model = None
best_epoch_i = None
min_valid_loss = 9e+9

for epoch_i in range(n_epoch):
    model.train()

    for batch in train_dataloader:
        optimizer.zero_grad()
        X = torch.tensor(batch['doc_ids'])
        y = torch.tensor(batch['label'])

        if use_cuda:
            X = X.cuda()
            y = y.cuda()

        y_pred = model(X)
        loss = calc_loss(y_pred, y)

        if global_i % 1000 == 0:
            print(f'i: {global_i}, epoch: {epoch_i}, loss: {loss.item()}')

        train_loss_history.append((global_i, loss.item()))

        loss.backward()
        optimizer.step()
        global_i += 1

    model.eval()

    valid_loss_list = []
    for batch in valid_dataloader:
        X = torch.tensor(batch['doc_ids'])
        y = torch.tensor(batch['label'])

        if use_cuda:
            X = X.cuda()
            y = y.cuda()

        y_pred = model(X)
        loss = calc_loss(y_pred, y)
        valid_loss_list.append(loss.item())

    valid_loss_mean = np.mean(valid_loss_list)
    valid_loss_history.append((global_i, valid_loss_mean.item()))

    if valid_loss_mean < min_valid_loss:
        min_valid_loss = valid_loss_mean
        best_epoch_i = epoch_i
        best_model = deepcopy(model)

    if epoch_i % 2 == 0:
        print("*"*30)
        print(f'valid_loss_mean: {valid_loss_mean}')
        print("*"*30)

print(f'best_epoch: {best_epoch_i}')

def calc_moving_average(arr, win_size=100):
    new_arr = []
    win = []

    for i, val in enumerate(arr):
        win.append(val)
        if len(win) > win_size:
            win.pop(0)

        new_arr.append(np.mean(win))
    return np.array(new_arr)
valid_loss_history = np.array(valid_loss_history)
train_loss_history =  np.array(train_loss_history)

plt.figure(figsize=(12,8))
plt.plot(train_loss_history[:,0],
         calc_moving_average(train_loss_history[:,1]), color='blue')

plt.plot(valid_loss_history[:,0],
         valid_loss_history[:,1], color='red')
plt.xlabel("step")
plt.ylabel("loss")

Evaluation

model = best_model

model.eval()

total = 0
correct = 0

for batch in tqdm(test_dataloader,
                  total=len(test_dataloader.dataset)//test_dataloader.batch_size):
    X = torch.tensor(batch['doc_ids'])
    y = torch.tensor(batch['label'])

    if use_cuda:
        X = X.cuda()
        y = y.cuda()

    y_pred = model(X)

    curr_correct = y_pred.argmax(dim=1) == y

    total += len(curr_correct)
    correct += sum(curr_correct)

print(f'test accuracy: {correct/total}')

'Python > 자연어처리' 카테고리의 다른 글

Python(45)- LSTM과 GRU  (0) 2024.07.02
Python(44)- CNN 분류  (0) 2024.07.02
Python(42)- RNN 기초  (0) 2024.06.27
Python(41)- 워드 임베딩 시각화  (0) 2024.06.27
Python(40)- 워드 임베딩  (0) 2024.06.25