본문 바로가기
데이터 엔지니어링

[데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.6 (업데이트 및 게임 런칭)

by 내가 진짜 유일한 2025. 2. 9.

이번 포스팅에선 게임 경진대회에 참가하기 위해 개발한 모의투자 게임을 업데이트 하고 웹 게임으로 런칭하게 된 이야기를 다뤄보겠다.

2024.12.04 - [데이터 엔지니어링] - [데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.5 (개발과정에서의 시행착오)

 

[데이터엔지니어링] 실시간 코인 데이터를 활용한 모의투자 게임 "코인예측왕" part.5 (개발과정에

이번 포스팅은 코인예측왕 게임을 개발하면서 내가 특정 기술이나 로직을 선택한 이유와 겪은 시행착오, 에러에 대해 작성하겠다.아마, 개발 결과보다 개발하면서 어떤 문제가 있었고, 어떻게

dont-make-excuses.tistory.com


게임 업데이트 사항

게임을 런칭하기 위해 개선할 사항을 팀원들과 의논해서 업데이트 하기로 했다.

 

이전에 이 게임에 대해 익스피디아 영국 지사에서 데이터 엔지니어로 근무하고 계시는 멘토님께 설명드린적 있었다.

멘토님이 개선 사항으로 클릭스트릠 데이터를 추가해보라고 해서,  팀원들과의 회의를 통해 클릭스트림 데이터를 추가하기로 했다.

 

그리고, 게임에서 실시간으로 코인의 가격이 변동되는 걸 보여주는 것이 게임의 핵심 포인트인데,

이걸 위해 코인 데이터의 실시간성을 최대한 보장해야한다.

 

따라서 내가 맡은 게임 업데이트 사항은 2가지가 있다.

1. 클릭스트림 데이터 추가 및 활용을 통해 유의미한 유저 데이터 분석/통계 도출

2. 코인 데이터의 실시간성을 높이기 위한 데이터 파이프라인 개선


1. 클릭스트림 데이터 추가 및 활용을 통해 유의미한 유저 데이터 분석/통계 도출

먼저, 클릭스트림 데이터란 뭘까?

클릭스트림 데이터의 정의

  • 사용자 행동 기록: 사용자가 웹페이지를 방문하면서 발생하는 모든 상호작용(클릭, 페이지 이동, 스크롤, 마우스 이동, 양식 제출 등)을 시간 순서대로 기록한 데이터
  • 세션 및 경로 정보: 단순한 클릭 수뿐 아니라, 사용자가 어떤 경로를 통해 사이트 내에서 이동했는지, 어느 페이지에서 머무르는지 등의 세션 데이터

페이지에서 발생하는 이벤트 로그를 프론트엔드 팀원이 클릭스트림 데이터로 API를 통해 백엔드에 보내기로 했다.

{
  "userName": "테스트",
  "userAffiliation": "테스트소속",
  "remainingTime_1": "72",
  "remainingTime_2": "75",
  "remainingTime_3": "56",
  "coins": "[\"BTC/KRW\",\"ETH/KRW\",\"XRP/KRW\"]",
  "deeplearningData": "[{\"largest_rise\":false,\"code\":\"KRW-BTC\",\"largest_spike\":false,\"fastest_growth\":false,\"percentage\":-0.030231900016843498,\"most_volatile\":false,\"sell_down\":158204055.6959311,\"largest_drop\":false,\"fastest_decline\":false,\"sell_up\":158235944.3040689,\"least_volatile\":true,\"rank\":4},{\"largest_rise\":false,\"code\":\"KRW-ETH\",\"largest_spike\":false,\"fastest_growth\":false,\"percentage\":-0.18379742303829008,\"most_volatile\":false,\"sell_down\":4844080.311059952,\"largest_drop\":false,\"fastest_decline\":false,\"sell_up\":4861919.688940048,\"least_volatile\":false,\"rank\":5},{\"largest_rise\":false,\"code\":\"KRW-XRP\",\"largest_spike\":false,\"fastest_growth\":false,\"percentage\":-3.7439054844358512,\"most_volatile\":false,\"sell_down\":4691.369949073792,\"largest_drop\":false,\"fastest_decline\":false,\"sell_up\":4726.630050926208,\"least_volatile\":false,\"rank\":7}]",
  "aiRecommend_1": false,
  "aiRecommend_2": false,
  "leverage": "1000",
  "userBuyCoinMoney_1": "{\"coin\":\"KRW-BTC\",\"money\":1000000000}",
  "userBuyCoinMoney_2": "{\"coin\":\"KRW-ETH\",\"money\":500000000}",
  "userBuyCoinMoney_3": "{\"coin\":\"KRW-XRP\",\"money\":100000000}",
  "userSellTime_1": "{\"coin\":\"KRW-BTC\",\"time\":66}",
  "userSellTime_2": "{\"coin\":\"KRW-ETH\",\"time\":56}",
  "userSellTime_3": "{\"coin\":\"KRW-XRP\",\"time\":0}",
  "balance": 1905189033.7548606
}

이렇게 클릭스트림 데이터를 json으로 정리해서 API로 보내게 된다.

 

그리고 AWS Lambda, API Gateway로 구축된 서버리스 구조를 사용하기 때문에, 이걸 활용하는 데이터 파이프라인을 설계해야했다.

데이터 파이프라인의 구축을 하기 전에, 설계를 먼저 했는데, 나 같은 경우는 ETL 파이프라인 말고, ELT 파이프라인으로 설계하고자 했다.

 

그렇다면, ELT, ETL 파이프라인이란 뭘까?

ETL(Extract, Transform, Load)

  1. Extract (추출): 소스 시스템(예: RDBMS, NoSQL, 파일, API 등)에서 원하는 데이터를 추출
  2. Transform (변환): 추출된 데이터를 대상 시스템에 적재하기 전에, 중간 서버(ETL 서버)나 전용 도구에서 변환, 정제, 필터링, 집계, 스키마 매핑 등의 작업을 수행
  3. Load (적재): 변환된 데이터를 최종 저장소(데이터 웨어하우스, 데이터 마트 등)에 적재

ELT(Extract, Load, Transform)

  1. Extract (추출): 소스 시스템에서 원본 데이터를 추출
  2. Load (적재): 추출된 원본 데이터를 변환 과정 없이 그대로 목적지(주로 데이터 레이크나 데이터 웨어하우스)에 저장
  3. Transform (변환): 데이터 레이크(또는 데이터 웨어하우스)에 적재된 후, 필요한 시점에, 필요한 형태로 변환 작업을 수행
구분 ETL ELT
변환 위치 추출 후 로드 후
로드 시점 변환을 완료한 뒤 최종 적재 원본 데이터를 먼저 적재한 후, 나중에 변환
스키마 사전에 정형 스키마가 확립된 DW 환경에 적합 원본 데이터(비정형 포함) 유연하게 수용 가능
유연성 스키마 변경 시 재설계 부담이 높음 요구에 따라 유연하게 재처리 가능
처리 엔진 ETL 서버/도구가 주로 변환 수행 DW/Spark 등의 대규모 병렬 처리
특징 전통적 BI, 정형 DW 중심 현대적 빅데이터/데이터 레이크 접근

 

여태까지 ETL 파이프라인으로 구축하기도 했고, ELT 파이프라인으로 구축해보라는 현직자 분의 조언이 있었기 때문에,

이번에는 ELT 파이프라인으로 설계 및 구축을 하기로 했다.

 

ELT 파이프라인 설계

ELT 파이프라인을 설계하기 위해 일단 구조를 생각해야했다.

내가 고려했던 몇 가지 사항은 2가지가 있다.

 

1. 유저가 게임을 완료하면 자기가 플레이한 통계를 바로 확인할 수 있는지

2. 저렴하고 간단하게 파이프라인을 구축해서, 오래 운영할 수 있는지

 

일단, 유저가 게임을 완료하고 나면 플레이한 결과가 통계에 포함되어, 바로 확인할 수 있어야 재밌다고 생각했고,

학생들이 사비로 서버 비용을 지불하고 있기 때문에, 저렴하고 간단하게 파이프라인을 구축해서,

게임을 오래 운영할 수 있도록 하는 것이 제일 중요하다고 생각했다.

 

최종 설계한 ELT 데이터 파이프라인 아키텍처

이게 최종적으로 설계하고 구축한 클릭스트림 데이터 ELT 파이프라인 아키텍처이다.

 

단계적으로 설명을 해보자면,

 

1. API Gateway와 Lamdba로 받아온 클릭스트림 데이터를 Amazon Kinesis의 Data Streams를 통해 스트림에 임시 저장을 한다

2. Kinesis에 새로운 데이터가 들어가면, Lambda가 트리거 되어 Raw parquet format Data로 DataLake인 AWS S3에 적재한다

3. 일정 주기로 AWS S3에 적재된 parquet 형태의 데이터를 Spark DataFrame으로 유저 데이터 분석/통계 도출 작업을 하고 결과를 다시 S3에 적재한다

4. 프론트엔드 서버에서 get_user_analysis 라는 요청이 오면, 정리한 통계 데이터 (json 형태)를 보낸다

 

이렇게 구성 되어 있다.

이제 가장 중요한 '왜 이렇게 설계하고 구축했는지'에 대해 설명해보겠다.

Amazon Kinesis 사용 이유와 사용 방법

Amazon Kinesis란?

완전 관리형 스트리밍 데이터 서비스로, 실시간으로 대규모 데이터 스트림을 수집/처리/분석 할 수 있고, 데이터를 안정적으로 받아서 처리파이프라인으로 전달해주는 역할을 한다.

 

Kinesis의 특징

완전 관리형 서비스(Managed Service)

  • Kafka나 기타 스트리밍 플랫폼을 온프레미스에서 운영할 경우, 브로커 설정·확장·장애 대응 등 관리 부담이 있다
  • Kinesis는 AWS에서 인프라를 자동으로 확장·관리해주므로, 개발·운영 팀은 애플리케이션 로직에 집중할 수 있다

높은 확장성

  • 프로비저닝(샤드 기반) 모드: 샤드 개수를 스스로 관리하며 쓰기·읽기 처리량을 추정하여 비용을 최적화할 수 있음.
  • 온디맨드(On-Demand) 모드: 정확한 처리량 예상이 어려운 경우 자동 확장을 통해 편리하지만, 사용량이 많아지면 비용이 증가할 수 있음.

AWS와의 통합

  • Lambda, S3, Redshift, CloudWatch 등과 즉시 연동 가능.

지금 사용하고 있는 온프레미스 Kafka 클러스터를 사용할지, Amazon Kinesis를 사용할지 고민을 했는데,

결론적으로 Kinesis를 사용하는 것이 가장 적합했다.

 

그 이유는

1. 완전 관리형 서비스이기 때문에, 빠른 개발 및 적용 가능

2. 온프레미스 Kakfa Stream과 AWS Lambda + API Gateway의 데이터 연동을 위해선, VPN 터널이나 Direct Connect를 구성해야 하는데, 이때 추가적인 비용이 발생

이러한 두 가지 이유로 Kinesis 사용하기로 했다.

 

Kinesis Data Streams, Kinesis Data Firehose, Kinesis Data Analytics 3가지 서비스를 제공하는데,

데이터를 Kinesis에서 변환하는 과정이 필요해서 KDS를 선택했다.

Kinesis 데이터 스트림 생성

Kinesis 데이터 스트림 생성을 할때, 최대한 비용을 줄이기 위해 프로비저닝에 샤드는 1개로 설정했다.

지금은 게임 이용자 수가 적기 때문에 샤드를 1개로 고정했다.

Kinesis 구성

그리고, 이제 API를 생성해서 프론트엔드 서버에서 클릭스트림 데이터를 보내면, Lambda에서 KDS로 보내게 코드를 작성했다.

API Gateway + Lambda

import json
import boto3
import os

kinesis_client =  boto3.client('kinesis', region_name = os.environ['AWS_REGION'])

KINESIS_STREAM_NAME = os.environ['KINESIS_STREAM_NAME']

def set_click_stream_handler(event, context):
    try:
        body = json.loads(event['body'])

        response = kinesis_client.put_record(
            StreamName = KINESIS_STREAM_NAME,
            Data = json.dumps(body),
            PartitionKey="single-shard"
        )
        print(f"Data send to Kinesis, Response: {response}")

        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Data sent Successfully'})
        }
    except Exception as e:
        print(f"Error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

그리고 Kinesis Data Stream에 있는 데이터를 S3에 적재하도록 했다.


여기에서 다른 RDB나 NoSQL DB를 사용할 수도 있었지만, S3를 사용한 이유는 비용 때문이었다.

DB는 사용하지 않아도 실행하는 시간 동안 계속 비용이 발생하기 때문에, 이런 사이드 프로젝트에 사용하기에 비용적으로 부담이 된다.

 

그래서 저렴한 스토리지 서비스인 S3를 최대한 사용해서 파이프라인을 구성했다.

Kinesis Data Stream에 트리거로 Lambda 연결

def KDS_to_S3_parquet_handler(event, context):
    try:
        records = []
        for record in event['Records']:
            try:
                decoded_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
                payload = json.loads(decoded_data)
                if not isinstance(payload, dict):
                    raise ValueError("Invalid data format: Not a JSON object")
                records.append(payload)
            except (json.JSONDecodeError, ValueError) as e:
                print(f"Skipping invalid record: {e}")

        if records:
            s3_key = create_s3_key(context.aws_request_id)
            upload_to_s3(records, s3_key)

        return {"statusCode": 200, "body": "Data processed successfully"}
    
    except Exception as e:
        print(f"Error processing Kinesis records: {str(e)}")
        return {"statusCode": 500, "body": f"Error: {str(e)}"}

def create_s3_key(request_id):
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3]
    s3_key = f"{RAW_DATA_PREFIX}part-{request_id}-{timestamp}.parquet"
    return s3_key

def upload_to_s3(records, s3_key):
    df = pd.DataFrame(records)
    try:
        df['balance'] = pd.to_numeric(df['balance'], errors='raise').astype('float64')
        print(f"[INFO] Successfully converted 'balance' to numeric. Sample data: {df['balance'].head().tolist()}")
    except Exception as e:
        print(f"[ERROR] Failed to convert 'balance' to numeric: {e}")
        raise
    
    buffer = BytesIO()
    write(buffer, df, compression='SNAPPY')
    buffer.seek(0)
    s3_client.upload_fileobj(buffer, RAW_DATA_BUCKET, s3_key)
    print(f"Uploaded Parquet file to S3: s3://{RAW_DATA_BUCKET}/{s3_key}")

이렇게, Kinesis Data Stream에 데이터가 있으면, Lambda 함수가 호출되고 S3에 지정된 위치에 데이터가 적재된다.

json인 데이터를 parquet 형태로 변경해서 S3에 적재했는데, 다음과 같은 이유가 있다.

 

Parquet format의 특징과 선택 이유

  • Parquet은 열 단위로 데이터를 저장하는 컬럼 지향 포맷
  • 분석에서 필요한 열만 읽고 처리할 수 있어, 입출력(I/O)을 대폭 줄일 수 있음
  • Parquet은 동일한 열의 데이터 유형이 유사할 경우 중복 제거와 함께 압축 효율이 높음
  • Spark 클러스터에서 대규모 데이터를 다룰 때 스토리지와 네트워크 I/O 부담이 줄어들어, 전체적인 파이프라인 성능 향상
  • Spark와 같은 빅데이터 처리 엔진이 Parquet을 읽을 때 다양한 최적화를 적용할 수 있음
  • Parquet 파일에는 메타데이터에 스키마를 명시함
  • Spark DataFrame이나 Hive 테이블로 로드할 때 열 이름, 타입이 명확하여, 처리 로직이 단순해짐

S3에 적재한 parquet 데이터를 Spark DataFrame으로 분석하기 때문에 이러한 이유로 json에서 parquet으로 데이터 형태를 변환했다.

AWS S3에 parquet 형태로 데이터 적재

이렇게 S3에 저장된 parquet 데이터를 이제 Spark를 활용해서 유저 데이터 통계를 도출해야하는데, 이 과정에서 우여곡절이 좀 있었다.

 

기존 버전

만약 기존 분석/통계가 없다면 즉, /analytics_results에 도출된 분석/통계 데이터가 없다면,

unprocessed/ 에 있는 parquet 데이터를 가져와서, 새로운 DataFrame을 만들고, 통계를 도출한다.

만약 /analysitcs_results에 기존의 분석/통계 데이터가 있다면 그걸 가져와서 Spark DF로 만들고,

unprocessed/ 에 있는 데이터를 가져와서 기존의 통계랑 합산해서 다시 새로운 통계를 만들고,

/analytics_results에 저장한다. 그리고 사용한 데이터는 /processed 으로 이동시킨다.

 

하지만, 이 과정이 복잡하기도 하고 에러가 너무 많이 발생해서, 더 쉽고 간단한 방법으로 변경했다.

 

수정 버전

/unprocessed 에 있는 모든 클릭스트림 데이터를 가져와서 Spark DF로 분석/통계 작업을 수행한다.

분석/통계 작업 결과는 analytics_result/에 json format으로 저장된다.

json format으로 저장한 이유는 프론트엔드에서 통계 시각화를 하기 때문에, 프론트엔드에서 다루기 쉽게 json으로 저장한다.

이 작업을 일정 주기마다 반복한다.

 

확실히 수정된 버전이 간단하지만, 일정 주기마다 모든 유저들의 클릭스트림 데이터로 분석/통계 작업을 진행하기 때문에,

컴퓨팅 자원이 더 사용될 수 있다는 단점이 있다.

 

이 과정을 통해 도출한 분석/통계 작업은 다음과 같다.

 

유저 분석/통계 작업 리스트

  1. 코인 선택 비율 통계
  2. 페이지 별 남은 사용 시간 통계
  3. 매도 시간 별 평균 통계
  4. 코인 별 매도 시간 평균 통계
  5. 레버리지 비율 통계
  6. 레버리지 별 평균 자산
  7. ai 추천 기능 사용 비율 통계
  8. ai 추천 평균 자산
  9. 자산 평균

이렇게 통계 작업을 진행해서 json으로 데이터를 내보낸다.

계산된 통계 데이터

{"coin":"KRW-UXLINK","count":33,"ratio":6.790123456790123}
{"coin":"KRW-BIGTIME","count":8,"ratio":1.646090534979424}
{"coin":"BTC/KRW","count":44,"ratio":9.053497942386832}
{"coin":"KRW-BTC","count":50,"ratio":10.2880658436214}
{"coin":"ETH/KRW","count":37,"ratio":7.613168724279835}
{"coin":"KRW-SOL","count":33,"ratio":6.790123456790123}
{"coin":"SOL/KRW","count":15,"ratio":3.0864197530864197}
{"coin":"KRW-DOGE","count":18,"ratio":3.7037037037037033}
{"coin":"KRW-XRP","count":12,"ratio":2.4691358024691357}
{"coin":"XRP/KRW","count":32,"ratio":6.584362139917696}
{"coin":"UXLINK/KRW","count":29,"ratio":5.967078189300412}
{"coin":"DOGE/KRW","count":42,"ratio":8.641975308641975}
{"coin":"SXP/KRW","count":30,"ratio":6.172839506172839}
{"coin":"KRW-ETH","count":20,"ratio":4.11522633744856}
{"coin":"SUI/KRW","count":33,"ratio":6.790123456790123}
{"coin":"KRW-SUI","count":28,"ratio":5.761316872427984}
{"coin":"BIGTIME/KRW","count":14,"ratio":2.880658436213992}
{"coin":"KRW-SXP","count":8,"ratio":1.646090534979424}

이건 coin_ratio.json 파일 내용이다. count와 ratio를 구해서 그래프로 사용하기 편하게 json으로 내보낸다.

보면, 비트코인과 도지코인, 이더리움 코인을 많은 유저들이 선택한 것을 볼 수있다.

{"leverage":1,"avg_balance":1.5995023178670843E9}
{"leverage":10,"avg_balance":1.593938557567102E9}
{"leverage":1000,"avg_balance":1.9324511747473848E9}
{"leverage":100,"avg_balance":1.5670192736051095E9}

이건 레버리지 별 자산 평균인데, 1000배 레버리지를 선택한 평균 자산이 가장 높은 통계를 보였다.

 

이걸 프론트엔드에서 시각화를 하기 위해 API Gateway로 Get Method를 구성하였다.

이제 게임에서 통계 데이터를 볼 수 있다.

게임에서 통계 데이터를 그래프로 볼 수 있다.
게임 내서에 페이지별 평균 소요시간을 시각화함
버튼 마다 사용자가 선택한 비율을 확인할 수 있다.

이렇게 통계를 구하고 나니까, 재밌는 결과를 확인할 수 있었다.

보면, 레버리지 1000배를 선택한 유저의 평균 자산이 19억으로 제일 높았고,

AI를 사용하지 않은 유저의 평균 자산이 그렇지 않은 유저보다 더 높았다.

그리고 AI를 사용한 유저가 그렇지 않은 유저보다 조금 더 많았다.

 

왜 이런 결과가 나왔는지 가설을 세워보자면,

보통 투자에 자신있는 고수들은 AI 기능을 사용하지 않고 직접 코인과 가격을 선택할 확률이 높고,

1000배 레버리지 선택을 해서 최대의 이득을 보려고 할 것이라고 생각된다.

 

그렇기 때문에 이런 결과가 나왔다고 생각한다.

클릭스트림 데이터로 유저 통계를 구하지 않았자면, 이런 흥미로운 결과를 확인할 수 없지 않았을까 한다.


2. 코인 데이터의 실시간성을  높이기 위한 위한 데이터 파이프라인 개선

이제 다음 수정 사항인 실시간성을 위한 데이터 파이프라인 수정이다.

기존에 있던 ETL 데이터 파이프라인을 수정해서, 게임 내에서 코인 값이 실시간으로 변동 되도록 하는 것이 목적이다.

 

그렇게 하기 위해선, 기존의 파이프라인을 간소화 할 필요가 있었다.

기존의 파이프라인 간소화를 통해, ELT 파이프라인으로 수정

Spark Streaming을 활용한 ETL 파이프라인
Spark Streaming을 생략하고 Kafka Stream을 활용한 ELT 파이프라인으로 수정

기존에는 Spark Streaming을 사용해서 Kafka consumer로 받아온 메세지를 Spark DF로 변경해서 처리하고 있었는데,

이 과정에서 지연이 발생할 수 있어서, 지연을 줄이기 위해 Spark Streaming을 사용하지 않고 Kafka Streams을 사용해서 데이터를 바로 DB로 적재하는 ELT 파이프라인으로 수정했다.

Spark Streaming -> Kafka Streams으로 수정

Spark Streaming (DStream, Structured Streaming)

  • Micro-batch 기반: 짧은 시간 간격(예: 1~2초 등)으로 데이터를 모아 배치 형태로 처리
  • “실시간”이라기보다는 “Near Real-time”에 가깝지만, 배치 간격을 매우 짧게 설정하면 사실상 실시간에 준하는 속도로 동작

Kafka Streams

  • Continuous Processing(연속 처리)에 기반을 둔 라이브러리
  • Kafka 토픽으로부터 폴링(poll)을 통해 들어오는 이벤트를 바로바로 처리하여 상태 스토어(state store)에 반영하거나 결과를 다른 토픽/DB에 기록
  • 레코드 단위(혹은 폴링 한 번당 일괄 처리)로 처리되므로, 마이크로배치 개념 없이, 실시간 스트리밍에 더 가깝다고 볼 수 있음

이러한 특징으로 파이프라인을 수정했다.

Kafka Streams 파티션 수 및 스레드 수 증가

Kafka Streams로 변경한 이후에 더 지연을 줄이기 위해 파티션 수를 확인했다.

파티션 수 1개 -> 10개

스레드 수 1개 -> 15개

파티션 수 4개일 때 각 파티션에 쌓인 데이터 수

파티션 수가 처음엔 1개였고, 1개의 파티션에서 약 1억 1천만개의 메세지를 처리하고 있었다.

그렇게 4개로 늘려줬다가, 10개로 늘렸다.

파티션 수를 10개로 늘린 상태에서 오프셋 확인

파티션 수를 10개로 늘렸을 때, 메세지를 10개의 파티션에서 고루 처리하고 있고, LAG(지연) 또한, 낮은 수치를 보이고 있다.

즉, 내가 원한 지연이 적은 실시간성을 높일 수 있게 된 것이다.

 

멀티스레드 병렬 처리 코드

BATCH_SIZE = 10  # 작은 배치 크기 유지
THREADS = 15  # 병렬 처리 스레드 수
LOCKS = {}  # 파티션별 Lock 관리
MAX_RETRIES = 3  # 최대 재시도 횟수
PARTITION_LIST = list(range(10))  # 파티션 리스트 (10개)

coin_codes = ['KRW-BTC', 'KRW-ETH', 'KRW-DOGE', 'KRW-BIGTIME', 'KRW-SUI', 'KRW-UXLINK', 'KRW-SOL', 'KRW-XRP', 'KRW-SXP']

# DynamoDB 적재 함수
def write_batch_to_dynamodb(records):
    for record in records:
        if record['code'] not in coin_codes:
            continue  # 선정되지 않은 코인은 무시

        retries = MAX_RETRIES
        while retries > 0:
            try:
                # 업서트(Upsert) 처리 - 기존 데이터 업데이트 또는 삽입
                table.update_item(
                    Key={
                        'code': record['code'], 
                        'trade_timestamp': int(record['trade_timestamp']) 
                    },
                    UpdateExpression="""
                        SET #ts = :timestamp,
                            high_price = :high_price,
                            low_price = :low_price,
                            trade_price = :trade_price,
                            #ch = :change,
                            change_price = :change_price,
                            change_rate = :change_rate
                    """,
                    ExpressionAttributeNames={
                        '#ts': 'timestamp',  # 예약어 처리
                        '#ch': 'change'      # 예약어 처리
                    },
                    ExpressionAttributeValues={
                        ':timestamp': int(record['timestamp']),
                        ':high_price': decimal.Decimal(str(record['high_price'])),
                        ':low_price': decimal.Decimal(str(record['low_price'])),
                        ':trade_price': decimal.Decimal(str(record['trade_price'])),
                        ':change': record['change'],
                        ':change_price': decimal.Decimal(str(record['change_price'])),
                        ':change_rate': decimal.Decimal(str(record['change_rate']))
                    }
                )
                break  # 성공 시 종료
            except Exception as e:
                retries -= 1
                if retries == 0:  # 최대 재시도 초과
                    print(f"[ERROR] Failed to write record after retries: {e}")
                time.sleep(2 ** (MAX_RETRIES - retries))  # 지수 백오프 재시도


# 병렬 처리 함수
def process_records_parallel(records):
    with ThreadPoolExecutor(max_workers=THREADS) as executor:
        for i in range(0, len(records), BATCH_SIZE):
            batch = records[i:i + BATCH_SIZE]
            executor.submit(write_batch_to_dynamodb, batch)

# 파티션별 메시지 처리
def process_partition(partition_id):
    consumer = KafkaConsumer(
        bootstrap_servers=['spark-worker-panda-01:9092',
                           'spark-worker-panda-02:9092',
                           'spark-worker-panda-03:9092',
                           'spark-worker-panda-04:9092'],
        enable_auto_commit=False,
        group_id='upbit-consumer-group',  # 그룹 ID 추가
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    # 특정 파티션 할당
    tp = TopicPartition('upbit-ticker-data', partition_id)
    consumer.assign([tp])

    buffer = []
    total_count = 0

    try:
        for message in consumer:
            data = message.value
            if data['code'] in coin_codes:
                buffer.append(data)

            # 고정 배치 크기 처리
            if len(buffer) >= BATCH_SIZE:
                with LOCKS[partition_id]:  # 파티션별 Lock
                    process_records_parallel(buffer)
                    consumer.commit()  # 성공 시 커밋
                    total_count += len(buffer)
                    buffer.clear()

    except Exception as e:
        print(f"[ERROR] Partition {partition_id} Error: {e}")
    finally:
        # 남은 데이터 처리
        if buffer:
            process_records_parallel(buffer)
            consumer.commit()
        consumer.close()
        print(f"[INFO] Partition {partition_id} processed {total_count} records.")

# 멀티스레드로 병렬 처리 시작
def main():
    threads = []
    # 파티션별 Lock 초기화
    for p in PARTITION_LIST:
        LOCKS[p] = threading.Lock()

    # 각 파티션에 대해 스레드 생성
    for partition_id in PARTITION_LIST:
        thread = threading.Thread(target=process_partition, args=(partition_id,))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    print("[INFO] Starting Kafka Consumer with optimized multi-threading...")
    main()

 

그렇다면, DynamoDB에서 모니터링으로 데이터의 쓰기 수치 변화를 보자.

DynamoDB 쓰기 사용량 및 요청 변화

모니터링 결과 약 2000~3000 정도의 쓰기 요청에서 22000까지 약 700% 이상의 성능 향상을 보여줬다.


3. 업데이트 마무리

이렇게, '코인예측왕' 게임의 업데이트를 하면서 클릭스트림 데이터도 수집해서 분석하는 ELT 파이프라인도 설계와 구축을 하고,

실제로 플레이한 유저들의 흥미로운 통계도 도출하여 게임에 녹여냈다.

그리고, 코인 데이터의 실시간성을 높이기 위해 Kafka Streams를 사용하고, 파티션과 스레드 수를 증가시켜서 실제로 코인 데이터가 약 700% 더 많이 쓰여지는 걸 확인했다.

 

업데이트를 하면서 어려웠던 것은, 무엇보다 처음 설계와 구성 방법이었다.

만약 내가 자원이 많았다면, 써보고 싶은 더 편하고 좋은 기술을 많이 사용할 수 있겠지만,

한정된 자원에서 개발을 하다보니 최대한 비용효율적으로 설계하고, 그에 맞게 구축하는 게 정말 어려웠다.

 

하지만, 이렇게 비용효율적으로 구축하다 보니 유지 가능성이 더 높아졌고,

개인 자본으로 서버 비용을 부담하는 이상 한 달만 운영할 수 있는 게임이 아닌 장기간 운영할 수 있게 되었다.

업데이트를 마친 최종 데이터 파이프라인 아키텍처

 

4. 게임 런칭

게임은 업데이트를 마치고 웹 게임으로 런칭하게 되었다.

게임주소: https://www.coinking.site/signin

 

코인예측왕

 

www.coinking.site

 

호기롭게 게임 런칭을 했으나, 확실히 홍보를 안하니까 유저가 당연히 없었고, 개인 SNS에 최대한 글을 올리면서 홍보를 했다.

그리고, 상위 5명의 유저들에게 커피 상품권을 상품으로 거는 마케팅도 진행했지만, 생각보다 유저가 많이 없었다.

 

그래서, 좋은 서비스를 개발해도 마케팅을 하지 않으면 사람들은 모르기 때문에 이용하지 않는다 라는 걸 제대로 느낄 수 있었다.

게임은 무료면서, 3분 정도 밖에 걸리지 않지만 사람들은 이 역시 플레이 하는 것을 꺼려하는 것도 알았다.

 

그래서 접근성을 높이기 위해 모바일 버전으로도 개발해서 더 많은 사람들이 쉽게 플레이 할 수 있게 업데이트를 해보려고 한다.

2024 벤처스타트업아카데미 성과공유 페스티벌 참여

이와 같은 내용으로 벤처아카데미에서 진행하는 성과공유 페스티벌에 이와 같은 내용으로 참여해서 성과를 공유하고 게임 홍보도 진행했다.

 


마무리하며..

어떤 프로젝트를 개발 해본 경험은 다수 있었지만, 이렇게 게임을 기획부터 개발, 업데이트, 런칭까지 진행해본 경험은 처음이었다.

개발 하는 것 자체는 상대적으로 어렵지 않았지만, 생각보다 기획하고 설계하는 과정이 정말 어렵고 만족스럽게 개발을 마치더라도,

마케팅이 없으면 이용자가 없을 수 있다는 것을 느끼고 개발 만큼이나 마케팅의 중요성을 알게 되었다.

 

그리고, 좋은 팀플의 소중함을 느꼈고, 이 팀으로 계속 다른 프로젝트도 진행했으면 좋겠다고 생각된다.

 

다음 프로젝트로는 개인적으로 투자에 관심이 많이 생겨서, 나스닥과 코인의 동향을 실시간으로 모니터링 해주는 투자 도우미 역할을 해줄 수 있는 서비스를 개발해보고자 한다.