Rapid grouping and ungrouping | Deephaven

Rapid grouping and ungrouping | Deephaven


Deephaven is commonly used to manipulate huge amounts of data — multiple tables, billions of rows, and hundreds of columns. It’s built to excel in both versatility of operations and speed of execution.

In this article, we’ll explore how Deephaven optimizes memory usage for rapid grouping and ungrouping, and then cover how choosing the right selection method can change the execution time of operations by orders of magnitude. Finally, we’ll delve into strategies you can employ to write the most efficient queries possible.

Deephaven saves time automatically where possible, but it’s still up to you to write efficient queries. We’ll offer techniques — both simple and more complex — to help you maximize the speed of your Deephaven queries.

Data often needs to be organized and sorted before performing any of Deephaven’s complex table operations. That means a lot of moving individual pieces of data in and out of various groups.

Whether grouping or ungrouping, moving large amounts of data around has to be a lot of work, and thus take a lot of time. After all, every bit of data being moved has to be copied from one place to another. Right? Not quite.

Deephaven has loads of built-in optimizations that eliminate redundant operations, resulting in massive performance boosts for computationally expensive queries.

For instance, Deephaven often accomplishes grouping and ungrouping via smart indexing operations instead of brute-force copying. Here’s an example of Deephaven’s speed when grouping and ungrouping 100 columns of 10 million rows:

from deephaven import empty_table
from decimal import Decimal
from time import time
import numpy as np
from deephaven.csv import write


t = empty_table(10_000_000).update(["A=i%2"] + [f"X{x}=i" for x in range(100)])


start_time = time()
t_grouped = t.group_by("A")
end_time = time()
group_time = end_time - start_time

write(t_grouped, "/data/grouped.csv")

print(f"Time to group: {int(group_time*1000)} milliseconds.")

Time to group: 482 milliseconds.

To avoid some optimizations that happen under the hood and get an accurate time for the ungrouping operation, we’ll have to create a new worker for the second half of this test. Then, we’ll use that worker to read in and then ungroup the data that we just grouped in the previous code block.

from deephaven.csv import read
from time import time

t_grouped = read("/data/grouped.csv")

start_time = time()
t_ungrouped = t_grouped.ungroup()
end_time = time()
ungroup_time = end_time - start_time

print(f"Time to ungroup: {int(ungroup_time*1000)} milliseconds.")

Time to ungroup: 19 milliseconds.

For testing, the code above was run 10 times, each from a fresh Deephaven Community Core worker with 4GB of RAM. Here are the averages from those 10 runs:

  • 431 milliseconds to group the data, or 43.1 nanoseconds per row.
  • 20 milliseconds to ungroup the data, or 2 nanoseconds per row.

Deephaven was restarted between each run of both code blocks above in order to avoid optimizations that would skew results. In this case, the query engine memoizes the grouping and ungrouping operations results, eliminating redundant calculations. If the Deephaven worker wasn’t restarted in between runs, measurements of execution time would be inaccurate, as the engine performs significantly less work when the memoized result is available after the first timed run.

In the previous example, we used update to create new columns. The update method immediately computes and stores all of the new column values.

The update_view table operation produces identical results to update, but only stores the formula used for the new column(s). Calculations are then performed on-demand when the UI needs to display results or when results are required for a downstream operation. These results are never saved to memory – update_view will have to recalculate the results every time they are needed. This can be a huge time-saver if the results are needed infrequently, but can take much longer downstream than update if the results are needed many times.

We can draw a comparison between update_view and the video game design practice of frustum culling. In a video game, the player can only see a small portion of the game world at any given time. The space within the player’s field of view is the frustum, and frustum culling is the practice of selectively rendering only the portion of the game world that the player can see.

Modern video games with massive in-game worlds heavily rely on frustum culling to reduce loading times, and Deephaven queries are no different – they may greatly benefit from deferred calculations.

Let’s use the following two examples to compare update and update_view. To ensure that we don’t run out of heap space, we will use a table with only 135 million rows. Using new, separate containers, we will perform a summing operation:

The code for this example has been collapsed for readability. Click here to view the code.

from deephaven import empty_table
from decimal import Decimal
from time import time
import deephaven.dtypes as dht
import numpy as np


t = empty_table(300*10000*90//2).update_view(["A=i%10000", "B=2*i%10000", "Offset=1+i%300", "V=sin(A)*i*1000", "P=sin(A)", "X=sin(B)"])

The next two code blocks are run separately, on fresh Deephaven workers, to ensure that the results are not skewed by memoization.

t_grouped = t.group_by(["A", "Offset"])
start_time = time()
aggregated_update = t_grouped.update_view("VCalc=sum(V+P+X)")
end_time = time()
elapsed_time_update_view = end_time - start_time
aggregated_update = aggregated_update.view(["A", "Offset", "VCalc"])

print(f"Deephaven took {elapsed_time_update_view} seconds to perform the aggregation on 135 million rows of ints, using `update_view`.")
t_grouped = t.group_by(["A", "Offset"])
start_time = time()
aggregated_update = t_grouped.update("VCalc=sum(V+P+X)")
end_time = time()
elapsed_time_update = end_time - start_time
aggregated_update = aggregated_update.view(["A", "Offset", "VCalc"])

print(f"Deephaven took {elapsed_time_update} seconds to perform the aggregation on 135 million rows of ints, using `update`.")

Deephaven took 0.22206854820251465 seconds to perform the aggregation on 135 million rows of ints, using `update_view`.

Deephaven took 16.491493701934814 seconds to perform the aggregation on 135 million rows of ints, using `update`.

By repeating the above tests ten times each — again, with a fresh Deephaven worker each time — we get the following average results:

  • 0.26 seconds using update_view – 1.94 nanoseconds per row.
  • 17.93 seconds using update – 132.8 nanoseconds per row.

As you can see, the aggregation using update_view is significantly faster than the aggregation using update. This is because update_view only computes the values that are within the view of the UI or are needed by downstream operations. For example, in the code above, update_view doesn’t have to perform the calculations right away — so each row completes almost instantly. In contrast, update computes all of the values immediately and stores them in memory.

Let’s do one last comparison to see how these two queries affect the performance of downstream operations. We’ll re-run the two above code blocks, and add then time two additional operations:

start_time = time()
overallSum = aggregated_update.select("VCalc").sum_by()
overallAvg = aggregated_update.select("VCalc").avg_by()
end_time = time()

elapsed_time_downstream = end_time - start_time

print(f"Deephaven took {elapsed_time_downstream} seconds to perform the downstream operations.")

Deephaven took 40.14136099815369 seconds to perform the downstream operations using `update_view`.

Deephaven took 0.019984722137451172 seconds to perform the downstream operations using `update`.

By repeating this test ten times each with the aggregated_update tables from the previous two code blocks, we get the following results:

  • 36.20 seconds on the table created using update_view – 268.16 nanoseconds per row.
  • 0.016 seconds on the table created using update – 0.12 nanoseconds per row.

The operation completes almost instantly on the table that we created using update, because the previous calculations have already been done. The operation takes more than two thousand times longer on the table that we created using update_view, because the calculations were deferred and instead have to be performed on-demand every time a downstream operation requires results from the original query.

If we combine the times for both parts of the above test, we see that the overall time to complete the entire operation is 36.46 seconds for the table created using update_view and 17.95 seconds for the table created using update. While the table created using update_view is faster to create, it is significantly slower to use in downstream operations – and the more downstream operations there are, the more pronounced the performance difference between update and update_view will be. Meanwhile, it takes 20 seconds to create the source table using update, but the downstream operations complete almost instantly.

Choosing the right selection method is a simple way to streamline your queries. See our full guide for choosing a selection method here.

Deephaven saves time automatically where possible, but it’s still up to you to write efficient queries. Large data sets amplify the effect of small inefficiencies in queries, resulting in significant performance degradation. If you’re working with real-time or large data, query efficiency quickly becomes paramount. See our guide Think Like a Deephaven Ninja for tips on writing efficient queries.

Beyond making sure you’ve chosen the correct selection method for your use case, you may be able to further improve query efficiency by changing the way you represent your logic as a function.

In the examples below, we can see up to a 7x difference in speed depending on how we write our query. We’ll break it down into parts, as it’s pretty complex.

First, we’ll import all the libraries we need.

Click here to view imports.
from typing import Union, Sequence, Tuple, List
from deephaven.table import Table
from deephaven.updateby import rolling_group_tick
from math import sqrt
from deephaven.column import string_col, long_col, double_col
from deephaven import empty_table, new_table
from deephaven import agg
from datetime import datetime
import numba as nb
import numpy as np

Next, we’ll define a function that groups and aggregates data. This function will be used in all of our examples.

def grouping_agg(t: Table, by: Sequence[str], formulas=Sequence[str]) -> Table:
rst_cols = []
rst_cols.extend(by)
rst_cols.extend([f.split("=")[0].strip() for f in formulas])

rst = t.group_by(by).update(formulas).view(rst_cols)
return rst

Now, we want to compare several different ways of writing a function for our query. We’ll define functions that calculate its first input plus the square root of its second input. The functions are:

Deephaven’s implementation of guvectorize does not currently use advanced features like chunking, which means it won’t be as fast as it could be in this example. Improvements to guvectorize are planned for future releases.

def custom_func_python(x, y) -> float:
return sum(x) + sqrt(len(y))

def custom_func_numpy(x, y) -> float:
return np.sum(x) + np.sqrt(len(y))

@nb.guvectorize([(nb.float64[:],nb.float64[:],nb.float64[:])],"(m),(m)->(m)",nopython=True)
def custom_func_numba(x, y, rst):
rst[:] = sum(x) + sqrt(len(y))

@nb.guvectorize([(nb.float64[:],nb.float64[:],nb.float64[:])],"(m),(m)->(m)",nopython=True)
def custom_func_numbanumpy(x, y, rst):
rst[:] = np.sum(x) + sqrt(len(y))

We’re almost ready, but we’ll need a way to time the above functions as they run. To do that, we define a function that calls each in a table operation, and time the results:

def run_it(label: str, formulas: List[str], n_row: int, n_group: int):
t = empty_table(n_row).update(["Id=ii%n_group", "Offset=ii%5", "Value1=random()", "Value2=random()", "OtherCol=1"])

start = datetime.now()
t1 = grouping_agg(t, by=["Id"], formulas=formulas)
stop = datetime.now()
dt = stop - start
sec_per_eval = dt.total_seconds()
ns_per_row = sec_per_eval / n_row * 1e9
print(f"TIME: {n_row} {n_group} {label}:t{ns_per_row:.2f} ns/row")
return (label, n_row, n_group, ns_per_row)

Now, we need to define a few parameters to feed the function we just made:

  • n_rows is a list of the number of rows to use in our test.
  • n_groups is a list of the number of groups to use in our test.
  • n_repeat is the number of times to repeat the test.

We’ll also create an empty array to hold our result data:

n_rows = [10_000_000]
n_groups = [1000]

data = []

It’s time to put everything together. In the next code block, we will create a loop that runs each of our functions on each of our test cases. Lastly, a table is produced that shows the performance data. We use format_columns to create a heatmap of the results, with the fastest results in green and the slowest in dark gray.

for n_group in n_groups:
for n_row in n_rows:
data.append(run_it("Java+BuiltIn", ["F=sum(Value1)+sqrt(Value2.size())"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Py+Cast", ["F = (double) custom_func_python(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Py", ["F = custom_func_python(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Numpy", ["F = custom_func_numpy(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Numba", ["F = custom_func_numba(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+NumbaNumpy", ["F = custom_func_numbanumpy(Value1,Value2)"], n_row=n_row, n_group=n_group))

perf = new_table([
string_col("Label", [x[0] for x in data]),
long_col("NRow", [x[1] for x in data]),
long_col("NGroup", [x[2] for x in data]),
double_col("NSperRow", [x[3] for x in data]),
])
To view the entire above example in one code block, click here.
from typing import Union, Sequence, Tuple, List
from typing import Union, Sequence, Tuple, List
from deephaven.table import Table
from deephaven.updateby import rolling_group_tick
from math import sqrt
from deephaven.column import string_col, long_col, double_col
from deephaven import empty_table, new_table
from deephaven import agg
from datetime import datetime
import numba as nb
import numpy as np

def grouping_agg(t: Table, by: Sequence[str], formulas=Sequence[str]) -> Table:
rst_cols = []
rst_cols.extend(by)
rst_cols.extend([f.split("=")[0].strip() for f in formulas])

rst = t.group_by(by).update(formulas).view(rst_cols)
return rst

def custom_func_python(x, y) -> float:
return sum(x) + sqrt(len(y))

def custom_func_numpy(x, y) -> float:
return np.sum(x) + np.sqrt(len(y))

@nb.guvectorize([(nb.float64[:],nb.float64[:],nb.float64[:])],"(m),(m)->(m)",nopython=True)
def custom_func_numba(x, y, rst):
rst[:] = sum(x) + sqrt(len(y))

@nb.guvectorize([(nb.float64[:],nb.float64[:],nb.float64[:])],"(m),(m)->(m)",nopython=True)
def custom_func_numbanumpy(x, y, rst):
rst[:] = np.sum(x) + sqrt(len(y))

def run_it(label: str, formulas: List[str], n_row: int, n_group: int):
t = empty_table(n_row).update(["Id=ii%n_group", "Offset=ii%5", "Value1=random()", "Value2=random()", "OtherCol=1"])

start = datetime.now()
t1 = grouping_agg(t, by=["Id"], formulas=formulas)
stop = datetime.now()
dt = stop - start
sec_per_eval = dt.total_seconds()
ns_per_row = sec_per_eval / n_row * 1e9
print(f"TIME: {n_row} {n_group} {label}:t{ns_per_row:.2f} ns/row")
return (label, n_row, n_group, ns_per_row)

n_rows = [10_000_000,]
n_groups = [1000]

data = []

for n_group in n_groups:
for n_row in n_rows:
data.append(run_it("Java+BuiltIn", ["F=sum(Value1)+sqrt(Value2.size())"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Py+Cast", ["F = (double) custom_func_python(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Py", ["F = custom_func_python(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Numpy", ["F = custom_func_numpy(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+Numba", ["F = custom_func_numba(Value1,Value2)"], n_row=n_row, n_group=n_group))
data.append(run_it("Custom+NumbaNumpy", ["F = custom_func_numbanumpy(Value1,Value2)"], n_row=n_row, n_group=n_group))

perf = new_table([
string_col("Label", [x[0] for x in data]),
long_col("NRow", [x[1] for x in data]),
long_col("NGroup", [x[2] for x in data]),
double_col("NSperRow", [x[3] for x in data]),
])

By repeating the above tests ten times each — again, with a fresh Deephaven worker each time — we get the following average results:

As you can see, "Custom+Numpy" is the fastest in this case, followed closely by the query that uses Java and Deephaven’s built-in methods. However, queries that use a combination of Java and built-in methods are almost always the fastest.

Our Community documentation has all of the resources you need to become a Deephaven power user. Our Slack community continues to grow, and we’d love to have you join us! If you have any questions, comments, or suggestions, please reach out to us there.



Source link
lol

By stp2y

Leave a Reply

Your email address will not be published. Required fields are marked *

No widgets found. Go to Widget page and add the widget in Offcanvas Sidebar Widget Area.