Why partitioned tables are powerful | Deephaven

Pile-T5


“You don’t have to be an engineer to be a racing driver, but you do have to have Mechanical Sympathy.”
– Jackie Stewart, racing driver

This simple quote has deep meaning when it comes to many facets of life. Its value cannot be understated when it comes to software. In this article, we’ll touch upon an important concept in Deephaven – partitioned tables. Like with auto racing, having a grasp of the mechanics behind a system will enable you to maximize its potential. So, let’s take a closer look at partitioned tables and how they can help you get the most out of Deephaven.

Partitioned tables can help parallelize queries across multiple threads and generally improve performance when used in the right context. They can also make it easier to work with big data in the user interface.

A partitioned table is divided into smaller parts called constituent tables (or subtables). These parts are grouped based on the values of certain columns, called key columns. The values in these key columns decide which constituent table each row belongs to.

A partitioned table can be thought of in two ways:

  1. A vertically stacked list of tables. This list can be combined into a single table with merge.
  2. A map of tables. A constituent table can be retrieved by its key values with get_constituent.

A partitioned table can be created in two ways:

  1. By partitioning a source table.
  2. Directly from a Kafka stream.

This article is focused on why partitioned tables are a powerful tool, so it will only discuss partitioning source tables. For more information on consuming Kafka streams directly into a partitioned table, see consume_to_partitioned_table.

Let’s look at some basic examples of creating and using partitioned tables.

Partition a table into constituent tables

The following code reads a CSV file of insurance data into Deephaven. It then partitions the table by the region column and gets a table containing each partitioned table keys.

from deephaven import read_csv

insurance = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/Insurance/csv/insurance.csv")

insurance_partitioned = insurance.partition_by("region")

partition_keys = insurance_partitioned.keys()

Tables can be partitioned by an arbitrary number of key columns. Consider the following code, which instead partitions the insurance table by 3 key columns: region, smoker, and sex.

insurance_partitioned = insurance.partition_by(["region", "smoker", "sex"])

partition_keys = insurance_partitioned.keys()

Get constituent tables from a partitioned table

To get individual constituent tables from a partitioned table, you need to specify which constituent you want by using the unique key values. The following code gets the constituent for non-smoking women in the Northwest.

insurance_nonsmoking_women_northwest = insurance_partitioned.get_constituent(["northwest", "no", "female"])

To learn more about partitioned tables, head over to our user guide.

Why? is the most important question when considering whether partitioned tables are right for your queries. Let’s examine the convenience and performance benefits they provide.

Convenience

A partitioned table can be used like a map to retrieve constituent tables via their keys. Deephaven’s user interface has a built-in partitioned table viewer that allows you to:

  • View the keys of a partitioned table.
  • View constituent tables.
  • Merge the partitioned table.

Speaking of user interface, deephaven.ui is a powerful tool for creating custom user interfaces. Combining it with partitioned tables enables you to do things like switch between constituents with a single click. Consider the following code, which uses a dropdown menu to choose which partition to plot.

import deephaven.plot.express as dx
from deephaven import time_table
import deephaven.ui as ui

tt = time_table("PT0.25s").update([
"Type = random() < 0.5 ? `Sine` : `Cosine`",
"X = 0.05 * i",
"Y = (Type == `Sine`) ? sin(X) : cos(X)"
])


pt = tt.partition_by("Type")
keys = pt.keys()

@ui.component
def plot_pt(partitioned_table, initial_val):
text, set_text = ui.use_state(initial_val)
ct = ui.use_memo(
lambda: partitioned_table.get_constituent(text.capitalize()) if text != "" else None,
[partitioned_table, text]
)
picker = ui.picker(keys, selected_key=text, on_change=set_text)

return [
picker,
dx.line(
ct, x="X", y="Y", title=f"Partition Key: {text}"
)
if ct != None else ui.text("Please enter a valid partition.")
]

p = plot_pt(pt, "Sine")

img

Performance

Partitioned tables are not a one-size-fits-all solution. They will not always make queries faster. It’s important to understand when and how to use them to get the most out of Deephaven.

Partitioned tables can make queries faster in a couple of different ways.

Parallelization

Partitioned tables can improve query performance by parallelizing operations that standard tables cannot. Take, for example, an as-of join between two tables. If the tables are partitioned by the exact match columns of the join, the join operation is done in parallel.

Partitioned tables are powerful but aren’t a magic wand that improves performance in all cases. Parallelization is best employed when:

  • Shared resources between concurrent tasks are minimal.
  • Partitioned data is dense.
  • Data is sufficiently large.

It’s also worth noting that Python’s Global Interpreter Lock (GIL) prevents threads from running concurrently. Maximizing partitioned table query parallelization requires minimizing the amount of unnecessary Python code invoked.

Let’s look at this parallelization in action. I just made the claim that the join operation in an as-of join is done in parallel if two tables are partitioned by the exact match columns. The following code creates two fake tables of 5 million rows each that contain quote and trade data for seven different “symbols” on four different “exchanges”. It then partitions them on the exact match columns and compares the performance between the standard tables and the partitioned tables:

from deephaven.execution_context import get_exec_ctx
from deephaven import empty_table
from string import ascii_uppercase
from random import choice
from time import time

n_rows = 5_000_000

ctx = get_exec_ctx()

def rand_key(keytype, minval, maxval) -> str:
return keytype + "_" + choice(ascii_uppercase[minval:maxval])

quotes = empty_table(n_rows).update([
"Timestamp = '2024-09-20T00:00:00 ET' + ii * SECOND",
"Exchange = rand_key(`Exchange`, 20, 24)",
"Symbol = rand_key(`Sym`, 0, 7)",
"QuoteSize = randomInt(1, 10)",
"QuotePrice = randomDouble(0, 100)",
])

trades = empty_table(n_rows).update([
"Timestamp = '2024-09-20T00:00:00.1 ET' + ii * SECOND",
"Exchange = rand_key(`Exchange`, 20, 24)",
"Symbol = rand_key(`Sym`, 0, 7)",
"TradeSize = randomInt(1, 10)",
"TradePrice = randomDouble(0, 100)",
])

pt_quotes = quotes.partition_by(["Exchange", "Symbol"])
pt_trades = trades.partition_by(["Exchange", "Symbol"])

def partitioned_aj(t1, t2):
with ctx:
return t1.aj(t2, ["Exchange", "Symbol", "Timestamp"])

start = time()
result = quotes.aj(trades, ["Exchange", "Symbol", "Timestamp"])
end = time()

print(f"Standard table aj: {(end - start):.4f} seconds.")

start = time()
pt_result = pt_quotes.partitioned_transform(pt_trades, partitioned_aj)
end = time()

print(f"Partitioned table aj: {(end - start):.4f} seconds.")

Performance improvements will generally scale with:

  • Size and density of data
  • Hardware resources (number of CPU cores especially)
  • Number of partitions

Tick amplification

Tick amplification happens when cell values are grouped and ungrouped. If any cell contributing to a grouped array changes, the entire grouped array is marked as changed. As a result, a single small change in an input table can result in large sections of an ungrouped output table changing. This can spiral out of control in real-time queries when data is sufficiently large.

The following query demonstrates tick amplification using a table listener. It:

  • Creates a time table that ticks once every 5 seconds (t1).
  • Groups, updates, then ungroups the table (t2).
  • Partitions, updates, then merges the table (t3).

The number of added and modified rows is recorded on each tick. This value is printed for each of the above three operations to show how many rows are modified or added each time t1 ticks.

from deephaven import time_table
from deephaven.table_listener import listen


def print_changes(label, update, is_replay):
added = update.added()
modified = update.modified()
n_added = len(added["X"]) if "X" in added else 0
n_modified = len(modified["X"]) if "X" in modified else 0
changes = n_added + n_modified
print(f"TICK PROPAGATION: {label} {changes} changes")


t1 = time_table("PT5s").update(["A=ii%2", "X=ii"])


t2 = t1.group_by("A").update("Y=X+1").ungroup()


t3 = t1.partition_by("A").proxy().update("Y=X+1").target.merge()


h1 = listen(
t1, lambda update, is_replay: print_changes("RAW ", update, is_replay)
)
h2 = listen(
t2, lambda update, is_replay: print_changes("GROUP/UNGROUP ", update, is_replay)
)
h3 = listen(
t3, lambda update, is_replay: print_changes("PARTITION/MERGE", update, is_replay)
)

The first time t1 ticks, this gets printed:

TICK PROPAGATION: RAW             1 changes
TICK PROPAGATION: GROUP/UNGROUP 1 changes
TICK PROPAGATION: PARTITION/MERGE 1 changes

So far, no issues. But, that’s because t1 has only ticked once. After a little while, here’s the output:

TICK PROPAGATION: RAW             1 changes
TICK PROPAGATION: GROUP/UNGROUP 10 changes
TICK PROPAGATION: PARTITION/MERGE 1 changes

As t1 grows, the grouping and ungrouping operation continues to make more and more changes, whereas the partition/merge only makes one.

When the data is grouped, the engine cannot know that only one cell in a grouped array has changed, and it must recalculate the entire group. However, when the data is partitioned, only a single row in a single partition changes.

Partitioned tables are a powerful tool to help you get the most out of Deephaven. If used correctly, they can boost performance and convenience in queries, particularly ones on big data. Consider partitioned tables for your applications if:

  • Your data is dense.
  • You need to parallelize queries across multiple threads.
  • You have user interfaces that frequently request subtables by one or more key columns.

Our Slack community continues to grow! It’s a great place to learn if partitioned tables are right for your Deephaven applications. Reach out to us with any questions, comments, or feedback. We’d love to hear from you!



Source link
lol

By stp2y

Leave a Reply

Your email address will not be published. Required fields are marked *

No widgets found. Go to Widget page and add the widget in Offcanvas Sidebar Widget Area.