본문 바로가기
Hadoop

[Hadoop] Apache Hadoop - MapReduce

by 내가 진짜 유일한 2024. 11. 8.

글을 작성하기에 앞서..

데이터 엔지니어링을 공부하기 시작하면 먼저 Hadoop ecosystem를 접하게 된다.

Hadoop ecosystem을 보면 다뤄야할 프레임워크나 스킬들이 너무 많아서, 배우는 데 오래걸리겠다는 생각이 들었다.

이번 포스팅은 그 중에 Apache Hadoop 프레임워크에 대해 다뤄보고자 한다.


Apache Hadoop이란?

The Apache Hadoop software libary is a framework that allows for the distributed  processing of large data sets across clusters of computers using programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and strorage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle  failures at the application layer, so delivering a high-available service on top of a cluster of computers, each of which may be prone to failures.

출처 : https://hadoop.apache.org/

Apache Hadoop에 대한 정의를 여기에서 확인할 수 있다.

 

정리해보자면,

Apache Hadoop 소프트웨어 라이브러리이며, 프로그래밍 모델을 사용하여 컴퓨터 클러스터 전체에 걸쳐 대규모 데이터 세트를 분산 처리할 수 있는 프레임워크이다. 단일 서버부터 수천대의 머신으로 확장 가능하도록 설계되었으며 각각 로컬 컴퓨팅 및 스토리지를 제공한다.

라고 한다.

 

대규모 데이터 세트를 처리할 수 있는 분산 컴퓨팅 시스템이라고 이해할 수 있는데, 그렇다면 왜 하둡을 사용하게 되었는지 알아보자.

Hadoop 배경

2000년대 초에 인터넷과 컴퓨터의 발전으로 엄청난 양의 데이터가 생성되기 시작했을 때, RDBMS로 처리하기 어려운 비정형 데이터를 포함하고 있었고, 데이터의 양과 복잡성도 급격하게 증가하기 시작했다.

이를 해결하기 위해, Google에서 2003년에 GFS(Google File System), 2004년에 MapReduce라는 논문을 발표하여 대규모 데이터를 효율적으로 저장하고 처리할 수 있는 분산 파일 시스템과 병렬처리 프레임워크를 제안했다.

 

2004년에 Doug Cutting과 Mike Cafarella는 이 논문에서 아이디어를 얻어, 대규모 데이터를 효율적으로 처리할 수 있는 시스템을 개발하고자 하였으며, 이를 기반으로 오픈소스 분산 컴퓨팅 프레임워크인 Hadoop을 개발하였다.

 

Hadoop 사용 사례

그렇다면 왜 대규모 데이터를 처리하기 위해 Hadoop을 사용할까? 대표적인 Hadoop 사용 사례에 대해 알아보면,

2000년 대에 Hadoop을 사용해서 대규모 데이터 작업을 빠르고 비용 효율적으로 처리한 사건이 있었다.

 

2007년 뉴욕 타임즈는 대량의 아카이브를 디지털화 하고자 했으먀, 이 프로젝트는 약 400만 페이지에 달하는 뉴욕 타임즈의 기사 아카이브를 PDF 형식으로 변환하여, 이를 웹을 통해 제공하는 것이 목표였다.

하지만, 매우 방대한 데이터 처리 작업이었기 때문에 기존의 데이터 처리 방법으로는 비용과 시간이 많이 소요 되었다고 한다.

 

뉴욕 타임즈는 이 문제를 해결하기 위해 Apache Hadoop의 MapReduce와 HDFS(Hadoop Distributed File System)을 활용했다.

AWS의 EC2와 S3를 활용하여 데이터를 저장하고, EC2 상에서 Hadoop 클러스터를 구축하여, 100대 이상의 EC2 인스턴스에서 병렬로 PDF파일을 생성하도록 구현하여 몇 주가 소요되었을 작업을 단 24시간 만에, 400만 페이지의 PDF파일을 240달러 정도로 저렴하게 수행할 수 있었던 Hadoop의 고성능 데이터 처리 능력분산 컴퓨팅의 경제적 이점을 실증한 사례로 남아 있다.

 

그렇다면, Hadoop의 어떤 점이 이런 대용량 데이터를 효율적으로 처리할 수 있는걸까?

MapReduce

MapReduce는 데이터를 여러 조각으로 나누고, 각 조각을 분산된 노드에서 병렬로 처리할 수 있다.

이로 인해, 전체 데이터 처리가 빠르게 이루어지며, 데이터의 크기가 증가해도 추가 노드를 통해 확장 가능해진다.

 

MapReduce Architecture

1. Splitting

Spiltting 단계에서는 입력 데이터를 여러 조각으로 나눠 각 노드가 독립적으로 처리할 수 있도록 준비한다.

대용량 데이터셋이 더 작은 단위로 나뉘어 "입력 스플릿(Input Split)"이라는 관리 가능한 조각으로 나뉜다.

각 입력 스플릿은 독립적으로 처리할 수 있는 데이터셋의 일부이다.

Splitting 개념은 병렬 데이터 처리를 가능하게 하여 여러 컴퓨팅 노드가 동시에 서로 다른 입력 스플릿을 처리할 수 있게 한다.

Splitting은 처리 효율성을 높일 뿐만 아니라 대규모 데이터셋의 분산 처리를 가능하게 하는 중요한 요소이다.

이 과정은 데이터의 크기가 커질수록 전체 처리를 더욱 빠르게 하도록 돕는다.

2. Mapping

Mapping 단계에서 각 입력 스플릿이 사용자가 정의한 맵 함수에 따라 처리된다.

이 함수는 입력 스플릿 내 데이터를 일련의 키-값 쌍(Key-Value pair) 으로 변환한다.

키-값 쌍 은 이후 데이터의 그룹화와 처리를 위해 필수적이다.

매핑 단계는 분산된 컴퓨팅 노드에서 병렬로 수행되므로 처리 효율성이 더 높아진다.

3. Shuffling

Shuffling 단계에서 매핑 단계에서 생성된 중간 결과를 키 기준으로 그룹핑한다.

같은 키를 가진 키-값 쌍을 모두 모아 정리하는 과정이다.

이를 통해 동일한 단어들이 같은 노드에 모이게 되고, 최종 집계 작업을 더욱 효율적으로 수행할 수 있게 해준다.

예를 들어, "Hadoop"이라는 단어는 모든 노드에 발생한 빈도가 한 곳에 모이게 된다.

4. Reducing

Reducing 단계에서는 셔플릿을 통해 모인 키-값 쌍을 하나로 집계해서 최종 결과 Final Output을 생성한다.

이 단계에서 데이터가 집계되거나 분석되는 과정을 거쳐 최종 결과를 산출한다.

위 그림의 예시에서 "Hadoop"이라는 단어는 4번 등장하므로, {"Hadoop":4} 와 같은 결과가 나오게 된다.

이 단계가 끝나면 모든 중복된 데이터가 제거되고, 원하는 분석 결과만 남게 된다.

 

그렇다면, MapReduce를 사용해서 워드카운팅을 하는 예시를 살펴보도록 하자.

그 전에 Hadoop을 설치하고 적용해야 한다.

Hadoop을 설치하는 방법은 이 글을 참고하면 된다.

2024.03.13 - [데이터 엔지니어링] - Apache Spark & Hadoop 클러스터 구축 및 설정

 

Apache Spark & Hadoop 클러스터 구축 및 설정

이번 글에서는 나의 환경에 맞는 Apache Spark과 Hadoop 클러스터 구축에 대한 방법을 소개하겠다하드웨어 환경 - SingleBoard Computer(라즈베리파이, 라떼판다) 사용 클라우드 서버를 사용해서 클러스터

dont-make-excuses.tistory.com

WordCount 성능 비교 - 텍스트 파일 크기 143MB

이제 WordCount를 예시 텍스트와 코드로 테스트를 해보겠다.

비교는 3가지의 비교군을 두어 성능을 비교해보겠다.

 

먼저 WordCount를 하기 위해, Lorem ipsum 형식의 무작위 텍스트 파일을 생성하는 코드를 먼저 실행해서 텍스트 파일을 생성한다.

import random
import string

file_path = "long_text_file.txt"
num_lines = 1000000
word_count_per_line = 10

def random_word():
    word_length = random.randint(3, 10)
    return ''.join(random.choices(string.ascii_lowercase, k=word_length))

with open(file_path, "w") as file:
    for _ in range(num_lines):
        line = " ".join(random_word() for _ in range(word_count_per_line))
        file.write(line + "\n")

print(f"{file_path} 파일 생성 완료.")

 

이렇게 생성한 텍스트 파일의 형태는 이러하다.

이렇게 무작위 단어로 생성된 텍스트 파일

 

텍스트 파일의 크기 약 143.06 MB

1. Java 단일 프로세스 순차적 처리 워드카운팅

첫 번째는 MapReduce를 사용하지 않고 단순한 Java 기반의 Word Count 코드를 작성해보겠다.

한 줄씩 읽어서 각 단어의 빈도를 계산하는 방식이다.

import java.io.*;
import java.util.*;

public class WordCountSimple {

    public static Map<String, Integer> wordCount(List<String> data) {
        Map<String, Integer> wordCountMap = new HashMap<>();

        for (String line : data) {
            String[] words = line.split("\\s+");

            for (String word : words) {
                word = word.toLowerCase();
                wordCountMap.put(word, wordCountMap.getOrDefault(word, 0) + 1);
            }
        }

        return wordCountMap;
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        List<String> data = new ArrayList<>();
        String filePath = "long_text_file_2.txt";

        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
            String line;
            while ((line = reader.readLine()) != null) {
                data.add(line);
            }
        } catch (IOException e) {
            System.err.println("Error reading file: " + e.getMessage());
            return;
        }

        Map<String, Integer> result = wordCount(data);

        long endTime = System.currentTimeMillis();

	//result.forEach((word, count) -> System.out.println(word + ": " + count));

        System.out.println("Processing Time: " + (endTime - startTime) + " milliseconds");
    }
}

 

이 코드를 long_text_file.txt 약 143MB 크기의 텍스트 파일을 워드 카운팅 하는 프로세스 소요 시간은 약 18초가 소요 되었다.

18065 밀리초 소요

2. Java MapReduce 단일 프로세스 워드카운팅

MapReduce 방식을 사용해서 Word Count 작업을 수행한 코드이며, Hadoop 없이 Java 만으로 MapReduce를 구현한 것이다.

map() 메서드와 reduce() 메서드로 나뉘어져 있으며,

Map 단계에서 data 리스트의 각 줄에 대해 map 메서드를 호출하고, mappedData 리스트에 저장한다.

Reduce 단계에서 mappedData 리스트를 reduce() 메서드에 전달하여 단어별 총 빈도를 계산한다.

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

public class WordCountMapReduce {

    public static Map<String, Integer> map(String line) {
        Map<String, Integer> wordMap = new HashMap<>();
        String[] words = line.split("\\s+");

        for (String word : words) {
            word = word.toLowerCase();
            wordMap.put(word, wordMap.getOrDefault(word, 0) + 1);
        }

        return wordMap;
    }

    public static Map<String, Integer> reduce(List<Map<String, Integer>> mappedData) {
        Map<String, Integer> reducedData = new HashMap<>();

        for (Map<String, Integer> map : mappedData) {
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                reducedData.put(entry.getKey(), reducedData.getOrDefault(entry.getKey(), 0) + entry.getValue());
            }
        }

        return reducedData;
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        List<String> data = new ArrayList<>();
        String filePath = "long_text_file.txt";

        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
            String line;
            while ((line = reader.readLine()) != null) {
                data.add(line);
            }
        } catch (IOException e) {
            System.err.println("Error reading file: " + e.getMessage());
            return;
        }

        List<Map<String, Integer>> mappedData = data.stream()
                .map(WordCountMapReduce::map)
                .collect(Collectors.toList());

        Map<String, Integer> result = reduce(mappedData);

        long endTime = System.currentTimeMillis();

        //result.forEach((word, count) -> System.out.println(word + ": " + count));

        System.out.println("Processing Time: " + (endTime - startTime) + " milliseconds");
    }
}


이 코드를 long_text_file.txt 약 143MB 크기의 텍스트 파일을 워드 카운팅 하는 프로세스 소요 시간은 약 26초가 소요 되었다.

소요 시간 26722 밀리초

3. Hadoop MapReduce 분산 프로세스 워드카운팅

클러스터로 이루어진 Hadoop 프레임워크를 사용해서, Word Count를 하는 코드이다.

Mapper 단계에서는 WordCountMapper 클래스로 작성한다.

WordCountMapper 클래스는 입력 파일에서 데이터를 읽고 key-value 형식으로 변환하는 역할을 한다.

map() 메서드에서 line = value.toString()은 입력 줄 value를 문자열로 변환한다.

StringTokenizer tokenizer = new StringTokenizer(line)은 공백을 기준으로 문자열을 분리한다.

 

Reducer 단계에서는 WordCountReducer 클래스로 작성한다.

WordCountReducer 클래스는 Mapper에서 출력된 결과를 받아서 동일한 키에 대해 값을 더하는 역할이다.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken().toLowerCase());
                context.write(word, one);
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: WordCount <input path> <output path>");
            System.exit(-1);
        }

	long startTime = System.currentTimeMillis();

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean jobCompletionStatus = job.waitForCompletion(true);

	long endTime = System.currentTimeMillis();
        long executionTime = endTime - startTime;

	System.out.println("Execution Time: " + executionTime + " milliseconds");

	System.exit(jobCompletionStatus ? 0 : 1);
    }
}

 

먼저 이 코드를 실행하기 위해 텍스트 파일을 HDFS(Hadoop Distributed File System)에 put해야한다.

그 후에 hadoop MapReduce 코드를 실행하면 된다.

hdfs dfs -put long_text_file.txt /input
hadoop jar wordcount.jar WordCount /input /output

HDFS에 long_text_file.txt 파일을 올림
Hadoop MapReduce 로그 - 1
Hadoop MapReduce 로그 - 2

로그를 보면 Map이 0% 부터 100% 까지 처리하고 나서, Reduce 작업을 시작한다.
Reduce 작업 또한 0% 부터 100% 까지 처리하면 결과가 나온다.

수행 시간은 55827 밀리초로 약 55초가 걸렸다.

3가지 비교군 워드카운팅 결과

분명 MapReuce를 사용하면, 데이터 처리 성능이 빠를 것으로 예상되었는데,

단일 프로세스로 MapReduce를 사용하지 않은 워드카운트 코드가 제일 성능이 좋았고,

Hadoop을 사용하여 MapReuce를 수행한 코드가 성능이 제일 안 좋게 나왔다.

 

그렇다면 MapReduce와 Hadoop MapReduce가 어떨 때 성능이 좋을까?를 생각해보면,

대용량 데이터 처리를 할 때 성능이 좋다는 것을 알고 있지만, 이를 실험으로 증명해보겠다.

WordCount 성능 비교 - 텍스트 파일 크기 715MB

이를 위해 텍스트 파일의 크기를 키워 새로운 파일을 만들었다.

import random
import string

file_path = "long_text_file_2.txt"
num_lines = 10000000
word_count_per_line = 10

def random_word():
    word_length = random.randint(3, 10)
    return ''.join(random.choices(string.ascii_lowercase, k=word_length))

with open(file_path, "w") as file:
    for _ in range(num_lines):
        line = " ".join(random_word() for _ in range(word_count_per_line))
        file.write(line + "\n")

print(f"{file_path} 파일 생성 완료.")

라인을 10배를 늘려, long_text_file_2 텍스트 파일을 생성했다. 크기는 약 715MB이다.

HDFS에 파일 업로드
long_text_file_2.txt 크기는 약 715MB

이제 이 텍스트 파일을 앞서 진행 했던 3가지의 코드를 다시 실행해보겠다.

단일 자바 워드카운트 결과

일단 제일 수행 성능이 빨랐던 단일 순차적 워드카운팅 코드는 자바 힙 메모리 문제로 에러가 발생해서 실행이 불가하다.

힙 메모리 사이즈를 늘리는 방법도 있지만, 이 에러가 발생한 거 자체로 대용량 데이터를 처리하는 것은 문제가 있다고 생각된다.

단일 자바 MapReduce 워드카운트 결과

단일 자바로 MapReduce를 통해 워드카운팅한 코드 또한 자바 힙 메모리 문제로 에러가 발생하였다.

 

하지만, Hadoop 분산 컴퓨팅 프로세스를 사용한 방식의 워드카운팅 결과는

Hadoop MapReduce 워드카운트 처리 결과 271992 밀리초 소요

Hadoop 프레임워크를 사용하여 워드카운팅한 결과는 에러 없이 약 4.5분이 소요 되었다.

 

이러한 결과로 대용량 데이터를 처리할 때 Hadoop 프레임워크를 사용한 대용량 데이터 처리 성능이 우세한 것을 확인할 수 있었다.

 

만약, 이 실험을 보고 자바 단일 프로세스도 힙 메모리를 늘리면 되지 않나? 할 수 있지만, 

지금은 140MB, 715MB로 실험을 진행 한 것이고, 정말 빅데이터 크기인 YB, ZB 단위로 넘어가게 된다면,

단일 하드웨어의 메모리를 데이터 크기에 맞춰 늘리는 것은 한계가 있을 것이다.

 

따라서 Hadoop 프레임워크를 사용한 클러스터 분산 컴퓨팅 처리 방식이 더욱 효율적이고 유연한 확장이 되기 때문에 적합하다라고 판단할 수 있다.

 

이번 포스팅에선 Hadoop의 정의와 Hadoop의 MapReuce 처리 방식, MapReduce를 하는 이유와

Hadoop 프레임워크를 사용하는 이유를 워드카운트 실험으로 보였다.


다음 포스팅에선 Hadoop의 주요 기능인 HDFS, YARN에 대해 작성해보겠다.