yobx.xtracing — dataframe_trace module#
DataFrame function tracer — convert a Python function operating on a
TracedDataFrame into an ONNX model via
yobx.sql.to_onnx().
The tracer provides a lightweight pandas-inspired API. When you call
dataframe_to_onnx() (or the lower-level trace_dataframe()), a
TracedDataFrame is created from the input_dtypes mapping and passed
to your function. Every operation performed on the frame — column access,
arithmetic, filtering, aggregation — is recorded as an AST node rather than
being executed. After the function returns, the accumulated nodes are assembled
into a ParsedQuery that is compiled to ONNX by the
existing SQL converter.
Supported operations#
Column access:
df["col"]ordf.colArithmetic:
+,-,*,/between columns and/or scalars, or applied element-wise across all columns of aTracedDataFrame(e.g.df + 1,df * 2.0)Comparison:
>,<,>=,<=,==,!=Boolean combination:
&(AND),|(OR) onTracedConditionRow filtering:
TracedDataFrame.filter()ordf[condition]Column projection:
TracedDataFrame.select()New columns:
TracedDataFrame.assign()Aggregation:
.sum(),.mean(),.min(),.max(),.count()on aTracedSeriesColumn alias:
.alias(name)on aTracedSeriesGroup-by:
TracedDataFrame.groupby()+TracedGroupBy.agg()Function chaining:
TracedDataFrame.pipe()Copy:
TracedDataFrame.copy()
Example
import numpy as np
from yobx.xtracing.dataframe_trace import dataframe_to_onnx
from yobx.reference import ExtendedReferenceEvaluator
def transform(df):
df = df.filter(df["a"] > 0)
return df.select([(df["a"] + df["b"]).alias("total")])
dtypes = {"a": np.float32, "b": np.float32}
artifact = dataframe_to_onnx(transform, dtypes)
ref = ExtendedReferenceEvaluator(artifact)
a = np.array([1.0, -2.0, 3.0], dtype=np.float32)
b = np.array([4.0, 5.0, 6.0], dtype=np.float32)
(total,) = ref.run(None, {"a": a, "b": b})
# total == array([5., 9.], dtype=float32) (rows where a > 0)
- class yobx.xtracing.dataframe_trace.TracedCondition(condition: Condition)[source]#
Proxy for a boolean predicate produced by comparing
TracedSeries.Use
&and|to combine conditions; Python’sand/orkeywords cannot be used because they force boolean evaluation of the operands.- Parameters:
condition – the underlying
ConditionAST node.
- class yobx.xtracing.dataframe_trace.TracedDataFrame(columns: Dict[ColumnRef, TracedSeries], ops: List | None = None, source_columns: List[str] | None = None)[source]#
Proxy for a DataFrame that records SQL-like operations for ONNX export.
Create a
TracedDataFrameviatrace_dataframe()(recommended) or construct it directly with aColumnRef→TracedSeriesmapping. Apply operations (filter,select, arithmetic on columns) to build up the computation graph. Convert to an ONNX model viadataframe_to_onnx().- Parameters:
columns – mapping from
ColumnReftoTracedSeries.ops – accumulated list of
SqlOperationobjects (usually starts empty).source_columns – ordered list of original source column names (those that will become ONNX inputs). Defaults to the column names extracted from the keys of columns.
- assign(**kwargs: TracedSeries) TracedDataFrame[source]#
Record new computed columns without discarding existing ones.
Each keyword argument names a new (or overwritten) column; the value must be a
TracedSeries.- Returns:
a new
TracedDataFramewith the extra columns present.
- copy(deep: bool = True) TracedDataFrame[source]#
Return a copy of this
TracedDataFrame.Because
TracedDataFrameholds immutable AST nodes rather than actual data, both shallow and deep copies are equivalent: a newTracedDataFrameis returned with the same column expressions, recorded operations, and source-column list. This method exists so that functions written for realpandas.DataFrameobjects (which routinely call.copy()to avoid unintentional mutation) can be traced without modification.- Parameters:
deep – accepted for API compatibility with
pandas.DataFrame.copybut has no effect.- Returns:
a new
TracedDataFramerepresenting the same query.
- filter(condition: TracedCondition) TracedDataFrame[source]#
Record a row-filter (WHERE) operation.
- Parameters:
condition – a
TracedConditionproduced by comparing columns, e.g.df["a"] > 0.- Returns:
a new
TracedDataFramewith the filter recorded.
- groupby(by: str | List[str]) TracedGroupBy[source]#
Begin a group-by aggregation.
- Parameters:
by – column name or list of column names to group by.
- Returns:
a
TracedGroupByon which.agg(...)can be called.
- join(right: TracedDataFrame, left_key: str | List[str], right_key: str | List[str], join_type: str = 'inner') TracedDataFrame[source]#
Record an equi-join with another
TracedDataFrame.- Parameters:
right – the right-hand
TracedDataFrameto join with.left_key – column name (or list of names) from this (left) frame used in the join predicate.
right_key – column name (or list of names) from right frame used in the join predicate. Must have the same length as left_key when both are lists.
join_type – join type —
'inner'(default),'left','right', or'full'.
- Returns:
a new
TracedDataFramecontaining columns from both sides with aJoinOprecorded.
Example:
import numpy as np from yobx.xtracing.dataframe_trace import dataframe_to_onnx def transform(df1, df2): return df1.join(df2, left_key="cid", right_key="id") dtypes1 = {"cid": np.int64, "a": np.float32} dtypes2 = {"id": np.int64, "b": np.float32} artifact = dataframe_to_onnx(transform, [dtypes1, dtypes2])
- pipe(func: Callable[[...], TracedDataFrame], *args: object, **kwargs: object) TracedDataFrame[source]#
Apply func to this frame (pandas
pipeidiom).Equivalent to
func(self, *args, **kwargs). Useful for chaining transformations written as standalone functions:def preprocess(df): return df.filter(df["a"] > 0) def add_feature(df): return df.assign(c=(df["a"] + df["b"]).alias("c")) def pipeline(df): return df.pipe(preprocess).pipe(add_feature)
- Parameters:
func – callable that accepts a
TracedDataFrameas its first argument and returns aTracedDataFrame.args – additional positional arguments forwarded to func.
kwargs – additional keyword arguments forwarded to func.
- Returns:
the result of
func(self, *args, **kwargs).
- pivot_table(values: str | List[str], index: str | List[str], columns: str | List[str], aggfunc: str = 'sum', fill_value: float = 0.0, column_values: List | None = None) TracedDataFrame[source]#
Record a pivot-table aggregation for ONNX export.
Analogous to
pandas.DataFrame.pivot_table(). For each distinct value in the columns column(s) the values column(s) are aggregated (according to aggfunc) per index group, producing one output column per distinct category value per values column.- Parameters:
values – name of the column(s) to aggregate. A plain string names a single values column; a list names multiple values columns each of which is independently pivoted.
index – column name(s) to use as the row index (group-by keys).
columns – name of the column(s) whose distinct values become the output column names. A plain string names a single category column; a list specifies a compound category key — in that case each entry in column_values must be a tuple/list with one scalar per category column.
aggfunc – aggregation function —
'sum'(default),'mean','min','max', or'count'.fill_value – value inserted for (index, column) combinations that have no matching rows. Defaults to
0.column_values – the known set of distinct values that the columns column(s) may take. Required for ONNX export since ONNX graphs have a static structure. Each entry yields one output column per values column, named
"<values>_<cv>"(single category column) or"<values>_<cv1>_<cv2>…"(multiple category columns).
- Returns:
a new
TracedDataFramewith one column per entry in index (unique row-key values) followed by one column per (values, column_values) pair.
Example:
import numpy as np from yobx.xtracing.dataframe_trace import dataframe_to_onnx from yobx.reference import ExtendedReferenceEvaluator def transform(df): return df.pivot_table( values="v", index="k", columns="cat", aggfunc="sum", column_values=["X", "Y"], ) dtypes = {"k": np.int64, "cat": object, "v": np.float32} artifact = dataframe_to_onnx(transform, dtypes)
- select(exprs: List[str | TracedSeries] | Dict[str, TracedSeries]) TracedDataFrame[source]#
Record a column-projection (SELECT) operation.
- Parameters:
exprs –
one of:
a list of column-name strings — passes columns through unchanged;
a list of
TracedSeries— use.alias(name)to name each output;a
{alias: TracedSeries}dict — explicit alias → expression mapping.
- Returns:
a new
TracedDataFramewith the projection recorded.
- to_parsed_query() ParsedQuery[source]#
Assemble a
ParsedQueryfrom the recorded ops.If no
SelectOphas been recorded yet (e.g. only a filter was applied, or element-wise arithmetic was applied to the whole frame), aSELECTis generated from the current column expressions. This correctly captures any arithmetic embedded in the column expressions (e.g. fromdf + 1).- Returns:
a
ParsedQueryready for ONNX conversion viaparsed_query_to_onnx().
- class yobx.xtracing.dataframe_trace.TracedGroupBy(df: TracedDataFrame, by: List[str])[source]#
Result of
TracedDataFrame.groupby().Call
agg()to specify the aggregation expressions and obtain a newTracedDataFramerepresenting the grouped result.- Parameters:
df – the source
TracedDataFrame.by – list of column names to group by.
- agg(exprs: List[TracedSeries] | Dict[str, TracedSeries]) TracedDataFrame[source]#
Record aggregation expressions and return a new
TracedDataFrame.- Parameters:
exprs – either a list of
TracedSeries(each should carry an alias via.alias(name)), or a{alias: TracedSeries}dict.- Returns:
a new
TracedDataFramewithGroupByOp+SelectOprecorded.
- class yobx.xtracing.dataframe_trace.TracedSeries(expr: object, alias: str | None = None)[source]#
Proxy for a single column or computed expression in a traced DataFrame.
Arithmetic and comparison operators are overloaded to return new
TracedSeriesorTracedConditionobjects that accumulate the operation graph without executing any actual computation.- Parameters:
expr – the underlying AST expression node (a
ColumnRef,BinaryExpr,AggExpr, etc.).alias – optional output alias for this series.
- alias(name: str) TracedSeries[source]#
Return a copy tagged with name as the output alias.
- Parameters:
name – the alias to assign.
- Returns:
a new
TracedSeriescarrying the alias.
- count() TracedSeries[source]#
Record a
COUNT(...)aggregation.
- max() TracedSeries[source]#
Record a
MAX(...)aggregation.
- mean() TracedSeries[source]#
Record an
AVG(...)aggregation.
- min() TracedSeries[source]#
Record a
MIN(...)aggregation.
- sum() TracedSeries[source]#
Record a
SUM(...)aggregation.
- to_select_item() SelectItem[source]#
Convert to a
SelectItemfor use in a query.