# Import required libraries
import datetime
import os
import time
from polygon import RESTClient
from sqlalchemy import create_engine
from sqlalchemy import text
from math import sqrt
from math import isnan
import requests
[docs]class Data_Aggregate():
def __init__(self, location, table_name):
self.key = 'beBybSi8daPgsTp5yx5cHtHpYcrjp5Jq' # Prof's key
#self.key = 'YxyiNFmRMYJWORxaIM83t9ED8Jos8ZQO' # key to test
# Enter location to store the db file
self.db_location = location
# Enter name of database
self.table_name = table_name
# assignment - 2
self.EMA = 0
self.ATR = 0
self.keltner_min_val = 0
self.keltner_max_val = 0
# call the API
def call_API(self, from_, to, amount, precision):
url = f"https://api.polygon.io/v1/conversion/{from_}/{to}?amount={str(amount)}&precision={str(precision)}&apiKey={self.key}"
resp = requests.request("GET", url, headers={}, data={})
if resp.status_code == 200:
return resp.json()
else:
return None
# Function slightly modified from polygon sample code to format the date string
[docs] def ts_to_datetime(self, ts) -> str:
'''
Function slightly modified from polygon sample code to format the date string
'''
return datetime.datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
# Function which clears the raw data tables once we have aggregated the data in a 6 minute interval
[docs] def reset_raw_data_tables(self, engine, currency_pairs):
'''
Function which clears the raw data tables once we have aggregated the data in a 6 minute interval
'''
with engine.begin() as conn:
for curr in currency_pairs:
conn.execute(text("DROP TABLE " + curr[0] + curr[1] + "_raw;"))
conn.execute(
text("CREATE TABLE " + curr[0] + curr[1] + "_raw(ticktime text, fxrate numeric, inserttime text);"))
# This creates a table for storing the raw, unaggregated price data for each currency pair in the SQLite database
[docs] def initialize_raw_data_tables(self, engine, currency_pairs):
'''
This creates a table for storing the raw, unaggregated price data for each currency pair in the SQLite database
'''
with engine.begin() as conn:
for curr in currency_pairs:
conn.execute(
text("CREATE TABLE " + curr[0] + curr[1] + "_raw(ticktime text, fxrate numeric, inserttime text);"))
def initialize_raw_data_tables_HW2(self, engine, currency_pairs):
with engine.begin() as conn:
for curr in currency_pairs:
conn.execute(text("CREATE TABLE "+curr[0]+curr[1]+"_raw2(min numeric, max numeric, vol numeric, mean numeric, fd numeric);"))
# This creates a table for storing the (6 min interval) aggregated price data for each currency pair in the SQLite database
[docs] def initialize_aggregated_tables(self, engine, currency_pairs):
'''
This creates a table for storing the (6 min interval) aggregated price data for each currency pair in the SQLite database
'''
with engine.begin() as conn:
for curr in currency_pairs:
conn.execute(text(
"CREATE TABLE " + curr[0] + curr[1] + "_agg(inserttime text, avgfxrate numeric, stdfxrate numeric);"))
# This function is called every 6 minutes to aggregate the data, store it in the aggregate table,
# and then delete the raw data
[docs] def aggregate_raw_data_tables(self, engine, currency_pairs):
'''
This function is called every 6 minutes to aggregate the data, store it in the aggregate table, and then delete the raw data
'''
# Homeworkk - 2 keltner channel MAx, MIN, MAX-min and mean value calculate
with engine.begin() as conn:
for curr in currency_pairs:
# mean value
avg = conn.execute(text("SELECT AVG(fxrate) AS avg_price FROM "+curr[0]+curr[1]+"_raw;"))
for row in avg:
avg_price = row.avg_price
self.EMA = avg_price
# MIN
minimum = conn.execute(text("SELECT MIN(fxrate) AS min_rate FROM "+curr[0]+curr[1]+"_raw;"))
for row in minimum:
min_rate = row.min_rate
# MAX
maxvalue = conn.execute(text("SELECT MAX(fxrate) AS max_rate FROM "+curr[0]+curr[1]+"_raw;"))
for row in maxvalue:
max_rate = row.max_rate
# MAX-MIN as VOl
vol = conn.execute(text("SELECT MAX(fxrate)-MIN(fxrate) AS vol FROM "+curr[0]+curr[1]+"_raw;"))
for row in vol:
vol = row.vol
self.ATR = vol
# FD - fractal dimension
fd = conn.execute(text("SELECT COUNT(*) AS tot_cnt FROM "+curr[0]+curr[1]+"_raw WHERE fxrate < " + str(self.keltner_min_val) + " or fxrate > " + str(self.keltner_max_val) + ";"))
for row in fd:
fd_val = row.tot_cnt
conn.execute(text("INSERT INTO "+curr[0]+curr[1]+"_raw2 (min, max, vol, mean, fd) VALUES (:min, :max, :vol, :mean, :fd);"),[{"min": min_rate, "max": max_rate, "vol": vol, "mean": avg_price, "fd": fd_val}])
# Homework - 1
"""
with engine.begin() as conn:
for curr in currency_pairs:
result = conn.execute(
text("SELECT AVG(fxrate) as avg_price, COUNT(fxrate) as tot_count FROM " + curr[0] + curr[1] + "_raw;"))
for row in result:
avg_price = row.avg_price
tot_count = row.tot_count
std_res = conn.execute(text(
"SELECT SUM((fxrate - " + str(avg_price) + ")*(fxrate - " + str(avg_price) + "))/(" + str(
tot_count) + "-1) as std_price FROM " + curr[0] + curr[1] + "_raw;"))
for row in std_res:
std_price = sqrt(row.std_price)
date_res = conn.execute(text("SELECT MAX(ticktime) as last_date FROM " + curr[0] + curr[1] + "_raw;"))
for row in date_res:
last_date = row.last_date
conn.execute(text("INSERT INTO " + curr[0] + curr[
1] + "_agg (inserttime, avgfxrate, stdfxrate) VALUES (:inserttime, :avgfxrate, :stdfxrate);"),
[{"inserttime": last_date, "avgfxrate": avg_price, "stdfxrate": std_price}])
# This calculates and stores the return values
exec("curr[2].append(" + curr[0] + curr[1] + "_return(last_date,avg_price))")
# exec("print(\"The return for "+curr[0]+curr[1]+" is:"+str(curr[2][-1].hist_return)+" \")")
if len(curr[2]) > 5:
try:
avg_pop_value = curr[2][-6].hist_return
except:
avg_pop_value = 0
if isnan(avg_pop_value) == True:
avg_pop_value = 0
else:
avg_pop_value = 0
# Calculate the average return value and print it/store it
curr_avg = curr[2][-1].get_avg(avg_pop_value)
# exec("print(\"The average return for "+curr[0]+curr[1]+" is:"+str(curr_avg)+" \")")
# Now that we have the average return, loop through the last 5 rows in the list to start compiling the
# data needed to calculate the standard deviation
for row in curr[2][-5:]:
row.add_to_running_squared_sum(curr_avg)
# Calculate the standard dev using the avg
curr_std = curr[2][-1].get_std()
# exec("print(\"The standard deviation of the return for "+curr[0]+curr[1]+" is:"+str(curr_std)+" \")")
# Calculate the average standard dev
if len(curr[2]) > 5:
try:
pop_value = curr[2][-6].std_return
except:
pop_value = 0
else:
pop_value = 0
curr_avg_std = curr[2][-1].get_avg_std(pop_value)
# exec("print(\"The average standard deviation of the return for "+curr[0]+curr[1]+" is:"+str(curr_avg_std)+" \")")
# -------------------Investment Strategy-----------------------------------------------
try:
return_value = curr[2][-1].hist_return
except:
return_value = 0
if isnan(return_value) == True:
return_value = 0
try:
return_value_1 = curr[2][-2].hist_return
except:
return_value_1 = 0
if isnan(return_value_1) == True:
return_value_1 = 0
try:
return_value_2 = curr[2][-3].hist_return
except:
return_value_2 = 0
if isnan(return_value_2) == True:
return_value_2 = 0
try:
upp_band = curr[2][-1].avg_return + (1.5 * curr[2][-1].std_return)
if return_value >= upp_band and curr[
3].Prev_Action_was_Buy == True and return_value != 0: # (return_value > 0) and (return_value_1 > 0) and
curr[3].sell_curr(avg_price)
except:
pass
try:
loww_band = curr[2][-1].avg_return - (1.5 * curr[2][-1].std_return)
if return_value <= loww_band and curr[
3].Prev_Action_was_Buy == False and return_value != 0: # and (return_value < 0) and (return_value_1 < 0)
curr[3].buy_curr(avg_price)
except:
pass
"""
[docs] def acquire_data_and_write(self, currency_pairs):
'''
This access function repeatedly calls the polygon api every 1 seconds for 24 hours
and stores the results.
'''
# Number of list iterations - each one should last about 1 second
count = 0
agg_count = 0
# Create an engine to connect to the database; setting echo to false should stop it from logging in std.out
engine = create_engine("sqlite+pysqlite:///{}/{}.db".format(self.db_location, self.table_name), echo=False, future=True)
# Create the needed tables in the database
self.initialize_raw_data_tables(engine, currency_pairs)
# HW2 initialize data structures and DB table
self.initialize_raw_data_tables_HW2(engine, currency_pairs)
keltner_upper_band = []
keltner_lower_band = []
#self.initialize_aggregated_tables(engine, currency_pairs)
# Loop that runs until the total duration of the program hits 24 hours.
while count < 36000: # 36000 secs = 10 hours # 86400 seconds = 24 hours
# Make a check to see if 6 minutes has been reached or not
if agg_count == 360:
# max and min according to the formula
self.keltner_max_val = self.EMA + count*0.025*self.ATR
self.keltner_min_val = self.EMA - count*0.025*self.ATR
# add the min and max values to the array
keltner_upper_band.append(self.keltner_max_val)
keltner_lower_band.append(self.keltner_min_val)
# Aggregate the data and clear the raw data tables
self.aggregate_raw_data_tables(engine, currency_pairs)
self.reset_raw_data_tables(engine, currency_pairs)
agg_count = 0
# Only call the api every 1 second, so wait here for 0.75 seconds, because the
# code takes about .15 seconds to run
time.sleep(0.75)
# Increment the counters
count += 1
agg_count += 1
# Loop through each currency pair
for currency in currency_pairs:
# Set the input variables to the API
from_ = currency[0]
to = currency[1]
# Call the API with the required parameters
response = self.call_API(from_, to, amount=100, precision=2) #client.forex_currencies_real_time_currency_conversion(from_, to, amount=100, precision=2)
if response == None:
print("exception: response is NONE")
continue
# This gets the Last Trade object defined in the API Resource
last_trade = response["last"]
# Format the timestamp from the result
dt = self.ts_to_datetime(last_trade["timestamp"])
# Get the current time and format it
insert_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Calculate the price by taking the average of the bid and ask prices
avg_price = (last_trade['bid'] + last_trade['ask']) / 2
# Write the data to the SQLite database, raw data tables
with engine.begin() as conn:
conn.execute(text(
"INSERT INTO " + from_ + to + "_raw(ticktime, fxrate, inserttime) VALUES (:ticktime, :fxrate, :inserttime)"),
[{"ticktime": dt, "fxrate": avg_price, "inserttime": insert_time}])