하둡 스트리밍을 활용한 파이썬 word counting 예제~

|



하둡 스트리밍을 활용하면 맵리듀스 잡을 실행가능한 스크립트, 쉘 프로그래밍/파이썬/자바/R 등으로 처리할 수 있다. 

하둡 스트리밍에 대해서는 Apache Hadoop Streaming을 참고하면 된다. 


이번 강의에서는 기본 하둡 예제인 Word Count를 파이썬으로 구성한 후, 하둡 스트리밍으로 맵리듀스를 적용하는 예제를 살펴보기로 한다. 

하둡 스트리밍 명령어는 다음과 같이 사용법을 확인할 수 있다. 

> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar --help


1. 먼저 파이썬으로 맵 함수를 만들어 보자. 

WordCount에서 맵 함수는 파일의 각 라인별로 읽어서 공백으로 자른 다음, Key: 단어, Value: 1로 출력하면 된다. 

> gedit wordcount_mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    keys = line.split()

    for key in keys:
        value = 1
        print('{0}\t{1}'.format(key, value) )


2. 이어서 파이썬으로 리듀스 함수를 만들어 보자. 

리듀서로 넘어올 때는 이미 정렬과 그룹핑이 되어 있기 때문에 키 값인 단어를 이전과 비교해서 총합을 구하면 된다. 

> gedit wordcount_reducer.py

#!/usr/bin/env python

import sys

last_key = None
running_total = 0

for input_line in sys.stdin:
    input_line = input_line.strip()

    this_key, value = input_line.split("\t", 1)
    value = int(value)

    if last_key == this_key:
        running_total += value

    else:
        if last_key:
            print( "{0}\t{1}".format(last_key, running_total) )
        running_total = value
        last_key = this_key

if last_key == this_key:
    print( "{0}\t{1}".format(last_key, running_total))


3. 작성한 파이썬 소스코드를 실행가능한 모드로 변경한다. 

> chmod +x wordcount_mapper.py
> chmod +x wordcount_reducer.py
> ls


4. Word Count를 테스트할 파일을 생성한다. 

> echo "A long time ago in a galaxy far far away" > /home/cloudera/testfile1
> echo "Another episode of Star Wars" > /home/cloudera/testfile2


5. 생성한 테스트 파일을 하둡 파일 시스템에 올린다. 

만약 이전 강의를 따라하면서 똑같은 파일명이 있다면 해당 파일을 먼저 지우고 올리면 된다. 

> hdfs dfs -mkdir /user/cloudera/input
> hdfs dfs -put /home/cloudera/testfile1 /user/cloudera/input
> hdfs dfs -put /home/cloudera/testfile2 /user/cloudera/input
> hdfs dfs -ls /user/cloudera/input


6. 하둡 스트리밍을 활용하여 생성한 테스트 파일 기반으로 파이썬 맵퍼와 리듀서를 실행한다. 

> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -input /user/cloudera/input \ 
  -output /user/cloudera/output_new \
  -mapper /home/cloudera/wordcount_mapper.py \
  -reducer /home/cloudera/wordcount_reducer.py


7. 결과를 확인하면 테스트 파일의 단어와 개수가 정렬되어 있는 것을 볼 수 있다. 

> hdfs dfs -cat /user/cloudera/output_new/part-00000
> hdfs dfs -ls /user/cloudera/output_new


8. 이번에는 리듀스를 포함하지 않고 적용해보자. 마지막에 numReduceTasks를 0으로 세팅했다. 

> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -input /user/cloudera/input \
  -output /user/cloudera/output_new_0 \
  -mapper /home/cloudera/wordcount_mapper.py \
  -reducer /home/cloudera/wordcount_reducer.py \
  -numReduceTasks 0


9. 결과를 살펴보면 다음과 같이 세 개의 파일로 나누어져 있는 것을 확인할 수 있다. 

또한 리듀서를 적용하지 않았기 때문에 동일한 단어에 대해서 개수를 통합하지 못한 것을 far를 보면 알 수 있다. 

> hdfs dfs -ls /user/cloudera/output_new_0
> hdfs dfs -cat /user/cloudera/output_new_0/part-00000
> hdfs dfs -cat /user/cloudera/output_new_0/part-00001
> hdfs dfs -cat /user/cloudera/output_new_0/part-00002


10. 여러개의 파일로 결과가 나올 경우, getmerge를 통해 하나의 파일로 합쳐서 로컬에 가져올 수 있다. 

> hdfs dfs -getmerge /user/cloudera/output_new_0/* wordcount_num0_output.txt
> ls
> cat wordcount_num0_output.txt


11. 만약 numReducerTasks를 2로 설정하면 어떻게 될까? 

참고로 맵리듀스 결과가 저장될 곳을 기존 디렉토리를 지정하면 이미 파일이 존재한다는 에러가 난다. 

> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -input /user/cloudera/input \
  -output /user/cloudera/output_new_1 \
  -mapper /home/cloudera/wordcount_mapper.py \
  -reducer /home/cloudera/wordcount_reducer.py \
  -numReduceTasks 2


12. 리듀서를 2개를 사용하면 결과값이 글로벌하게 정렬되지 않는 점을 제외하고는 Word Count는 제대로 실행되는 것을 확인할 수 있다. 

> hdfs dfs -ls /user/cloudera/output_new_1
> hdfs dfs -cat /user/cloudera/output_new_1/part-00000
> hdfs dfs -cat /user/cloudera/output_new_1/part-00001


본 강의의 예제는 University of California, San Diego의 폴 로드리게즈(Paul Rodriguez) 교수의 자료를 참고했다. 

신고



Trackback 0 And Comment 0