Large language models like ChatGPT have taken the world by storm. There is no question that they have already changed the face of technology, and will continue to improve every year. Humans still make decisions based on information found on the internet, and a growing pool of AI-generated pseudo-information and misinformation is making it more difficult to distinguish fact from fiction. It’s not unreasonable to speculate that in the near future, the only things capable of detecting whether a piece of content was AI-generated will be other AI systems.
Building and deploying such systems is not an easy task. They need tons of domain-specific training data, of both human and AI origins. They need sophisticated models for distinguishing AI content from human content in a variety of contexts. They need to be deployed at large scales in real time, so generated content gets flagged immediately. This is a tall order, but it’s not impossible. In fact, it’s necessary.
In this post, I will sketch out what such a system might look like using a real example close to everyone’s heart: Amazon reviews. No one wants to make a purchase on Amazon based on glowing 5-star reviews if those reviews aren’t written by real people who really own the product. We have some tools at our disposal to address this problem.
- Data: We can use a massive dataset of Amazon reviews (over 571 million!) collected by Julian McAuley’s lab. This dataset covers reviews from 1996 to 2023.
- Detection Method: A recent Kaggle competition focused on detecting AI-generated text. One of the top entries used a method called BERT(Bidirectional Encoder Representations from Transformers) to identify fake reviews. The pre-trained model is stored at
detector/detector.pt
. - Real-time Analysis: Deephaven Data Labs offers a powerful real-time data processing tool. This allows us to analyze reviews as they’re written, not just after the fact.
To follow along, clone this project’s GitHub repository and follow the README. Let’s get started!
The code in this article is written for Deephaven 0.36.1. More specific versioning requirements can be found in the requirements.txt
file for this project.
Before getting started with this example, you’ll need to download the Amazon data. This only needs to be done once, and can take some time depending on the speed of your internet connection and the number of processors available to you. This took about 20 minutes on a Macbook Pro M2 with 8 cores.
First, import the libraries needed to load the data.
import os
import datasets
import datetime as dt
datasets.logging.set_verbosity_error()
Next, set NUM_PROC
equal to the number of processors on your machine. This step is important, as it significantly impacts the download speed.
Now, create a list of the dataset names for downloading, and define a start time to avoid downloading all the data. If you want to work with less data, leave some categories out of the list, or increase the start time.
names = [
'All_Beauty',
'Toys_and_Games',
'Cell_Phones_and_Accessories',
'Industrial_and_Scientific',
'Gift_Cards',
'Musical_Instruments',
'Electronics',
'Handmade_Products',
'Arts_Crafts_and_Sewing',
'Baby_Products',
'Health_and_Household',
'Office_Products',
'Digital_Music',
'Grocery_and_Gourmet_Food',
'Sports_and_Outdoors',
'Home_and_Kitchen',
'Subscription_Boxes',
'Tools_and_Home_Improvement',
'Pet_Supplies',
'Video_Games',
'Kindle_Store',
'Clothing_Shoes_and_Jewelry',
'Patio_Lawn_and_Garden',
'Unknown',
'Books',
'Automotive',
'CDs_and_Vinyl',
'Beauty_and_Personal_Care',
'Amazon_Fashion',
'Magazine_Subscriptions',
'Software',
'Health_and_Personal_Care',
'Appliances',
'Movies_and_TV'
]
start_time_millis = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc).timestamp() * 1_000
Finally, download and filter the data, and write the results to /amazon-data/
.
for name in names:
if not os.path.exists(f"amazon-data/reviews/{name}.parquet"):
print(f"Writing {name} review data...")
review_dataset = datasets.load_dataset(
"McAuley-Lab/Amazon-Reviews-2023",
f"raw_review_{name}",
num_proc=NUM_PROC,
trust_remote_code=True)['full']
filtered_review_dataset = (
review_dataset
.select_columns(["rating", "title", "text", "parent_asin", "user_id", "timestamp"])
.filter(lambda timestamp: timestamp >= start_time_millis, input_columns="timestamp")
)
filtered_review_dataset.to_parquet(f"amazon-data/reviews/{name}.parquet")
if not os.path.exists(f"amazon-data/items/{name}.parquet"):
print(f"Writing {name} item data...")
meta_dataset = datasets.load_dataset(
"McAuley-Lab/Amazon-Reviews-2023",
f"raw_meta_{name}",
num_proc=NUM_PROC,
trust_remote_code=True)['full']
filtered_meta_dataset = (
meta_dataset
.select_columns(["main_category", "title", "average_rating", "rating_number", "parent_asin"])
)
filtered_meta_dataset.to_parquet(f"amazon-data/items/{name}.parquet")
Now that we have the data on disk, we can use it to simulate a real-time data stream!
First, we want to stream the Amazon review dataset in real time. The Amazon dataset is static, so we will use TableReplayer
to simulate a real-time review stream. If you have a real-time review stream in a format like Kafka, you can directly use the stream without needing to simulate it.
Start by importing the necessary libraries.
from math import floor
from deephaven import parquet, dtypes
from deephaven.table import TableDefinition
from deephaven.replay import TableReplayer
from deephaven.time import to_j_instant
Now, read the Amazon reviews into a Parquet table with Deephaven’s Parquet module.
reviews_def = TableDefinition({
"rating": dtypes.double,
"title": dtypes.string,
"text": dtypes.string,
"parent_asin": dtypes.string,
"user_id": dtypes.string,
"timestamp": dtypes.long
})
reviews = parquet.read(
"/amazon-data/reviews/",
file_layout=parquet.ParquetFileLayout.FLAT_PARTITIONED,
table_definition=reviews_def
)
reviews = (
reviews
.update("timestamp = epochMillisToInstant(timestamp)")
.sort("timestamp")
)
The reviews
table has 25.6 million observations spanning 9 months. Streaming through all of those observations in real time would take… 9 months. Instead, we randomly sample 1 in 10,000 reviews and replay that data at 10,000x speed. This emulates Amazon’s real-world review frequency and lets us visualize long-term trends in just a few minutes.
min_time = to_j_instant("2023-01-01T00:00:00.000Z")
replay_start_time = to_j_instant("2024-01-01T00:00:00Z")
replay_end_time = to_j_instant("2024-01-01T00:36:00Z")
data_speed = 10_000
reviews = (
reviews
.where("random() < 1 / data_speed")
.update("replay_timestamp = replay_start_time + (long)floor((timestamp - min_time) / data_speed)")
)
Now, replay the data with Deephaven’s TableReplayer
.
reviews_replayer = TableReplayer(replay_start_time, replay_end_time)
reviews_ticking = reviews_replayer.add_table(reviews, "replay_timestamp").drop_columns("replay_timestamp")
reviews_replayer.start()
With data flowing in simulation, it’s possible to focus on the real-time detection of AI bots using BERT, our neural network that’s been trained on ChatGPT-generated text. It’s easier than you’d expect.
First, load the necessary libraries.
import concurrent.futures
import logging
import torch
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from deephaven.table_listener import listen, TableUpdate
from deephaven.stream.table_publisher import table_publisher
from deephaven.stream import blink_to_append_only
from deephaven import new_table
import deephaven.column as dhcol
import deephaven.dtypes as dtypes
Next, import the trained model’s parameters into a new model object and load the tokenizer needed to transform the input data.
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
for logger in loggers:
if "transformers" in logger.name.lower():
logger.setLevel(logging.ERROR)
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
model.load_state_dict(torch.load("/detector/detector.pt", weights_only=False))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizer.from_pretrained(
'bert-base-uncased',
do_lower_case=True,
padding=True,
truncation=True,
max_length=128,
clean_up_tokenization_spaces=True
)
Now, we’re going to walk through a real-time AI workflow step-by-step. The workflow looks like this:
- Create an object called a
TablePublisher
to publish new data to a ticking table. This table,preds_blink
, will contain the new predictions. - Define a function to perform inference and publish the results to
preds_blink
. - Create a
TableListener
that will listen to the ticking data source and call the inference/publisher function as new data rolls in. - Tie it all together by listening to the ticking source, performing inference on new inputs, and publishing the results to a new table.
First, create the TablePublisher
using the table_publisher
function. This function returns an empty table to capture the published data, which we’ll call preds_blink
, and an object that publishes data to that table, which we’ll call preds_publish
. preds_blink
is a blink table, meaning that it will only hold the most recent data from a given update cycle. Check out the guide on table publishers to learn more.
preds_blink, preds_publish = table_publisher(
"DetectorOutput", {
"rating": dtypes.double,
"parent_asin": dtypes.string,
"user_id": dtypes.string,
"timestamp": dtypes.Instant,
"gen_prob": dtypes.float32
},
)
Next, define a function to perform the inference and publish the results to a new table using the table publisher defined previously. This function will be called every time more data rolls in, enabling Deephaven to perform real-time inference on only the most recent data. For simplicity, we’ve broken this into two functions: one to actually perform the inference on a given set of inputs and one to call that function and publish the results to a new table.
def detect_bot(text):
tokenized_text = tokenizer(text.tolist(), padding=True, truncation=True, return_tensors='pt')
tokenized_text = {key: value.to(device) for key, value in tokenized_text.items()}
with torch.no_grad():
outputs = model(**tokenized_text)
logits = outputs.logits
predictions = torch.softmax(logits, dim=1)[:, 1].cpu().numpy()
return predictions
def compute_and_publish_inference(inputs, features):
outputs = detect_bot(inputs)
output_table = new_table(
[
dhcol.double_col("rating", features["rating"]),
dhcol.string_col("parent_asin", features["parent_asin"]),
dhcol.string_col("user_id", features["user_id"]),
dhcol.datetime_col("timestamp", features["timestamp"]),
dhcol.float_col("gen_prob", outputs)
]
)
preds_publish.add(output_table)
return None
Next, we create a TableListener
that listens to the ticking source and calls compute_and_publish
on new data. To do this, define a function called on_update
that takes two arguments, update
and is_replay
. Extract the added and modified data from the update
argument using update.added()
and update.modified()
. See the guide on table listeners to learn more.
Finally, we know that calling compute_and_publish
will be expensive – neural network inference is not cheap. Instead of delaying the main thread with these expensive calculations, offload them to a separate thread using a ThreadPoolExecutor
. This will collect the calculations to be done into a queue, and execute them as resources are available.
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
def on_update(update: TableUpdate, is_replay: bool) -> None:
input_col = "text"
feature_cols = ["rating", "parent_asin", "user_id", "timestamp"]
adds = update.added(cols=[input_col, *feature_cols])
modifies = update.modified(cols=[input_col, *feature_cols])
if adds and modifies:
inputs = np.hstack([adds[input_col], modifies[input_col]])
features = {feature_col: np.hstack([adds[feature_col], modifies[feature_col]]) for feature_col in feature_cols}
elif adds:
inputs = adds[input_col]
features = {feature_col: adds[feature_col] for feature_col in feature_cols}
elif modifies:
inputs = modifies[input_col]
features = {feature_col: modifies[feature_col] for feature_col in feature_cols}
else:
return
executor.submit(compute_and_publish_inference, inputs, features)
Now, tie it all together. The listen
function below calls on_update
every time a new review ticks into reviews_ticking
. This runs the inference calculation on the new data and stores the result in preds_blink
. Finally, blink_to_append_only
converts preds_blink
to an append-only table that stores the full history of the reviews and predictions.
handle = listen(reviews_ticking, on_update)
preds = blink_to_append_only(preds_blink)
The AI model output is captured in preds
in real time as data rolls into reviews_ticking
.
In a follow-up post, we’ll perform a downstream analysis on these results and see what we can learn about the incidence of ChatGPT-generated reviews on Amazon. Stay tuned!
This is a basic example of the systems discussed earlier, but it demonstrates how accessible such tasks have become with the right tools. HuggingFace and PyTorch have simplified deep learning by providing user-friendly interfaces to cutting-edge models. Researchers who spent countless hours implementing CNNs in C may be amazed at how straightforward these complex tasks are now.
A similar transformation is underway in real-time data processing with tools like Deephaven. Building real-time data applications has never been easier. Data practitioners will likely view batch processing, disparate interfaces, and manual configuration as outdated practices, much like we now regard C-based neural networks. The sentiment will be the same: “I’m glad I don’t have to do that anymore.”
Source link
lol