from deephaven.experimental.table_data_service import (
TableDataServiceBackend,
TableKey,
TableLocationKey,
TableDataService,
)
from typing import Callable, Optional, Dict
import pyarrow as pa
class TableKeyImpl(TableKey):
"""
A simple implementation of a TableKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableKeyImpl{{{self.key}}}"
class TableLocationKeyImpl(TableLocationKey):
"""
A simple implementation of a TableLocationKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableLocationKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableLocationKeyImpl{{{self.key}}}"
class TestTable:
"""
A custom table implementation for the backend.
"""
class TestTableLocation:
"""
A custom table location implementation for the backend.
"""
def __init__(
self, data_schema: pa.Schema, partitioning_values: Optional[pa.Table]
):
self.partitioning_values = partitioning_values
self.size_cb: Callable[[int], None] = lambda *x: x
self.size_failure_cb: Callable[[], None] = lambda *x: x
self.data: pa.Table = data_schema.empty_table()
def append_data(self, new_data: pa.Table):
"""
Append data to the table in batches.
"""
rbs = self.data.to_batches()
for batch in new_data.to_batches():
rbs.append(batch)
self.data = pa.Table.from_batches(rbs)
self.size_cb(self.data.num_rows)
def __init__(
self, data_schema: pa.Schema, partitioning_column_schema: Optional[pa.Schema]
):
self.data_schema = data_schema
self.partitioning_column_schema = partitioning_column_schema
self.locations: Dict[TableLocationKey, self.TestTableLocation] = {}
self.location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None] = (
lambda *x: x
)
self.location_failure_cb: Callable[[str], None] = lambda *x: x
def add_table_location(
self,
table_location_key: TableLocationKeyImpl,
partitioning_column_values: Optional[pa.Table],
data_values: pa.Table,
):
"""
Add a new location to the table.
"""
if table_location_key in self.locations:
raise ValueError(
f"Cannot add table location {table_location_key} already exists"
)
new_location = self.TestTableLocation(
self.data_schema, partitioning_column_values
)
new_location.append_data(data_values)
self.locations[table_location_key] = new_location
def append_table_location(
self, table_location_key: TableLocationKeyImpl, data_values: pa.Table
):
"""
Append data to an existing location in the table.
"""
if table_location_key not in self.locations:
raise ValueError(
f"Cannot append to non-existent table location {table_location_key}"
)
self.locations[table_location_key].append_data(data_values)
class TestBackend(TableDataServiceBackend):
"""
A custom implementation of the TableDataServiceBackend for testing.
"""
def __init__(self):
self.tables: Dict[TableKey, TestTable] = {}
def add_table(self, table_key: TableKeyImpl, table: TestTable):
"""
Add a new table to the backend.
"""
if table_key in self.tables:
raise ValueError(f"{table_key} already exists")
self.tables[table_key] = table
def table_schema(
self,
table_key: TableKeyImpl,
schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the schema of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
schema_cb(table.data_schema, table.partitioning_column_schema)
def table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the locations of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
for key, location in self.tables[table_key].locations:
location_cb([key, location.partitioning_values])
success_cb()
def table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the size of a table location with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
size_cb(table.locations[table_location_key].data.num_rows)
def column_values(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
col: str,
offset: int,
min_rows: int,
max_rows: int,
values_cb: Callable[[pa.Table], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch column values with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
location = table.locations[table_location_key]
values_cb(location.data.select([col]).slice(offset, min_rows))
def subscribe_to_table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table locations with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
table.location_cb = location_cb
table.location_failure_cb = failure_cb
for key, location in table.locations.items():
location_cb(key, location.partitioning_values)
success_cb()
def unsubscribe():
table.location_cb = lambda *x: x
table.location_failure_cb = lambda *x: x
return unsubscribe
def subscribe_to_table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table location size with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return lambda *x: x
location = table.locations[table_location_key]
location.size_cb = size_cb
location.failure_cb = failure_cb
size_cb(location.data.num_rows)
success_cb()
def unsubscribe():
location.size_cb = lambda *x: x
location.failure_cb = lambda *x: x
return unsubscribe
Source link
lol