diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index f79a6d466..8ebdeb639 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -688,10 +688,24 @@ def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path): def test_reader_read_series_by_row_retries_across_native_row_query_boundaries(): + class _FakeColumn: + def __init__(self, values): + self._values = values + + def to_numpy(self): + return np.asarray(self._values) + + class _FakeArrowTable: + def __init__(self, rows): + self._rows = rows + self.num_rows = len(rows) + + def column(self, name): + return _FakeColumn([row[name] for row in self._rows]) + class _FakeResultSet: def __init__(self, rows): self._rows = rows - self._index = -1 def __enter__(self): return self @@ -699,12 +713,12 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): return False - def next(self): - self._index += 1 - return self._index < len(self._rows) - - def get_value_by_name(self, name): - return self._rows[self._index][name] + def read_arrow_batch(self): + if self._rows is None: + return None + rows = self._rows + self._rows = None + return _FakeArrowTable(rows) class _FakeNativeReader: def __init__(self, timestamps, values, boundary): @@ -713,11 +727,18 @@ def __init__(self, timestamps, values, boundary): self._boundary = boundary def query_table_by_row( - self, table_name, column_names, offset=0, limit=-1, tag_filter=None + self, + table_name, + column_names, + offset=0, + limit=-1, + tag_filter=None, + batch_size=0, ): assert table_name == "pvf" assert column_names == ["totalcloudcover"] assert tag_filter is None + assert batch_size > 0 if limit < 0: stop = len(self._timestamps) else: @@ -748,6 +769,9 @@ def query_table_by_row( np.testing.assert_array_equal(ts_arr, np.arange(5, 17, dtype=np.int64)) np.testing.assert_array_equal(values, np.arange(5, 17, dtype=np.float64)) + values_only = reader.read_series_values_by_row(device_id, 0, 5, 12) + np.testing.assert_array_equal(values_only, np.arange(5, 17, dtype=np.float64)) + def test_series_path_resolution_allows_prefix_tag_values(): catalog = MetadataCatalog() diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 40149102a..aff4f4f91 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -210,32 +210,6 @@ def _build_device_entry(refs: List[DeviceRef]) -> dict: } -def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict: - """Build shared-timeline series stats from native timeline metadata.""" - min_time = None - max_time = None - count = 0 - - for reader, device_id, field_idx in refs: - info = reader.get_series_info_by_ref(device_id, field_idx) - shard_min = info["timeline_min_time"] - shard_max = info["timeline_max_time"] - shard_count = info["timeline_length"] - - if shard_count == 0: - continue - - count += shard_count - min_time = shard_min if min_time is None else min(min_time, shard_min) - max_time = shard_max if max_time is None else max(max_time, shard_max) - - return { - "min_time": min_time, - "max_time": max_time, - "count": count, - } - - def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> np.ndarray: """Load and merge the full timestamp axis for one logical series on demand.""" # This is intentionally lazy because it is one of the most expensive dataset @@ -269,41 +243,53 @@ def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> np.ndarr return merged_timestamps -def _read_field_by_position( +def _build_position_read_info(reader, device_id: int, field_idx: int): + series_info = reader.get_series_info_by_ref(device_id, field_idx) + return { + "length": series_info["timeline_length"], + "min_time": series_info["timeline_min_time"], + "max_time": series_info["timeline_max_time"], + "table_name": series_info["table_name"], + "column_name": series_info["column_name"], + "device_id": series_info["device_id"], + "field_idx": series_info["field_idx"], + "tag_columns": series_info["tag_columns"], + "tag_values": series_info["tag_values"], + } + + +def _read_field_values_by_position( series_name: str, refs: List[SeriesRef], offset: int, limit: int, -) -> Tuple[np.ndarray, np.ndarray]: - """Read one logical series by global position without materializing timestamps for non-overlapping shards.""" +) -> np.ndarray: + """Read one logical series by global position and return values only.""" if limit <= 0: - return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + return np.array([], dtype=np.float64) + + if len(refs) == 1: + reader, device_id, field_idx = refs[0] + return reader.read_series_values_by_row(device_id, field_idx, offset, limit) - infos = [] + ref_infos = [] for reader, device_id, field_idx in refs: - series_info = reader.get_series_info_by_ref(device_id, field_idx) - infos.append( - { - "length": series_info["timeline_length"], - "min_time": series_info["timeline_min_time"], - "max_time": series_info["timeline_max_time"], - "table_name": series_info["table_name"], - "column_name": series_info["column_name"], - "device_id": series_info["device_id"], - "field_idx": series_info["field_idx"], - "tag_columns": series_info["tag_columns"], - "tag_values": series_info["tag_values"], - } - ) + info = _build_position_read_info(reader, device_id, field_idx) + if info["length"] <= 0: + continue + ref_infos.append(((reader, device_id, field_idx), info)) + if not ref_infos: + return np.array([], dtype=np.float64) + ordered = sorted( - zip(refs, infos), key=lambda item: (item[1]["min_time"], item[1]["max_time"]) + ref_infos, key=lambda item: (item[1]["min_time"], item[1]["max_time"]) ) if _has_time_range_overlap([info for _, info in ordered]): - return _read_field_by_position_overlap(series_name, ordered, offset, limit) + _, values = _read_field_by_position_overlap(series_name, ordered, offset, limit) + return values remaining_offset = offset remaining_limit = limit - time_parts = [] value_parts = [] for (reader, device_id, field_idx), info in ordered: shard_count = info["length"] @@ -311,20 +297,19 @@ def _read_field_by_position( remaining_offset -= shard_count continue local_limit = min(remaining_limit, shard_count - remaining_offset) - ts_arr, values = reader.read_series_by_row( + values = reader.read_series_values_by_row( device_id, field_idx, remaining_offset, local_limit ) - if len(ts_arr) > 0: - time_parts.append(ts_arr) + if len(values) > 0: value_parts.append(values) remaining_limit -= local_limit remaining_offset = 0 if remaining_limit <= 0: break - if not time_parts: - return np.array([], dtype=np.int64), np.array([], dtype=np.float64) - return np.concatenate(time_parts), np.concatenate(value_parts) + if not value_parts: + return np.array([], dtype=np.float64) + return np.concatenate(value_parts) def _has_time_range_overlap(infos: List[dict]) -> bool: @@ -815,13 +800,16 @@ def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries: return Timeseries( series_name, self._index.series_ref_map[series_ref], - _build_runtime_series_stats(self._index.series_ref_map[series_ref]), + self._cache.field_stats[series_ref], self._assert_open, lambda: _merge_field_timestamps( series_name, self._index.series_ref_map[series_ref] ), - lambda offset, limit: _read_field_by_position( - series_name, self._index.series_ref_map[series_ref], offset, limit + lambda offset, limit: _read_field_values_by_position( + series_name, + self._index.series_ref_map[series_ref], + offset, + limit, ), ) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 4899b2bf9..370af92e7 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -20,7 +20,7 @@ import os import sys -from typing import Dict, Iterator, List, Tuple +from typing import Dict, Iterator, List, Optional, Tuple import numpy as np @@ -43,6 +43,10 @@ TSDataType.TIMESTAMP, } +# Use by-row batch mode so position reads consume TsBlocks/Arrow batches +# instead of iterating one row at a time through ResultSet.next(). +_ROW_BATCH_SIZE = 8192 + def _to_python_scalar(value): return value.item() if hasattr(value, "item") else value @@ -374,25 +378,29 @@ def read_series_by_row( next_offset = offset while remaining > 0: - batch_timestamps = [] - batch_values = [] with self._reader.query_table_by_row( table_entry.table_name, [field_name], offset=next_offset, limit=remaining, tag_filter=tag_filter, + batch_size=min(remaining, _ROW_BATCH_SIZE), ) as result_set: - while result_set.next(): - batch_timestamps.append(result_set.get_value_by_name("time")) - value = result_set.get_value_by_name(field_name) - batch_values.append(np.nan if value is None else float(value)) + batch_timestamps, batch_field_values = ( + self._collect_arrow_numeric_batches( + result_set, + [field_name], + include_timestamps=True, + table_name=table_entry.table_name, + ) + ) + batch_values = batch_field_values[field_name] - if not batch_timestamps: + if len(batch_timestamps) == 0: break - timestamp_parts.append(np.asarray(batch_timestamps, dtype=np.int64)) - value_parts.append(np.asarray(batch_values, dtype=np.float64)) + timestamp_parts.append(batch_timestamps) + value_parts.append(batch_values) read_count = len(batch_timestamps) next_offset += read_count remaining -= read_count @@ -403,6 +411,53 @@ def read_series_by_row( return timestamp_parts[0], value_parts[0] return np.concatenate(timestamp_parts), np.concatenate(value_parts) + def read_series_values_by_row( + self, device_id: int, field_idx: int, offset: int, limit: int + ) -> np.ndarray: + """Read one logical series by device-local row offset/limit and return values only.""" + if limit <= 0: + return np.array([], dtype=np.float64) + + table_entry, device_entry, field_name = self._resolve_series_ref( + device_id, field_idx + ) + tag_values = dict(zip(table_entry.tag_columns, device_entry.tag_values)) + tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None + value_parts = [] + remaining = limit + next_offset = offset + + while remaining > 0: + with self._reader.query_table_by_row( + table_entry.table_name, + [field_name], + offset=next_offset, + limit=remaining, + tag_filter=tag_filter, + batch_size=min(remaining, _ROW_BATCH_SIZE), + ) as result_set: + _, field_values = self._collect_arrow_numeric_batches( + result_set, + [field_name], + include_timestamps=False, + table_name=table_entry.table_name, + ) + batch_values = field_values[field_name] + + if len(batch_values) == 0: + break + + value_parts.append(batch_values) + read_count = len(batch_values) + next_offset += read_count + remaining -= read_count + + if not value_parts: + return np.array([], dtype=np.float64) + if len(value_parts) == 1: + return value_parts[0] + return np.concatenate(value_parts) + def read_device_fields_by_time_range( self, device_id: int, field_indices: List[int], start_time: int, end_time: int ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: @@ -435,8 +490,6 @@ def _read_arrow( tag_columns = list(tag_columns) field_columns = list(field_columns) query_columns = list(field_columns) - timestamp_parts = [] - field_parts = {field_column: [] for field_column in field_columns} tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None with self._reader.query_table( @@ -447,47 +500,83 @@ def _read_arrow( tag_filter=tag_filter, batch_size=65536, ) as result_set: - while True: - arrow_table = result_set.read_arrow_batch() - if arrow_table is None: - break + timestamps, field_values = self._collect_arrow_numeric_batches( + result_set, + field_columns, + include_timestamps=True, + table_name=table_name, + ) - if arrow_table.num_rows == 0: - continue + if len(timestamps) == 0: + return timestamps, field_values - timestamp_parts.append(arrow_table.column("time").to_numpy()) - for field_column in field_columns: - raw_values = arrow_table.column(field_column).to_numpy() - try: - field_parts[field_column].append( - np.asarray(raw_values, dtype=np.float64) - ) - except (TypeError, ValueError) as e: - raise TypeError( - f"Field column '{field_column}' in table '{table_name}' is not numeric-compatible." - ) from e + # Keep the dataset layer strict about the requested time window even if + # the underlying query path returns boundary-adjacent null rows. + if timestamps[0] < start_time or timestamps[-1] > end_time: + mask = (timestamps >= start_time) & (timestamps <= end_time) + timestamps = timestamps[mask] + field_values = { + field_column: values[mask] + for field_column, values in field_values.items() + } - if not timestamp_parts: + return timestamps, field_values + + def _collect_arrow_numeric_batches( + self, + result_set, + field_columns: List[str], + *, + include_timestamps: bool, + table_name: Optional[str] = None, + ) -> Tuple[Optional[np.ndarray], Dict[str, np.ndarray]]: + """Consume a batch-mode ResultSet and return owning numpy arrays.""" + arrow_batches = [] + total_rows = 0 + while True: + arrow_table = result_set.read_arrow_batch() + if arrow_table is None: + break + if arrow_table.num_rows == 0: + continue + arrow_batches.append(arrow_table) + total_rows += arrow_table.num_rows + + if total_rows == 0: return ( - np.array([], dtype=np.int64), + np.array([], dtype=np.int64) if include_timestamps else None, { field_column: np.array([], dtype=np.float64) for field_column in field_columns }, ) - timestamps = np.concatenate(timestamp_parts).astype(np.int64) + timestamps = ( + np.empty(total_rows, dtype=np.int64) if include_timestamps else None + ) field_values = { - field_column: np.concatenate(field_parts[field_column]) + field_column: np.empty(total_rows, dtype=np.float64) for field_column in field_columns } - # Keep the dataset layer strict about the requested time window even if - # the underlying query path returns boundary-adjacent null rows. - mask = (timestamps >= start_time) & (timestamps <= end_time) - timestamps = timestamps[mask] - field_values = { - field_column: values[mask] for field_column, values in field_values.items() - } + offset = 0 + for arrow_table in arrow_batches: + batch_rows = arrow_table.num_rows + if include_timestamps: + timestamps[offset : offset + batch_rows] = arrow_table.column( + "time" + ).to_numpy() + for field_column in field_columns: + try: + field_values[field_column][offset : offset + batch_rows] = ( + arrow_table.column(field_column).to_numpy() + ) + except (TypeError, ValueError) as e: + target = table_name or "" + raise TypeError( + f"Field column '{field_column}' in table '{target}' " + "is not numeric-compatible." + ) from e + offset += batch_rows return timestamps, field_values diff --git a/python/tsfile/dataset/timeseries.py b/python/tsfile/dataset/timeseries.py index 39d43b442..2127384d3 100644 --- a/python/tsfile/dataset/timeseries.py +++ b/python/tsfile/dataset/timeseries.py @@ -78,16 +78,26 @@ def __init__( stats: dict, ensure_open: Callable[[], None], load_timestamps: Callable[[], np.ndarray], - read_by_position: Callable[[int, int], Tuple[np.ndarray, np.ndarray]], + read_values_by_position: Callable[[int, int], np.ndarray], ): self._name = name self._series_refs = series_refs self._stats = dict(stats) self._ensure_open = ensure_open self._load_timestamps = load_timestamps - self._read_by_position = read_by_position + self._read_values_by_position = read_values_by_position self._timestamps = None + def _read_values_only(self, start: int, limit: int) -> np.ndarray: + if limit <= 0: + return np.array([], dtype=np.float64) + return self._read_values_by_position(start, limit) + + def _read_contiguous_slice(self, start: int, stop: int) -> np.ndarray: + if start >= stop: + return np.array([], dtype=np.float64) + return self._read_values_only(start, stop - start) + @property def name(self) -> str: return self._name @@ -119,7 +129,7 @@ def __getitem__(self, key): key += length if key < 0 or key >= length: raise IndexError(f"Index {key} out of range [0, {length})") - _, values = self._read_by_position(key, 1) + values = self._read_values_only(key, 1) return float(values[0]) if len(values) > 0 else None if isinstance(key, slice): @@ -131,13 +141,12 @@ def __getitem__(self, key): # [a:b:1]. Avoid materializing the full position list for large # series; read the window directly. if step == 1: - _, values = self._read_by_position(start, stop - start) - return values + return self._read_contiguous_slice(start, stop) positions = np.arange(start, stop, step, dtype=np.int64) min_pos = int(positions.min()) max_pos = int(positions.max()) - _, values = self._read_by_position(min_pos, max_pos - min_pos + 1) + values = self._read_contiguous_slice(min_pos, max_pos + 1) if len(values) == 0: return np.array([], dtype=np.float64) relative = positions - min_pos