spread_backtest/Specified_Price.py

33 lines
1.4 KiB
Python

import pandas as pd
import numpy as np
def amount_specified(
min_data: pd.DataFrame,
post_adj_factor: pd.DataFrame,
min_amount: float=0.
):
"""
计算指定金额下的平均后复权价格
Args:
min_data (pd.DataFrame): 指定日期分钟数据,需至少包含代码(stock_code)、分钟(Time)、价格(price)、成交量(vol)、成交金额(amount)列
post_adj_factor (pd.DataFrame): 后复权因子数据,需至少包含股票、后复权因子
min_amount (float): 指定最小金额下的平均价
"""
# 按照指定最小量获取平均价格
stock_amt = min_data.pivot_table(index='Time', columns='stock_code', values='amount').cumsum()
stock_vol = min_data.pivot_table(index='Time', columns='stock_code', values='vol').cumsum()
amount_price = stock_amt / stock_vol
amount_price.iloc[1:] = amount_price.iloc[1:].where(stock_amt <= min_amount, np.nan)
amount_price = amount_price.unstack().reset_index()
amount_price.columns = ['stock_code', 'time', 'price']
amount_price = amount_price.dropna(subset=['price']).drop_duplicates(subset='stock_code', keep='last')
# 计算后复权价格
amount_price = amount_price.merge(post_adj_factor, on=['stock_code'], how='left').dropna(subset=['factor'])
amount_price['open_post'] = amount_price['price'] * amount_price['factor']
return amount_price[['stock_code','open_post']]