본문 바로가기
데이터 엔지니어링

[데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.2 (데이터엔지니어링)

by 내가 진짜 유일한 2024. 11. 29.

2024.11.27 - [데이터 엔지니어링] - [데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" 개발 - 1 (코인 데이터 활용, 데이터 파이프라인 설계)

 

[데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" 개발 - 1 (코인 데

이번 포스팅은 코인 데이터를 활용한 모의투자를 게임으로 제작한 "코인 예측왕"이라는 게임을 개발한 내용으로 작성하겠다.1. 코인 데이터를 활용하게 된 계기일단, 코인 데이터를 활용해서 모

dont-make-excuses.tistory.com

전 포스팅에서 게임 개발을 하게 된 계기와 아키텍처 설계에 대한 글을 작성했다.

이번 글은 본격적인 데이터엔지니어링 개발 내용에 대해 다뤄보도록 하겠다.


데이터엔지니어링 파트 개발

데이터 엔지니어링 파트는 온 프로미스로 개발한 클러스터가 기존에 있었다.

2024.03.13 - [데이터 엔지니어링] - Apache Spark & Hadoop 클러스터 구축 및 설정

 

Apache Spark & Hadoop 클러스터 구축 및 설정

이번 글에서는 나의 환경에 맞는 Apache Spark과 Hadoop 클러스터 구축에 대한 방법을 소개하겠다하드웨어 환경 - SingleBoard Computer(라즈베리파이, 라떼판다) 사용 클라우드 서버를 사용해서 클러스터

dont-make-excuses.tistory.com

 

하지만, 클라우드 컴퓨팅에 대해 공부도 하고 프로젝트에 적용도 해보고자 AWS에서 EMR을 사용해서 클러스터를 구축했다.

 

a. AWS EMR 클러스터 생성

EMR 클러스터 생성
EMR 클러스터 Hadoop HDFS Web UI 접속

EMR 클러스터의 코어를 선택하는 기준은 온프로미스 클러스터의 사양과 비슷하게 맞추면서 최대한 저렴한 인스턴스로 선택했다.

이렇게 EMR로 클러스터를 구축하는 방법이 온프로미스 구축 방법 보다 쉬웠고 간편했다.

 

하지만 이 편한 방법에서 문제가 발생했다, 바로 비용이었다.

문제 1. EMR 비용

EMR 클러스터 비용이었는데, 약 8일 동안 35.57 + 6.97 달러를 해서 약 42.64 달러가 나왔다...

 

학생 입장에서 테스트도 해야 하고, 게임 런칭하는 중에 클러스터를 계속 실행하는 일이 필요한 상태에서 이렇게 돈이 많이 나가면,

EMR 클러스터를 더 유지하지 못 하겠다고 생각하고 온프로미스 클러스터로 다시 변경하기로 했다.

b. Kafka로 Upbit 데이터 실시간 수집 - Extract

Kafka로 각 종목 마다 모든 데이터를 수집하기 위해서 종목코드(KRW-ETC, KRW-BTC)가 필요하다.

그렇기 때문에, Upbit에서 종목 코드를 가져오는 API로 Postgresql에 축적해서 사용하고자 했다.

https://docs.upbit.com/reference/%EB%A7%88%EC%BC%93-%EC%BD%94%EB%93%9C-%EC%A1%B0%ED%9A%8C
마켓 종목 Postgresql 저장

종목 코드를 통해 upbit websocket을 연결해서 실시간으로 코인 데이터를 수집하여 Kakfa Producer에 send 하고자 했다.

 

이때, pyupbit 라이브러리를 알게 되어서 굳이 DB를 사용해서 종목 코드를 가져와서 조회할 필요가 없어졌다..

그래서, pyupbit를 통해 종목 코드를 전부 가져와서 websocket으로 실시간 데이터를 수집하여 Kafka Producer에 send 하였다.

import multiprocessing as mp
import pyupbit
import datetime
import os
import json
from kafka import KafkaProducer

KAFKA_TOPIC = 'upbit-ticker-data'
KAFKA_BROKER = 'spark-worker-panda-01:9092, spark-worker-panda-02:9092, spark-worker-panda-03:9092, spark-worker-panda-04:9092'

def default_converter(o):
    if isinstance(o, datetime.datetime):
        return o.isoformat()

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v, default=default_converter).encode('utf-8'),
    retries=5,
    acks='all'
)

def on_send_success(record_metadata):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{current_time}] Message sent to {record_metadata.topic} partition {record_metadata.partition}")

def on_send_error(excp):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{current_time}] Error sending message: {excp}")

def get_market_codes():
    try:
        return pyupbit.get_tickers()
    except Exception as e:
        print(f"Error fetching market codes: {e}")
        return []

def collect_save_data(queue):
    while True:
        data = queue.get()

        producer.send(KAFKA_TOPIC, data).add_callback(on_send_success).add_errback(on_send_error)

if __name__ == "__main__":
    queue = mp.Queue()

    market_codes = get_market_codes()
    if not market_codes:
        print("No market codes available.")
        exit(1)

    proc = mp.Process(
        target=pyupbit.WebSocketClient,
        args=('ticker', market_codes, queue),
        daemon=True
    )
    proc.start()


    collect_save_data(queue)

    producer.flush()
    producer.close()

 

이렇게 Producer에 send한 데이터를 Consumer로 받아,

DataLake인 HDSF에 적재하고, 시각화 목적 시계열 DB인 InfluxDB에 적재하였다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, LongType, BooleanType, TimestampType, StructField
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

influxdb_url = "http://spark-master-panda-01:8086"
influxdb_token = "jCKAgbR5JWKVOoJCBRNMmNyUcTwNgFf_r0hLEeyHCTegaCqIkjRWRu41---uvwhbnCpn1rK2Jr4oi8BLJfLowA=="
influxdb_org = "spark"
influxdb_bucket = "spark"

client = InfluxDBClient(
    url=influxdb_url,
    token=influxdb_token,
    org=influxdb_org
)

write_api = client.write_api(write_options=SYNCHRONOUS)

spark = SparkSession.builder \
    .appName("KafkaSparkStreamingToInfluxAndHDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://spark-master-panda-01:9000") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "3") \
    .config("spark.driver.memory", "1g") \
    .config("spark.master", "yarn") \
    .getOrCreate()

kafka_bootstrap_servers = "spark-worker-panda-01:9092,spark-worker-panda-02:9092,spark-worker-panda-03:9092,spark-worker-panda-04:9092"
kafka_topic = "upbit-ticker-data"

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()

schema = StructType([
    StructField("type", StringType(), True),
    StructField("code", StringType(), True),
    StructField("opening_price", DoubleType(), True),
    StructField("high_price", DoubleType(), True),
    StructField("low_price", DoubleType(), True),
    StructField("trade_price", DoubleType(), True),
    StructField("prev_closing_price", DoubleType(), True),
    StructField("change", StringType(), True),
    StructField("change_price", DoubleType(), True),
    StructField("signed_change_price", DoubleType(), True),
    StructField("change_rate", DoubleType(), True),
    StructField("signed_change_rate", DoubleType(), True),
    StructField("trade_volume", DoubleType(), True),
    StructField("acc_trade_price", DoubleType(), True),
    StructField("acc_trade_price_24h", DoubleType(), True),
    StructField("acc_trade_volume", DoubleType(), True),
    StructField("acc_trade_volume_24h", DoubleType(), True),
    StructField("highest_52_week_price", DoubleType(), True),
    StructField("lowest_52_week_price", DoubleType(), True),
    StructField("market_state", StringType(), True),
    StructField("trade_timestamp", LongType(), True),
    StructField("is_trading_suspended", BooleanType(), True)
])

json_df = kafka_df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

def write_to_influxdb(batch_df, batch_id):
    records = batch_df.collect()
    for record in records:
        point = Point("upbit_ticker") \
            .tag("type", record["type"]) \
            .tag("code", record["code"]) \
            .tag("market_state", record["market_state"]) \
            .tag("change", record["change"]) \
            .field("opening_price", record["opening_price"]) \
            .field("high_price", record["high_price"]) \
            .field("low_price", record["low_price"]) \
            .field("trade_price", record["trade_price"]) \
            .field("prev_closing_price", record["prev_closing_price"]) \
            .field("change_price", record["change_price"]) \
            .field("signed_change_price", record["signed_change_price"]) \
            .field("change_rate", record["change_rate"]) \
            .field("signed_change_rate", record["signed_change_rate"]) \
            .field("trade_volume", record["trade_volume"]) \
            .field("acc_trade_price", record["acc_trade_price"]) \
            .field("acc_trade_price_24h", record["acc_trade_price_24h"]) \
            .field("acc_trade_volume", record["acc_trade_volume"]) \
            .field("acc_trade_volume_24h", record["acc_trade_volume_24h"]) \
            .field("highest_52_week_price", record["highest_52_week_price"]) \
            .field("lowest_52_week_price", record["lowest_52_week_price"]) \
            .field("is_trading_suspended", record["is_trading_suspended"]) \
            .time(record["trade_timestamp"], WritePrecision.MS)

        write_api.write(bucket=influxdb_bucket, org=influxdb_org, record=point)

query_hdfs = json_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://spark-master-panda-01:9000/finVizor/upbit/ticker_data/") \
    .option("checkpointLocation", "hdfs://spark-master-panda-01:9000/checkpoints/upbit") \
    .start()

query_influxdb = json_df.writeStream \
    .foreachBatch(write_to_influxdb) \
    .outputMode("append") \
    .start()

query_influxdb.awaitTermination()
query_hdfs.awaitTermination()

 

이렇게 HDSF와 InfluxDB에 넣어서 시각화까지 했지만, 문제가 발생했다.

문제 2. 학교 폐쇄망 사용

학교에 온프로미스 클러스터가 배치되어 있는데,

폐쇄망을 사용하고 있어서 로컬 DB를 사용하면 배포된 서버에서 DB에 접근하기 어려운 문제가 밸생했다.

 

포트 포워딩은 학교 폐쇄망을 뚫기 위해 사용해도 되는 건지 확인이 안된 상태였기 때문에,

차라리 DB를 클라우드 DB로 변경하기로 했다.

 

그래서, HDFS -> AWS S3로, InfluxDB -> AWS DynamoDB로 변경했다.

c. AWS S3, AWS DynamoDB 선택이유

AWS S3는 대표적인 DataLake로 사용되는 저렴한 클라우드 Storage 서비스 였기 때문에, 고민 없이 S3로 변경하였다.

굉장히 고민 되었던 InfluxDB 대체 DB 선택이었다.

 

처음에는 시계열 DB인 AWS TimeStream를 선택했다. 

시계열 DB를 계속 선택한 이유는 시계열 DB가 대부분 Read, Write 속도 성능이 좋고, 코인 데이터를 그래프로 시각화하기 편해서 그렇다.

 

하지만, TimeStream은 서울 리전이 지원을 안해서,,, 리전을 계속 옮기고 설정해줘야하는 불편함에 TimeStream을 사용하지 않았다.

 

그래서 다른 DB 조사해서 AWS DynamoDB를 선택했다.

DynamoDB를 선택한 이유는 NoSQL이며, 제일 중요한 실시간 데이터를 위해 Read, Write 성능이 제일 잘 나오는 DB라서 선택했다.

 

그렇게 AWS S3, DynamoDB에 Kafka Consumer를 통해 데이터를 적재하도록 코드를 수정했다.

c. Kafka, Spark Streaming으로 AWS S3, DynamoDB에 적재 - Transformation, Load

import boto3
import decimal
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, LongType, StructField

dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2')
table = dynamodb.Table('dynamoDB_upbit_table')

spark = SparkSession.builder \
    .appName("KafkaSparkStreamingToDynamoDBAndS3") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.executor.memory", "5g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "3") \
    .config("spark.driver.memory", "6g") \
    .config("spark.master", "yarn") \
    .getOrCreate()

kafka_bootstrap_servers = "spark-worker-panda-01:9092,spark-worker-panda-02:9092,spark-worker-panda-03:9092,spark-worker-panda-04:9092"
kafka_topic = "upbit-ticker-data"

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

schema = StructType([
    StructField("code", StringType(), True),
    StructField("trade_date", StringType(), True),
    StructField("trade_time", StringType(), True),
    StructField("trade_timestamp", LongType(), True),
    StructField("high_price", DoubleType(), True),
    StructField("low_price", DoubleType(), True),
    StructField("trade_price", DoubleType(), True),
    StructField("change", StringType(), True),
    StructField("change_price", DoubleType(), True),
    StructField("change_rate", DoubleType(), True),
    StructField("timestamp", LongType(), True)
])

json_df = kafka_df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

def write_to_dynamodb(batch_df, batch_id):
    selected_df = batch_df.select(
        col("code"),
        col("trade_timestamp"),
        col("timestamp"),
        col("high_price"),
        col("low_price"),
        col("trade_price"),
        col("change"),
        col("change_price"),
        col("change_rate")
    )

    records = selected_df.collect()

    for record in records:
        try:
            item = {
                "code": record["code"],
                "trade_timestamp": record["trade_timestamp"],
                "timestamp": record["timestamp"],
                "high_price": decimal.Decimal(str(record["high_price"])),
                "low_price": decimal.Decimal(str(record["low_price"])),
                "trade_price": decimal.Decimal(str(record["trade_price"])),
                "change": record["change"],
                "change_price": decimal.Decimal(str(record["change_price"])),
                "change_rate": decimal.Decimal(str(record["change_rate"]))
            }

            table.put_item(Item=item)
        except Exception as e:
            print(f"Failed to write record to DynamoDB: {e}")

query_dynamodb = json_df.writeStream \
    .foreachBatch(write_to_dynamodb) \
    .outputMode("append") \
    .start()

query_s3 = json_df.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "s3a://aws-s3-bucket-fastcampus/dataLake_upbit/") \
    .option("checkpointLocation", "s3a://aws-s3-bucket-fastcampus/checkpoint/") \
    .option("header", "true") \
    .trigger(processingTime='45 seconds') \
    .start()

query_dynamodb.awaitTermination()
query_s3.awaitTermination()

 

로그를 통해 데이터가 잘 수집 되고 있는 지 확인하였다.

이렇게 적재된 데이터는 DynamoDB에서 볼 수 있었다.

DynamoDB 데이터 적재 모니터링 / 온디맨드에서

 

이렇게, DynamoDB에 적재된 데이터를 이제 시각화하기 위해 QuickSight를 사용하려고 했지만,

QuickSight

QuickSight에 DynamoDB를 지원하지 않아서, DynamoDB에 적재된 데이터를 Glue로 S3에 데이터를 옮겨서 시각화 하려고 했다.

AWS Glue를 통해 DynamoDB에 있는 데이터를 S3로 전송
Glue 설정과 실행
Glue 비용

지금 Glue로 S3로 데이터를 옮기는 과정에서 제일 저렴한 인스턴스 유형을 선택해서 실행했지만, 이 역시 하루 정도를 돌렸는데,

비용이 너무 많이 지출되었다.

 

그래서, 코인 데이터 시각화를 위해 다른 방법이 필요했다.

문제3. 비용으로 인한 코인 데이터 시각화 대안 필요

이런 프로젝트를 개발할 때도 비용 문제로 이렇게 변경하는데,

실제 기업에서는 얼마나 비용을 저렴하고 비용 효율적으로 구축하는 지 궁금하다.

 

코인 데이터 시각화 대안으로 생각한 방법은 TradingView를 사용하는 것 이었다.

d. TradingView 사용

https://www.tradingview.com/widget-docs/widgets/charts/advanced-chart/

TradingView Widget을 활용하여 코인 데이터를 시각화 하는 방법을 찾았고,

수집한 데이터를 시각화 하는 것 보다 효율적으로 시각화할 수 있는 방안이 생겨서 TradingView를 활용하기로 했다.

 

 


데이터 엔지니어링 마무리

여기까지가 팀 단위로 게임 개발에 들어가기 전에, 혼자 데이터 엔지니어링 파트를 개발한 내용이다.

최종 데이터 파이프라인 아키텍처

팀을 모집할 때, "이 상태까지 개발 되었고 이제 게임 개발만 하면 된다." 라고 하면서 설득을 했기 때문에,

좋은 팀원들을 잘 모집할 수 있었다고 생각한다.

 

개발을 하면서 초기 데이터 파이프라인과 구성이 많이 변경되었다. 이유는 비용 문제가 대부분이었다.

데이터 파이프라인 구성 변경

온프로미스 클러스터 → AWS EMR → 온프로미스 클러스터

DataLake: HDFS → AWS S3

DataWarehouse: InfluxDB → AWS TimeStream → AWS DynamoDB

Data Visualization: InfluxDB Graph → AWS DynamoDB + AWS Glue + QuickSight → TradingView

 

이렇게 변경해가면서 안정적이면서 실시간으로 데이터를 수집하는 파이프라인을 만들었고,

데이터 엔지니어로서 비용 효율적으로 구축하기 위해 노력해서, 결론적으로 DB 비용만 지출하게 되었다.

 

데이터 엔지니어링과 백엔드 개발, 게임 런칭하면서 발생한 문제와, 해결한 트러블 슈팅은 마지막에 따로 포스팅하겠다.

 

다음 포스팅으로는 Lambda를 활용한 백엔드 파트를 다뤄보도록 하겠다.