The biggest players in the crypto space all use AI to predict prices and manage investments. So should you. Doing so isn’t as difficult as you might think.
This is the first in a six-part blog series on real-time crypto price predictions with AI. In this blog, I’ll cover the use of Apache Airflow for the acquisition of up-to-date crypto data.
Throughout this series, you’ll learn how to:
- Acquire up-to-date crypto data with Apache Airflow
- Implement real-time AI with TensorFlow
- Implement real-time AI with Nvidia RAPIDS
- Test the models on simulated real-time data
- Implement the models on real-time crypto data from Coinbase
- Share AI predictions with URIs
In this blog, I will walk you through the first stage of obtaining crypto data. Apache Airflow is a powerful tool for automating the acquisition of crypto data.
To get started with Apache Airflow, follow the setup instructions. Once setup is complete, run:
airflow webserver --port 8080
airflow scheduler
This starts the Apache Airflow server.
A directed acyclic graph (DAG) is a concept in computing that is used to define the order of calculations. Apache Airflow uses DAGs to define the relationships, order, and scheduling of tasks in a workflow. Deephaven uses DAGs to organize the flow of data through queries.
The code below uses a simple Airflow DAG to fetch and transform BTC data on the Gemini exchange from cryptodatadownload. The DAG defines the order and scheduling of operations.
Click to see the code!
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pyarrow.parquet as pq
import pyarrow as pa
import pandas as pd
import requests
import csv
url = "https://www.cryptodatadownload.com/cdd/Gemini_BTCUSD_2022_1min.csv"
def get_data(**kwargs):
with requests.Session() as s:
download = s.get(url)
decoded_content = download.content.decode('utf-8')
cr = csv.reader(decoded_content.splitlines(), delimiter=',')
my_list = list(cr)[2:]
return my_list
def transform_data(**kwargs):
ti = kwargs['ti']
BTC_gemini_data = ti.xcom_pull(key=None, task_ids=['fetch_btc_gemini'])[0]
now = datetime.now().strftime("%m%d%Y%H%M%S")
price = []
date = []
for element in BTC_gemini_data:
time = element[1]
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
p = float(element[-2])
date.append(time)
price.append(p)
d = {"Date":date,"Price":price}
df = pd.DataFrame(data=d)
table = pa.Table.from_pandas(df)
file_name = "all_data/data"+now+".parquet"
pq.write_table(table, file_name)
default_args = {
'owner': 'jeremiah',
'start_date': datetime(2021, 10, 4, 11, 00, 00),
'concurrency': 1,
'retries': 0
}
with DAG('crypto',
catchup = False,
default_args = default_args,
schedule_interval = '*/1 * * * *',
) as dag:
fetch_binance_ohlcv = PythonOperator(task_id = 'fetch_btc_gemini',
python_callable = get_data)
transform_data = PythonOperator(task_id = 'transform_data',
python_callable = transform_data)
fetch_binance_ohlcv >> transform_data
The workflow presented in the code is straightforward:
- Obtain BTC data from cryptodatadownload.
- Transform the data and save timestamps and prices to Parquet.
Airflow manages this workflow by calling it at scheduled intervals. This is managed by the schedule_interval
input to the DAG. Check port 8080
in your web browser to see more details about the DAG, including number of runs, previous run, next run, interval, and more.
With the dynamic data pipeline set up, next comes the fun part: implementing machine learning models. Stay tuned for the next blog in the series where I’ll implement TensorFlow and PyTorch LSTM models to predict prices.
Source link
lol