yobx.xtracing — dataframe_trace module#

DataFrame function tracer — convert a Python function operating on a TracedDataFrame into an ONNX model via yobx.sql.to_onnx().

The tracer provides a lightweight pandas-inspired API. When you call dataframe_to_onnx() (or the lower-level trace_dataframe()), a TracedDataFrame is created from the input_dtypes mapping and passed to your function. Every operation performed on the frame — column access, arithmetic, filtering, aggregation — is recorded as an AST node rather than being executed. After the function returns, the accumulated nodes are assembled into a ParsedQuery that is compiled to ONNX by the existing SQL converter.

Supported operations#

Example

import numpy as np
from yobx.xtracing.dataframe_trace import dataframe_to_onnx
from yobx.reference import ExtendedReferenceEvaluator

def transform(df):
    df = df.filter(df["a"] > 0)
    return df.select([(df["a"] + df["b"]).alias("total")])

dtypes = {"a": np.float32, "b": np.float32}
artifact = dataframe_to_onnx(transform, dtypes)

ref = ExtendedReferenceEvaluator(artifact)
a = np.array([1.0, -2.0, 3.0], dtype=np.float32)
b = np.array([4.0,  5.0, 6.0], dtype=np.float32)
(total,) = ref.run(None, {"a": a, "b": b})
# total == array([5., 9.], dtype=float32)  (rows where a > 0)
class yobx.xtracing.dataframe_trace.TracedCondition(condition: Condition)[source]#

Proxy for a boolean predicate produced by comparing TracedSeries.

Use & and | to combine conditions; Python’s and/or keywords cannot be used because they force boolean evaluation of the operands.

Parameters:

condition – the underlying Condition AST node.

class yobx.xtracing.dataframe_trace.TracedDataFrame(columns: Dict[ColumnRef, TracedSeries], ops: List | None = None, source_columns: List[str] | None = None)[source]#

Proxy for a DataFrame that records SQL-like operations for ONNX export.

Create a TracedDataFrame via trace_dataframe() (recommended) or construct it directly with a ColumnRefTracedSeries mapping. Apply operations (filter, select, arithmetic on columns) to build up the computation graph. Convert to an ONNX model via dataframe_to_onnx().

Parameters:
  • columns – mapping from ColumnRef to TracedSeries.

  • ops – accumulated list of SqlOperation objects (usually starts empty).

  • source_columns – ordered list of original source column names (those that will become ONNX inputs). Defaults to the column names extracted from the keys of columns.

assign(**kwargs: TracedSeries) TracedDataFrame[source]#

Record new computed columns without discarding existing ones.

Each keyword argument names a new (or overwritten) column; the value must be a TracedSeries.

Returns:

a new TracedDataFrame with the extra columns present.

property columns: List[str]#

Return the ordered list of column names in the current frame.

copy(deep: bool = True) TracedDataFrame[source]#

Return a copy of this TracedDataFrame.

Because TracedDataFrame holds immutable AST nodes rather than actual data, both shallow and deep copies are equivalent: a new TracedDataFrame is returned with the same column expressions, recorded operations, and source-column list. This method exists so that functions written for real pandas.DataFrame objects (which routinely call .copy() to avoid unintentional mutation) can be traced without modification.

Parameters:

deep – accepted for API compatibility with pandas.DataFrame.copy but has no effect.

Returns:

a new TracedDataFrame representing the same query.

filter(condition: TracedCondition) TracedDataFrame[source]#

Record a row-filter (WHERE) operation.

Parameters:

condition – a TracedCondition produced by comparing columns, e.g. df["a"] > 0.

Returns:

a new TracedDataFrame with the filter recorded.

groupby(by: str | List[str]) TracedGroupBy[source]#

Begin a group-by aggregation.

Parameters:

by – column name or list of column names to group by.

Returns:

a TracedGroupBy on which .agg(...) can be called.

join(right: TracedDataFrame, left_key: str | List[str], right_key: str | List[str], join_type: str = 'inner') TracedDataFrame[source]#

Record an equi-join with another TracedDataFrame.

Parameters:
  • right – the right-hand TracedDataFrame to join with.

  • left_key – column name (or list of names) from this (left) frame used in the join predicate.

  • right_key – column name (or list of names) from right frame used in the join predicate. Must have the same length as left_key when both are lists.

  • join_type – join type — 'inner' (default), 'left', 'right', or 'full'.

Returns:

a new TracedDataFrame containing columns from both sides with a JoinOp recorded.

Example:

import numpy as np
from yobx.xtracing.dataframe_trace import dataframe_to_onnx

def transform(df1, df2):
    return df1.join(df2, left_key="cid", right_key="id")

dtypes1 = {"cid": np.int64, "a": np.float32}
dtypes2 = {"id": np.int64, "b": np.float32}
artifact = dataframe_to_onnx(transform, [dtypes1, dtypes2])
pipe(func: Callable[[...], TracedDataFrame], *args: object, **kwargs: object) TracedDataFrame[source]#

Apply func to this frame (pandas pipe idiom).

Equivalent to func(self, *args, **kwargs). Useful for chaining transformations written as standalone functions:

def preprocess(df):
    return df.filter(df["a"] > 0)

def add_feature(df):
    return df.assign(c=(df["a"] + df["b"]).alias("c"))

def pipeline(df):
    return df.pipe(preprocess).pipe(add_feature)
Parameters:
  • func – callable that accepts a TracedDataFrame as its first argument and returns a TracedDataFrame.

  • args – additional positional arguments forwarded to func.

  • kwargs – additional keyword arguments forwarded to func.

Returns:

the result of func(self, *args, **kwargs).

pivot_table(values: str | List[str], index: str | List[str], columns: str | List[str], aggfunc: str = 'sum', fill_value: float = 0.0, column_values: List | None = None) TracedDataFrame[source]#

Record a pivot-table aggregation for ONNX export.

Analogous to pandas.DataFrame.pivot_table(). For each distinct value in the columns column(s) the values column(s) are aggregated (according to aggfunc) per index group, producing one output column per distinct category value per values column.

Parameters:
  • values – name of the column(s) to aggregate. A plain string names a single values column; a list names multiple values columns each of which is independently pivoted.

  • index – column name(s) to use as the row index (group-by keys).

  • columns – name of the column(s) whose distinct values become the output column names. A plain string names a single category column; a list specifies a compound category key — in that case each entry in column_values must be a tuple/list with one scalar per category column.

  • aggfunc – aggregation function — 'sum' (default), 'mean', 'min', 'max', or 'count'.

  • fill_value – value inserted for (index, column) combinations that have no matching rows. Defaults to 0.

  • column_values – the known set of distinct values that the columns column(s) may take. Required for ONNX export since ONNX graphs have a static structure. Each entry yields one output column per values column, named "<values>_<cv>" (single category column) or "<values>_<cv1>_<cv2>…" (multiple category columns).

Returns:

a new TracedDataFrame with one column per entry in index (unique row-key values) followed by one column per (values, column_values) pair.

Example:

import numpy as np
from yobx.xtracing.dataframe_trace import dataframe_to_onnx
from yobx.reference import ExtendedReferenceEvaluator

def transform(df):
    return df.pivot_table(
        values="v",
        index="k",
        columns="cat",
        aggfunc="sum",
        column_values=["X", "Y"],
    )

dtypes = {"k": np.int64, "cat": object, "v": np.float32}
artifact = dataframe_to_onnx(transform, dtypes)
select(exprs: List[str | TracedSeries] | Dict[str, TracedSeries]) TracedDataFrame[source]#

Record a column-projection (SELECT) operation.

Parameters:

exprs

one of:

  • a list of column-name strings — passes columns through unchanged;

  • a list of TracedSeries — use .alias(name) to name each output;

  • a {alias: TracedSeries} dict — explicit alias → expression mapping.

Returns:

a new TracedDataFrame with the projection recorded.

to_parsed_query() ParsedQuery[source]#

Assemble a ParsedQuery from the recorded ops.

If no SelectOp has been recorded yet (e.g. only a filter was applied, or element-wise arithmetic was applied to the whole frame), a SELECT is generated from the current column expressions. This correctly captures any arithmetic embedded in the column expressions (e.g. from df + 1).

Returns:

a ParsedQuery ready for ONNX conversion via parsed_query_to_onnx().

class yobx.xtracing.dataframe_trace.TracedGroupBy(df: TracedDataFrame, by: List[str])[source]#

Result of TracedDataFrame.groupby().

Call agg() to specify the aggregation expressions and obtain a new TracedDataFrame representing the grouped result.

Parameters:
  • df – the source TracedDataFrame.

  • by – list of column names to group by.

agg(exprs: List[TracedSeries] | Dict[str, TracedSeries]) TracedDataFrame[source]#

Record aggregation expressions and return a new TracedDataFrame.

Parameters:

exprs – either a list of TracedSeries (each should carry an alias via .alias(name)), or a {alias: TracedSeries} dict.

Returns:

a new TracedDataFrame with GroupByOp + SelectOp recorded.

class yobx.xtracing.dataframe_trace.TracedSeries(expr: object, alias: str | None = None)[source]#

Proxy for a single column or computed expression in a traced DataFrame.

Arithmetic and comparison operators are overloaded to return new TracedSeries or TracedCondition objects that accumulate the operation graph without executing any actual computation.

Parameters:
  • expr – the underlying AST expression node (a ColumnRef, BinaryExpr, AggExpr, etc.).

  • alias – optional output alias for this series.

alias(name: str) TracedSeries[source]#

Return a copy tagged with name as the output alias.

Parameters:

name – the alias to assign.

Returns:

a new TracedSeries carrying the alias.

count() TracedSeries[source]#

Record a COUNT(...) aggregation.

max() TracedSeries[source]#

Record a MAX(...) aggregation.

mean() TracedSeries[source]#

Record an AVG(...) aggregation.

min() TracedSeries[source]#

Record a MIN(...) aggregation.

sum() TracedSeries[source]#

Record a SUM(...) aggregation.

to_select_item() SelectItem[source]#

Convert to a SelectItem for use in a query.