티스토리 뷰

스크립트를 사용하여 데이터 가져오기

A 지점에서 데이터를 가져와 JSON과 나머지 인터페이스를 사용하여 Elasticsearch로 불러오는 스크립트를 작성해보자.

  • 먼저 연습에 필요한 데이터셋을 다운로드 받는다. 다음의 데이터셋은 교육용으로 무료 제공되는 영화 정보에 대한 데이터셋이다.
    • wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
  • 압출을 풀 수 있는 unzip 라이브러리를 다운로드 받는다.
    • sudo apt install unzip
  • 압축을 풀어준다.
    • unzip ml-latest-small.zip
  • 아래의 파이썬 스크립트를 movies_to_json 저장한다.
"""
- movies.csv 파일을 읽고 영화 제목이나 출시 연도, 장르와 같은 영화 정보를 가져와서 csv.DictReader를 사용해 모든 걸 자동으로 매핑하는 파이썬 Dict로 가져온다.
- Elasticseaerch를 통해 새로운 JSON 파일에 Bulk API 입력에 필용한 JSON 형식대로 한 줄씩 작성한다.
"""
import csv
import re

csvfile = open('ml-latest-small/movies.csv', 'r')

reader = csv.DictReader( csvfile )
for movie in reader:
        print ("{ \"create\" : { \"_index\": \"movies\", \"_id\" : \"" , movie['movieId'], "\" } }", sep='')
        title = re.sub(" \(.*\)$", "", re.sub('"','', movie['title']))
        year = movie['title'][-5:-1]
        if (not year.isdigit()):
            year = "2016"
        genres = movie['genres'].split('|')
        print ("{ \"id\": \"", movie['movieId'], "\", \"title\": \"", title, "\", \"year\":", year, ", \"genre\":[", end='', sep='')
        for genre in genres[:-1]:
            print("\"", genre, "\",", end='', sep='')
        print("\"", genres[-1], "\"", end = '', sep='')
        print ("] }")
  • 작성한 python script를 실행시켜 json 파일로 저장한다.
    • python3 MoviesToJson.py > more_movies.json
  • json 형태로 정상적으로 변환되었는지 확인한다.
    • less moremovies.json
  • Bulk API를 통해 Elasticsearch에 데이터를 import한다.
    • curl -XPUT 127.0.0.1:9200/_bulk --data-binary @moremovies.json

클라이언트 라이브러리로 가져오기

위의 방식처럼 스크립트를 직접 작성해도 되지만 대부분의 언어에는 Elasticsearch Client Library를 제공을 하기 때문에 이렇게 번거롭게 작업을 할 필요가 없다.
라이브러리를 사용하여 단순하게 import 작업을 진행해보자.

  • elasticsearch package를 설치하기 위해 pip를 설치한다.
    • sudo apt install python3-pip
  • elasticsearch package를 설치한다.
    • pip3 install elasticsearch
  • 스크립트를 작성해준다.
import csv
from collections import deque
import elasticsearch
from elasticsearch import helpers

def read_movies():
    csvfile = open('ml-latest-small/movies.csv', 'r', encoding="utf8")

    reader = csv.DictReader( csvfile )

    titleLookup = {}

    for movie in reader:
            titleLookup[movie['movieId']] = movie['title']

    return titleLookup

def read_ratings():
    csvfile = open('ml-latest-small/ratings.csv', 'r', encoding="utf8")

    titleLookup = read_movies()

    reader = csv.DictReader( csvfile )
    for line in reader:
        rating = {}
        rating['user_id'] = int(line['userId'])
        rating['movie_id'] = int(line['movieId'])
        rating['title'] = titleLookup[line['movieId']]
        rating['rating'] = float(line['rating'])
        rating['timestamp'] = int(line['timestamp'])
        yield rating


es = elasticsearch.Elasticsearch(["http://127.0.0.1:9200"])

es.indices.delete(index="ratings",ignore=404)
deque(helpers.parallel_bulk(es,read_ratings(),index="ratings"), maxlen=0)
es.indices.refresh()

"""
- movies 정보를 읽어 Dict로 변환하는 read_movies() 함수를 생성해준다.
- 평점을 읽기 위한 read_ratings() 함수도 작성한다.
- elasticsearch 인스턴스를 생성하고 기존에 있던 평점 테이블을 삭제한 후 read_ratings() 함수를 사용해 평점 데이터를 평점 index에 import한다.
"""
  • 스크립트 파일을 실행시켜주면 된다.
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함