def __init__(self, symbol, multiplier, weight_cutoff, lookback):
print ("Engine initialised for {}....".format(symbol))
self.multiplier = multiplier # multiplier for the volatility of spread
self.weight_cutoff = weight_cutoff # to generate signal only when spread is closer to extremes
self.spread_factor = 50000
self.stoploss_trigger = 0.02
self.trade_df = pd.DataFrame([], columns = ['entry_time', 'dir', 'entry_price', 'spread', 'exit_time', 'exit_price'])
self.ohlc_df = pd.DataFrame([], columns = ['time', 'open', 'high', 'low', 'close'])
self.spread_df = pd.DataFrame([], columns = ['time', 'spread'])
self.signal_dict['5min'] = {'buy_spd': None, 'sell_spd': None}
def EMA(self, df, base, target, period, alpha = False):
con = pd.concat([df[:period][base].rolling(window=period).mean(), df[period:][base]])
df[target] = con.ewm(alpha=1 / period, adjust=False).mean()
df[target] = con.ewm(span=period, adjust=False).mean()
df[target].fillna(0, inplace=True)
def ATR(self, df, period = 14, ohlc=['open', 'high', 'low', 'close']):
atr = 'ATR_' + str(period)
# Compute true range only if it is not computed and stored earlier in the df
if not 'TR' in df.columns:
df['h-l'] = df[ohlc[1]] - df[ohlc[2]]
df['h-yc'] = abs(df[ohlc[1]] - df[ohlc[3]].shift())
df['l-yc'] = abs(df[ohlc[2]] - df[ohlc[3]].shift())
df['TR'] = df[['h-l', 'h-yc', 'l-yc']].max(axis=1)
df.drop(['h-l', 'h-yc', 'l-yc'], inplace=True, axis=1)
# Compute EMA of true range using ATR formula after ignoring first row
self.EMA(df, 'TR', atr, period, alpha=True)
def calculate_spread(self):
self.adj_spread = (self.mark - self.index) + self.spread_factor #factor added to ensure positive spread for ATR calc
self.spread = (self.mark - self.index)
def process_data(self, timestamp):
if self.prev_day != self.day:
self.porfolio_sl_count = 0
if self.index is not None and self.mark is not None and ((self.prev_mark != self.mark) or (self.prev_index != self.index)):
self.kf = py_KalmanFilter(self.spread)
spread_kalman = self.kf.update(self.spread)
if self.count > 15: # wait time for kalman filter to tune its hyperparameters
if self.min_ts.minute % 5 == 0:
self.spread_df.set_index('time', inplace = True)
self.sampled_df = self.spread_df.spread.resample('5 Min').ohlc()
self.sampled_df.reset_index(inplace = True)
self.ohlc_df = pd.concat([self.ohlc_df, self.sampled_df], axis = 0)
self.ohlc_df.dropna(inplace = True)
with open('ohlc_df_{}.pickle'.format(self.symbol), 'wb') as f:
pickle.dump(self.ohlc_df, f)
self.spread_df = pd.DataFrame([], columns = ['time', 'spread'])
self.ohlc_df.set_index('time', inplace = True)
self.minval = np.min(self.ohlc_df.close.values[-self.lookback:]) - self.spread_factor
self.maxval = np.max(self.ohlc_df.close.values[-self.lookback:]) - self.spread_factor
weight = (spread_kalman - self.minval)/(self.maxval - self.minval)
weight = min(max(0, weight), 1)
self.ohlc_df.reset_index(inplace = True)
self.short_atr_df = self.ATR(self.ohlc_df.copy())
self.short_atr_df.dropna(inplace = True)
if len(self.short_atr_df) > 0:
min_atr = self.short_atr_df.iloc[-1]['ATR_14']
buy_spd = round(spread_kalman - min_atr * self.multiplier * weight * 2, 2)
sell_spd = round(spread_kalman + min_atr * self.multiplier * (1-weight) * 2, 2)
if weight < self.weight_cutoff or weight > (1 - self.weight_cutoff):
self.signal_dict['5min']['buy_spd'] = buy_spd
self.signal_dict['5min']['sell_spd'] = sell_spd
self.spread_df.loc[len(self.spread_df)] = [self.min_ts, self.adj_spread]
self.prev_index = self.index
self.prev_mark = self.mark
if self.position == "LONG" and (self.mark/self.entry_price - 1 > self.exit_trigger or self.mark/self.entry_price - 1 < -self.stoploss_trigger):
print ("LONG trade EXITED at {} with mark price = {}; index price = {}; spread = {}".format(self.min_ts, self.mark, self.index, self.spread))
print ("Spread = {}".format(self.signal_dict['5min']))
if self.mark/self.entry_price - 1 < -self.stoploss_trigger:
self.porfolio_sl_count += 1 # check to avoid heavy drawdown at portfolio level in a day
self.pos_data.extend([self.min_ts, self.mark])
self.trade_df.loc[len(self.trade_df)] = self.pos_data
if self.position == "SHORT" and (self.mark/self.entry_price - 1 > self.stoploss_trigger or self.mark/self.entry_price - 1 < -self.exit_trigger):
print ("SHORT trade EXITED at {} with mark price = {}; index price = {}; spread = {}".format(self.min_ts, self.mark, self.index, self.spread))
print ("Spread = {}".format(self.signal_dict['5min']))
if self.mark/self.entry_price - 1 > self.stoploss_trigger:
self.porfolio_sl_count += 1 # check to avoid heavy drawdown at portfolio level in a day
self.pos_data.extend([self.min_ts, self.mark])
self.trade_df.loc[len(self.trade_df)] = self.pos_data
if self.spread < self.signal_dict['5min']['buy_spd'] and self.position == None and self.porfolio_sl_count <= 3:
print ("LONG trade ENTERED at {} with mark price = {}; index price = {}; spread = {}".format(self.min_ts, self.mark, self.index, self.spread))
print ("Spread = {}".format(self.signal_dict['5min']))
self.pos_data = [self.min_ts, 'LONG', self.mark, self.spread]
self.entry_price = self.mark
if self.spread > self.signal_dict['5min']['sell_spd'] and self.position == None and self.porfolio_sl_count <= 3:
print ("SHORT trade ENTERED at {} with mark price = {}; index price = {}; spread = {}".format(self.min_ts, self.mark, self.index, self.spread))
print ("Spread = {}".format(self.signal_dict['5min']))
self.pos_data = [self.min_ts, 'SHORT', self.mark, self.spread]
self.entry_price = self.mark