언어 처리를 위해서는 시계열성이 반영되는 RNN이나 LSTM, GRU등을 사용해왔다. Seq2Seq는 Machine Translation을 위해 구글이 개발한 알고리즘으로 위의 알고리즘을 Encoder와 Decoder로 연결하여 하나의 벡터로 만들어 학습하는 알고리즘이다. 역시 wikidocs.net의 문서가 잘되어 있어 이를 참조했다베꼈다.
번역할 문장을 인코딩해서 나온 신경망을 디코더의 입력으로 연결하는 것이다. 'I am a student'의 순서가 나오면 'je suis etudiant'라고 인식하는 것인데, 단어 자체를 분석하여 인식하기보다, 이를 테면 통문장 영어처럼 외워 번역한다고 보면 맞을까? seq2seq은 attention기반의 transformer 모델은 아니지만, 이를 위해하기 위한 기초가 된다고..
단어를 전처리하고, 임베딩한 후, 모델을 만들어 예측한다는 것은 다른 자연어 처리 기법과 동일하고, Seq2Seq에서는 'Teacher Forcing'이라는 개념만 유의하면 된다. 인코더의 입력으로 디코더를 학습한다고 했는데, lstm의 단계별 예측이 잘못되면 잘못된 내용으로 학습되기 때문에 초기값만 인코더의 결과값으로 입력하고, 번역된 문장(예제에서는 프랑스어)을 디코더의 학습데이터로 사용해야 한다는 것이다. 즉, 위의 예에서 'i am a student'로 lstm인코딩된 결과가 초기값이고, 'je suis etudiant'로 학습한 디코더의 신경망에 입력값으로 넣는다고 보면 된다.
import numpy as np
import re
import shutil
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pandas as pd
import os
import unicodedata
import urllib3
import zipfile
// english -> france 번역 데이터
http = urllib3.PoolManager()
url ='http://www.manythings.org/anki/fra-eng.zip'
filename = 'fra-eng.zip'
path = os.getcwd()
zipfilename = os.path.join(path, filename)
with http.request('GET', url, preload_content=False) as r, open(zipfilename, 'wb') as out_file:
shutil.copyfileobj(r, out_file)
with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
zip_ref.extractall(path)
// 테스트용으로 3.3만개의 샘플데이터만 사용
num_samples = 33000
데이터는 seq2seq의 예제로 많이 쓰이는 영어-프랑스어 샘플데이터이다. 3.3만개의 데이터만 사용한다.
// 유니코드를 아스키로
def unicode_to_ascii(s):
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
// 문장부호를 분리하려고
def preprocess_sentence(sent):
sent = unicode_to_ascii(sent)
sent = re.sub(r"([?.!,¿])", r" \1", sent)
sent = re.sub(r"([^a-zA-Z!.?])", r" ", sent)
sent = re.sub(r"\s+", " ", sent)
return sent
// 읽어들인 데이터를 전처리
// 영어는 encoder_input, 프랑스어는 decoder_input과 decoder_target
// decoder_input은 teacher forcing을 위해, decoder_target은 테스트를 위해
def load_preprocessed_data():
encoder_input, decoder_input, decoder_target = [], [], []
with open("fra.txt", "r") as lines:
for i, line in enumerate(lines):
src_line, tar_line, _ = line.strip().split('\t')
src_line_input = [w for w in preprocess_sentence(src_line).split()]
tar_line = preprocess_sentence(tar_line)
tar_line_input = [w for w in ("<sos> " + tar_line).split()]
tar_line_target = [w for w in (tar_line + " <eos>").split()]
encoder_input.append(src_line_input)
decoder_input.append(tar_line_input)
decoder_target.append(tar_line_target)
if i == num_samples - 1 :
break
return encoder_input, decoder_input, decoder_target
// 전처리 데이터 테스트
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
print(sents_en_in[:5])
print(sents_fra_in[:5])
print(sents_fra_out[:5])
==>
[['Go', '.'], ['Hi', '.'], ['Hi', '.'], ['Run', '!'], ['Run', '!']]
[['<sos>', 'Va', '!'], ['<sos>', 'Salut', '!'], ['<sos>', 'Salut', '.'], ['<sos>', 'Cours', '!'], ['<sos>', 'Courez', '!']]
[['Va', '!', '<eos>'], ['Salut', '!', '<eos>'], ['Salut', '.', '<eos>'], ['Cours', '!', '<eos>'], ['Courez', '!', '<eos>']]
위와 같이 전처리는 문장을 단어 단위로 나누는 것이다.
// 소스(영어) 문장 토큰화
tokenizer_en = Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en_in)
encoder_input = tokenizer_en.texts_to_sequences(sents_en_in)
// 타겟(프랑스) 문장 토큰화
tokenizer_fra = Tokenizer(filters="", lower=False)
tokenizer_fra.fit_on_texts(sents_fra_in)
tokenizer_fra.fit_on_texts(sents_fra_out)
decoder_input = tokenizer_fra.texts_to_sequences(sents_fra_in)
decoder_target = tokenizer_fra.texts_to_sequences(sents_fra_out)
// 토큰화 된 문장 패딩
encoder_input = pad_sequences(encoder_input, padding="post")
decoder_input = pad_sequences(decoder_input, padding="post")
decoder_target = pad_sequences(decoder_target, padding="post")
// 사전 사이즈
src_vocab_size = len(tokenizer_en.word_index) + 1
tar_vocab_size = len(tokenizer_fra.word_index) + 1
// 추후 활용을 위하 사전의 인덱스와 단어를 리스트로 저장
src_to_index = tokenizer_en.word_index
index_to_src = tokenizer_en.index_word
tar_to_index = tokenizer_fra.word_index
index_to_tar = tokenizer_fra.index_word
// 학습이 더 고르게 되도록 데이터를 셔플
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]
// train, eval 데이터로 train 데이터를 분리
n_of_val = int(33000*0.1)
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]
encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]
다른 자연어 처리 절차와 마찬가지로 단어로 분리된 문장을 토큰화하고 인덱싱, 패딩한다.
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model
# 은닉층의 차원은 50개로
latent_dim = 50
# 토큰화된 영어로 인코더 구성
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, latent_dim)(encoder_inputs)
# masking의 이유는 padding된 '0'는 학습에 이용되지 않도록
enc_masking = Masking(mask_value=0.0)(enc_emb)
encoder_lstm = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking)
encoder_states = [state_h, state_c]
# 토큰화된 프랑스어로 디코더 구성
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(tar_vocab_size, latent_dim) # 임베딩 층
dec_emb = dec_emb_layer(decoder_inputs) # 패딩 0은 연산에서 제외
dec_masking = Masking(mask_value=0.0)(dec_emb)
# 상태값 리턴을 위해 return_state는 True, 모든 시점에 대해서 단어를 예측하기 위해 return_sequences는 True
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
# 인코더의 은닉 상태를 초기 은닉 상태(initial_state)로 사용
decoder_outputs, _, _ = decoder_lstm(dec_masking,
initial_state=encoder_states)
# softmax로 decoder의 출력 구성
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['acc'])
model.fit(x = [encoder_input_train, decoder_input_train], y = decoder_target_train, \
validation_data = ([encoder_input_test, decoder_input_test], decoder_target_test),
batch_size = 128, epochs = 20)
모델은 초반에 설명한대로 소스로 인코더, 타겟으로 디코더를 구성하여 이어 붙인다.
모델을 구성하는 것 못지 않게, 테스트 모듈을 구성하는 것도 살짝 복잡하다. 소스의 문장을 학습한 인코더의 결과를 디코더의 입력값으로 두어 테스트하는 것이다. 즉, 입력값을 그대로 모델에 predict하는 것이 아니라 encoder와 decoder를 별도 구성해 encoder에서 나온 결과를 decoder model에 넣어 예측하게 된다.
# 인코더 모델은 구성한 그대로 사용
encoder_model = Model(encoder_inputs, encoder_states)
# 디코더 모델
# 이전 시점의 상태를 보관할 텐서
decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
# 훈련 때 사용했던 임베딩 층을 재사용
dec_emb2= dec_emb_layer(decoder_inputs)
# 다음 단어 예측을 위해 이전 시점의 상태를 현 시점의 초기 상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]
# 모든 시점에 대해서 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2)
decoder_model = Model(
[decoder_inputs] + decoder_states_inputs,
[decoder_outputs2] + decoder_states2)
encoder, decoder 모델을 별도로 구성해 둔다. 참고로 Keras의 Model 함수의 의미는 아래와 같다.
from keras.models import Model
from keras.layers import Input, Dense
a = Input(shape=(32,))
b = Dense(32)(a)
model = Model(inputs=a, outputs=b)
# input 'a'와, output 'b'를 구성하기 위한 모든 layer를 자동으로 구성함
즉, 위의 예에서 Input과 Dense 레이어를 자동으로 구축해 준다.
# 실제 테스트 함수
def decode_sequence(input_seq):
# 입력으로부터 인코더의 상태를 얻음
states_value = encoder_model.predict(input_seq)
# <SOS>에 해당하는 정수 생성
target_seq = np.zeros((1,1))
target_seq[0, 0] = tar_to_index['<sos>']
stop_condition = False
decoded_sentence = ''
# stop_condition이 True가 될 때까지 루프 반복
# 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정합니다.
while not stop_condition:
# 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
# 예측 결과를 단어로 변환
sampled_token_index = np.argmax(output_tokens[0, -1, :])
sampled_char = index_to_tar[sampled_token_index]
# 현재 시점의 예측 단어를 예측 문장에 추가
decoded_sentence += ' '+sampled_char
# <eos>에 도달하거나 정해진 길이를 넘으면 중단.
if (sampled_char == '<eos>' or
len(decoded_sentence) > 50):
stop_condition = True
# 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
target_seq = np.zeros((1,1))
target_seq[0, 0] = sampled_token_index
# 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
states_value = [h, c]
return decoded_sentence
위가 실제 테스트하는 함수로 참조한 문서에 설명이 잘되어 있어 그대로 옮겨 온다. 테스트 결과는 아래와 같다.
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq2src(input_seq):
temp=''
for i in input_seq:
if(i!=0):
temp = temp + index_to_src[i]+' '
return temp
# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq2tar(input_seq):
temp=''
for i in input_seq:
if((i!=0 and i!=tar_to_index['<sos>']) and i!=tar_to_index['<eos>']):
temp = temp + index_to_tar[i] + ' '
return temp
# 테스트 모듈
for seq_index in [3,50,100,300,1001]:
input_seq = encoder_input_train[seq_index: seq_index + 1]
decoded_sentence = decode_sequence(input_seq)
print("원문 : ",seq2src(encoder_input_train[seq_index]))
print("번역문 :",seq2tar(decoder_input_train[seq_index]))
print("예측문 :",decoded_sentence[:-5])
print("\n")
==>
원문 : It s still alive .
번역문 : Elle vit encore .
예측문 : C est tout ce que j ai fait .
원문 : You re resourceful .
번역문 : Vous etes plein de ressources .
예측문 : Vous etes toutes .
원문 : She stabbed him .
번역문 : Elle le poignarda .
예측문 : Elle l a fait .
원문 : I ll pay .
번역문 : Je paie .
예측문 : Je vais les cheveux .
원문 : I love soup .
번역문 : J adore la soupe .
예측문 : J adore les cheveux .
엇비슷하게 나오지만, 의미는 전혀 다르다. 학습을 더 충분히 하면 많이 개선될 부분이다.
'AI 빅데이터 > 후려치는 데이터분석과 AI 알고리즘' 카테고리의 다른 글
[데이터분석] 시계열분석 2 - XGBoost (1) | 2020.09.10 |
---|---|
[데이터분석] 시계열 분석 1 - ARIMA (937) | 2020.09.09 |
[자연어처리] 텍스트 생성으로 이해하는 RNN (0) | 2020.08.02 |
[자연어처리] 간단하게 텍스트 감성 분류하기 (0) | 2020.07.13 |
[영상인식] GAN과 AutoEncoder (1) | 2020.06.14 |
댓글