Classification problems are standard table stakes for today’s data scientists. Modern ML algorithms offer tools well suited for the task. However, such problems become more interesting and more challenging to address when you need to classify data as it arrives in real time. One effective workflow enables you to train on static data, then apply your code to the streaming data source to perform real-time classification.
Deephaven – with its “streaming tables” – uniquely empowers this ML workflow. Python, custom functions, and table operations are delivered to tables. Deephaven enables you to be agnostic about whether those tables are static or dynamic. If your code works on static data, it will also just work in real time without any extra steps. This easy workflow is intended to empower a range of AI and ML scientists and their use cases in insurance, security, customer support, and health care.
For example, a classification algorithm could be used to support a doctor’s analysis of diabetes risk for a patient – in real time.
In this blog, I describe a working prototype of such a model. SciKit-Learn is used for classification, and its integration with Deephaven extends the training into the real-time application. Since statically-trained models can be applied to dynamic tables without extra work, moving between training, testing, and deployment tasks is easy. The result is a machine learning model that can analyze diabetes risk based on health factors.
The CDC reports that 1.5 million Americans are diagnosed with diabetes every year, and in 2018, the American Diabetes Association reported that diabetes costs the U.S. $327 billion dollars. Early detection and mitigation have known benefits.
The code in this blog uses SciKit-Learn, which is not included in Deephaven’s base Docker images. However, our suite of base+ images includes this module. Instructions on installing Deephaven with this module built-in are found in the following places:
Additionally, Python packages can be installed manually. For more information, see How to install Python packages in Deephaven.
The data used in this blog can be found on Kaggle and in our examples repository.
Let’s start by loading the CSV data into a Deephaven table.
from deephaven import read_csv
diabetes_indicators = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/Diabetes/diabetes_012_health_indicators_BRFSS2015.csv")
The table includes rows for 253,680 patients, and columns for each of 21 health indicators that give a diagnosis of no diabetes, prediabetes, or diabetes.
Before we dive headfirst into solving this problem, we need to know a little more about the data. First off, we want to find out if any of the columns in this dataset give us little to no indication that a person may be diabetic. Pandas DataFrames have a built-in function called corr
that can help us do just that.
from deephaven import , pandas
import pandas as pd
df_indicators = pandas.to_pandas(diabetes_indicators)
diabetic_correlation = df_indicators.corr()['Diabetes_012']
print(diabetic_correlation)
For this application, we want columns with a sufficiently high correlation to the diabetes diagnosis. Let’s use a minimum correlation of 0.16.
df_indicators = df_indicators[diabetic_correlation[diabetic_correlation > 0.16].keys()]
print(df_indicators)
We’re left with 8 of the original 21 indicators. Now, let’s split the data into training and testing sets. When we split the data, we want to ensure that the rate of each diagnosis in the train and test sets mirrors that of the full dataset. Here, we’ll use 20% of the data to train our model (which we’ll create shortly). That small percentage contains over 50,000 individual patients, which should be plenty to train our model with.
import numpy as np
df_0 = df_indicators.loc[df_indicators["Diabetes_012"] == 0]
df_1 = df_indicators.loc[df_indicators["Diabetes_012"] == 1]
df_2 = df_indicators.loc[df_indicators["Diabetes_012"] == 2]
diag_0_mask = np.random.rand(len(df_0)) < 0.2
diag_1_mask = np.random.rand(len(df_1)) < 0.2
diag_2_mask = np.random.rand(len(df_2)) < 0.2
df_0_train, df_0_test = df_0[diag_0_mask], df_0[~diag_0_mask]
df_1_train, df_1_test = df_1[diag_1_mask], df_1[~diag_1_mask]
df_2_train, df_2_test = df_2[diag_2_mask], df_2[~diag_2_mask]
training_set = pandas.to_table(pd.concat([df_0_train, df_1_train, df_2_train]).sample(frac = 1))
testing_set = pandas.to_table(pd.concat([df_0_test, df_1_test, df_2_test]).sample(frac = 1))
x_cols = ["HighBP", "HighChol", "BMI", "HeartDiseaseorAttack", "GenHlth", "PhysHlth", "DiffWalk", "Age"]
y_cols = "Diabetes_012"
This is a classification problem, so there are several viable ways to solve it using machine learning. Here, I’ll use a Random Forest classifier. SciKit-Learn has a built-in method called sklearn.ensemble.RandomForestClassifier
. We construct the classifier with the following properties (note that the random state can be changed; 19 was chosen arbitrarily):
random_state = 19
criterion = "entropy"
from sklearn.ensemble import RandomForestClassifier as rfc
rfc_model = rfc(random_state = 1, criterion = "entropy")
Fitting the training data to this model is simple. We will time the operation and report the score at the end of fitting.
Additionally, we create the functions that define how data is transferred to and from Deephaven tables and NumPy arrays.
import time
def fit_rfc(x_train, y_train):
global rfc_model
start = time.time()
rfc_model.fit(x_train, y_train)
end = time.time()
print(f"Accuracy on test set: {rfc_model.score(x_train, y_train)}")
print(f"Random Forest Classifier trained in {end - start} seconds.")
def table_to_numpy_int(rows, columns):
return np.squeeze(gather.table_to_numpy_2d(rows, columns, dtype = np.intc))
def numpy_to_table(data, index):
return data[index]
Now we can actually fit the model to the training data. Doing so is easy – it follows the steps outlined in our guide How to use deephaven.learn.
from deephaven import learn
from deephaven.learn import gather
learn.learn(
table = training_set,
model_func = fit_rfc,
inputs = [learn.Input(x_cols, table_to_numpy_int), learn.Input(y_cols, table_to_numpy_int)],
outputs = None,
batch_size = training_set.intSize()
)
How well did our model perform once it was fitted to the training set?
def use_fitted_model(x_test):
global rfc_model
return rfc_model.predict(x_test)
training_predictions = learn.learn(
table = training_set,
model_func = use_fitted_model,
inputs = [learn.Input(x_cols, table_to_numpy_int)],
outputs = [learn.Output("Prediction", numpy_to_table, "int")],
batch_size = training_set.intSize()
)
dia_correct = training_predictions.where(filters = ["Diabetes_012 == 2 && Prediction == 2"]).intSize()
dia_wrong_pre = training_predictions.where(filters = ["Diabetes_012 == 2 && Prediction == 1"]).intSize()
dia_wrong_no = training_predictions.where(filters = ["Diabetes_012 == 2 && Prediction == 0"]).intSize()
pre_correct = training_predictions.where(filters = ["Diabetes_012 == 1 && Prediction == 1"]).intSize()
pre_wrong_dia = training_predictions.where(filters = ["Diabetes_012 == 1 && Prediction == 2"]).intSize()
pre_wrong_no = training_predictions.where(filters = ["Diabetes_012 == 1 && Prediction == 0"]).intSize()
no_correct = training_predictions.where(filters = ["Diabetes_012 == 0 && Prediction == 0"]).intSize()
no_wrong_dia = training_predictions.where(filters = ["Diabetes_012 == 0 && Prediction == 2"]).intSize()
no_wrong_pre = training_predictions.where(filters = ["Diabetes_012 == 0 && Prediction == 1"]).intSize()
print(f"Patients with diabetes: {dia_correct} correct, {dia_wrong_pre} predictions of prediabetes, {dia_wrong_no} predictions of no diabetes.")
print(f"Patients with prediabetes: {pre_correct} correct, {pre_wrong_dia} predictions of diabetes, {pre_wrong_no} predictions of no diabetes.")
print(f"Patients with no diabetes: {no_correct} correct, {no_wrong_dia} predictions of diabetes, {no_wrong_pre} predictions of prediabetes.")
That’s pretty good. These accuracy reports can be summed up:
- Approximately two-thirds of patients with diabetes are correctly identified.
- About half of patients with prediabetes are correctly identified.
- Nearly all patients without diabetes are correctly identified.
Although these results can approximately assess your risk for diabetes, they should not, of course, be considered definitive. If a patient is genuinely concerned about the possibility of being diabetic, we recommend talking to a doctor.
With the model fitted, and the results deemed satisfactory, let’s use that fitted model on the testing set. We’ve already done most of the work!
testing_predictions = learn.learn(
table = testing_set,
model_func = use_fitted_model,
inputs = [learn.Input(x_cols, table_to_numpy_int)],
outputs = [learn.Output("Training_Prediction", numpy_to_table, "int")],
batch_size = testing_set.intSize()
)
The model works on a static set of testing data. In the real world, there is far greater need for real-time solutions than static ones. In the next section, we’ll simulate a real-time feed of incoming patient indicators and show how easy it is to make this model work on the live table.
We want to simulate a real-time feed of incoming patient indicators. At the start of this document, we mentioned an important statistic:
- Approximately 1.5 million Americans are diagnosed with diabetes each year.
That translates to about 4,000 diagnoses per day. We want to somewhat replicate this rate, so we’ll write between one and ten new patient indicators per second.
The real-time feed simulation will be done using DynamicTableWriter.
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
import random, threading
np_testing = pandas.to_pandas(testing_set).values
col_names = x_cols + ["Diabetes_012"]
col_types = [dht.int_] * 9
table_writer = DynamicTableWriter(
col_names,
col_types
)
live_indicators = table_writer.table
def live_feed_of_indicators(np_testing):
for i in range(len(np_testing)):
indicators = np_testing[i, :]
high_bp = int(indicators[1])
high_chol = int(indicators[2])
bmi = int(indicators[3])
hdoa = int(indicators[4])
gen_hlth = int(indicators[5])
phys_hlth = int(indicators[6])
diff_walk = int(indicators[7])
age = int(indicators[8])
diab = int(indicators[0])
table_writer.write_row(high_bp, high_chol, bmi, hdoa, gen_hlth, phys_hlth, diff_walk, age, diab)
time.sleep(random.uniform(0.01, 0.6))
thread = threading.Thread(target = live_feed_of_indicators, args = (np_testing,))
thread.start()
Now all that’s left to do is use the fitted model on the live feed!
live_predictions = learn.learn(
table = live_indicators,
model_func = use_fitted_model,
inputs = [learn.Input(x_cols, table_to_numpy_int)],
outputs = [learn.Output("Prediction", numpy_to_table, "int")],
batch_size = 100
)
There is enormous demand for innovative real-time software solutions in healthcare. This model does an effective job of being a first-pass filter for a patient to determine whether or not they should consult a doctor regarding a possible diabetes diagnosis, or if a doctor should run more tests. This dataset is imbalanced, with the majority of measurements being for people that are not diabetic. This mirrors the real world, in which between 10 and 15% of Americans are estimated to have diabetes. Imbalances like these can have severe negative consequences on the efficacy of machine learning methods. Despite this, the model can aid in making a decision as to whether or not to consult a doctor, or perhaps aid a doctor in performing further evaluation.
The model used in this blog is simple to construct thanks to SciKit-Learn. I also showed how easy it can be to apply statically trained models on real-time data with Deephaven.
Source link
lol