Python/자연어처리
Python(46)- 문장 임베딩
두설날
2024. 7. 5. 15:06
*이 글을 읽기전에 작성자 개인의견이 있으니, 다른 블로그와 교차로 읽는것을 권장합니다.*
1. 문장 임베딩
- 2017년 이전의 임베딩 기법들은 대부분 단어 수준의 모델
- 단어 수준 임베딩 기법은 자연어의 특성인 모호성, 동음이의어를 구분하기 어렵다는 한계가 있음
- 2017년 이후에는 ELMo(Embeddings from Language Models)와 같은 모델이 발표되고 트랜스포머와 같은 언어 모델에서 문장 수준의 언어 모델링을 고려하면서 한계점들이 해결됨
2. seq2seq 배경
- seq2seq 모델이 등장하기 전에 DNN(Deep Neural Network) 모델은 사물인식, 음성인식 등에서 꾸준히 성과를 내고 있었음(예 : CNN, RNN, LSTM, GRU ...)
- 단점 : 모델 입/출력의 크기가 고정된다는 한계점 존재했기 때문에 자연어처리와 같은 가변적인 길이의 입/출력을 처리하는 문제들을 제대로 해결할 수 없었음
- RNN은 seq2seq가 등장하기 전에 입/출력을 시퀀스 단위로 처리할 수 있는 모델
2-1. seq2seq(Sequence To Sequence)란?
- 2014년 구글에서 논문으로 제안한 모델
- LSTM 또는 GRU기반의 구조를 가지고 고정된 길이의 단어 시퀀스를 입력받아 입력 시퀀스에 알맞은 길이의 시퀀스를 출력해주는 언어 모델
- 2개의 LSTM을 각각 Encoder와 Decoder로 사용해서 가변적인 길이의 입출력을 처리하고자 했음
- 기계 번역 작업에서 큰 성능 향상을 가져왔고 특히 긴 문장을 처리하는데 강점이 있음
- 위키독스 14-1 참고
2-2. 인코더
- 입력 문장을 컨텍스트 벡터에 압축하는 역할
- 인코더의 LSTM은 입력 문장을 단어 순서대로 처리하여 고정된 크기의 컨텍스트 벡터를 반환
- 컨텍스트 벡터는 인코더의 마지막 스텝에서 출력된 hidden state와 같음
- 컨텍스트 벡터는 입력 문장의 정보를 함축하는 벡터이므로 해당 벡터를 입력 문장에 대한 수준의 벡터로 활용할 수 있음
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size)
self.gru = nn.GRU(input_size, hidden_size)
def forward(self, input):
embedded = self.embedding(input).view(1, 1, -1)
output.hidden = self.gru(embedded)
return output, hidden
class Decoder(nn.Module):
def __init__(self, hidden_size, output_size):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.embedding = nn.Embedding(output_size, hidden_size)
self.gru = nn.GRU(hidden_size, output_size)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, input, hidden):
output = self.embedding(input).view(1, 1, -1)
output = F.relu(output)
output, hidden = self.gru(output, hidden)
output = self.out(output[0])
return output, hidden
2-3. 디코더
- 입력 문장의 정보가 압축된 컨텍스트 벡터를 사용하여 출력 문장을 디코딩하는 역할
- 컨텍스트 벡터와 문장의 시작을 뜻하는 토큰을 입력으로 받아서 문장의 끝을 뜻하는 토큰이 나올 때까지 문장을 생성
- LSTM의 첫 셀에서는 토큰과 컨텍스트 벡터를 입력받아서 그 다음에 등장할 확률이 가장 높은 단어를 예측하고 다음 스텝에서 예측한 단어를 입력으로 받아 그 다음에 등장할 확률이 가장 높은 단어를 예측하는 형태로 계속 진행
2-4. 학습과정과 한계점
- 모델 학습 과정에서는 이전 셀에서 예측한 단어를 다음 셀의 압력으로 넣어주는 대신 실제 정답 단어를 다음 셀의 입력으로 넣기도 함(교사 강요)
- 위 방법으로 학습도지 않으면 이전 셀에서의 오류가 다음 셀로 계속 전파될 것이기 때문에 학습이 제대로 되지 않고 오래 걸릴 수 있음
- 가변적인 길이의 입/출력을 처리하는데 효과적인 모델 구조이며, 실제로 기계번역 작업에서 성능향상을 거뒀으나 여전히 한계를 가짐
- 인코더가 출력하는 벡터 사이즈가 고정되어 있기 때문에, 입력으로 들어오는 단어의 수가 매우 많아지게 되면 성능이 떨어짐
- RNN 구조의 모델에서는 hidden state를 통해 이전 셀의 정보를 다음 셀로 계속 전달하게 되는데 문장의 길이가 길어지면 초기 셀에서 전달됐던 정보들이 점차 흐려짐(LSTM, GRU 같은 모델들이 제안되긴 했으나 여전히 이전 정보를 계속 압축하는데 한계가 있음)
위키독스 14-02
import os
import re
import shutil
import zipfile
import requests
import numpy as np
import pandas as pd
import unicodedata
import urllib3
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.nn.functional as F
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from torch.utils.data import DataLoader, TensorDataset
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def download_zip(url, output_path):
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"ZIP file downloaded to {output_path}")
else:
print(f"Failed to download. HTTP Response Code: {response.status_code}")
url = "http://www.manythings.org/anki/fra-eng.zip"
output_path = "fra-eng.zip"
download_zip(url, output_path)
path = os.getcwd()
zipfilename = os.path.join(path, output_path)
with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
zip_ref.extractall(path)
num_samples = 33000
def to_ascii(s):
# 프랑스어 악센트(accent) 삭제
# 예시 : 'déjà diné' -> deja dine
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
def preprocess_sentence(sent):
# 악센트 제거 함수 호출
sent = to_ascii(sent.lower())
# 단어와 구두점 사이에 공백 추가.
# ex) "I am a student." => "I am a student ."
sent = re.sub(r"([?.!,¿])", r" \1", sent)
# (a-z, A-Z, ".", "?", "!", ",") 이들을 제외하고는 전부 공백으로 변환.
sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent)
# 다수 개의 공백을 하나의 공백으로 치환
sent = re.sub(r"\s+", " ", sent)
return sent
# 전처리 테스트
en_sent = u"Have you had dinner?"
fr_sent = u"Avez-vous déjà diné?"
print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :',preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장 :', fr_sent)
print('전처리 후 프랑스어 문장 :', preprocess_sentence(fr_sent))
def load_preprocessed_data():
encoder_input, decoder_input, decoder_target = [], [], []
with open("fra.txt", "r") as lines:
for i, line in enumerate(lines):
# source 데이터와 target 데이터 분리
src_line, tar_line, _ = line.strip().split('\t')
# source 데이터 전처리
src_line = [w for w in preprocess_sentence(src_line).split()]
# target 데이터 전처리
tar_line = preprocess_sentence(tar_line)
tar_line_in = [w for w in ("<sos> " + tar_line).split()]
tar_line_out = [w for w in (tar_line + " <eos>").split()]
encoder_input.append(src_line)
decoder_input.append(tar_line_in)
decoder_target.append(tar_line_out)
if i == num_samples - 1:
break
return encoder_input, decoder_input, decoder_target
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super().__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, tag):
encoder_outputs, hidden =self.encoder(src)
decoder_outputs, _ = self.decoder(trg, hidden)
return decoder_outputs
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
print('인코더의 입력 :',sents_en_in[:5])
print('디코더의 입력 :',sents_fra_in[:5])
print('디코더의 레이블 :',sents_fra_out[:5])
tokenizer_en = Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en_in)
encoder_input = tokenizer_en.texts_to_sequences(sents_en_in)
encoder_input = pad_sequences(encoder_input, padding="post")
tokenizer_fra = Tokenizer(filters="", lower=False)
tokenizer_fra.fit_on_texts(sents_fra_in)
tokenizer_fra.fit_on_texts(sents_fra_out)
decoder_input = tokenizer_fra.texts_to_sequences(sents_fra_in)
decoder_input = pad_sequences(decoder_input, padding="post")
decoder_target = tokenizer_fra.texts_to_sequences(sents_fra_out)
decoder_target = pad_sequences(decoder_target, padding="post")
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)
src_vocab_size = len(tokenizer_en.word_index) + 1
tar_vocab_size = len(tokenizer_fra.word_index) + 1
print("영어 단어 집합의 크기 : {:d}, 프랑스어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))
src_to_index = tokenizer_en.word_index
index_to_src = tokenizer_en.index_word
tar_to_index = tokenizer_fra.word_index
index_to_tar = tokenizer_fra.index_word
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('랜덤 시퀀스 :',indices)
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]
encoder_input[30997]
decoder_input[30997]
decoder_target[30997]
n_of_val = int(33000*0.1)
print('검증 데이터의 개수 :',n_of_val)
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]
encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)
embedding_dim = 64
hidden_units = 64
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
def forward(self, input):
embedded = self.embedding(input)
output, hidden = self.gru(embedded)
return output, hidden
class Decoder(nn.Module):
def __init__(self, hidden_size, output_size):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.embedding = nn.Embedding(output_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, input, hidden):
embedded = self.embedding(input)
embedded = F.relu(embedded)
output, hidden = self.gru(embedded, hidden)
output = self.out(output)
return output, hidden
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg):
encoder_outputs, hidden = self.encoder(src)
decoder_outputs, _ = self.decoder(trg, hidden)
return decoder_outputs
encoder = Encoder(src_vocab_size, hidden_units)
decoder = Decoder(hidden_units, tar_vocab_size)
model = Seq2Seq(encoder, decoder)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
encoder_input_train_tensor = torch.tensor(encoder_input_train, dtype=torch.long)
decoder_input_train_tensor = torch.tensor(decoder_input_train, dtype=torch.long)
decoder_target_train_tensor = torch.tensor(decoder_target_train, dtype=torch.long)
encoder_input_test_tensor = torch.tensor(encoder_input_test, dtype=torch.long)
decoder_input_test_tensor = torch.tensor(decoder_input_test, dtype=torch.long)
decoder_target_test_tensor = torch.tensor(decoder_target_test, dtype=torch.long)
train_dataset = TensorDataset(encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor)
test_dataset = TensorDataset(encoder_input_test_tensor, decoder_input_test_tensor, decoder_target_test_tensor)
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
def train_model(model, optimizer, loss_func, train_loader, epochs):
model.train()
for epoch in range(epochs):
total_loss = 0
for encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor in train_loader:
optimizer.zero_grad()
outputs = model(encoder_input_train_tensor, decoder_input_train_tensor)
loss = loss_func(outputs.view(-1, outputs.shape[-1]), decoder_target_train_tensor.view(-1))
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}')
# 학습 시작
train_model(model, optimizer, loss_func, train_loader, epochs=10)
# 디코드 시퀀스 함수
def decode_sequence(input_seq):
input_tensor = torch.tensor(input_seq, dtype=torch.long)
with torch.no_grad():
encoder_output, hidden = model.encoder(input_tensor)
target_seq = torch.tensor([[tar_to_index['<sos>']]], dtype=torch.long)
decoded_sentence = ''
stop_condition = False
while not stop_condition:
with torch.no_grad():
output, hidden = model.decoder(target_seq, hidden)
sampled_token_index = output.argmax(2).item()
sampled_char = index_to_tar[sampled_token_index]
if sampled_char != '<eos>':
decoded_sentence += ' ' + sampled_char
if sampled_char == '<eos>' or len(decoded_sentence.split()) > 100:
stop_condition = True
target_seq = torch.tensor([[sampled_token_index]], dtype=torch.long)
return decoded_sentence
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
sentence = ''
for encoded_word in input_seq:
if encoded_word != 0:
sentence += index_to_src.get(encoded_word, '') + ' '
return sentence.strip()
# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
sentence = ''
for encoded_word in input_seq:
if encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']:
sentence += index_to_tar.get(encoded_word, '') + ' '
return sentence.strip()
# 테스트
for seq_index in [0, 1]:
input_seq = encoder_input_train[seq_index: seq_index + 1]
decoded_sentence = decode_sequence(input_seq)
print("입력문장 :", seq_to_src(encoder_input_train[seq_index]))
print("정답문장 :", seq_to_tar(decoder_input_train[seq_index]))
print("번역문장 :", decoded_sentence.strip())
print("-" * 50)
3. 어텐션 매커니즘
- seq2seq 모델의 한계를 해결하기 위해 제안한 논문에서 발표
- 어텐션이라는 단어가 쓰이지 않았지만, 어텐션 개념을 제공한 연구논몬 (과제 참고) [논문]https://arxiv.org/abs/1409.0473
- 어텐션 단어를 사용한 모델에 대한 [논문](https://arxiv.org/abs/1508.04025)
- 입력 시퀀스가 길어지면 출력 시퀀스의 정확도가 떨어지는 것을 보정해주기 위해 등장한 기법
- = RNN에서 입력시퀀스 정보손실 보정용도로 어텐션 활용
- 어텐션의 아이디어는 디코더에서 출력 단어를 예측하는 매 시점(time step)마다 인코더에서의 전체 입력 문장을 다시 한 번 참고하는 것
- 단, 전체 입력 문장을 전부 다 동일한 비율로 참고하는 것이 아니라 해당 시점에서 예측해야 할 단어와 연관이 있는 입력 단어 부분을 조금 더 집중해서 보게 됨
- 위키독스 15-1 참고 https://wikidocs.net/22893
3-1. 어텐션 함수
- 어텐션 함수는 주어진 쿼리에 대하여 모든 키와의 유사도를 각각 계산
- 계산된 유사도를 키와 맵핑되어 있는 각각의 값에 반영한 뒤 유사도와 반영된 값을 모두 더해서 반환(어텐션 값)
- Q(Query) : t시점의 디코더 셀에서의 은닉 상태
- K(Keys) : 모든 시점의 인코더 셀의 은닉 상태들
- V(Values) : 모든 시점의 인코더 셀의 값
3-2. 어텐션 작동 원리
- 예측하고자 하는 단어를 위해 입력 단어들의 정보를 다시 참고
- 어텐션 스코어를 구하는데 사용하는 수식은 다양하게 있으나 가장 간단한 dot product를 사용하는 것이 일반적
- 단어들의 정보를 참고하여 나온 확률 중 가장 큰 값을 예측하고자 하는 단어를 위해 사용함
4. ELMo(Embeddings from Language Model)
- 2018년 논문에서 제안된 새로운 워드 임베딩 방법론(문맥을 반영한 워드 임베딩)
- 엘모의 가장 큰 특징은 사전 훈련된 언어 모델을 사용한다는 것
- 기존 워드 임베딩은 주변 문맥 정보를 활용하여 단어를 벡터로 표현하는 방법을 사용(같은 표기 단어를 문맥에 따라 다르게 임베딩 할 수 없는 한계가 있었음, 뜻 표기 오류, 동음이의어 표기 어려움)
- biLM라는 구조를 사용(양방향의 언어 모델링을 통해 문맥적인 표현을 반영하여 해당 입력 문장의 확률을 예측)
- ELMo에서 말하는 biLM은 기본적으로 다층구조(Multi-layer)를 전제 = 은닉층이 최소 2개 이상
- 입력이 되는 단어벡터는 임베딩 층이 아닌, 합성곱 신경망을 이용한 문자 임베딩에서 얻음
- 양방향 RNN과 ELMo에서의 biLMd은 다름
- 양방향 RNN : 순방향과 역방향 은닉 상태를 다음 레이어(층)의 입력으로 사용
- biLM : 순방향과 역방향 2개의 언어모델을 별개로 학습
- 대량의 자연어 코퍼스를 미리 학습하여 코퍼스 안에 포함된 일반화된 언어 특성들을 모델의 파라미터 안에 함축하여 사용하는 방법
- 위키독스 09-09 참고 https://wikidocs.net/33930
4-1. 허깅페이스(Hugging Face)
- 인공지능 자연어 처리 기술을 중심으로 한 오픈소스 커뮤니티와 소프트웨어 플랫폼을 제공하는 사이트(회사)
- 특히 트랜스포머 모델들을 쉽게 사용할 수 있도록 하는 라이브러리(Transformers)로 유명
- 플랫폼과 라이브러리 등은 개발자와 AI 기업들에게 쉽게 학습시키고 배포할 수 있도록 도움
- [허깅페이스](https://huggingface.co/)
4-2. ELMo biLM 임베딩과정
1) 각 층의 출력밧을 연결
2) 각 층의 출력값 별로 가중치를 준다
3) 각 층의 출력값을 모두 더한다
4) 벡터의 크기를 결정하는 스칼라 매개변수를 곱한다
import json
import requests
API_TOKEN = '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'
headers = {"Authorization": f"Bearer {API_TOKEN}"}
API_URL = "https://api-inference.huggingface.co/models/deepset/roberta-base-squad2"
def query(data):
data = json.dumps(data)
response = requests.request("POST", API_URL, headers=headers, data=data)
return json.loads(response.content.decode("utf-8"))
data = query(
{
"inputs": {
"question": "너의 이름이 뭐니?",
"context": "나는 서울에 살고 있고 내 이름은 김사과야"
}
}
)
print(data)
data = query(
{
"inputs": {
"question": "What's my name?",
"context": "My name is Kim and I live in Seoul"
}
}
)
print(data)
5. 트랜스포머(Transformer)
- 2017년 구글이 발표한 [논문("Attention Is All You Need")](https://arxiv.org/abs/1706.03762)에서 발표된 모델
- seq2seq의 구조인 인코더-디코더를 따르면서도 어텐션(Attention)만으로 구현된 모델
- RNN을 사용하지 않고 인코더-디코더 구조를 설계하였음에도 번역 성능에서 RNN보다 월등히 우수한 성능을 보였으며 2017년 이후 지금까지 다양한 분야에서 사용되는 모델
- 위키독스 16-01 참고 https://wikidocs.net/31379
5-1. 트랜스포머의 특징
- RNN을 사용하지 않지만 seq2seq 모델의 구조처럼 인코더에서 입력 시퀀스를 입력받고, 디코더에서 출력 시퀀스를 출력하는 인코더-디코더 형식을 사용함.
- seq2seq 모델 구조에서는 이코더와 디코더를 각각 하나의 RNN모델처럼 사용했다면, 트랜스포머에서는 인코더와 디코더 단위를 N개로 확장하는 구조를 사용(논문에서는 6개씩 사용)
- seq2seq에서 입력시퀀스를 하나의 벡터로 압축할 때, 정보 손실 단점 -> 어텐션 보완
5-2. 포지셔널 인코딩
- 트랜스포머는 다넝의 위치 정보를 얻기 위해 각 단어의 임베딩 벡터의 위치정보들을 더하여 모델의 위력으로 사용
- 위치 정보를 가진 값을 만들기 위해 sin, cos함수를 사용. (pos, i) 형태로 임베딩 벡터의 위치를 나타냄(i는 임베딩 벡터 내의 차원의 인덱스를 의미)
- 임베딩 벡터 내의 차원의 인덱스가 짝수인 경우에는 sin함수의 값을 사용, 홀수 연구 경우에는 cos함수의 값을 사용
- 각 임베딩 벡터에 포지셔널 인코딩의 값을 더하면 같은 단어라도 하더라도 문장 내의 위치에 따라 트랜스포머의 입력으로 들어가는 임베딩 벡터의 값이 달라짐
5-3. 트랜스포머 셀프 어텐션
- 어텐션을 스스로 수행한다는 의미
- 하나의 문장 내에서 단어 간의 관계를 파악하기 위해 사용되는 어텐션 매커니즘(seq2seq와 동일)
5-4. 멀티헤드 어텐션
- 어텐션에서는 d_model의 차원을 가진 단어 벡터를 num_heads로 나눈 차원으로 어텐션을 수행
- 트랜스포머 연구진은 한 번의 어텐션을 하는 것보다 여러 번의 어텐션을 병렬로 사용하는 것이 더 효과적이라고 판단
- 병렬 어텐션을 모두 수행항면 모든 어텐션 헤드를 연결하여 모두 연결된 어텐션 헤드 행렬 크기가 (seq_len, d_model)이 됨
5-5. Position-wise FFNN(Feed Forward Neural Network)
- 일반적인 deep neural network의 feed forward 신경망
- 각각의 학습 노드가 서로 완전하게 연결된 Fully-Connected NN이라고 해석할 수 있음
5-6. 잔차 연결과 레이어 정규화
- 입력과 출력은 FFNN을 지나기 때문에, 동일한 차원을 가지므로 덧셈이 가능한 형태가 됨
- 잔차 연결을 거친 결과에 layer normalization 과정을 가짐
- 수식으로 구현된 인코더는 총 num_layers만큼을 순차적으로 처리한 후에 마지막층의 인코더의 출력을 디코더로 전달하면서 디코더 연산이 시작
5-7. 디코더(Decoder)
- 디코더도 인코더와 동일하게 임베딩 층과 포지셔널 인코딩을 거친 후에 문장
- 학습시 교사강요 기법을 사용하여 학습되므로 학습 과정에서는 디코더는 정답 문장에 해당하는 문장 행렬을 한번에 입력
- Look-ahead mask기법을 사용하여, 현 시점의 정답이 아니라 이후에 나올 정답 단어들까지 참조하지 않도록 함
- 위키독스 16-02 참고
# -*- coding: utf-8 -*-
"""Transformer_Korean_Chatbot.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1p1Ca20Dd61oDkI9YyLMuDZbr-DK1Jd0A
"""
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
tf.__version__
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__()
self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, position, i, d_model):
angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
return position * angles
def positional_encoding(self, position, d_model):
angle_rads = self.get_angles(
position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
d_model=d_model)
sines = tf.math.sin(angle_rads[:, 0::2])
cosines = tf.math.cos(angle_rads[:, 1::2])
angle_rads = np.zeros(angle_rads.shape)
angle_rads[:, 0::2] = sines
angle_rads[:, 1::2] = cosines
pos_encoding = tf.constant(angle_rads)
pos_encoding = pos_encoding[tf.newaxis, ...]
print(pos_encoding.shape)
return tf.cast(pos_encoding, tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
def scaled_dot_product_attention(query, key, value, mask):
# query 크기 : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# key 크기 : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
# value 크기 : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
# padding_mask : (batch_size, 1, 1, key의 문장 길이)
# Q와 K의 곱. 어텐션 스코어 행렬.
matmul_qk = tf.matmul(query, key, transpose_b=True)
# 스케일링
# dk의 루트값으로 나눠준다.
depth = tf.cast(tf.shape(key)[-1], tf.float32)
logits = matmul_qk / tf.math.sqrt(depth)
# 마스킹. 어텐션 스코어 행렬의 마스킹 할 위치에 매우 작은 음수값을 넣는다.
# 매우 작은 값이므로 소프트맥스 함수를 지나면 행렬의 해당 위치의 값은 0이 된다.
if mask is not None:
logits += (mask * -1e9)
# 소프트맥스 함수는 마지막 차원인 key의 문장 길이 방향으로 수행된다.
# attention weight : (batch_size, num_heads, query의 문장 길이, key의 문장 길이)
attention_weights = tf.nn.softmax(logits, axis=-1)
# output : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
output = tf.matmul(attention_weights, value)
return output, attention_weights
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, name="multi_head_attention"):
super(MultiHeadAttention, self).__init__(name=name)
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0
# d_model을 num_heads로 나눈 값.
# 논문 기준 : 64
self.depth = d_model // self.num_heads
# WQ, WK, WV에 해당하는 밀집층 정의
self.query_dense = tf.keras.layers.Dense(units=d_model)
self.key_dense = tf.keras.layers.Dense(units=d_model)
self.value_dense = tf.keras.layers.Dense(units=d_model)
# WO에 해당하는 밀집층 정의
self.dense = tf.keras.layers.Dense(units=d_model)
# num_heads 개수만큼 q, k, v를 split하는 함수
def split_heads(self, inputs, batch_size):
inputs = tf.reshape(
inputs, shape=(batch_size, -1, self.num_heads, self.depth))
return tf.transpose(inputs, perm=[0, 2, 1, 3])
def call(self, inputs):
query, key, value, mask = inputs['query'], inputs['key'], inputs[
'value'], inputs['mask']
batch_size = tf.shape(query)[0]
# 1. WQ, WK, WV에 해당하는 밀집층 지나기
# q : (batch_size, query의 문장 길이, d_model)
# k : (batch_size, key의 문장 길이, d_model)
# v : (batch_size, value의 문장 길이, d_model)
# 참고) 인코더(k, v)-디코더(q) 어텐션에서는 query 길이와 key, value의 길이는 다를 수 있다.
query = self.query_dense(query)
key = self.key_dense(key)
value = self.value_dense(value)
# 2. 헤드 나누기
# q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# k : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
# v : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
query = self.split_heads(query, batch_size)
key = self.split_heads(key, batch_size)
value = self.split_heads(value, batch_size)
# 3. 스케일드 닷 프로덕트 어텐션. 앞서 구현한 함수 사용.
# (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)
# (batch_size, query의 문장 길이, num_heads, d_model/num_heads)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
# 4. 헤드 연결(concatenate)하기
# (batch_size, query의 문장 길이, d_model)
concat_attention = tf.reshape(scaled_attention,
(batch_size, -1, self.d_model))
# 5. WO에 해당하는 밀집층 지나기
# (batch_size, query의 문장 길이, d_model)
outputs = self.dense(concat_attention)
return outputs
def create_padding_mask(x):
mask = tf.cast(tf.math.equal(x, 0), tf.float32)
# (batch_size, 1, 1, key의 문장 길이)
return mask[:, tf.newaxis, tf.newaxis, :]
def encoder_layer(dff, d_model, num_heads, dropout, name="encoder_layer"):
inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
# 인코더는 패딩 마스크 사용
padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
# 멀티-헤드 어텐션 (첫번째 서브층 / 셀프 어텐션)
attention = MultiHeadAttention(
d_model, num_heads, name="attention")({
'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
'mask': padding_mask # 패딩 마스크 사용
})
# 드롭아웃 + 잔차 연결과 층 정규화
attention = tf.keras.layers.Dropout(rate=dropout)(attention)
attention = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(inputs + attention)
# 포지션 와이즈 피드 포워드 신경망 (두번째 서브층)
outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
# 드롭아웃 + 잔차 연결과 층 정규화
outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(attention + outputs)
return tf.keras.Model(
inputs=[inputs, padding_mask], outputs=outputs, name=name)
def encoder(vocab_size, num_layers, dff,
d_model, num_heads, dropout,
name="encoder"):
inputs = tf.keras.Input(shape=(None,), name="inputs")
# 인코더는 패딩 마스크 사용
padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
# 포지셔널 인코딩 + 드롭아웃
embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
# 인코더를 num_layers개 쌓기
for i in range(num_layers):
outputs = encoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
dropout=dropout, name="encoder_layer_{}".format(i),
)([outputs, padding_mask])
return tf.keras.Model(
inputs=[inputs, padding_mask], outputs=outputs, name=name)
# 디코더의 첫번째 서브층(sublayer)에서 미래 토큰을 Mask하는 함수
def create_look_ahead_mask(x):
seq_len = tf.shape(x)[1]
look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
padding_mask = create_padding_mask(x) # 패딩 마스크도 포함
return tf.maximum(look_ahead_mask, padding_mask)
def decoder_layer(dff, d_model, num_heads, dropout, name="decoder_layer"):
inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
# 디코더는 룩어헤드 마스크(첫번째 서브층)와 패딩 마스크(두번째 서브층) 둘 다 사용.
look_ahead_mask = tf.keras.Input(
shape=(1, None, None), name="look_ahead_mask")
padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
# 멀티-헤드 어텐션 (첫번째 서브층 / 마스크드 셀프 어텐션)
attention1 = MultiHeadAttention(
d_model, num_heads, name="attention_1")(inputs={
'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
'mask': look_ahead_mask # 룩어헤드 마스크
})
# 잔차 연결과 층 정규화
attention1 = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(attention1 + inputs)
# 멀티-헤드 어텐션 (두번째 서브층 / 디코더-인코더 어텐션)
attention2 = MultiHeadAttention(
d_model, num_heads, name="attention_2")(inputs={
'query': attention1, 'key': enc_outputs, 'value': enc_outputs, # Q != K = V
'mask': padding_mask # 패딩 마스크
})
# 드롭아웃 + 잔차 연결과 층 정규화
attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
attention2 = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(attention2 + attention1)
# 포지션 와이즈 피드 포워드 신경망 (세번째 서브층)
outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention2)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
# 드롭아웃 + 잔차 연결과 층 정규화
outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(outputs + attention2)
return tf.keras.Model(
inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
outputs=outputs,
name=name)
def decoder(vocab_size, num_layers, dff,
d_model, num_heads, dropout,
name='decoder'):
inputs = tf.keras.Input(shape=(None,), name='inputs')
enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
# 디코더는 룩어헤드 마스크(첫번째 서브층)와 패딩 마스크(두번째 서브층) 둘 다 사용.
look_ahead_mask = tf.keras.Input(
shape=(1, None, None), name='look_ahead_mask')
padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
# 포지셔널 인코딩 + 드롭아웃
embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
# 디코더를 num_layers개 쌓기
for i in range(num_layers):
outputs = decoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
dropout=dropout, name='decoder_layer_{}'.format(i),
)(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])
return tf.keras.Model(
inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
outputs=outputs,
name=name)
def transformer(vocab_size, num_layers, dff,
d_model, num_heads, dropout,
name="transformer"):
# 인코더의 입력
inputs = tf.keras.Input(shape=(None,), name="inputs")
# 디코더의 입력
dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")
# 인코더의 패딩 마스크
enc_padding_mask = tf.keras.layers.Lambda(
create_padding_mask, output_shape=(1, 1, None),
name='enc_padding_mask')(inputs)
# 디코더의 룩어헤드 마스크(첫번째 서브층)
look_ahead_mask = tf.keras.layers.Lambda(
create_look_ahead_mask, output_shape=(1, None, None),
name='look_ahead_mask')(dec_inputs)
# 디코더의 패딩 마스크(두번째 서브층)
dec_padding_mask = tf.keras.layers.Lambda(
create_padding_mask, output_shape=(1, 1, None),
name='dec_padding_mask')(inputs)
# 인코더의 출력은 enc_outputs. 디코더로 전달된다.
enc_outputs = encoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff,
d_model=d_model, num_heads=num_heads, dropout=dropout,
)(inputs=[inputs, enc_padding_mask]) # 인코더의 입력은 입력 문장과 패딩 마스크
# 디코더의 출력은 dec_outputs. 출력층으로 전달된다.
dec_outputs = decoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff,
d_model=d_model, num_heads=num_heads, dropout=dropout,
)(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
# 다음 단어 예측을 위한 출력층
outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
small_transformer = transformer(
vocab_size = 9000,
num_layers = 4,
dff = 512,
d_model = 128,
num_heads = 4,
dropout = 0.3,
name="small_transformer")
tf.keras.utils.plot_model(
small_transformer, to_file='small_transformer.png', show_shapes=True)
def loss_function(y_true, y_pred):
y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')(y_true, y_pred)
mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
loss = tf.multiply(loss, mask)
return tf.reduce_mean(loss)
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super(CustomSchedule, self).__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
step = tf.cast(step, tf.float32)
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps**-1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
sample_learning_rate = CustomSchedule(d_model=128)
plt.plot(sample_learning_rate(tf.range(200000, dtype=tf.float32)))
plt.ylabel("Learning Rate")
plt.xlabel("Train Step")
"""# 챗봇 구현"""
import pandas as pd
import urllib.request
import tensorflow_datasets as tfds
import tensorflow as tf
import time
import numpy as np
import matplotlib.pyplot as plt
import re
urllib.request.urlretrieve("https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv", filename="ChatBotData.csv")
train_data = pd.read_csv('ChatBotData.csv')
train_data.head()
print('챗봇 샘플의 개수 :', len(train_data))
print(train_data.isnull().sum())
questions = []
for sentence in train_data['Q']:
# 구두점에 대해서 띄어쓰기
# ex) 12시 땡! -> 12시 땡 !
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
questions.append(sentence)
answers = []
for sentence in train_data['A']:
# 구두점에 대해서 띄어쓰기
# ex) 12시 땡! -> 12시 땡 !
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
answers.append(sentence)
len(questions)
print(questions[:5])
print(answers[:5])
# 서브워드텍스트인코더를 사용하여 질문과 답변을 모두 포함한 단어 집합(Vocabulary) 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
questions + answers, target_vocab_size=2**13)
# 시작 토큰과 종료 토큰에 대한 정수 부여.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
# 시작 토큰과 종료 토큰을 고려하여 단어 집합의 크기를 + 2
VOCAB_SIZE = tokenizer.vocab_size + 2
print('시작 토큰 번호 :',START_TOKEN)
print('종료 토큰 번호 :',END_TOKEN)
print('단어 집합의 크기 :',VOCAB_SIZE)
# 서브워드텍스트인코더 토크나이저의 .encode()를 사용하여 텍스트 시퀀스를 정수 시퀀스로 변환.
print('Tokenized sample question: {}'.format(tokenizer.encode(questions[20])))
# 서브워드텍스트인코더 토크나이저의 .encode()와 decode() 테스트해보기
# 임의의 입력 문장을 sample_string에 저장
sample_string = questions[20]
# encode() : 텍스트 시퀀스 --> 정수 시퀀스
tokenized_string = tokenizer.encode(sample_string)
print ('정수 인코딩 후의 문장 {}'.format(tokenized_string))
# decode() : 정수 시퀀스 --> 텍스트 시퀀스
original_string = tokenizer.decode(tokenized_string)
print ('기존 문장: {}'.format(original_string))
# 각 정수는 각 단어와 어떻게 mapping되는지 병렬로 출력
# 서브워드텍스트인코더는 의미있는 단위의 서브워드로 토크나이징한다. 띄어쓰기 단위 X 형태소 분석 단위 X
for ts in tokenized_string:
print ('{} ----> {}'.format(ts, tokenizer.decode([ts])))
# 최대 길이를 40으로 정의
MAX_LENGTH = 40
# 토큰화 / 정수 인코딩 / 시작 토큰과 종료 토큰 추가 / 패딩
def tokenize_and_filter(inputs, outputs):
tokenized_inputs, tokenized_outputs = [], []
for (sentence1, sentence2) in zip(inputs, outputs):
# encode(토큰화 + 정수 인코딩), 시작 토큰과 종료 토큰 추가
sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
tokenized_inputs.append(sentence1)
tokenized_outputs.append(sentence2)
# 패딩
tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
return tokenized_inputs, tokenized_outputs
questions, answers = tokenize_and_filter(questions, answers)
print('질문 데이터의 크기(shape) :', questions.shape)
print('답변 데이터의 크기(shape) :', answers.shape)
# 0번째 샘플을 임의로 출력
print(questions[0])
print(answers[0])
print('단어 집합의 크기(Vocab size): {}'.format(VOCAB_SIZE))
print('전체 샘플의 수(Number of samples): {}'.format(len(questions)))
# 텐서플로우 dataset을 이용하여 셔플(shuffle)을 수행하되, 배치 크기로 데이터를 묶는다.
# 또한 이 과정에서 교사 강요(teacher forcing)을 사용하기 위해서 디코더의 입력과 실제값 시퀀스를 구성한다.
BATCH_SIZE = 64
BUFFER_SIZE = 20000
# 디코더의 실제값 시퀀스에서는 시작 토큰을 제거해야 한다.
dataset = tf.data.Dataset.from_tensor_slices((
{
'inputs': questions,
'dec_inputs': answers[:, :-1] # 디코더의 입력. 마지막 패딩 토큰이 제거된다.
},
{
'outputs': answers[:, 1:] # 맨 처음 토큰이 제거된다. 다시 말해 시작 토큰이 제거된다.
},
))
dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
# 임의의 샘플에 대해서 [:, :-1]과 [:, 1:]이 어떤 의미를 가지는지 테스트해본다.
print(answers[0]) # 기존 샘플
print(answers[:1][:, :-1]) # 마지막 패딩 토큰 제거하면서 길이가 39가 된다.
print(answers[:1][:, 1:]) # 맨 처음 토큰이 제거된다. 다시 말해 시작 토큰이 제거된다. 길이는 역시 39가 된다.
tf.keras.backend.clear_session()
# Hyper-parameters
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
DFF = 512
DROPOUT = 0.1
model = transformer(
vocab_size=VOCAB_SIZE,
num_layers=NUM_LAYERS,
dff=DFF,
d_model=D_MODEL,
num_heads=NUM_HEADS,
dropout=DROPOUT)
MAX_LENGTH = 40
learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam(
learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
def accuracy(y_true, y_pred):
# ensure labels have shape (batch_size, MAX_LENGTH - 1)
y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)
model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
EPOCHS = 50
model.fit(dataset, epochs=EPOCHS)
def evaluate(sentence):
sentence = preprocess_sentence(sentence)
sentence = tf.expand_dims(
START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
output = tf.expand_dims(START_TOKEN, 0)
# 디코더의 예측 시작
for i in range(MAX_LENGTH):
predictions = model(inputs=[sentence, output], training=False)
# 현재(마지막) 시점의 예측 단어를 받아온다.
predictions = predictions[:, -1:, :]
predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
' '
# 만약 마지막 시점의 예측 단어가 종료 토큰이라면 예측을 중단
if tf.equal(predicted_id, END_TOKEN[0]):
break
# 마지막 시점의 예측 단어를 출력에 연결한다.
# 이는 for문을 통해서 디코더의 입력으로 사용될 예정이다.
output = tf.concat([output, predicted_id], axis=-1)
return tf.squeeze(output, axis=0)
def predict(sentence):
prediction = evaluate(sentence)
predicted_sentence = tokenizer.decode(
[i for i in prediction if i < tokenizer.vocab_size])
print('Input: {}'.format(sentence))
print('Output: {}'.format(predicted_sentence))
return predicted_sentence
def preprocess_sentence(sentence):
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
return sentence
output = predict('영화 볼래?')
output = predict("고민이 있어")
output = predict("너무 화가나")
6. Bert(Bidirectional Encoder Representations from Transformers)
- 2018년도 구글의 논문에서 제안한 모델로 Trnasformer의 인코더 기반의 언어 모델
- 버트는 unlabeled data로부터 pre-train을 진행한 후, 특정 downstream task에 fine-tuning을 하는 모델
- downstream task : 주어진 문제나 작업에 특정하게 맞추어진 task를 의미
- fine-tuning : 사전 학습된 모델을 새로운 작업 또는 데이터셋에 맞게 조정하는 과정
- deep bidirectional을 더욱 강조하여 기존의 모델들과의 차별성을 강조
- 하나의 output layer만을 pre-trained BERT 모델에 추가하여 NLP의 다양한 주요(11개)에서 SOTA를 달성
- 위키독스 17, 17-02 참고
6-1. BERT 모델의 구조
- pre-training part와 fine-tuning part로 나눠짐
- pre-training에서는 다양한 pre-training task의 unlabeled data를 활용해 파라미터를 조정하고 이를 바탕으로 학습된 모델은 fine-tuning에서 downstream tasks의 labeled data를 이용함
- 양방향 transformer encode를 여러층 쌓은 것
- 실제 이미지를 수정하는 학습모델은 거의 건들지 않으나 뒤의 출력하는 부분(Dense)를 수정하는 경우가 대다수
6-2. BERT의 사전 학습
- MLM(Masked Language Model)
- input tokens의 일정 비율을 마스킹하고 마스킹 된 토큰을 예측하는 과정([MASK])
- pre-training과 fine-tuning 사이의 mismatch가 발생할 수 있음(마스크 토큰이 fine-tuning 과정에서는 나타나지 않게 추가적인 처리 필요)
- NSP(Next Sentence Prediction)
- downstream test 두 문장 사이의 연속성을 확인하는 것이 핵심
- 문장 A와 B를 선택할 때 50%는 실제 A의 다음 문장인 B를 고르고, 나머지 50%는 랜덤 문장 B에서 고르게 됨
- https://github.com/songys/Chatbot_data
GitHub - songys/Chatbot_data: Chatbot_data_for_Korean
Chatbot_data_for_Korean. Contribute to songys/Chatbot_data development by creating an account on GitHub.
github.com
import urllib.request
import pandas as pd
urllib.request.urlretrieve('https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv', filename='ChatBotData.csv')
train_dataset = pd.read_csv('ChatBotData.csv')
print(len(train_dataset))
train_dataset
train_dataset.replace('', float('NaN'), inplace=True)
print(train_dataset.isnull().values.any())
train_dataset = train_dataset.drop_duplicates(['Q']).reset_index(drop=True)
print(len(train_dataset))
train_dataset = train_dataset.drop_duplicates(['A']).reset_index(drop=True)
print(len(train_dataset))
import matplotlib.pyplot as plt
question_list = list(train_dataset['Q'])
answer_list = list(train_dataset['A'])
print('질문의 최대길이:', max(len(question) for question in question_list))
print('질문의 평균길이:', sum(map(len, question_list)) / len(question_list))
print('답변의 최대길이:', max(len(answer) for answer in answer_list))
print('답변의 평균길이:', sum(map(len, answer_list)) / len(answer_list))
import random
response_cadidates = random.sample(answer_list, 500)
response_cadidates[:10]
- SKTBrain에서 공개한 한국이 데이터로 사전학습한 [BERT모델](https://github.com/SKTBrain/KoBERT)
!pip install kobert-transformers
import torch
from kobert_transformers import get_kobert_model
model = get_kobert_model()
model.eval()
# 문장에서 토큰들의 인덱스
input_ids = torch.LongTensor([[31, 51, 99], [15, 5, 0]])
# 모델이 어떤 토큰을 무시해야 하는지 나타내는 텐서(0: 무시, 1: 고려)
attention_mask = torch.LongTensor([[1, 1, 0], [1, 1, 0]])
# 다중 문장 입력을 다룰 때, 각 토큰이 어떤 문장에 속하는지 구분(0, 1)
token_type_ids = torch.LongTensor([[0, 0, 1], [0, 1, 0]])
output = model(input_ids, attention_mask, token_type_ids)
output
# Sequence Embeddings : 각 토큰에 대한 임베딩, 의미적 표현
# pooler_output : 입력 시퀀스에서 추출한 특징의 요약
# hidden_states : 모델 내부의 각 레이어에서의 숨겨진 상태값
output[0]
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'
from kobert_tokenizer import KoBERTTokenizer
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
tokenizer.tokenize('[CLS] 한국어 모델을 공유합니다. [SEP]')
tokenizer.convert_tokens_to_ids(['[CLS]', '▁한국', '어', '▁모델', '을', '▁공유', '합니다', '.', '[SEP]'])
import numpy as np
import torch
from sklearn.metrics.pairwise import cosine_similarity
def get_cls_token(sentence):
model.eval()
tokenized_sent = tokenizer(
sentence,
return_tensors='pt',
truncation=True,
add_special_tokens=True,
max_length=128
)
input_ids = tokenized_sent['input_ids']
attention_mask = tokenized_sent['attention_mask']
token_type_ids = tokenized_sent['token_type_ids']
with torch.no_grad():
output = model(input_ids, attention_mask, token_type_ids)
cls_output = output[1]
cls_token = cls_output.detach().cpu().numpy()
return cls_token
get_cls_token('너 요즘 바뻐?')
def predict(query, candidates):
candidates_cls = []
for cand in candidates:
cand_cls = get_cls_token(cand)
candidates_cls.append(cand_cls)
candidates_cls = np.array(candidates_cls).squeeze(axis=1)
query_cls = get_cls_token(query)
similarity_list = cosine_similarity(query_cls, candidates_cls)
target_idx = np.argmax(similarity_list)
return candidates[target_idx]
sample_query = '너 요즘 바뻐?'
# print(get_cls_token(sample_query))
print(get_cls_token(sample_query).shape)
sample_query = '너 요즘 바뻐?'
sample_candidates = ['바쁘면 가버려', '아니 안바뻐', '오늘은 이만', '에붸붸붸']
predicted_answer = predict('sample_query', sample_candidates)
print(f'결과: {predicted_answer}')
sample_query = '힘든 연애 좋은 연애라는게 무슨 차이일까?'
sample_candidates = random.sample(answer_list, 100)
predicted_answer = predict('sample_query', sample_candidates)
print(f'결과: {predicted_answer}')
end = 1
while end == 1:
sentence = input('질문을 입력하세요: ')
if len(sentence) == 0:
break
predicted_answer = predict(sentence, response_candidates)
print(predicted_answer)
print('\n')
7. GPT(Geneative Pre-Training)
- GPT 모델은 2018년 6월에 OpenAI가 논문을 통해 처음 제안
- GPT도 unlabeled data로부터 pre-train을 진행한 후 특정 downstream task에 fine_tuning을 하는 모델
- Transformer의 decoder만 사용하는 구조
7-1. GPT 모델의 특징
- 사전학습에는 대규모의 unlabelled data를 사용하는데 unlabeled data에서 단어 수준 이상의 정보를 얻는 것이 매우 힘듦. 또한 어떤 방법이 유용한 텍스트 표현을 배우는데 효과적인지 불분명함
- 사전학습 이후에도 어떤 방법이 fine-tuning에 가장 효과적인지 불분명
- GPT 논문에서 unsupervised pre-training과 supervised fine-tuning의 조합을 사용한 접근법을 제안
- 모델은 이미 효과가 검증된 2017년 공개된 transformer를 사용
- GPT는 transformer의 변형인 multi-layer Transformer decoder만 사용
- 입력 문맥 token에 multi-headed self-attention을 적용한 후, token에 대한 출력 분포를 얻기 위해 position-wise feedforward layer를 적용
- 모델 구조의 변형이 거의 없음. 모델 구조를 변형하지 않고 linear layer를 마지막에 추가하는 아주 간단한 작업만 수행하면 됨
7-2. GPT 모델 학습방법
- unsupervised pre-training
- 대규모 코퍼스에서 unsupervised learning으로 언어 모델을 학습
- transformer 디코더를 사용하여 계속 next token prediction 학습하는 것
- multi-layer Transformer decoder를 사용하여 입력 문맥 token에 대한 출력 분포를 얻기 위해 position-wise feedforward layer를 적용
- supervised fine-tuning
- 특정 작업에 대한 데이터로 모델을 fine-tuning
- 파인 튜닝 단계에서는 사전 학습된 모델을 각 task에 맞게 input과 label로 구성된 supervised dataset에 대해 학습
- 결과를 task에 맞는 loss들을 결합
> 사전 학습은 next token prediction이라는 language modeling으로 진행되었기 때문에 각 downstream task와 input모양이 다를 수 밖에 없음
!pip install transformers
import torch
from transformers import pipeline # 파이프라인
model_name = 'heegyu/kogpt-j-base'
# pipeline() : 허깅페이스의 Transformers 라이브러리에서 제공하는
# 다양한 자연어처리 작업을 간편하게 수행할 수 있도록 도와주는 함수
pipe = pipeline('text-generation', model=model_name)
print(pipe('안녕하세요', repetition_penalty=1.2, do_sample=True, eos_token_id=))
- repetition_penalty : 텍스트 생성과정에서 반복되는 단어 또는 구문의 생성을 억제하기 위한 파라미터
- 특정 단어가 반복될 때 단어의 확률을 감소시키는 방식으로 작동
- 모델이 동일한 단어를 다시 생성하려고 할 때 로그 확률에 패널티를 부여하여 다른 단어를 선택하도록 유도
- 1(패널티를 주지 않음)보다 큰 값을 사용. 1.5가 강한 패널티(텍스트 균형이 맞지 않을 수 있음)
- 특정 단어의 원래 확률이 P라면 반복될 때 확률은 P / 1.2로 줄어듦
- 예) 오늘 날씨는 좋습니다. 오늘 날씨는 맑습니다. => 오늘 날씨는 좋습니다. 하늘은 맑고, 기온은 따뜻합니다.
- do_sample : 텍스트 생성 과정에서 샘플링 방법을 설정. True일 경우, 모델은 확률 분포에서 토큰을 무작위로 선택함. 텍스트 생성에 다양성과 창의성을 더할 수 있음
- 샘플링(Sampling) :
- 모델이 예측한 확률 분포에서 무작위로 토큰을 선택하는 방식
- 다양한 결과를 생성할 수 있으며, 예측할 수 없는 창의적인 텍스트를 생성
- 품질이 일관되지 않을 수 있음. 엉뚱하거나 의미 없는 결과를 생성할 가능성이 있음
- 빔 서치(Beam search) :
- 여러 경로를 동시에 고려하여 가장 높은 점수를 가진 경로를 선택하는 방식
- 주어진 빔 폭 내에서 가장 가능성 높은 몇 가지 경로를 추적하여 최종적으로 가장 점수가 높은 경로를 선택
- 일관되고 논리적인 텍스트를 생성할 수 있음
- 덜 창의적이고 반복적인 텍스트를 생성할 수 있음. 계산 비용이 높음
- 샘플링의 세부 설정
- temperature
- 확률 분포를 변화시켜 예측된 확률값들을 부드럽게 하거나 날카롭게 만듦
- 높은 값(예: 1.5) : 확률 분포를 평탄하게 만들어 무작위성이 높아짐
- 낮은 값(예: 0.6) : 확률 분포를 날카롭게 만들어 결정론적으로 만듦
- top-k 샘플링
- 확률 분포에서 상위 k개의 후보만 고려한느 방법
- 상위 k개의 후보 토큰만 남기고 나머지는 무시한 후, 해당 사잉에서 샘플링
- top-p 샘플링
- 누적 확률이 p이상이 되는 후보군을 고려하는 방법
- 후보 토큰을 누적 확률이 p가 되는 지점까지 포함시킨 후, 해당 사이에서 샘플링
- temperature
- 샘플링(Sampling) :
print(pipe("안녕하세요", repetition_penalty=1.5, do_sample=False, eos_token_id=1, early_stopping=True, max_new_tokens=128))
print(pipe("오늘 정부 발표에 따르면, ", repetition_penalty=1.2, do_sample=True, eos_token_id=1, early_stopping=True, max_new_tokens=128))
print(pipe("싸늘하다. 가슴에 비수가 날아와 꽂힌다. ", repetition_penalty=1.2, do_sample=True, eos_token_id=1, early_stopping=True, max_new_tokens=128, min_length=64))