Real-time crypto price predictions with Deephaven and AI | Deephaven

Real-time crypto price predictions with Deephaven and AI | Deephaven


Using AI to predict prices and manage investments is the key to gaining a competitive edge in the crypto space. Doing so isn’t as difficult as you might think.

The Coinbase API makes real-time crypto price acquisition easy. That, in turn, makes model deployment a breeze.

This is the fifth of a six-part blog series on real-time crypto price predictions with AI. In this blog, I’ll deploy the AI models I’ve built and tested previously on a real-time feed of crypto prices. Keep up with the rest of the blog series:

  1. Acquire up-to-date crypto data with Apache Airflow
  2. Implement an LSTM model with TensorFlow
  3. Implement a linear regression model with Nvidia RAPIDS
  4. Test the models on simulated real-time data
  5. Implement the models on real-time crypto data from Coinbase
  6. Share AI predictions with URIs

Now that the model building, training, and testing is all complete, it’s time for the last step in the machine learning workflow: deployment. For this application, the deployment will be applying the models to real-time crypto prices obtained from the Coinbase Websocket API. Previously, Jake Mulford wrote a blog that details how to use the Coinbase websocket API that details how to use this API. I’ll be building on his work for this application.

I’ve made small changes to Jake’s code so that it only listens to a subset of the available data. In order to match the time steps in the training model, we update the crypto price once per minute. I’ve built on his work by only ingesting the data I care about for my AI models. I will use this workflow to predict Bitcoin prices with both models.

Click to see the code!

from websocket import create_connection, WebSocketConnectionClosedException
from deephaven.time import to_datetime, lower_bin
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from threading import Thread
import json


ws = create_connection("wss://ws-feed.exchange.coinbase.com")
ws.send(
json.dumps(
{
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": ["matches"],
}
)
)


def coinbase_time_to_datetime(strn):
return to_datetime(strn[0:-1] + " UTC")


if "coinbase_websocket_table" not in globals():
dtw_columns = {
'time': dht.DateTime,
'price': dht.float_
}


dtw = DynamicTableWriter(dtw_columns)
coinbase_websocket_table = dtw.table


time_dict = {}
connection_open = True


def pull_from_coinbase():
global connection_open

while connection_open:
try:
data = json.loads(ws.recv())
time = coinbase_time_to_datetime(data["time"])
price = float(data["price"])
time_mins = lower_bin(time, 60_000_000_000)
if time_mins in time_dict:
old_time = time_mins
time_dict[time_mins][0] += 1
time_dict[time_mins][1] += price
else:
time_dict[time_mins] = [1,price]
if len(time_dict) > 1:
row_to_write = []
row_to_write.append(old_time)
row_to_write.append(time_dict[old_time][1] / time_dict[old_time][0])
dtw.write_row(*row_to_write)

except KeyError as key_error:
print(f"Warning: The key {key_error.args} has yet to be set.")

except WebSocketConnectionClosedException as connection_error:
print("The connection to the host has been closed.")
ws.close()
connection_open = False


thread = Thread(target=pull_from_coinbase)
thread.start()


thread = Thread(target=pull_from_coinbase)
thread.start()

In the previous blog of the series, we tested both the TensorFlow and Nvidia RAPIDS models on simulated real-time feeds. If you’re keeping up with the series, you’ll see that the code to implement them here is remarkably similar.

The only notable difference in the code is the use of a flag that indicates if the first price has been received from Coinbase.

TensorFlow LSTM model

The model in code below is the TensorFlow LSTM model we previously implemented. To run the code below, you’ll need to run the code from that blog.

Upon receipt of the first price, the first_time flag gets set to false. Any time a new price comes in after that, the prices shift, and the newest price gets rolled to the end of our model input. The price is predicted in turn.

Click to see the code!

def table_to_numpy(rows, cols):
return gather.table_to_numpy_2d(rows, cols, dtype = np.double)


def get_predicted_price(data, idx):
return data


n_input = 4
n_features = 1


first_time = True
prices = np.array([], dtype = np.double)
last_four = np.array([0, 0, 0, 0], dtype = np.double).reshape((1, n_input, n_features))


def predict_with_model(data):
global last_four, first_time

current_pred = model.predict(last_four)
current_pred = scaler.inverse_transform(current_pred)
current_pred = current_pred.reshape(1,-1)[0]
add_data = data[0]
scaled_live_prices = scaler.fit_transform(dhnp.to_numpy(coinbase_websocket_table.view(["price"])).reshape(-1, 1))
value = scaled_live_prices[-1].item()

if first_time:
last_four = np.array([value, value, value, value]).reshape((1, 4, 1))
first_time = False


last_four = np.roll(last_four, -1, axis=1)
last_four[0][-1][0] = value

return current_pred


real_time_prediction=learn.learn(
table = coinbase_websocket_table,
model_func = predict_with_model,
inputs = [learn.Input("price", table_to_numpy_double)],
outputs = [learn.Output("Predicted_price", get_predicted_price, "double")],
batch_size = 1
)

img

Nvidia RAPIDS linear regression model

The code below uses the fitted Nvidia RAPIDS linear regression model previously implemented. To run the code below, first run the code from that blog.

This code is also remarkably similar to that of the previous blog. Just like with the LSTM above, we roll the newest value into the input for our model each time a new, current price is obtained from Coinbase.

Click to see the code!

def table_to_numpy(rows, cols):
return gather.table_to_numpy_2d(rows, cols, dtype = np.double)



def get_predicted_price(data, idx):
return data



last_three = np.array([[0, 0, 0]], dtype = np.double)
first_time = True



def use_fitted_model(data):
global last_three, first_time
value = data[0][0]
if first_time == True:
first_time = False
last_three = np.array([[value,value,value]], dtype = np.double)


last_three = np.roll(last_three, 1, axis=1)
last_three[0,0] = value

predictions=linear_regression_gpu.predict(last_three)

return predictions



Predict_table = learn.learn(
table = coinbase_websocket_table,
model_func = use_fitted_model,
inputs = [learn.Input(["price"], table_to_numpy)],
outputs = [learn.Output("Predicted_Price", get_predicted_price, "double")],
batch_size = 1
)

img

I like to watch my model work in real time. I can see how it performs, and make informed decisions based off its behavior.

from deephaven.plot.figure import Figure

rt_plot = Figure()
.plot_xy(series_name="price", t=real_time_prediction, x="time", y="price")
.plot_xy(series_name="Predicted_price", t=real_time_prediction, x="time", y="Predicted_price")
.show()

Here’s a screenshot of what the plot looks like after the first seven predictions.

img

Try this out for yourself! Both of the models presented in this series can easily be modified to suit your needs, and are flexible in their applications. In the next and final blog of the series, we’ll share the results of our efforts in a new and exciting way.

Reach out on Slack if you have any questions or feedback for us.



Source link
lol

By stp2y

Leave a Reply

Your email address will not be published. Required fields are marked *

No widgets found. Go to Widget page and add the widget in Offcanvas Sidebar Widget Area.