The money made in meme stock trading is a stunning example of the power of community sentiment. Nevertheless, you could spend hours manually scrolling social media without gleaning meaningful insight. Or, you could equip yourself with automation and ML to measure sentiment – in real time – and produce useful results.
Python and TensorFlow’s natural language processing libraries and Deephaven’s stream-and-batch table engine together meet this challenge. Below, I share the details of an app I made to do natural language processing of the Twitter feed and marry it to stock price information – all in real time.
This starter program looks at tweets about cryptocurrency; however, the possibilities are endless and we encourage you to tailor these queries to suit your interests.
The program showcased here integrates Deephaven with Twitter and Python’s Natural Language Toolkit (NLTK) to pull recent tweets and evaluate sentiment in real time. We start by pulling the data and running a SentimentIntensityAnalyzer
on each tweet. We then aggregate the posts to see the overall positivity or negativity of that term on Twitter with time.
- To get started, go to our example repo or run the following commands to download all the code and dependencies to work with Docker.
git clone https://github.com/deephaven-examples/twitter-sentiment.git
cd twitter-sentiment
A start script will install the needed Python modules and launch the Deephaven IDE.
To run it, execute:
- Twitter provides an API to make it easy to pull public tweets. In order to use this code as-is, you need to also have a Twitter Developer account and copy your Bearer Token.
Free access to the Twitter API is no longer available. However, the code and concepts in this blog are still valid for users with a paid developer account.
- Import the appropriate packages to use each platform. I perform a
pip install
inside Deephaven. This ensures my program runs without any extra modifications needed. For other ways to install Python packages, see our How to install Python packages guide.
import nltk
nltk.download('punkt')
nltk.download('vader_lexicon')
from requests_oauthlib import OAuth1Session
import requests
from datetime import datetime
import time
import re
import json
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from deephaven.DateTimeUtils import convertDateTime, minus, convertPeriod, currentTime
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
import threading
This program is intended to be fine-tuned to fit your data needs. Below are the values you’ll need to change to customize the program for your specific use case and information.
In this example, I perform sentiment analysis on Dogecoin tweets over the course of one week.
-
First, you need the token I mentioned above. Important: the Bearer Token is provided by Twitter and each developer has a monthly limit, so keep this token private.
-
I search for any tweet that contains the term
DOGE
. -
Since there is a tweet-rate-limit and I want to see the tweets for the last seven days, I collect just 10 tweets for each time pull with
max_results = 10
. I recommend using low numbers for testing. When you are ready for production, increase as needed, while keeping in mind the rate limit. -
Next, to see how the sentiment of tweets change with time, I divide those seven days up into discreet
time_bins
. More bins will give you the ability to see more refined changes in the social sentiment, but would also pull in more tweets, which means you hit your rate limit quicker. -
My Twitter access level limits the amount of historical tweets I can pull to seven days, so I set
time_history = 7
. This is the standard for non-academic searches.
bearer_token = '<INPUT YOUR TOKEN HERE>'
search_term = 'DOGE'
max_results = 10
time_bins = 10
time_history = 7
In this section of code, we created the functions needed to pull the data from Twitter.
- Twitter provides a lot of sample code with the v2 API. These functions are pulled from the Github Twitter-API-v2-sample-code repo so that we connect to the needed endpoints with the appropriate authorization.
def create_headers(bearer_token):
headers = {
"Authorization": "Bearer {}".format(bearer_token),
"User-Agent": "v2FullArchiveSearchPython"}
return headers
search_url = "https://api.twitter.com/2/tweets/search/recent"
def connect_to_endpoint(url, headers, params):
response = requests.request("GET", search_url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()
- Tweets contain a lot of metadata that can be useful. Here, I set the fields I like to work with: just the
tweet.fields
anduser.fields
data to keep it simple. Using these fields allows me to weigh tweets based on the popularity of the tweet or user and ignores location information. The rest are left for you to add as needed and might be useful if you want to limit the search to certain places in the world.
def get_query_params(start_time, end_time):
return {'query': search_term,
'start_time': start_time,
'end_time': end_time,
'max_results': max_results,
'tweet.fields': 'id,text,author_id,in_reply_to_user_id,geo,conversation_id,created_at,lang,public_metrics,referenced_tweets,reply_settings,source',
'user.fields': 'id,name,username,created_at,description,public_metrics,verified',
'next_token': {}}
-
Now we have the function that pulls the tweets. I’ve separated it from the previous code to make it easier to change the
query_params
to the date zone you want. -
By default, if given a start time range of seven days ago, only recent tweets will be pulled. Since I want a guarantee of dates in bins, I supply the exact start and end date for each request.
-
This function is called for each time bin and returns all the tweet data requested in JSON format.
def get_tweets(query_params):
headers = create_headers(bearer_token)
json_response = connect_to_endpoint(search_url, headers, query_params)
return(json_response['data'])
Since I’m performing a sentiment analysis on the content of the tweets, I clean each tweet. This is optional but provides a more uniform appearance to the tweets in the table.
def cleanText(text):
text = text.lower()
text = re.sub(r'.([a-zA-Z])', r'. 1', text)
text = re.sub(r'?([a-zA-Z])', r'. 1', text)
text = re.sub(r'!([a-zA-Z])', r'. 1', text)
text = re.sub("q[1-4]", "q", text)
text = re.sub("20[0-2][0-9]", "2000", text)
return text
Next, I run each tweet through the NLTK SentimentIntensityAnalyzer
. This returns the polarity score of the tweet – that is how positive, negative, and neutral a tweet is, as well as the combined score. Often a tweet will be filled with made up words, acronyms and such. These generally are scored as neutral and do not impact the analysis.
def analyze_line(text):
sid = SentimentIntensityAnalyzer()
return(sid.polarity_scores(text))
This last function is needed to create a table to store our data.
-
We use Deephaven’s DynamicTableWriter class, which calls the function for each iteration of the dynamic table.
-
We add to the table for each
time_bins
.
By formatting the data with Deephaven Types, we make it easy to join, filter, summarize, plot and perform other analysis on our table.
def thread_func():
for i in range(1, time_bins):
start_time = str(minus(currentTime(),convertPeriod("T"+str(int(i*(24*time_history)/time_bins))+"H")))[:-9]+'Z'
end_time = str(minus(currentTime(),convertPeriod("T"+str(int((i-1)*(24*time_history)/time_bins))+"H")))[:-9]+'Z'
query_params = get_query_params(start_time, end_time)
all_text = get_tweets(query_params)
for t in all_text:
id = float(t['id'])
combined = analyze_line(cleanText(t['text']))
negative = combined.get('neg')
neutral = combined.get('neu')
compound = combined.get('compound')
positive = combined.get('pos')
dateTime = t['created_at'][:-1]+" NY"
retweet_count = t['public_metrics']['retweet_count']
reply_count = t['public_metrics']['reply_count']
like_count = t['public_metrics']['like_count']
quote_count= t['public_metrics']['quote_count']
tableWriter_sia.logRow(t['text'], float(compound), float(negative), float(neutral), float(positive), float(id),convertDateTime(dateTime), int(retweet_count), int(reply_count), int(like_count), int(quote_count))
- Finally, I create the
tableWriter_sia
and execute the threading to run the above function. This will create a tablesia_data
that fills with the tweets and their metadata, as well as the sentiment of each tweet.
tableWriter_sia = DynamicTableWriter(
["Text", "Compound", "Negative", "Neutral", "Positive", "ID", "DateTime", "Retweet_count", "Reply_count", "Like_count", "Quote_count"],
[dht.string, dht.double, dht.double, dht.double, dht.double, dht.double, dht.datetime, dht.int_, dht.int_, dht.int_, dht.int_])
sia_data = tableWriter_sia.getTable()
thread_sia = threading.Thread(target = thread_func)
thread_sia.start()
Now the fun part. Let’s do some analysis on the tweets so we can see how the search term’s positivity and negativity have changed with time.
First, let’s aggregate the tweets so that we can get a summary of each tweet inside our chosen time bins.
This code:
- Creates a series of averages and weighted averages.
- Creates the
combined_tweets
table that shows us the overall sentiment each minute for our time bins.
from deephaven import agg as agg
agg_list = [
agg.count_(col = "Count"),
agg.avg(cols = ["Average_negative = Negative"]),
agg.avg(cols = ["Average_neutral = Neutral"]),
agg.avg(cols = ["Average_positive = Positive"]),
agg.avg(cols = ["Average_compound = Compound"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_negative = Negative"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_neutral = Neutral"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_positive = Positive"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_compound = Compound"])
]
from deephaven.DateTimeUtils import expressionToNanos, convertDateTime, upperBin
combined_tweets = sia_data.update(formulas = ["Time_bin = upperBin(DateTime, 10000)"]).sort(order_by = ["DateTime"]).agg_by(agg_list, by = [by = ["Time_bin"]).sort("Time_bin"])
The table’s cool, but not as useful as a plot. I use Deephaven’s plotting methods to create a nice visualization of my data.
from deephaven import Plot
sia_averages = Plot.plot("AVG_Neg", combined_tweets, "Time_bin", "Average_negative")
.lineColor(Plot.colorRGB(255,0,0,100))
.plot("AVG_Pos", combined_tweets, "Time_bin", "Average_positive")
.lineColor(Plot.colorRGB(0,255,0,100))
.show()
This code provides a basic starter. You can use it to make your own searches, tie to other programs, or just see how social media is doing.
We hope this program inspires you. If you make something of your own or have an idea to share, we’d love to hear about it on Gitter!
Source link
lol