티스토리 뷰
Data Engineering/Elasticsearch
[Elasticsearch] Script 혹은 Library를 사용하여 데이터 import
dev_jun 2022. 11. 23. 14:40스크립트를 사용하여 데이터 가져오기
A 지점에서 데이터를 가져와 JSON과 나머지 인터페이스를 사용하여 Elasticsearch로 불러오는 스크립트를 작성해보자.
- 먼저 연습에 필요한 데이터셋을 다운로드 받는다. 다음의 데이터셋은 교육용으로 무료 제공되는 영화 정보에 대한 데이터셋이다.
wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
- 압출을 풀 수 있는
unzip
라이브러리를 다운로드 받는다.sudo apt install unzip
- 압축을 풀어준다.
unzip ml-latest-small.zip
- 아래의 파이썬 스크립트를 movies_to_json 저장한다.
"""
- movies.csv 파일을 읽고 영화 제목이나 출시 연도, 장르와 같은 영화 정보를 가져와서 csv.DictReader를 사용해 모든 걸 자동으로 매핑하는 파이썬 Dict로 가져온다.
- Elasticseaerch를 통해 새로운 JSON 파일에 Bulk API 입력에 필용한 JSON 형식대로 한 줄씩 작성한다.
"""
import csv
import re
csvfile = open('ml-latest-small/movies.csv', 'r')
reader = csv.DictReader( csvfile )
for movie in reader:
print ("{ \"create\" : { \"_index\": \"movies\", \"_id\" : \"" , movie['movieId'], "\" } }", sep='')
title = re.sub(" \(.*\)$", "", re.sub('"','', movie['title']))
year = movie['title'][-5:-1]
if (not year.isdigit()):
year = "2016"
genres = movie['genres'].split('|')
print ("{ \"id\": \"", movie['movieId'], "\", \"title\": \"", title, "\", \"year\":", year, ", \"genre\":[", end='', sep='')
for genre in genres[:-1]:
print("\"", genre, "\",", end='', sep='')
print("\"", genres[-1], "\"", end = '', sep='')
print ("] }")
- 작성한 python script를 실행시켜 json 파일로 저장한다.
python3 MoviesToJson.py > more_movies.json
- json 형태로 정상적으로 변환되었는지 확인한다.
less moremovies.json
- Bulk API를 통해 Elasticsearch에 데이터를 import한다.
curl -XPUT 127.0.0.1:9200/_bulk --data-binary @moremovies.json
클라이언트 라이브러리로 가져오기
위의 방식처럼 스크립트를 직접 작성해도 되지만 대부분의 언어에는 Elasticsearch Client Library를 제공을 하기 때문에 이렇게 번거롭게 작업을 할 필요가 없다.
라이브러리를 사용하여 단순하게 import 작업을 진행해보자.
- elasticsearch package를 설치하기 위해 pip를 설치한다.
sudo apt install python3-pip
- elasticsearch package를 설치한다.
pip3 install elasticsearch
- 스크립트를 작성해준다.
import csv
from collections import deque
import elasticsearch
from elasticsearch import helpers
def read_movies():
csvfile = open('ml-latest-small/movies.csv', 'r', encoding="utf8")
reader = csv.DictReader( csvfile )
titleLookup = {}
for movie in reader:
titleLookup[movie['movieId']] = movie['title']
return titleLookup
def read_ratings():
csvfile = open('ml-latest-small/ratings.csv', 'r', encoding="utf8")
titleLookup = read_movies()
reader = csv.DictReader( csvfile )
for line in reader:
rating = {}
rating['user_id'] = int(line['userId'])
rating['movie_id'] = int(line['movieId'])
rating['title'] = titleLookup[line['movieId']]
rating['rating'] = float(line['rating'])
rating['timestamp'] = int(line['timestamp'])
yield rating
es = elasticsearch.Elasticsearch(["http://127.0.0.1:9200"])
es.indices.delete(index="ratings",ignore=404)
deque(helpers.parallel_bulk(es,read_ratings(),index="ratings"), maxlen=0)
es.indices.refresh()
"""
- movies 정보를 읽어 Dict로 변환하는 read_movies() 함수를 생성해준다.
- 평점을 읽기 위한 read_ratings() 함수도 작성한다.
- elasticsearch 인스턴스를 생성하고 기존에 있던 평점 테이블을 삭제한 후 read_ratings() 함수를 사용해 평점 데이터를 평점 index에 import한다.
"""
- 스크립트 파일을 실행시켜주면 된다.
'Data Engineering > Elasticsearch' 카테고리의 다른 글
[Elasticsearch] Logstash와 MySQL (0) | 2022.11.24 |
---|---|
[Elasticsearch] Logstash란 (0) | 2022.11.23 |
[Elasticsearch] Section 3. Elasticsearch를 사용한 검색 (0) | 2022.11.23 |
[Elasticsearch] 데이터 매핑 및 색인화 - 2. 동시성, Analyzer, Flattened, Mapping Exception (0) | 2022.11.22 |
[Elasticsearch] Data 매핑 및 색인화 - 1 (0) | 2022.11.22 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- 빅데이터를지탱하는기술
- 프로그래머스
- BOJ
- Python
- Elasticsearch
- cka
- OS
- 이코테
- 빅데이터
- logstash
- kubernetes
- sqoop
- Flutter
- kafka
- oozie
- heapq
- 파이썬
- Espher
- Algorithm
- DFS
- Hadoop
- CS
- GROK
- 백준
- elasticsaerch
- mahout
- 네트워크
- HDFS
- DP
- CSAPP
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
글 보관함