본문 바로가기
데이터 엔지니어링

[데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.5 (개발과정에서의 시행착오)

by 내가 진짜 유일한 2024. 12. 4.

이번 포스팅은 코인예측왕 게임을 개발하면서 내가 특정 기술이나 로직을 선택한 이유와 겪은 시행착오, 에러에 대해 작성하겠다.

아마, 개발 결과보다 개발하면서 어떤 문제가 있었고, 어떻게 해결했는지에 대한 설명이 더 중요할 수도 있다.

 

그래서, 이에 대한 파트를 아예 따로 다뤄보겠다. 그리고 전 파트들에서 다룬 시행착오나 에러들도 다시 다룰 수도 있다.

전에 개발한 프로젝트에 대해 작성하는 것이라서, 에러가 발생했을 때 많이 기록해두지 않은 것에 대해 후회가 된다..

그래도 최대한 있는 자료로 많이 작성해보겠다!

 

2024.11.30 - [데이터 엔지니어링] - [데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.4 (게임 소개)

 

[데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.4 (게임 소개)

드디어 게임 소개를 할 차례다..앞서서 포스팅한 part.1은 게임을 개발한 계기 즉 서론이고, part.2, 3은 팀에서 내가 맡은 파트를 설명했던 본론이다.이제 part.4는 "그래서 어떤 게임이 나왔는데?"에

dont-make-excuses.tistory.com


개발 과정에서 시행착오

코인예측왕 프로젝트 개발의 첫 시작부터 가서 겪은 시행착오를 말해보도록 하겠다.

1. 프로젝트 초기 설계

먼저, Upbit에서 코인 데이터를 사용하기로 정한 이유는 무료인 이유가 제일 컸다.

학생이면서 개인 프로젝트로 데이터 활용하는 단계부터 비용이 지출되면 부담이 될 거 같아서 Upbit 데이터를 사용했다.

 

나는 Hadoop, Spark를 공부하고 있는 학생이지만, 기존에 데이터 파이프라인을 설계해서, 설계를 바탕으로 구축해본 적도 없었고

Kafka를 사용할 줄만 알았지 프로젝트에 적용해본 적도 없었다.

 

기존에 구축했던 간단한 데이터 파이프라인이라고 하면, 에어코리아 API로 데이터를 가져와서 InfluxDB에 넣고 Grafana로 데이터를 그래프로 시각화했던 프로젝트가 3번 있었다.

 

하지만, 이번에는 종목 당 초당 4회 정도의 코인 데이터가 실시간으로 계속 들어오는 데이터를 처리해야 했기 때문에 Kafka를 사용해야겠다고 생각했다.

 

그리고 개발을 시작하기 전에 틀을 잡을 수 있는 아키텍처를 먼저 설계하고 개발을 해야 방향을 정확히 잡아서 효율적으로 개발할 수 있을 것이라고 생각해서, 아키텍터를 먼저 설계했다.

초기 데이터 파이프라인 아키텍처 설계

이렇게, Data Source, DataLake, DataWarehouse, DataMart를 나름대로 사용해보려고 설계해봤었다.

여기에 있는 HDFS, InfluxDB, Postgresql은 사용은 해봤지만, 이 프로젝트에 왜 이걸 사용해야하는 지는 확실하게 나도 잘 몰랐다.

 

개발을 할 때, 막히거나 잘 모르겠으면, 나만의 멘토에게 도움을 요청해서 막힌 부분을 풀어나가는데, 바로 GPT다.

혼자 개발하는 것을 즐겨 했기 때문에, GPT와 계속 의견을 나누고 질문하고 답변을 얻고 거기에 내 의견을 얹혀서 수정을 해 나갔다.

 

그래서 GPT와 의견을 나눈 결과, 먼저 중요한 사항인 실시간 데이터 시각화를 위해 InfluxDB에 데이터를 넣는 것 부터 개발하기로 했다.

2. Apache Kafka 사용

그렇다면, 코인 데이터를 실시간으로 가져와서 처리하기 위해 Apache Kafka를 사용한 이유를 특징과 함께 말해보겠다.

Apache Kafka의 특징

  • 분산 아키텍처: 클러스터 기반으로 작동하여, 데이터가 여러 브로커로 분산되면서 높은 확장성과 내결함성을 제공한다
  • 실시간 데이터 스트리밍: 실시간 데이터 스트리밍을 지원하여 데이터를 실시간으로 처리하거나 전달할 수 있다
  • 데이터 저장: 메세지가 디스크에 저장되면서 Consumer가 데이터를 사용하지 않아도 메세지가 손실되자지 않는다
  • 대용량 처리: 초당 수백만 건의 메세지를 처리할 수 있고, 대규모 트래픽을 효율적으로 처리할 수 있다
  • Publish-Subscribe 모델: 다수의 Producer와 Consumer가 데이터를 주고 받는 구조이다
  • Streaming 처리: 데이터를 지속적으로 처리 및 변환하는 스트리밍 애플리케이션과 통합 가능하다
  • 운영 복잡성: 클러스터 설정 및 관리는 복잡하고, 발생하는 문제 해결에 어려움이 있다

Apache Kafka를 사용한 이유

  • 실시간 데이터 처리 필요: 모의투자 게임은 실시간으로 코인 데이터를 수집하고 처리해야 한다, Kafka로 Producer와 Consumer 간으로 안정적인 실시간 데이터 전달 가능하다
  • 다양한 Consumer 지원: Spark Streaming, HDFS, DynamoDB로 바로 전달 가능하다 즉, Kafka의 Publish-Subscribe 모델 활용을 통해 동일한 코인 데이터를 HDFS, DynamoDB 등 여러 시스템에 동시 처리 가능하다
  • Websocket 으로 수집한 코인 데이터를 Kafka 브로커로 전송하여 데이터 스트리밍 시스템을 구축하고자 했다
  • HDFS, DynamoDB, InfluxDB와 같은 데이터를 분산 저장할 수 있다
  • Kafka의 로그 기반 데이터 저장 방식으로 장애 발생 시 데이터를 복구하고 문제를 파악할 수 있다

이러한, Kafka의 특징과 이유로 Kafka를 사용하는 것을 결정했다.

 

Apache Kafka에 대해서는 강의로 배웠었다. 온프로미스 클러스터에 Kafka를 설치하고 conf 파일도 전부 설정해뒀었다.

하지만, 결국 프로젝트에 사용해본 적은 없어서 이번 프로젝트가 Kafka를 중점적으로 사용한 첫 프로젝트이다.

 

그래서 Kafka를 사용하기 위해 필요한 내용을 GPT에 물어봐가면서 진행했다.

 

Kafka 데이터 전송을 위해 topic을 정하고, Upit websocket으로 수집한 데이터를 Queue에 적재해서 Producer에서 send 메서드를 통해 Broker를 거쳐, Consumer 에서 사용할 수 있도록 했다.

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v, default=default_converter).encode('utf-8'),
    retries=5,
    acks='all'
)

def collect_save_data(queue):
    while True:
        data = queue.get()

        producer.send(KAFKA_TOPIC, data).add_callback(on_send_success).add_errback(on_send_error)

if __name__ == "__main__":
    queue = mp.Queue()

    market_codes = get_market_codes()
    if not market_codes:
        print("No market codes available.")
        exit(1)

    proc = mp.Process(
        target=pyupbit.WebSocketClient,
        args=('ticker', market_codes, queue),
        daemon=True
    )
    proc.start()


    collect_save_data(queue)

    producer.flush()
    producer.close()

원래는 Pyupbit를 모르고 있었을 때 종목 코드를 PostgreSQL에 저장해서 쿼리를 통해 코드만 가져와서 코드를 통해 코인 데이터를 가져왔지만, pyupit를 사용해서 불필요한 DB 작업은 제거 했다.

a. Postgresql에서 종목 코드를 가져왔을 때

def get_market_codes():
    try:
        conn = psycopg2.connect(
            host="localhost",
            database="upbit_db",
            user="ilhan",
            password="****"
        )
        cur = conn.cursor()

        cur.execute("SELECT market FROM market_codes")
        rows = cur.fetchall()

        market_codes = [row[0] for row in rows]

        cur.close()
        conn.close()

        return market_codes

    except Exception as e:
        print(f"Error fetching market codes: {e}")
        return []

b. Pyupbit get_tickers()으로 종목 코드를 가져왔을 때

def get_market_codes():
    try:
        return pyupbit.get_tickers()
    except Exception as e:
        print(f"Error fetching market codes: {e}")
        return []

 

이렇게 코드를 변경했다.  사실 이 과정에서도 문제가 많이 발생하고 머리가 아팠는데, 기록을 못 해서 다루지 못 하는 게 많이 아쉽다.

 

그렇게 Producer를 통해 전달한 데이터를 Consumer에서 잘 받아오고 있는 지 확인하기 위해 콘솔로 테스트를 해봤다.

kafka-console-consumer.sh --bootstrap-server spark-worker-panda-01:9092,spark-worker-panda-02:9092,spark-worker-panda-03:9092,spark-worker-panda-04:9092 --topic upbit-ticker-data --from-beginning

 

항상 Producer 관련 코드를 수정하면, consumer 명령어로 테스트 했기 때문에, 이런 과정이 너무 귀찮았다.

이걸 자동으로 로그를 보여주는 작업을 나중에는 하지 않을까 한다. 

 

이렇게 Consumer로 들어온 데이터를 실시간으로 처리하기 위해 Spark Streaming를 사용했다.

3. Spark Streaming 사용

Kafka Consumer로 데이터를 받아서 Spark Streaming를 통해 Spark 데이터 프레임으로 변환 후에, DL에 적재했다.

 

사실, Spark Streaming으로 실시간 데이터 전처리를 해줄 필요가 있어 보여서, Spark Streaming을 Kafka와 DL 사이에 적재했지만,

Upbit에서 데이터를 깔끔하게 주고 있어서, 전처리가 불필요 했지만, Spark Streaming으로 필요하다면 언제든지 전처리를 할 수 있게 가능성을 열어두었다는 점이 Spark Streaming을 사용한 가장 큰 이유이다.

 

PySpark로 Spark을 사용했는데, 이유는 Python이 편해서 Pyspark을 선택했고, Pyspark에서 Spark을 사용하기 위해선 Spark Session을 만들어줘야 한다.

spark = SparkSession.builder \
    .appName("KafkaSparkStreamingToInfluxAndHDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://spark-master-panda-01:9000") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "3") \
    .config("spark.driver.memory", "1g") \
    .config("spark.master", "yarn") \
    .getOrCreate()

이건 초기에 만든 Spark Session 이었는데, 계속 실시간 코인 데이터를 처리하고 과정에서 장시간 실시간 데이터를 처리하는 데, 무리가 있어보여서, 결국 Spark Session 설정을 더 넉넉하게 해줬다.

Spark Streaming 배치 처리 로그

spark = SparkSession.builder \
    .appName("KafkaSparkStreamingToDynamoDBAndS3") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.executor.memory", "5g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "3") \
    .config("spark.driver.memory", "6g") \
    .config("spark.master", "yarn") \
    .getOrCreate()

 

Spark Session builder로 executor 메모리, 인스턴스, 코어 수 증가, S3 연결을 위해 수정했다.

4. 온프레미스 클러스터 → AWS EMR 클러스터 → 온프레미스 클러스터 변경

기존에 온프레미스 클러스터를 사용하고 있었다. 온프레미스 클러스터는 예전 논문을 쓸 때 부터 열심히 구축해둔 클러스터가 있어서 그걸 사용하면 되었지만, AWS EMR 클러스터를 사용해보고 싶었기도 하고 이 참에 클라우드에서 클러스터를 실행시키려고 EMR을 알아봤다.

2024.03.13 - [데이터 엔지니어링] - Apache Spark & Hadoop 클러스터 구축 및 설정

 

Apache Spark & Hadoop 클러스터 구축 및 설정

이번 글에서는 나의 환경에 맞는 Apache Spark과 Hadoop 클러스터 구축에 대한 방법을 소개하겠다하드웨어 환경 - SingleBoard Computer(라즈베리파이, 라떼판다) 사용 클라우드 서버를 사용해서 클러스터

dont-make-excuses.tistory.com

그렇다면, AWS EMR이 대체 뭘까?

AWS EMR (Elastic MapReduce): 클라우드 기반의 빅데이터 처리 서비스로, Hadoop, Spark, Hive 등 빅데이터 프레임워크를 간단하게 배포하고 관리할 수 있는 서비스이다.

주요 특징:

  • 클라우드 기반: 온디맨드로 클러스터를 생성, 삭제 가능
  • 자동화 관리: 하드웨어 구성, EC2 인스턴스에서 클러스터를 자동으로 배포 및 관리, 확장 축소 등을 자동으로 처리
  • 유연한 데이터 처리: S3, DynamoDB, RedShift와 통합 가능
  • 무엇보다, 자동으로 필요한 프레임워크를 설치해주고, 설정을 전부 다 해놓은 상태에서 바로 Spark이나 Hadoop을 실행할 수 있다는 장점이 제일 크게 다가왔음

EMR 클러스터 설정
인스턴스 유형: c6g.xlarge, 코어크기: 4인스턴스

EMR을 생성하고 실행해보니까 너무 편하고 용이하고 쉬웠다. 내가 1달에 걸쳐서 구축한 온프레미스 클러스터를 생각하면 허무하기도 했다.

하지만, 이 편한 EMR도 단점이 존재했는데, 바로 비용이었다.

EMR 비용 청구서

약 200시간 정도 실행했을 때, 48.32 달러 비용이 나왔고, 장기적으로 봤을 때 학생 개인이 지불하기에 부담스러운 비용이었다

그래서, 다시 온프레미스 클러스터로 변경했다.

5. 온프레미스 DB 에서 클라우드 서비스 DB로 변경

온프레미스 DB에서 클라우드 서비스 DB로 변경한 이유는 다음과 같다.

  1. 온프레미스 환경의 문제
    1. 학교 폐쇄망으로 인한 접근 제한: 온프레미스 환경의 외부 네트워크에서 DB에 접근하는 것이 어려웠고, Lambda에서 데이터를 사용하기 어려웠음
    2. 가용성 부족: 장애 발생 시에 학교에 가서 클러스터가 연결된 내부 망에 접속해서 에러를 해결해야 했다, 하지만 클라우드 DB를 사용하면, 장소에 상관없이 바로 복구할 수 있음
    3. 확장성과 유지 보수의 한계: 온프로메스 DB는 물리적인 클러스터 서버에 의존하므로, 데이터 요청에 대해 대응하기 위해 하드웨어를 추가할 수 없는 확장성 문제
  2. 클라우드 서비스 DB로 변경한 이유
    1. 접근성 향상: 클라우드 DB를 사용하면, 장소에 상관없이 접근 가능하여 장애 복구에 대처할 수 있음
    2. 확장성 및 유연성: 사용량에 따라 리소스를 동적으로 확장하거나, 축소할 수 있기 때문에 확장이 유연하다
    3. 관리 효율성: DB를 설정하고 관리하는 작업이 클라우드 서비스에 의해 관리되어 운영 부담이 줄어들고, 프로젝트 개발에 더 신경쓸 수 있다

그래서, 변경한 DB는 다음과 같다.

HDFS → AWS S3

InfluxDB → AWS DynamoDB

a. HDFS AWS S3 선택 이유

AWS S3 선택 이유는 앞서 말한 온프레미스 DB 문제와 비슷한 이유 때문이다.

  1. 확장성 문제
    1. HDFS는 물리적 클러스터의 용량에 의존하고 있고, 용량 부족 시 에러가 발생했고, 데이터노드의 로그가 쌓이다 보니 데이터 노드에 에러가 발생해서 연결이 끊어지는 경우가 빈번했다
    2. S3는 제한이 거의 없으며, 데이터 적재가 증가해도 자동으로 용량 확장 가능하다
  2. 접근성
    1. 현재 온프레미스 클러스터에 HDFS 있었기 때문에, 학교 폐쇄망에 클러스터가 있어 외부에서 접근성이 떨어졌다
    2. AWS S3는 어디서나 접근 가능하며, AWS 인프라를 활용해서 고가용성을 제공한다

그렇다면, HDFS와 AWS S3 장단점을 비교해보자

특징 HDFS (온프레미스) AWS S3
확장성 물리적 클러스터 노드 추가 필요, 제한적 무제한 확장 가능
운영 및 유지보수 클러스터 관리 필요 (온프레미스라 학교에 있어야 함) 완전 관리형 서비스, 적은 유지보수
데이터 접근성 폐쇄망 환경에서 외부 서버의 접근이 어려움 어디에서나 접근 가능

AWS S3 데이터 적재로 코드 변경

query_hdfs = json_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://spark-master-panda-01:9000/finVizor/upbit/ticker_data/") \
    .option("checkpointLocation", "hdfs://spark-master-panda-01:9000/checkpoints/upbit") \
    .start()

↓↓↓

query_s3 = json_df.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "s3a://aws-s3-bucket/dataLake_upbit/") \
    .option("checkpointLocation", "s3a://aws-s3-bucket/checkpoint/") \
    .option("header", "true") \
    .trigger(processingTime='45 seconds') \
    .start()

b. InfluxDB DynamoDB 선택 이유

이 선택 이유 또한, 앞서 말한 온프레미스 문제와 비슷한 이유이다.

처음에 InfluxDB를 사용한 이유부터 말해보겠다.

처음에 InfluxDB를 사용한 이유

이 내용은 TMI 일 수도 있다. 나는 2년 동안 학부 연구생으로서 학과 연구실에서 활동을 했다. 나는 주로 공공 미세먼지 데이터와 연구실에서 측정한 미세먼지 데이터를 수집해서 시각화하는 역할을 맡았다.

이때 사용한 DB는 시계열 DB를 고려하고 있었고, 그 중에서 InfluxDB를 사용했는데, 학과 선배가 InfluxDB를 사용해서 데이터 시각화 한 자료를 보고 도움을 얻을 수 있을 거 같아서, 나도 InfluxDB를 배워서 데이터를 시각화 하고자 했다.

그래서, InfluxDB + Grafana를 사용해서 데이터를 시각화를 하였다.

에어코리아 미세먼지 데이터를 활용하여 InfluxDB + Grafana로 시각화 한 학교 과제
에어코리아 미세먼지 데이터를 활용하여 InfluxDB + Grafana로 시각화 한 학교 과제

 

모의투자 게임을 위해 코인 데이터를 공부해보니까, 시간에 따른 코인 변동이 있고 → 시계열 DB를 사용했을 때 적절하다 라고 생각했다.

그래서, 익숙한 InfluxDB를 사용하려고 했고, InfluxDB에 코인 데이터를 적재하고 완전 real-time 업데이트는 아니었지만, 1초 이내의 배치 업데이트를 보였다.

그렇다면, AWS DynamoDB로 변경하게 된 과정

  • 온프레미스 환경: 학교 폐쇄망에서 개발하다 보니, 장애가 있을 때 즉각적으로 대처하기 어려움
  • 성능: 대량의 데이터를 실시간으로 Write, Read 할 수 있는 지 보장하기 어려움
  • 확장성 제한: 물리적인 서버에서 성능을 의존하고 있었기 때문에, 요청이 증가했을 때 확장하기 어려움

그래서, 클라우드 DB를 알아보고 있었는데,

일단 제일 중요한 실시간 / 마이크로 배치 단위의 Read/Write 성능이 보장되는 게 주요 사항이었다.

 

이왕이면 시계열 DB면 더 좋을 거 같다는 생각으로 AWS TimeStream, AWS influxDB를 사용해봤는데, 

서울 리전 지원을 안해서, 도쿄 리전으로 왔다갔다 하는 게 너무 불편해서 다른 DB를 알아봐야 했다.

 

그래서 다음으로 시도한 DB가 읽기/쓰기 성능이 뛰어난 DynamoDB 였다.

DynamoDB 선택 이유

  • 완전 관리형 NoSQL: AWS에서 제공하는 NoSQL 데이터베이스
  • Key-Value & Document DB로 JSON 형태로 데이터 저장
  • 한국 리전에도 사용 가능
  • 사용량에 따라 자동으로 확장/ 축소 가능 필요하다면 온디멘드 설정으로 성능을 높일 수 있음
  • AWS에서 유지보수를 해주기 때문에 운영 부담 감소
  • 밀로초 단위로 읽기 / 쓰기 지연 시간을 보장하는 고성능

이러한 이유로 DynamoDB를 사용했고, 테스트 할 때 밀리초 단위로 데이터를 가져오는 것을 확인하고 그대로 적용했다.

DynamoDB에 데이터가 잘 들어오는지 모니터링으로 확인

온프레미스 DB 에서 클라우드 서비스 DB로 변경하면서 어려웠던 부분

사실, AWS를 이렇게 적극적으로 사용해본 게 이번이 처음이라 비용이나 VPC 설정, IAM 설정, Role 설정 등 해야할 게 많아서

이런 걱정을 갖고 있는 채로 마음의 문을 열고 AWS를 사용해보는 게 제일 어려웠다.

 

실제로 해보니까, 하나씩 설정하고 새로운 클라우드 서비스를 시작할 때 마다 IAM 계정에 Role을 설정해주면 되는 작업이었고,

서비스를 사용하는 방법은 GPT와 구글링을 통해 배우면 되기 때문에 큰 문제는 안되었다.

 

온프레미스 DB를 운영할 때 개발이나 유지보수가 간단했고, 모니터링도 다 지원해서 편했다.

단점은 비용인데, 비용은 배우는 입장이라서 일단 내가 감당하기로 하고 전부 시도해보기로 했다.

그렇게 마인드를 열어보니까, 정말 편했고 오픈 마인드로 기술을 최대한 받아드리고 활용하는 게 중요한 지 깨닫는 경험을 했다.

6. DynamoDB + AWS Glue + AWS Athena + QuickSight 시각화 → TradingView 사용

이제 InfluxDB를 사용했을 때 편했던 데이터 시각화를 DynamoDB로 해야하는 문제가 남아있었다.

DynamoDB는 모니터링을 지원했지 데이터 시각화를 지원하지는 않았다. 심지어 QuickSight에서 DynamoDB를 지원하지도 않았다.

QuickSight DB 지원 목록 DynamoDB는 없었다

그래서 DynamoDB에 적재된 데이터를 시각화 할 수 있는 여러 방법을 알아보니까,

AWS Glue로 DynamoDB를 크롤링해서 S3에 적재하는 ETL 작업을 하고 

AWS Athena로 S3에 쿼리를 날려서, QuickSight로 시각화 하는 방법을 알아냈다.

 

사실, 이렇게 복잡한 경로를 거쳐서 시각화를 해야하나? 싶었는데, 일단 시도해봤다.

그렇다면, AWS Glue, Athena, QuickSight는 뭘까?

 

AWS Glue: 완전 관리형 ETL 서비스로, 데이터 수집 변환 적재에 사용한다. 서버리스 기반으로 따로 클러스터를 관리할 필요가 없고, Spark 프레임워크로 실행된다

Amazon Athena: 서버리스 대화형 쿼리 서비스로, SQL을 사용해 S3에 저장된 데이터를 직접 쿼리할 수 있으며, 데이터베이스 없이 S3의 데이터를 분석할 수 있다

Amazon QuickSight: 클라우드 기반 비지니스 인텔리전스 BI 서비스로, 데이터를 시각화하고 대시보드를 만들 수 있으며, S3, Athena, RDS 등과 연동 가능하다

AWS Glue를 통해 DynamoDB에 있는 데이터를 S3로 ETL하는 과정
Glue가 실행되고 1시간 뒤에 PERMISSION_ERROR 에러가 발생함
An error occurred (AccessDenied) when calling the S3 PutObject operation: Access Denied 에러 즉 S3에 권한이 없어서 발생
S3 PutObject 권한을 줘서 3시간 21분 만에 성공

이렇게, DynamoDB에 있는 데이터를 S3에 데이터 시각화를 위해 옮기긴 했지만, 상당한 시간 요소와 비용이 들었다.

그래서 이 방법은 적합하지 않다고 판단하고, 바로 다른 시각화 방법을 알아봤을 때 발견한 게 TradingView이다.

TradingView 사용

TradingView는 내가 필요로 하는 그래프를 전부 제공했고, 심지어 Widget으로 관리도 필요 없으며 무료로 제공해줘서 바로 사용했다.

TradingView 위젯을 통한 비트코인 그래프 생성

이렇게, 초기에 생각했던 DynamoDB 데이터를 시각화 하는 방법이 적절하지 않다고 생각해서,

다른 좋은 대안을 찾은 과정을 설명했다, 하지만 TradingView도 문제가 있었는데, 바로 1분 단위 그래프가 최소 시간 단위였다.

 

나는 초 단위 그래프를 원했지만, 그건 결제를 해야했기 때문에, 1분 단위 그래프로 타협을 봤다.

7. AWS Lambda에서 Layer 문제로 AWS SAM 도입

AWS Lambda에서 추가 라이브러리를 사용하기 위해 Layer를 추가했다.

pymysql 라이브러리 사용을 위해 Layer 생성
LinuxOS ec2인스턴스로 lambda 레이어 생성
layer를 추가했지만, 에러 발생

Layer를 추가했지만, 에러가 발생해서 mac os나 ubuntu가 아닌 lambda가 사용하는 Linux을 docker와 ec2 인스턴스에서 생성해서 pymysql layer 생성을 했다. 하지만, 계속 에러가 발생해서 layer 생성 말고 다른 방법을 찾아야했다.

그때 학과 연구실 후배에게 추천 받은 게 AWS SAM이었다. AWS SAM을 사용하면 외부 라이브러리를 사용하는 게 용이하고,

Lambda 함수마다 따로 코드를 작성하는 게 아닌, 하나의 스택이라는 개념에서 작성을 하고 배포하면 각 Lambda 함수에 배포가 되었다.

 

AWS SAM (Serverless Application Model)은 서버리스 애플리케이션을 개발, 테스트, 배포 할 수 있는 오픈소스 프레임워크이다.

AWS CloudFormation을 기반으로 작동하며, Lambdam API Gateway, DynamoDB 등 서버리스 서비스를 빠르게 배포할 수 있다.

(좌) template.yml 구성 / (우) requirements.txt 라이브러리 명시

AWS SAM은 AWS CLI의 확장으로 작동하고, template.yaml 구성 파일을 사용하여 리소스를 명시해서 사용한다.

AWS SAM을 설치해서 sam init으로 SAM 초기화하고, 템플릿을 적절하게 환경에 맞게 설정하면 된다.

 

SAM은 하나의 스택에 template.yml, requirements.txt 등 애플리케이션 단위로 필요한 것들을 집합으로 배포하게 된다. template.yml, requirements.txt, Lambda 함수를 작성해서 스택을 sam deploy로 배포하면 된다.

일단, AWS Lambda를 처음 사용해보기도 했고, SAM, API Gateway, Role 권한 설정 등 해줘야할 작업들이 많았다.

lambda 스택 / Lambda 함수

하지만, 한번 작업을 해주니까 Lambda가 비용도 저렴하고, 관리해야할 서버도 없어서 너무 편했다.

이번 경험을 살려서, 다음 프로젝트에도 적합하다면 Lambda를 계속 사용할 거 같다.

API Gateway, Lambda 한달 사용 비용

실제 한달 Lambda 사용으로 청구된 비용을 보면 거의 나오지 않았다.

이 게임을 하루만 실행하는 게 아니기 때문에, 장기적으로 봤을 때도 Lambda를 사용하는 게 적합하다고 생각했다.

8. AWS Lambda 데이터 반환 및 응답 시간 테스트

error: An error occurred (AccessDeniedException) 

API 테스트 에러 발생

DynamoDB에서 적재된 코인 데이터를 잘 가져올 수 있는 지, API 테스트를 Postman으로 하는 중에 에러가 발생했다.

 

에러 내용

error: An error occurred (AccessDeniedException) when calling the Query operation: User: arn~role/ sendUpbitDataFromDynamoDBToFrontend-role~ is not quthorized to perform: dynamodb:Query on resource: arn:~ because to identity-based no identity-based policy allows the dynamodb:Query action~

 

에러 내용을 보면 DynamoDB에서 데이터를 쿼리할 권한이 없어서 발생한 에러임을 확인할 수 있다.

내가 사용한 IAM Role에 DynamoDB에 쿼리를 실핼할 수 있는 권한이 없었기 때문에, IAM Role 권한을 설정해줘야 했다.

Lambda 함수 정책 설정
IAM 역할에서 AmazonDynamoDBFullAccess 권한 추가

이렇게 DynamoDBFullAccess 권한 추가를 하고, 다시 API 테스트를 해보니까, 권한 에러는 발생하지 않았지만, 다른 에러가 발생했다.

 

Error: Object of type Decimal is not JSON serializable

Error: Object of type Decimal is not JSON serializable 발생

에러 내용

error: Object of type Decimal is not JSON serializable, trace: Traceback (most recent call last)body: ~ json.dumps(items)

 

이 에러 발생 원인은 Decimal 타입 직렬화 문제이다. json.dumps 함수는 기본적으로 int, float, str, bool, list와 같은 기본 타입만 JSON으로 직렬화 할 수 있다, Decimal 타입은 JSON 직렬화 지원 대상이 아니기 때문에, json.dumps 호출 시에 오류가 발생한다.

 

그래서, DynamoDB에서 반환된 데이터에 숫자 타입이 포함되어 있을 때, 처리하지 않고 json.dumps로 바로 직렬화하면 오류 발생한다.

def decimal_default(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError
    
def lambda_handler(event, context):
    table = dynamodb.Table(table_name)
    
    try:
        results = []
        
        for coin_code in coin_codes:
            response = table.query(
                KeyConditionExpression=Key('code').eq(coin_code),
                ScanIndexForward=False,
                Limit=1
            )
            
            items = response.get('Items', [])
            for item in items:
                item['coin_name'] = coin_dict.get(coin_code, '코인 이름 에러')
            
            results.extend(items)
        return {
            'statusCode': 200,
            'body': json.dumps(results, default=decimal_default)
        }
    
    except Exception as e:
        error_message = traceback.format_exc()
        return {
            'statusCode': 500,
            'body': json.dumps({
                "message": "Error fetching data",
                "error": str(e),
                "trace": error_message
            })
        }

 

이렇게, decimal_default 함수를 추가하여, Decimal을 float 타입으로 변환하여 해결하면 된다.

AWS API Gateway 응답 시간  Postman 테스트

코인 데이터 가져올 때 소요 시간 3.22초
1165ms 시간 소요
865ms 시간 소요 점점 데이터 가져오는 주기가 짧아짐

처음 데이터를 DynamoDB에서 가져오는 API 테스트를 했을 때, 3.22초로 예상했던 거 보다 응답 시간이 늦었다.

모의투자 게임에서 요구하는 데이터 쿼리 주기는 1초 이내 였고, 변경해야 하나 걱정이 되었지만, 다음 실행부터 1000ms 이내로 데이터를 가져올 수 있었다. 이게 Lambda의 ColdStart 문제였다.

AWS Lambda와 API Gateway를 사용해서, 코인 데이터를 DynamoDB에서 가져오는 백엔드를 서버리스로 에러 없이 구현했다.

9. 온프레미스 클러스터 문제로 DB에 데이터 적재 에러 발생

업비트 코인 데이터를 수집하기 위해 계속 kafka, spark 코드를 실행하고 테스트하고 있던 와 중에, DynamoDB에 데이터가 일정 기간 동안 수집되지 않는 것을 발견했다.

DynamoDB에 데이터 적재가 안되는 이슈 발견
클러스터의 마스터 노드에서 데이터를 적재하는 프로세스의 로그를 출력하여, 에러를 확인
du -ah --max-depth=1 ❘ sort -rh ❘ head -n 10 명령어 실행하여 Spark history에서 로그가 많은 용량을 차지하고 있음을 확인하고 제거

에러 원인은 클러스터의 Spark 로그가 많이 발생해서 네임노드에 남은 스토리지가 거의 없다고 발생한 에러였다.

클러스터의 네임 노드에서 용량을 많이 차지하고 있는 파일을 du -ah --max-depth=1 | sort -rh | head -n 10 명령어로 내림차순 10개 파일을 찾아서 spark history 로그에서 용량을 많이 차지하고 있는 것을 확인했다.

 

그래서, 일단 Spark history 로그를 삭제하고 Spark history 로그를 로컬 파일 스토리지에 저장할 게 아니라, S3에 저장하기로 변경했다.

#spark.history.fs.logDirectory file:///home/spark/spark-3.1.2-bin-hadoop3.2/history
spark.history.fs.logDirectory s3a://aws-s3-bucket-fastcampus/coin-game-spark-history/
#spark.eventLog.dir file:///home/spark/spark-3.1.2-bin-hadoop3.2/history
spark.eventLog.dir s3a://aws-s3-bucket-fastcampus/coin-game-spark-history/

Spark-default.conf 파일을 수정하고, 클러스터에서 데이터를 안정적으로 수집할 수 있게 자잘한 에러들을 계속 수정했다.

결과적으로 이 에러를 마지막으로 클러스터 에러가 없어졌고, 데이터를 안정적으로 수집할 수 있게 되었다.

 

하지만, 그래도 데이터가 또 다시 DynamoDB에 적재되지 않는 상황이 발생할까 걱정되어서, CloudWatch로 알림을 줄 수 있게 했다.

CloudWatch로 데이터가 일정 기간 동안 적재 되지 않을 때, 알림을 주도록 설정

CloudWatch에서 경보 설정
DynamoDB ConsumedWriteCapacityUnits 선택
조건을 5분 동안 데이터 count 합계가 0이면 알람을 주기로 설정
내 메일로 엔드포인트 설정
DynamoDB 데이터 적재 모니터링 중간 중간 데이터가 수집되지 않은 부분을 확인할 수 있음
데이터 일정 기간 적재가 안될 때 알림을 보냄

이렇게, CloudWatch로 경보 설정을 해서, 언제든지 클러스터에 문제가 있을 때 확인할 수 있고 대처를 바로 할 수 있게 되었다.

10. AI & 데이터 분석 결과 전송 API 개발 및 심볼 추가

Upbit 데이터를 45초 주기로 데이터를 S3에 csv 형식으로 적재하고, AI / 데이터분석 팀원이 1분 주기로 S3에 데이터를 가져와서,

데이터 분석 및 AI 결과를 만들어둔 API에 지정된 JSON 형식으로 보내게 된다.

S3에 적재된 데이터를 1분 주기로 가져와서, API를 통해 데이터를 DynamoDB에 적재하고, API를 통해 결과를 프론트엔드가 가져옴

받은 결과를 DynamoDB에 적재하고, 프론트엔드에서 API를 요청해서 AI / 데이터분석 결과를 가져올 수 있게 하였다.

Ai & 데이터 분석 결과를 가져오는 형식

이렇게, 가져온 결과를 확인할 수 있고, 프론트엔드에서 이걸 활용하여 아이콘 심볼 형태로 유저가 확인할 수 있게 하고자 했다.

팀원들과 상의한 아이콘과 색 지정
게임 플레이에서 9개 코인 그래프를 선택하는 화면과 AI & 데이터 분석 결과를 아이콘으로 나타나게 되었음

프론트엔드 팀원이 잘 해준 결과 적절한 아이콘으로 결과를 잘 표현했다고 생각한다.

11. 데이터를 1초 단위로 잘 가져오는 지 테스트 및 문제 발생

코인 데이터를 1초 단위로 가져올 수 있는 지 프론트엔드에서 테스트

프론트엔드에서 API를 통해 1초 단위로 코인 데이터를 가져올 수 있는 지 확인하였다.

1초 단위로 데이터를 가져올 수 있었지만, 여기에서 문제는 데이터를 가져올 수 있다고 해서 코인 가격이 변동하는 것은 아니었다.

코인 데이터가 1초 단위로 변경하는 것은 아니었다

우리 팀은 1초 단위로 코인 가격이 계속 변하는 것을 플레이어가 확인할 수 있으면서, 자기 잔고가 코인 가격에 따라 어떻게 변하는 지 실시간으로 확인하는 것을 기대했지만, 사실 그게 어려웠다.

 

코인 이라고 해서 1초 단위로 가격이 변동할 수 있는 것이라고 확신할 수 있는 게 아니었다.

그래서, 9개 코인 리스트를 거래대금 내림차순으로 9개로 선정할 필요가 있었다.

거래대금이 높다는 건, 그 만큼 거래가 활발하게 이루어지는 것이기 때문에 1초 단위로 코인 가격이 변하는 것을 기대할 수 있었다.

그래서 코인 리스트를 재선정했고 나름대로 1초 단위로 코인 가격이 변동하는 것을 확인하였다.

12. 1, 10, 100, 1000배 레버리지 기능 추가

레버리지 시스템이 없었을 때, 자본 변동 +- 10만단위

초기에는 레버리지 기능이 없었다. 최대 2분이라는 게임 플레이 조건에서 45초 동안 코인 가격이 변동하게 되는 데,

아무리 이익을 많이 봐도 몇십만원 단위 밖에 오르지 않았다.

레버리지 기능 추가

그래서, 레버리지 시스템을 추가하자고 건의했고, 팀원들과 회의 끝에 넣기로 했다.

게임이니까 이왕이면 파격적인 1000배까지 레버리지를 넣을 수 있게 하였다.

레버리지 100배를 적용한 자본 변동

결론적으로 1000배 레버리지를 하니까, 정말 재밌어졌고 자본의 변동이 정말 커졌다.

심지어 툭하면 청산이 되었고, 10억~100억 단위 이익을 볼 때도 있었다, 우리가 원한 결과였다.

13. 매도 기능 추가

게임 초기에는 자동 매수를 하고 45초 후에 자동으로 매도되어서 자본이 결정되는 방식이었다.

자동 매수 후 45초 뒤에 자동 매도를 하는 방식

 

플레이가 언제 어떤 가격으로 판매를 할지, 정하지 못 하고 말 그대로 진짜 운이 좋아야 이익을 볼 수 있는 구조였다.

그래서, 팀원에게 매도 기능을 넣자고 건의 했고, 받아드려져서 매도 버튼을 통해 플레이어가 원하는 타이밍에 매도를 할 수 있게 되었다.

마지막 코인이 청산되는 결과
2개 코인을 매도를 하고, 마지막까지 존버한 1개 코인은 자동으로 매도가 되었음

이렇게, 코인을 원하는 타이밍에 매도할 수 있었고, 마지막까지 매도하지 않은 코인은 자동 매도 되어서 최종 잔고가 나타났다.

이게 게임 점수가 되었고, 랭킹 대시보드에서 순위를 확인할 수 있다.

랭킹 대시보드에서 총 자본으로 순위 확인

14. Balance (잔고) 계산 로직 문제 및 수정

비트코인 가격이 떨어졌다가 다시 원 가격으로 올랐는데, 초기 자본이 그대로 돌아오지 않았다.

이것은 분명한 문제였고, 원인이 뭔지 찾아서 해결할 필요가 있었다.

코인과 자본 계산 로직 문제

비트코인 가격이 떨어졌다 다시 원 가격으로 돌아왔는데, 투자한 자본은 줄어든 상황

프론트엔드에서 자본 계산을 하고 있고, 초기 자본 계산 로직은 현재 보유 자산 / 코인 가격 으로 코인 개수를 계산하고,

코인 개수 * 코인 가격으로 자본을 계산하고 있다고 했다.

 

이 과정에서 코인 개수가 일부 소수점 밑으로 소실되었고, 그래서 다시 자본 계산할 때 소실 된 개수만큼 자본 차이가 발생했다.

그래서 여러가지 해결 방법을 제시했는 데, 그 중에 하나가 Decimal 타입을 사용해서 계산하는 거 였다.

Decimal  타입이란?

소수점 숫자를 다룰 때 정확한 계산을 보장하기 위해 사용되는 데이터 타입이며, 금융, 과학 계산 등에서 float 타입이 가지는 정밀도 손실 문제를 피하기 위해 사용된다

그래서, Decimal 타입을 사용해보기로 하고, 일단 테스트를 진행했다. 그래야 팀원들에게 제안할 수 있다.

Decimal을 사용한 자본 계산 방법

비트코인 가격으로 계속 Balance를 계산했고, 가격이 원 가격이 되면 1000000000이 다시 찍히는 것을 중간 중간 확인할 수 있었다.

 

이렇게, 해결된 줄 알았지만, 실제로 적용을 해보니 아직도 자본 손실이 있었다.

아무래도 소수점이 무한히 발생하게 된다면 ex) 0.33333333..., 손실이 있을 수 밖에 없다.

 

그래서 결국 프론트엔드에서 매수 했을 때 코인 가격을 기억해뒀다가, 처음 가격으로 다시 돌아오면 현재 자본을 유지하는 알고리즘으로 수정하여, 이 문제를 해결할 수 있었다.

15. 최종 발표 및 시연 때 에러 발생

이게 시행착오 마지막 사항이라고 할 수 있는데, 사실 엄청난 실수였다.

바로 제일 중요한 발표 시연 때 AI 기능이 제대로 작동하지 않은 문제였다.

 

전날까지 전부 잘 되다가, 좀 더 다이나믹한 코인 가격 변동을 보여주기 위해 코인 리스트를 변경했더니,

AI 기능이 동작하지 않은 문제가 발생했다.

팀원이랑 같이 발표 직전까지 에러를 해결하려고 했는데, 결국 해결하지 못 하고 발표 및 시연을 하게 되었다..

 

그래서 임기응변으로 AI 부분을 최대한 빠르게 넘어가고 아무렇지 않은 척 하며 시연을 하였다.

결국 떨리는 마음으로 발표를 했고, 결국 시연 후에 에러를 잡을 수 있었다.

 

원인은 Lambda에서 코인 리스트를 업데이트 하지 않아서, 일부 코인 랭킹이 보내지지 않았고,

누락이 되어서 AI 기능이 제대로 동작하지 않았다. coin_codes 배열에 선정한 9개 코인 리스트만 결과를 보내주게 되는데, coin_codes를 업데이트 하지 않아서 AI 기능이 동작하지 않고 있었다.

코인 데이터를 DynamoDB에서 쿼리해서 보내주는 sendUpbitDataFromDynamoDBToFrontend 함수

이 함수에서도 선정한 coin_codes 배열에 있는 코인만 DB에서 가져와서, 보내주게 된다.

딥러닝 결과를 보내주는 GetDeeplearningResult 함수

에러가 발생한 원인이 sendUpbitDataFromDynamoDBToFrontend 함수에선 coin_codes 리스트를 업데이트 했지만,

GetDeeplearningResult 이 함수에선 coin_codes 업데이트를 안해서 결과가 누락된 채로 전달해서 에러가 발생했다.

 

결국... 시연 때는 망치고 시연 전과 후에는 제대로 동작한 게 정말 아쉬웠고, 항상 시연 할 때 제대로 안되는 진리를 경험했다..

 

이 시행착오를 마지막으로 코인 모의 투자 게임 "코인예측왕" 게임의 거의 모든 시행착오를 다뤘다.

사실, 개발 할 때 전부 기록해둔 게 아니라서, 모든 시행착오와 에러를 다룰 수는 없었지만, 

 

개발 타임라인에 맞춰 최대한 시행착오를 작성해봤다.

많은 시행착오 끝에 최종 개발 결과

최종 데이터 파이프라인 아키텍처

이게 많은 시행착오를 거친 최종 데이터 파이프라인이고, 개발 할 때마다 수정해서 마지막에 급하게 만들 필요가 없었다.

개발 팀 노션

팀 프로젝트를 하면서 노션도 열심히 작성하고, 결론적으로 좋은 결과가 있었다.

게임 전시와 게임에 참여하고 있는 학생들

학교에 게임을 전시하도록 건의해서 학생들이 플레이할 수 있게 전시하고, 많은 참여가 있었다.

대시보드에서 확인할 수 있는 플레이한 유저

이렇게, 163번의 게임 플레이가 있었고, 1등한 학생은 트럼프 대선에 맞춰 게임을 플레이해서 약 625%의 수익률을 보였다...

등수로 마지막 유저는 선택한 3개의 코인이 모두 청산된 유저도 있었다 ㅎㅎㅎ

 

이렇게, 개발 초기부터, 마지막 시연 및 전시까지 전부 마쳐서 정말 뿌듯했고, 많은 시행착오를 거치면서 많은 경험을 할 수 있었다.

경험을 한 만큼 성장했다고 생각하고, 가장 값진 경험은 긍정적인 협업 경험이지 이라고 생각한다.

 

이상으로 코인예측왕 게임의 포스팅을 마치도록 하겠다.