본문 바로가기

개발/NLP

NLP Load : 맨땅에 헤딩하는 NLP 공부일지 (4)

정수 인코딩 (Integer Encoding)

주어진 데이터셋에 대해 단어/형태소 토큰화를 수행한 이후 각 단어에 고유한 정수를 부여하는 작업을 말한다.
단, 중복이 허용되지 않는 모든 단어들의 집합을 만드는 것! 이를 단어 집합(Vocabulary)라고 하며 "Vocab"이라고 부르기도 한다. 이를 기반으로 문서를 정수로 인코딩한다!!

특수 토큰 및 데이터셋의 소소한 이야기

데이터셋을 다룰때 보통 pandas 라이브러리를 자주 사용하는데, df.info() 를 사용하면 각 데이터의 타입을 볼 수 있다. 여기서 Dtypeobject로 보인다면 문자열을 의미한다고 이해하면 된다.
데이터셋의 분할을 할땐 train_test_split()을 이용하면 쉽게 훈련 셋과 테스트 셋을 나눌 수 있다.
collectionsCounter를 이용하면 주어진 대상의 빈도수를 빠르게 뽑아낼 수 있다.
처음 데이터셋을 보게 되면 결측값과 데이터의 불균형여부를 확인하는게 좋다.

결측값은 일부 누락된 정보가 들어있는 데이터를 의미하며 정보로서의 가치가 없는 값을 말한다. pandas 라이브러리의 df.isnull().values.any()를 이용하면 해당 값 여부를 알 수 있다.

데이터의 불균형여부는 보통 특정 레이블의 데이터 수를 이용하면 불균형한지 아닌지 알 수 있다.
예시와 같이 개수를 plot화 시켜 보는것이 전체를 한눈에 알 수 있어 편한듯 하다ㅎㅎ

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import nltk
nltk.download('punkt')
import torch
import urllib.request
from tqdm import tqdm
from collections import Counter
from nltk.tokenize import word_tokenize
from sklearn.model_selection import train_test_split

def tokenize(sentences):
    tokenized_sentences = []
    for sent in tqdm(sentences):
        tokenized_sent = word_tokenize(sent)
        tokenized_sent = [word.lower() for word in tokenized_sent]
        tokenized_sentences.append(tokenized_sent)
    return tokenized_sentences

def texts_to_sequences(tokenized_X_data, word_to_index):
    encoded_X_data = []
    for sent in tokenized_X_data:
        index_sequences = []
        for word in sent:
            try:
                index_sequences.append(word_to_index[word])
            except KeyError:
                index_sequences.append(word_to_index['<UNK>'])
        encoded_X_data.append(index_sequences)
    return encoded_X_data

if __name__=="__main__":

    # ▼ Integer Encoding 예제
    
    urllib.request.urlretrieve("https://raw.githubusercontent.com/ukairia777/pytorch-nlp-tutorial/main/10.%20RNN%20Text%20Classification/dataset/IMDB%20Dataset.csv", filename="IMDB Dataset.csv")
    df = pd.read_csv('IMDB Dataset.csv')
    df.info() # Dtype에서 object는 문자열
    print('결측값 여부 :',df.isnull().values.any())
    df['sentiment'].value_counts().plot(kind='bar') # 레이블의 불균형여부를 확인해야 함!
    df['sentiment'] = df['sentiment'].replace(['positive','negative'],[1, 0])
    X_data = df['review']
    y_data = df['sentiment']
    print('영화 리뷰의 개수: {}'.format(len(X_data)))
    print('레이블의 개수: {}'.format(len(y_data)))
    X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.5, random_state=0, stratify=y_data)
    
    tokenized_X_train = tokenize(X_train)
    tokenized_X_test = tokenize(X_test)
    # 상위 샘플 2개 출력
    for sent in tokenized_X_train[:2]:
        print(sent)
    
    word_list = []
    for sent in tokenized_X_train:
        for word in sent:
            word_list.append(word)

    word_counts = Counter(word_list)
    print('총 단어수 :', len(word_counts))
    print('훈련 데이터에서의 단어 the의 등장 횟수 :', word_counts['the'])
    print('훈련 데이터에서의 단어 love의 등장 횟수 :', word_counts['love'])

    vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    print('등장 빈도수 상위 10개 단어')
    print(vocab[:10])

    threshold = 3 # 빈도수를 기준으로 vocab의 분포를 조정하기 위한 기준치
    total_cnt = len(word_counts) # 단어의 수
    rare_cnt = 0 # 등장 빈도수가 threshold보다 작은 단어의 개수를 카운트
    total_freq = 0 # 훈련 데이터의 전체 단어 빈도수 총 합
    rare_freq = 0 # 등장 빈도수가 threshold보다 작은 단어의 등장 빈도수의 총 합

    # 단어와 빈도수의 쌍(pair)을 key와 value로 받는다.
    for key, value in word_counts.items():
        total_freq = total_freq + value

        # 단어의 등장 빈도수가 threshold보다 작으면
        if(value < threshold):
            rare_cnt = rare_cnt + 1
            rare_freq = rare_freq + value

    print('단어 집합(vocabulary)의 크기 :',total_cnt)
    print('등장 빈도가 %s번 이하인 희귀 단어의 수: %s'%(threshold - 1, rare_cnt))
    print("단어 집합에서 희귀 단어의 비율:", (rare_cnt / total_cnt)*100)
    print("전체 등장 빈도에서 희귀 단어 등장 빈도 비율:", (rare_freq / total_freq)*100)

    word_to_index = {}
    word_to_index['<PAD>'] = 0
    word_to_index['<UNK>'] = 1

    # 전체 단어 개수 중 빈도수 1이하인 단어는 제거.
    vocab_size = total_cnt - rare_cnt
    vocab = vocab[:vocab_size]
    print('단어 집합의 크기 :', len(vocab))

    for index, word in enumerate(vocab) :
        word_to_index[word] = index + 2
    
    encoded_X_train = texts_to_sequences(tokenized_X_train, word_to_index)
    encoded_X_test = texts_to_sequences(tokenized_X_test, word_to_index)
    # 'UNK' => 'Unknown' => 모르는 단어가 등장했을 경우 맵핑하는 용도로 사용되는 스페셜 토큰.

    index_to_word = {}
    for key, value in word_to_index.items():
        index_to_word[value] = key
    print(word_to_index)
    print(index_to_word)

    decoded_sample = [index_to_word[word] for word in encoded_X_train[0]]
    print('기존의 첫번째 샘플 :', tokenized_X_train[0])
    print('복원된 첫번째 샘플 :', decoded_sample)

패딩 (Padding)

모든 문장에 대해 정수 인코딩을 수행하였을 때 길이는 서로 다를 수 있다. 이때 가상의 단어를 추가하여 길이를 맞춰주는데, 이를 왜 하느냐! 딥러닝 모델이 인코딩한 문장을 병렬처리를 하기 위해서는 행렬로 인식해야 하는데 문장의 각 길이가 달라버리면 이 데이터 셋을 행렬로 인식할 수 없다.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import nltk
nltk.download('punkt')
import torch
import urllib.request
from tqdm import tqdm
from collections import Counter
from nltk.tokenize import word_tokenize
from sklearn.model_selection import train_test_split

def below_threshold_len(max_len, nested_list):
    # 패딩 : 딥러닝 모델이 string을 matrix로 인식하기 위해서는 병렬처리를 해야함. 이를 위해 서로다른 길이를 하나의 길이로 맞춰주는 것.
    count = 0
    for sentence in nested_list:
        if(len(sentence) <= max_len):
            count = count + 1
    print('전체 샘플 중 길이가 %s 이하인 샘플의 비율: %s'%(max_len, (count / len(nested_list))*100))

def pad_sequences(sentences, max_len):
    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

if __name__=="__main__":

    # ▼ Padding 예제

    print('리뷰의 최대 길이 :',max(len(review) for review in encoded_X_train))
    print('리뷰의 평균 길이 :',sum(map(len, encoded_X_train))/len(encoded_X_train))
    plt.hist([len(review) for review in encoded_X_train], bins=50)
    plt.xlabel('length of samples')
    plt.ylabel('number of samples')
    plt.show()

    max_len = 500
    below_threshold_len(max_len, encoded_X_train)
    
    padded_X_train = pad_sequences(encoded_X_train, max_len=max_len)
    padded_X_test = pad_sequences(encoded_X_test, max_len=max_len)
    print('훈련 데이터의 크기 :', padded_X_train.shape)
    print('테스트 데이터의 크기 :', padded_X_test.shape)
    print(padded_X_train[0])

 

전체 코드

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import nltk
nltk.download('punkt')
import torch
import urllib.request
from tqdm import tqdm
from collections import Counter
from nltk.tokenize import word_tokenize
from sklearn.model_selection import train_test_split

def tokenize(sentences):
    tokenized_sentences = []
    for sent in tqdm(sentences):
        tokenized_sent = word_tokenize(sent)
        tokenized_sent = [word.lower() for word in tokenized_sent]
        tokenized_sentences.append(tokenized_sent)
    return tokenized_sentences

def texts_to_sequences(tokenized_X_data, word_to_index):
    encoded_X_data = []
    for sent in tokenized_X_data:
        index_sequences = []
        for word in sent:
            try:
                index_sequences.append(word_to_index[word])
            except KeyError:
                index_sequences.append(word_to_index['<UNK>'])
        encoded_X_data.append(index_sequences)
    return encoded_X_data

def below_threshold_len(max_len, nested_list):
    # 패딩 : 딥러닝 모델이 string을 matrix로 인식하기 위해서는 병렬처리를 해야함. 이를 위해 서로다른 길이를 하나의 길이로 맞춰주는 것.
    count = 0
    for sentence in nested_list:
        if(len(sentence) <= max_len):
            count = count + 1
    print('전체 샘플 중 길이가 %s 이하인 샘플의 비율: %s'%(max_len, (count / len(nested_list))*100))

def pad_sequences(sentences, max_len):
    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

if __name__=="__main__":
    urllib.request.urlretrieve("https://raw.githubusercontent.com/ukairia777/pytorch-nlp-tutorial/main/10.%20RNN%20Text%20Classification/dataset/IMDB%20Dataset.csv", filename="IMDB Dataset.csv")
    df = pd.read_csv('IMDB Dataset.csv')
    df.info() # Dtype에서 object는 문자열
    print('결측값 여부 :',df.isnull().values.any())
    df['sentiment'].value_counts().plot(kind='bar') # 레이블의 불균형여부를 확인해야 함!
    df['sentiment'] = df['sentiment'].replace(['positive','negative'],[1, 0])
    X_data = df['review']
    y_data = df['sentiment']
    print('영화 리뷰의 개수: {}'.format(len(X_data)))
    print('레이블의 개수: {}'.format(len(y_data)))
    X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.5, random_state=0, stratify=y_data)
    
    tokenized_X_train = tokenize(X_train)
    tokenized_X_test = tokenize(X_test)
    # 상위 샘플 2개 출력
    for sent in tokenized_X_train[:2]:
        print(sent)
    
    word_list = []
    for sent in tokenized_X_train:
        for word in sent:
            word_list.append(word)

    word_counts = Counter(word_list)
    print('총 단어수 :', len(word_counts))
    print('훈련 데이터에서의 단어 the의 등장 횟수 :', word_counts['the'])
    print('훈련 데이터에서의 단어 love의 등장 횟수 :', word_counts['love'])

    vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    print('등장 빈도수 상위 10개 단어')
    print(vocab[:10])

    threshold = 3 # 빈도수를 기준으로 vocab의 분포를 조정하기 위한 기준치
    total_cnt = len(word_counts) # 단어의 수
    rare_cnt = 0 # 등장 빈도수가 threshold보다 작은 단어의 개수를 카운트
    total_freq = 0 # 훈련 데이터의 전체 단어 빈도수 총 합
    rare_freq = 0 # 등장 빈도수가 threshold보다 작은 단어의 등장 빈도수의 총 합

    # 단어와 빈도수의 쌍(pair)을 key와 value로 받는다.
    for key, value in word_counts.items():
        total_freq = total_freq + value

        # 단어의 등장 빈도수가 threshold보다 작으면
        if(value < threshold):
            rare_cnt = rare_cnt + 1
            rare_freq = rare_freq + value

    print('단어 집합(vocabulary)의 크기 :',total_cnt)
    print('등장 빈도가 %s번 이하인 희귀 단어의 수: %s'%(threshold - 1, rare_cnt))
    print("단어 집합에서 희귀 단어의 비율:", (rare_cnt / total_cnt)*100)
    print("전체 등장 빈도에서 희귀 단어 등장 빈도 비율:", (rare_freq / total_freq)*100)

    word_to_index = {}
    word_to_index['<PAD>'] = 0
    word_to_index['<UNK>'] = 1

    # 전체 단어 개수 중 빈도수 1이하인 단어는 제거.
    vocab_size = total_cnt - rare_cnt
    vocab = vocab[:vocab_size]
    print('단어 집합의 크기 :', len(vocab))

    for index, word in enumerate(vocab) :
        word_to_index[word] = index + 2
    
    encoded_X_train = texts_to_sequences(tokenized_X_train, word_to_index)
    encoded_X_test = texts_to_sequences(tokenized_X_test, word_to_index)
    # 'UNK' => 'Unknown' => 모르는 단어가 등장했을 경우 맵핑하는 용도로 사용되는 스페셜 토큰.

    index_to_word = {}
    for key, value in word_to_index.items():
        index_to_word[value] = key
    print(word_to_index)
    print(index_to_word)

    decoded_sample = [index_to_word[word] for word in encoded_X_train[0]]
    print('기존의 첫번째 샘플 :', tokenized_X_train[0])
    print('복원된 첫번째 샘플 :', decoded_sample)
    
    # ▲ Integer Encoding 예제
    # -------------------------------------------------------------------------------------
    # ▼ Padding 예제

    print('리뷰의 최대 길이 :',max(len(review) for review in encoded_X_train))
    print('리뷰의 평균 길이 :',sum(map(len, encoded_X_train))/len(encoded_X_train))
    plt.hist([len(review) for review in encoded_X_train], bins=50)
    plt.xlabel('length of samples')
    plt.ylabel('number of samples')
    plt.show()

    max_len = 500
    below_threshold_len(max_len, encoded_X_train)
    
    padded_X_train = pad_sequences(encoded_X_train, max_len=max_len)
    padded_X_test = pad_sequences(encoded_X_test, max_len=max_len)
    print('훈련 데이터의 크기 :', padded_X_train.shape)
    print('테스트 데이터의 크기 :', padded_X_test.shape)
    print(padded_X_train[0])