비트코인의 24시간 최대 하락률과 최대 상승률을 분석해보았다.
그렇다.. 물렸다..
🗂 데이터
데이터는 kaggle에서 바이낸스 BTC-USD 15분봉 데이터를 활용하였다.
Open time, Open, High, Low, Close 등의 열이 있다.
5월 24일에 다운받았는데 전날인 5월 23일 00시 데이터까지 들어있다.

1일봉 데이터를 사용하면 00시 기준으로 데이터가 나뉜다.
내가 알고싶은 것은 일 단위가 아닌 24시간단위이기 때문에 15분봉 데이터를 활용하였다.
2018년 1월 1일부터 2025년 5월 23일 00시 까지 데이터가있다.
2,698일치 데이터이므로 259,008개의 데이터가 있어야하는데 (=2,698*24시*4(15분봉))
길이가 258,573 으로, 435개가 빠졌다.
🔎 분석하기

분석 순서는 아래와 같다.
- 우선 전체 데이터프레임을 상승장과 하락장으로 나눈다.
- 슬라이딩 윈도우 방식으로 24시간치 데이터를 묶어 한 Task로 만든다.
- 해당 task에서 최대 상승률과 하락률을 계산한다.
- 다음으로는 이전 최대치와 비교한다.
이렇게 상승장과 하락장에서 각각 최대 상승률과 하락률 구간이 언제인지 결과가 나오게 된다.
96개씩 1칸씩 밀어가며 모든 task를 계산하므로 너무 많아서 멀티프로세싱방식으로 구현하였다.
import pandas as pd
import numpy as np
from datetime import datetime
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
import os
# 상승장 / 하락장 정의
bull_periods = [
('2012-11-28', '2013-11-30'),
('2015-01-14', '2017-12-17'),
('2018-12-15', '2021-11-10'),
('2022-11-21', '2023-01-09'),
('2023-01-09', '2100-01-01'),
]
bear_periods = [
('2013-11-30', '2015-01-14'),
('2017-12-17', '2018-12-15'),
('2021-11-10', '2022-11-21'),
]
def label_market(date):
for start, end in bull_periods:
if datetime.strptime(start, "%Y-%m-%d") <= date <= datetime.strptime(end, "%Y-%m-%d"):
return 'Bull'
for start, end in bear_periods:
if datetime.strptime(start, "%Y-%m-%d") <= date <= datetime.strptime(end, "%Y-%m-%d"):
return 'Bear'
return 'Unknown'
# 슬라이딩 윈도우 작업을 분리한 wrapper 함수
def analyze_window_wrapper(args):
return analyze_single_window(*args)
# 윈도우 하나에 대해 최대 상승/하락 분석
def analyze_single_window(window_high, window_low, window_time):
window_size = len(window_high)
# 최대 하락률 계산
# 각 시점(i)의 고가를 행으로 복제한 2차원 행렬 생성 (i, j)
# 예: 1행은 high[0]이 96번, 2행은 high[1]이 96번 등
peak_matrix = np.repeat(window_high.reshape(-1, 1), window_size, axis=1)
# 각 시점(j)의 저가를 열로 복제한 2차원 행렬 생성
# 예: 첫 열은 low[0]이 96번, 두 번째 열은 low[1]이 96번 등
trough_matrix = np.repeat(window_low.reshape(1, -1), window_size, axis=0)
# (고점 - 저점) / 고점 * 100 → 하락률(%) 계산 행렬
# 이 행렬의 (i, j)는 i 시점에 사고 j 시점에 팔았을 때 하락률
drawdown_matrix = (peak_matrix - trough_matrix) / peak_matrix * 100
# 무의미한 값(현재 또는 과거 시점 이후로 파는 경우)을 -inf 처리
# 즉, i >= j 인 경우는 제외 (하락은 미래 방향만 의미 있음)
drawdown_matrix[np.tril_indices(window_size)] = -np.inf
# 최대 하락률이 발생한 (i, j) 인덱스 추출
dd_k, dd_j = np.unravel_index(np.argmax(drawdown_matrix), drawdown_matrix.shape)
## 최대 상승률 계산
# 시점(i)의 저가를 행으로 복제한 행렬 생성
trough_matrix = np.repeat(window_low.reshape(-1, 1), window_size, axis=1)
# 각 시점(j)의 고가를 열로 복제한 행렬 생성
peak_matrix = np.repeat(window_high.reshape(1, -1), window_size, axis=0)
# (고점 - 저점) / 저점 * 100 → 상승률(%) 계산 행렬
# 이 행렬의 (i, j)는 i 시점에 사고 j 시점에 팔았을 때 상승률
rise_matrix = (peak_matrix - trough_matrix) / trough_matrix * 100
# 마찬가지로 i >= j 인 경우는 의미 없으므로 -inf 처리
rise_matrix[np.tril_indices(window_size)] = -np.inf
# 최대 상승률이 발생한 (i, j) 인덱스 추출
rise_k, rise_j = np.unravel_index(np.argmax(rise_matrix), rise_matrix.shape)
return {
'max_drawdown_percent': {
'start_time': window_time[dd_k],
'end_time': window_time[dd_j],
'peak_price': window_high[dd_k],
'trough_price': window_low[dd_j],
'drawdown_percent': drawdown_matrix[dd_k, dd_j]
},
'max_rise_percent': {
'start_time': window_time[rise_k],
'end_time': window_time[rise_j],
'trough_price': window_low[rise_k],
'peak_price': window_high[rise_j],
'rise_percent': rise_matrix[rise_k, rise_j]
}
}
# 전체 기간에서 슬라이딩 윈도우 분석 (멀티프로세싱)
def analyze_market_windows_parallel(df: pd.DataFrame, window_size: int = 96):
high = df['High'].values
low = df['Low'].values
open_time = df['Open time'].values
# sliding window. 데이터 누락때문에 사용 불가
# tasks = [(high[i:i+window_size], low[i:i+window_size], open_time[i:i+window_size])
# for i in range(len(df) - window_size + 1)]
tasks = []
for i in range(len(df) - window_size + 1):
h = high[i:i+window_size]
l = low[i:i+window_size]
t = open_time[i:i+window_size]
# 시간차 계산
time_diff = t[-1] - t[0]
if time_diff <= 85500000000000:
tasks.append((h, l, t))
else:
pass
if not tasks:
print("⛔ 조건에 맞는 유효한 윈도우가 없습니다.")
return None
with Pool(cpu_count()) as pool:
results = list(tqdm(
pool.imap_unordered(analyze_window_wrapper, tasks),
total=len(tasks), desc="슬라이딩 분석 진행 중"
))
max_dd_result = max(results, key=lambda x: x['max_drawdown_percent']['drawdown_percent'])
max_rise_result = max(results, key=lambda x: x['max_rise_percent']['rise_percent'])
if not results:
return {
'📉 최대 하락률 구간': '❌ 충분한 데이터 없음',
'📈 최대 상승률 구간': '❌ 충분한 데이터 없음'
}
else:
return {
'📉 최대 하락률 구간': max_dd_result['max_drawdown_percent'],
'📈 최대 상승률 구간': max_rise_result['max_rise_percent']
}
# Bull / Bear 분리 후 각각 분석 실행
def analyze_by_market(df: pd.DataFrame):
df['Open time'] = pd.to_datetime(df['Open time'])
df = df.sort_values(by=['Open time'], axis=0)
df['Market'] = df['Open time'].apply(label_market)
bull_df = df[df['Market'] == 'Bull'].reset_index(drop=True)
bear_df = df[df['Market'] == 'Bear'].reset_index(drop=True)
print(f"📊 Bull 데이터 개수: {len(bull_df)}")
print(f"📊 Bear 데이터 개수: {len(bear_df)}")
if len(bull_df) >= 96:
print("\n✅ [상승장] 분석 시작...")
bull_result = analyze_market_windows_parallel(bull_df)
format_result(bull_result, "상승장")
else:
print("⛔ 상승장 데이터가 부족합니다 (96개 미만).")
if len(bear_df) >= 96:
print("\n✅ [하락장] 분석 시작...")
bear_result = analyze_market_windows_parallel(bear_df)
format_result(bear_result, "하락장")
else:
print("⛔ 하락장 데이터가 부족합니다 (96개 미만).")
def format_result(result_dict, market_label):
print(f"\n✅ {market_label} 결과 요약")
rise = result_dict['📈 최대 상승률 구간']
fall = result_dict['📉 최대 하락률 구간']
print(f"\n📈 최대 상승률 구간:")
print(f" 시작일 : {rise['start_time']}")
print(f" 종료일 : {rise['end_time']}")
print(f" 시작 가격 : {rise['trough_price']:.2f}")
print(f" 종료 가격 : {rise['peak_price']:.2f}")
print(f" 상승률(%) : {rise['rise_percent']:.2f}%")
print(f"\n📉 최대 하락률 구간:")
print(f" 시작일 : {fall['start_time']}")
print(f" 종료일 : {fall['end_time']}")
print(f" 시작 가격 : {fall['peak_price']:.2f}")
print(f" 종료 가격 : {fall['trough_price']:.2f}")
print(f" 하락률(%) : {fall['drawdown_percent']:.2f}%")
if __name__ == "__main__":
import pandas as pd
##==== 수정 필요=====
base_dir = "/my/file/path"
filename = "btc_15m_data_2018_to_2025.csv"
file_path = os.path.join(base_dir, filename)
df = pd.read_csv(file_path)
analyze_by_market(df)
처음에 데이터 누락이 있을거를 생각 못하고 단순하게 슬라이딩 윈도우로 96개씩 한 task를 만들었다가 결과가 이상하게 나와버렸었다...
그리고 슬라이딩 윈도우로 했을 떄 또 다른 문제가 있었다.
상승장, 하락장을 구분하여 데이터프레임을 만들고 task를 만드는데,
단순하게 96개씩 묶어버리니 시간 간격이 24시간을 초과하여도 한 task로 묶이는 문제가 발생했다.
결과가 21년 부터 22년까지 77%가 상승한게 최대 상승률 구간이라고 나오는 등 잘못된 결과가 나왔다.
📔 실행 결과
✅ 상승장 결과 요약
📈 최대 상승률 구간:
시작일 : 2020-03-13T02:15:00.000000000
종료일 : 2020-03-13T13:30:00.000000000
시작 가격 : 3782.13
종료 가격 : 5955.00
상승률(%) : 57.45%
📉 최대 하락률 구간:
시작일 : 2020-03-12T02:30:00.000000000
종료일 : 2020-03-13T02:15:00.000000000
시작 가격 : 7768.75
종료 가격 : 3782.13
하락률(%) : 51.32%
✅ 하락장 결과 요약
📈 최대 상승률 구간:
시작일 : 2018-02-06T11:15:00.000000000
종료일 : 2018-02-07T11:00:00.000000000
시작 가격 : 6268.75
종료 가격 : 8274.00
상승률(%) : 31.99%
📉 최대 하락률 구간:
시작일 : 2018-01-15T22:45:00.000000000
종료일 : 2018-01-16T22:30:00.000000000
시작 가격 : 13790.00
종료 가격 : 9035.00
하락률(%) : 34.48%

📚 추가 분석
24시간 아니어도 최대 하락, 상승률을 구하고싶은데 추세 전환을 어떤 기준으로 잡아야할지 모르곘다.
n% 이상 변동 시
n개 이상 연달아 같은 캔들 발생 시
이평선 위/아래 있는 경우,
ADX를 이용한 경우 등등
여러가지가 있지만 차트가 단순하게 위아래로만 움직이는게 아니라 횡보하는 경우도 많아서 더 고민해봐야한다.
'Data > Data Analystics' 카테고리의 다른 글
| [데이터 분석] 정말 비행기가 가장 안전한 교통 수단일까? 2. 데이터 전처리 (0) | 2024.12.30 |
|---|---|
| [데이터 분석] 정말 비행기가 가장 안전한 교통 수단일까? 1. 데이터 수집 (0) | 2024.12.30 |
| [데이터 분석] 정말 비행기가 가장 안전한 교통 수단일까? 0. 분석 계기 (1) | 2024.12.30 |
| [2023 빅콘테스트] 클래식 공연 활성화를 위한 효과적 가격 모델 수립 (0) | 2024.05.01 |
| [ADP 실기 준비] 코로나19 - 인구대비 상위 5개국 구하기 + 시각화 (1) | 2020.08.24 |