Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions python/tests/test_tsfile_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,23 +688,37 @@ def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):


def test_reader_read_series_by_row_retries_across_native_row_query_boundaries():
class _FakeColumn:
def __init__(self, values):
self._values = values

def to_numpy(self):
return np.asarray(self._values)

class _FakeArrowTable:
def __init__(self, rows):
self._rows = rows
self.num_rows = len(rows)

def column(self, name):
return _FakeColumn([row[name] for row in self._rows])

class _FakeResultSet:
def __init__(self, rows):
self._rows = rows
self._index = -1

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
return False

def next(self):
self._index += 1
return self._index < len(self._rows)

def get_value_by_name(self, name):
return self._rows[self._index][name]
def read_arrow_batch(self):
if self._rows is None:
return None
rows = self._rows
self._rows = None
return _FakeArrowTable(rows)

class _FakeNativeReader:
def __init__(self, timestamps, values, boundary):
Expand All @@ -713,11 +727,18 @@ def __init__(self, timestamps, values, boundary):
self._boundary = boundary

def query_table_by_row(
self, table_name, column_names, offset=0, limit=-1, tag_filter=None
self,
table_name,
column_names,
offset=0,
limit=-1,
tag_filter=None,
batch_size=0,
):
assert table_name == "pvf"
assert column_names == ["totalcloudcover"]
assert tag_filter is None
assert batch_size > 0
if limit < 0:
stop = len(self._timestamps)
else:
Expand Down Expand Up @@ -748,6 +769,9 @@ def query_table_by_row(
np.testing.assert_array_equal(ts_arr, np.arange(5, 17, dtype=np.int64))
np.testing.assert_array_equal(values, np.arange(5, 17, dtype=np.float64))

values_only = reader.read_series_values_by_row(device_id, 0, 5, 12)
np.testing.assert_array_equal(values_only, np.arange(5, 17, dtype=np.float64))


def test_series_path_resolution_allows_prefix_tag_values():
catalog = MetadataCatalog()
Expand Down
102 changes: 45 additions & 57 deletions python/tsfile/dataset/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,32 +210,6 @@ def _build_device_entry(refs: List[DeviceRef]) -> dict:
}


def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict:
"""Build shared-timeline series stats from native timeline metadata."""
min_time = None
max_time = None
count = 0

for reader, device_id, field_idx in refs:
info = reader.get_series_info_by_ref(device_id, field_idx)
shard_min = info["timeline_min_time"]
shard_max = info["timeline_max_time"]
shard_count = info["timeline_length"]

if shard_count == 0:
continue

count += shard_count
min_time = shard_min if min_time is None else min(min_time, shard_min)
max_time = shard_max if max_time is None else max(max_time, shard_max)

return {
"min_time": min_time,
"max_time": max_time,
"count": count,
}


def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> np.ndarray:
"""Load and merge the full timestamp axis for one logical series on demand."""
# This is intentionally lazy because it is one of the most expensive dataset
Expand Down Expand Up @@ -269,62 +243,73 @@ def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> np.ndarr
return merged_timestamps


def _read_field_by_position(
def _build_position_read_info(reader, device_id: int, field_idx: int):
series_info = reader.get_series_info_by_ref(device_id, field_idx)
return {
"length": series_info["timeline_length"],
"min_time": series_info["timeline_min_time"],
"max_time": series_info["timeline_max_time"],
"table_name": series_info["table_name"],
"column_name": series_info["column_name"],
"device_id": series_info["device_id"],
"field_idx": series_info["field_idx"],
"tag_columns": series_info["tag_columns"],
"tag_values": series_info["tag_values"],
}


def _read_field_values_by_position(
series_name: str,
refs: List[SeriesRef],
offset: int,
limit: int,
) -> Tuple[np.ndarray, np.ndarray]:
"""Read one logical series by global position without materializing timestamps for non-overlapping shards."""
) -> np.ndarray:
"""Read one logical series by global position and return values only."""
if limit <= 0:
return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
return np.array([], dtype=np.float64)

if len(refs) == 1:
reader, device_id, field_idx = refs[0]
return reader.read_series_values_by_row(device_id, field_idx, offset, limit)

infos = []
ref_infos = []
for reader, device_id, field_idx in refs:
series_info = reader.get_series_info_by_ref(device_id, field_idx)
infos.append(
{
"length": series_info["timeline_length"],
"min_time": series_info["timeline_min_time"],
"max_time": series_info["timeline_max_time"],
"table_name": series_info["table_name"],
"column_name": series_info["column_name"],
"device_id": series_info["device_id"],
"field_idx": series_info["field_idx"],
"tag_columns": series_info["tag_columns"],
"tag_values": series_info["tag_values"],
}
)
info = _build_position_read_info(reader, device_id, field_idx)
if info["length"] <= 0:
continue
ref_infos.append(((reader, device_id, field_idx), info))
if not ref_infos:
return np.array([], dtype=np.float64)

ordered = sorted(
zip(refs, infos), key=lambda item: (item[1]["min_time"], item[1]["max_time"])
ref_infos, key=lambda item: (item[1]["min_time"], item[1]["max_time"])
)
if _has_time_range_overlap([info for _, info in ordered]):
return _read_field_by_position_overlap(series_name, ordered, offset, limit)
_, values = _read_field_by_position_overlap(series_name, ordered, offset, limit)
return values

remaining_offset = offset
remaining_limit = limit
time_parts = []
value_parts = []
for (reader, device_id, field_idx), info in ordered:
shard_count = info["length"]
if remaining_offset >= shard_count:
remaining_offset -= shard_count
continue
local_limit = min(remaining_limit, shard_count - remaining_offset)
ts_arr, values = reader.read_series_by_row(
values = reader.read_series_values_by_row(
device_id, field_idx, remaining_offset, local_limit
)
if len(ts_arr) > 0:
time_parts.append(ts_arr)
if len(values) > 0:
value_parts.append(values)
remaining_limit -= local_limit
remaining_offset = 0
if remaining_limit <= 0:
break

if not time_parts:
return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
return np.concatenate(time_parts), np.concatenate(value_parts)
if not value_parts:
return np.array([], dtype=np.float64)
return np.concatenate(value_parts)


def _has_time_range_overlap(infos: List[dict]) -> bool:
Expand Down Expand Up @@ -815,13 +800,16 @@ def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries:
return Timeseries(
series_name,
self._index.series_ref_map[series_ref],
_build_runtime_series_stats(self._index.series_ref_map[series_ref]),
self._cache.field_stats[series_ref],
self._assert_open,
lambda: _merge_field_timestamps(
series_name, self._index.series_ref_map[series_ref]
),
lambda offset, limit: _read_field_by_position(
series_name, self._index.series_ref_map[series_ref], offset, limit
lambda offset, limit: _read_field_values_by_position(
series_name,
self._index.series_ref_map[series_ref],
offset,
limit,
),
)

Expand Down
Loading
Loading