import pickle
import os
from io import StringIO, BytesIO
from inspect import isfunction
import numpy
import numpy.random as nrandom
import pandas
from pandas.testing import assert_frame_equal

    from pandas import json_normalize
except ImportError:
    from import json_normalize
from .dataframe_split import (
from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream

[docs] class StreamingDataFrameSchemaError(Exception): """ Reveals an issue with inconsistant schemas. """
[docs] class StreamingDataFrame: """ Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on :epkg:`dataframe`. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop: :: for df in self: # self is a StreamingDataFrame # ... The constructor cannot receive an iterator otherwise this class would be able to walk through the data only once. The main reason is it is impossible to :mod:`pickle` (or :epkg:`dill`) an iterator: it cannot be replicated. Instead, the class takes a function which generates an iterator on :epkg:`DataFrame`. Most of the methods returns either a :epkg:`DataFrame` either a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. In the second case, methods can be chained. By default, the object checks that the schema remains the same between two chunks. This can be disabled by setting *check_schema=False* in the constructor. The user should expect the data to remain stable. Every loop should produce the same data. However, in some situations, it is more efficient not to keep that constraints. Draw a random @see me sample is one of these cases. :param iter_creation: function which creates an iterator or an instance of see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` :param check_schema: checks that the schema is the same for every :epkg:`dataframe` :param stable: indicates if the :epkg:`dataframe` remains the same whenever it is walked through """
[docs] def __init__(self, iter_creation, check_schema=True, stable=True): self._delete_ = [] if isinstance(iter_creation, (pandas.DataFrame, dict, numpy.ndarray, str)): raise TypeError( "Unexpected type %r for iter_creation. It must " "be an iterator." % type(iter_creation) ) if isinstance(iter_creation, StreamingDataFrame): self.iter_creation = iter_creation.iter_creation self.stable = iter_creation.stable else: self.iter_creation = iter_creation self.stable = stable self.check_schema = check_schema
[docs] def is_stable(self, do_check=False, n=10): """ Tells if the :epkg:`dataframe` is supposed to be stable. :param do_check: do not trust the value sent to the constructor :param n: number of rows used to check the stability, None for all rows :return: boolean *do_check=True* means the methods checks the first *n* rows remains the same for two iterations. """ if do_check: for i, (a, b) in enumerate(zip(self, self)): if n is not None and i >= n: break try: assert_frame_equal(a, b) except AssertionError: # pragma: no cover return False return True else: return self.stable
[docs] def get_kwargs(self): """ Returns the parameters used to call the constructor. """ return dict(check_schema=self.check_schema)
[docs] def train_test_split( self, path_or_buf=None, export_method="to_csv", names=None, streaming=True, partitions=None, **kwargs, ): """ Randomly splits a :epkg:`dataframe` into smaller pieces. The function returns streams of file names. It chooses one of the options from module :mod:`dataframe_split <pandas_streaming.df.dataframe_split>`. :param path_or_buf: a string, a list of strings or buffers, if it is a string, it must contain ``{}`` like ``partition{}.txt``, if None, the function returns strings. :param export_method: method used to store the partitions, by default :meth:`pandas.DataFrame.to_csv`, additional parameters will be given to that function :param names: partitions names, by default ``('train', 'test')`` :param kwargs: parameters for the export function and :func:`sklearn.model_selection.train_test_split`. :param streaming: the function switches to a streaming version of the algorithm. :param partitions: splitting partitions :return: outputs of the exports functions or two see class `StreamingDataFrame` if *path_or_buf* is None. The streaming version of this algorithm is implemented by function :func:`sklearn_train_test_split_streaming <pandas_streaming.df.dataframe_split.sklearn_train_test_split_streaming>`. Its documentation indicates the limitation of the streaming version and gives some insights about the additional parameters. """ if streaming: if partitions is not None: if len(partitions) != 2: raise NotImplementedError( # pragma: no cover "Only train and test split is allowed, *partitions* " "must be of length 2." ) kwargs = kwargs.copy() kwargs["train_size"] = partitions[0] kwargs["test_size"] = partitions[1] return sklearn_train_test_split_streaming(self, **kwargs) return sklearn_train_test_split( self, path_or_buf=path_or_buf, export_method=export_method, names=names, **kwargs, )
@staticmethod def _process_kwargs(kwargs): """ Filters out parameters for the constructor of this class. """ kw = {} for k in ["check_schema"]: if k in kwargs: kw[k] = kwargs[k] del kwargs[k] return kw
[docs] @staticmethod def read_json( *args, chunksize=100000, flatten=False, **kwargs ) -> "StreamingDataFrame": """ Reads a :epkg:`json` file or buffer as an iterator on :epkg:`DataFrame`. The signature is the same as :epkg:`pandas:read_json`. The important parameter is *chunksize* which defines the number of rows to parse in a single bloc and it must be defined to return an iterator. If *lines* is True, the function falls back into :epkg:`pandas:read_json`, otherwise it used @see fn enumerate_json_items. If *lines* is ``'stream'``, *enumerate_json_items* is called with parameter ``lines=True``. Parameter *flatten* uses the trick described at `Flattening JSON objects in Python <>`_. Examples: .. runpython:: :showcode: from io import BytesIO from pandas_streaming.df import StreamingDataFrame data = b'''{"a": 1, "b": 2} {"a": 3, "b": 4}''' it = StreamingDataFrame.read_json(BytesIO(data), lines=True) dfs = list(it) print(dfs) .. runpython:: :showcode: from io import BytesIO from pandas_streaming.df import StreamingDataFrame data = b'''[{"a": 1, "b": 2}, {"a": 3, "b": 4}]''' it = StreamingDataFrame.read_json(BytesIO(data)) dfs = list(it) print(dfs) The parsed json must have an empty line at the end otherwise the following exception is raised: `ijson.common.IncompleteJSONError`: `parse error: unallowed token at this point in JSON text`. """ if not isinstance(chunksize, int) or chunksize <= 0: raise ValueError("chunksize must be a positive integer") # pragma: no cover kwargs_create = StreamingDataFrame._process_kwargs(kwargs) if isinstance(args[0], (list, dict)): if flatten: return StreamingDataFrame.read_df( json_normalize(args[0]), **kwargs_create ) return StreamingDataFrame.read_df(args[0], **kwargs_create) if kwargs.get("lines", None) == "stream": del kwargs["lines"] def localf(a0=args[0]): if hasattr(a0, "seek"): return enumerate_json_items( a0, encoding=kwargs.get("encoding", None), lines=True, flatten=flatten, ) st = JsonIterator2Stream(localf) args = args[1:] if chunksize is None: return StreamingDataFrame( lambda: pandas.read_json( st, *args, chunksize=None, lines=True, **kwargs ), **kwargs_create, ) def fct1( st=st, args=args, chunksize=chunksize, kw=kwargs.copy() # noqa: B008 ): for r in pandas.read_json( # noqa: UP028 st, *args, chunksize=chunksize, nrows=chunksize, lines=True, **kw ): yield r return StreamingDataFrame(fct1, **kwargs_create) if kwargs.get("lines", False): if flatten: raise NotImplementedError( "flatten==True is implemented with option lines='stream'" ) if chunksize is None: return StreamingDataFrame( lambda: pandas.read_json(*args, chunksize=None, **kwargs), **kwargs_create, ) def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()): # noqa: B008 for r in pandas.read_json( # noqa: UP028 *args, chunksize=chunksize, nrows=chunksize, **kw ): yield r return StreamingDataFrame(fct2, **kwargs_create) st = JsonIterator2Stream( lambda a0=args[0]: enumerate_json_items( a0, encoding=kwargs.get("encoding", None), flatten=flatten ) ) args = args[1:] if "lines" in kwargs: del kwargs["lines"] if chunksize is None: return StreamingDataFrame( lambda: pandas.read_json( st, *args, chunksize=chunksize, lines=True, **kwargs ), **kwargs_create, ) def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): # noqa: B008 if hasattr(st, "seek"): for r in pandas.read_json( # noqa: UP028 st, *args, chunksize=chunksize, nrows=chunksize, lines=True, **kw ): yield r return StreamingDataFrame(fct3, **kwargs_create)
[docs] @staticmethod def read_csv(*args, **kwargs) -> "StreamingDataFrame": """ Reads a :epkg:`csv` file or buffer as an iterator on :epkg:`DataFrame`. The signature is the same as :epkg:`pandas:read_csv`. The important parameter is *chunksize* which defines the number of rows to parse in a single bloc. If not specified, it will be equal to 100000. """ if not kwargs.get("iterator", True): raise ValueError("If specified, iterator must be True.") if not kwargs.get("chunksize", 100000): raise ValueError("If specified, chunksize must not be None.") kwargs_create = StreamingDataFrame._process_kwargs(kwargs) kwargs["iterator"] = True if "chunksize" not in kwargs: kwargs["chunksize"] = 100000 return StreamingDataFrame( lambda: pandas.read_csv(*args, **kwargs), **kwargs_create )
[docs] @staticmethod def read_str(text, **kwargs) -> "StreamingDataFrame": """ Reads a :epkg:`DataFrame` as an iterator on :epkg:`DataFrame`. The signature is the same as :epkg:`pandas:read_csv`. The important parameter is *chunksize* which defines the number of rows to parse in a single bloc. """ if not kwargs.get("iterator", True): raise ValueError("If specified, iterator must be True.") if not kwargs.get("chunksize", 100000): raise ValueError("If specified, chunksize must not be None.") kwargs_create = StreamingDataFrame._process_kwargs(kwargs) kwargs["iterator"] = True if "chunksize" not in kwargs: kwargs["chunksize"] = 100000 if isinstance(text, str): buffer = StringIO(text) else: buffer = BytesIO(text) return StreamingDataFrame( lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create )
[docs] @staticmethod def read_df(df, chunksize=None, check_schema=True) -> "StreamingDataFrame": """ Splits a :epkg:`DataFrame` into small chunks mostly for unit testing purposes. :param df: :class:`pandas.DataFrame` :param chunksize: number rows per chunks (// 10 by default) :param check_schema: check schema between two iterations :return: iterator on see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` """ if chunksize is None: if hasattr(df, "shape"): chunksize = df.shape[0] else: raise NotImplementedError( f"Cannot retrieve size to infer chunksize for type={type(df)}." ) if hasattr(df, "shape"): size = df.shape[0] else: raise NotImplementedError( # pragma: no cover f"Cannot retrieve size for type={type(df)}." ) def local_iterator(): "local iterator" for i in range(0, size, chunksize): end = min(size, i + chunksize) yield df[i:end].copy() return StreamingDataFrame(local_iterator, check_schema=check_schema)
[docs] def __iter__(self): """ Iterator on a large file with a sliding window. Each windows is a :epkg:`DataFrame`. The method stores a copy of the initial iterator and restores it after the end of the iterations. If *check_schema* was enabled when calling the constructor, the method checks that every :epkg:`DataFrame` follows the same schema as the first chunck. Even with a big chunk size, it might happen that consecutive chunks might detect different type for one particular column. An error message shows up saying ``Column types are different after row`` with more information about the column which failed. In that case, :epkg:`pandas:DataFrame.read_csv` can overwrite the type on one column by specifying ``dtype={column_name: new_type}``. It frequently happens when a string column has many missing values. """ iters = self.iter_creation() sch = None rows = 0 for it in iters: if sch is None: sch = (list(it.columns), list(it.dtypes)) elif self.check_schema: if list(it.columns) != sch[0]: # pylint: disable=E1136 raise StreamingDataFrameSchemaError( # pragma: no cover "Column names are different after row {0}\nFirst chunk: {1}" # noqa: UP030 "\nCurrent chunk: {2}".format(rows, sch[0], list(it.columns)) ) # pylint: disable=E1136 if list(it.dtypes) != sch[1]: # pylint: disable=E1136 errdf = pandas.DataFrame( dict( names=sch[0], schema1=sch[1], # pylint: disable=E1136 schema2=list(it.dtypes), ) ) # pylint: disable=E1136 tdf = StringIO() errdf["diff"] = errdf["schema2"] != errdf["schema1"] errdf = errdf[errdf["diff"]] errdf.to_csv(tdf, sep=",", index=False) raise StreamingDataFrameSchemaError( "Column types are different after row {0}. You may use option " # noqa: UP030 'dtype={{"column_name": str}} to force the type on this column.' "\n---\n{1}".format(rows, tdf.getvalue()) ) rows += it.shape[0] yield it
@property def shape(self): """ This is the kind of operations you do not want to do when a file is large because it goes through the whole stream just to get the number of rows. """ nl, nc = 0, 0 for it in self: nc = max(it.shape[1], nc) nl += it.shape[0] return nl, nc @property def columns(self): """ See :epkg:`pandas:DataFrame:columns`. """ for it in self: return it.columns # The dataframe is empty. return [] @property def dtypes(self): """ See :epkg:`pandas:DataFrame:dtypes`. """ for it in self: return it.dtypes
[docs] def to_csv(self, path_or_buf=None, **kwargs) -> "StreamingDataFrame": """ Saves the :epkg:`DataFrame` into string. See :epkg:`pandas:DataFrame.to_csv`. """ if path_or_buf is None: st = StringIO() close = False elif isinstance(path_or_buf, str): st = open(path_or_buf, "w", encoding=kwargs.get("encoding")) # noqa: SIM115 close = True else: st = path_or_buf close = False for df in self: df.to_csv(st, **kwargs) kwargs["header"] = False if close: st.close() if isinstance(st, StringIO): return st.getvalue() return path_or_buf
[docs] def to_dataframe(self) -> pandas.DataFrame: """ Converts everything into a single :epkg:`DataFrame`. """ return pandas.concat(self, axis=0)
[docs] def to_df(self) -> pandas.DataFrame: """ Converts everything into a single :epkg:`DataFrame`. """ return self.to_dataframe()
[docs] def iterrows(self): """ See :epkg:`pandas:DataFrame:iterrows`. """ for df in self: for it in df.iterrows(): # noqa: UP028 yield it
[docs] def head(self, n=5) -> pandas.DataFrame: """ Returns the first rows as a :epkg:`DataFrame`. """ st = [] total = 0 for df in self: h = df.head(n=n) total += h.shape[0] st.append(h) if total >= n: break n -= h.shape[0] if len(st) == 1: return st[0] if len(st) == 0: return None return pandas.concat(st, axis=0)
[docs] def tail(self, n=5) -> pandas.DataFrame: """ Returns the last rows as a :epkg:`DataFrame`. The size of chunks must be greater than ``n`` to get ``n`` lines. This method is not efficient because the whole dataset must be walked through. """ for df in self: h = df.tail(n=n) return h
[docs] def where(self, *args, **kwargs) -> "StreamingDataFrame": """ Applies :epkg:`pandas:DataFrame:where`. *inplace* must be False. This function returns a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. """ kwargs["inplace"] = False return StreamingDataFrame( lambda: map(lambda df: df.where(*args, **kwargs), self), # noqa: C417 **self.get_kwargs(), )
[docs] def sample(self, reservoir=False, cache=False, **kwargs) -> "StreamingDataFrame": """ See :meth:`pandas.DataFrame.sample`. Only *frac* is available, otherwise choose :meth`reservoir_sampling`. This function returns a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. :param reservoir: use `reservoir sampling <>`_ :param cache: cache the sample :param kwargs: additional parameters for :meth:`pandas.DataFrame.sample` If *cache* is True, the sample is cached (assuming it holds in memory). The second time an iterator walks through the """ if reservoir or "n" in kwargs: if "frac" in kwargs: raise ValueError("frac cannot be specified for reservoir sampling.") return self._reservoir_sampling( cache=cache, n=kwargs["n"], random_state=kwargs.get("random_state") ) if cache: sdf = self.sample(cache=False, **kwargs) df = sdf.to_df() return StreamingDataFrame.read_df(df, chunksize=df.shape[0]) return StreamingDataFrame( lambda: map(lambda df: df.sample(**kwargs), self), # noqa: C417 **self.get_kwargs(), stable=False, )
def _reservoir_sampling( self, cache=True, n=1000, random_state=None ) -> "StreamingDataFrame": """ Uses the `reservoir sampling <>`_ algorithm to draw a random sample with exactly *n* samples. :param cache: cache the sample :param n: number of observations to keep :param random_state: sets the random_state :return: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` .. warning:: The sample is split by chunks of size 1000. This parameter is not yet exposed. """ if not cache: raise ValueError("cache=False is not available for reservoir sampling.") indices = [] seen = 0 for i, df in enumerate(self): for ir, _ in enumerate(df.iterrows()): seen += 1 if len(indices) < n: indices.append((i, ir)) else: x = nrandom.random() if x * n < (seen - n): k = nrandom.randint(0, len(indices) - 1) indices[k] = (i, ir) indices = set(indices) def reservoir_iterate(sdf, indices, chunksize): "iterator" buffer = [] for i, df in enumerate(self): for ir, row in enumerate(df.iterrows()): if (i, ir) in indices: buffer.append(row) if len(buffer) >= chunksize: yield pandas.DataFrame(buffer) buffer.clear() if len(buffer) > 0: yield pandas.DataFrame(buffer) return StreamingDataFrame( lambda: reservoir_iterate(sdf=self, indices=indices, chunksize=1000) )
[docs] def drop( self, labels=None, *, axis=0, index=None, columns=None, level=None, inplace=False, errors="raise", ) -> "StreamingDataFrame": """ Applies :epkg:`pandas:DataFrame:drop`. This function returns a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. """ if axis == 0: raise NotImplementedError(f"drop is not implemented for axis={axis}.") if inplace: raise NotImplementedError(f"drop is not implemented for inplace={inplace}.") return StreamingDataFrame( lambda: map( # noqa: C417 lambda df: df.drop( labels, axis=axis, index=index, columns=columns, level=level, inplace=False, errors=errors, ), self, ), **self.get_kwargs(), )
[docs] def apply(self, *args, **kwargs) -> "StreamingDataFrame": """ Applies :epkg:`pandas:DataFrame:apply`. This function returns a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. """ return StreamingDataFrame( lambda: map(lambda df: df.apply(*args, **kwargs), self), # noqa: C417 **self.get_kwargs(), )
[docs] def applymap(self, *args, **kwargs) -> "StreamingDataFrame": """ Applies :epkg:`pandas:DataFrame:applymap`. This function returns a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. """ return StreamingDataFrame( lambda: map(lambda df: df.applymap(*args, **kwargs), self), # noqa: C417 **self.get_kwargs(), )
[docs] def merge(self, right, **kwargs) -> "StreamingDataFrame": """ Merges two see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` and returns see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. *right* can be either a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` or simply a :epkg:`pandas:DataFrame`. It calls :epkg:`pandas:DataFrame:merge` in a double loop, loop on *self*, loop on *right*. """ if isinstance(right, pandas.DataFrame): return self.merge( StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs ) def iterator_merge(sdf1, sdf2, **kw): "iterate on dataframes" for df1 in sdf1: for df2 in sdf2: df = df1.merge(df2, **kw) yield df return StreamingDataFrame( lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs() )
[docs] def concat(self, others, axis=0) -> "StreamingDataFrame": """ Concatenates :epkg:`dataframes`. The function ensures all :epkg:`pandas:DataFrame` or see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` share the same columns (name and type). Otherwise, the function fails as it cannot guess the schema without walking through all :epkg:`dataframes`. :param others: list, enumeration, :epkg:`pandas:DataFrame` :param axis: concatenate by rows (0) or by columns (1) :return: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` """ if axis == 1: return self._concath(others) if axis == 0: return self._concatv(others) raise ValueError("axis must be 0 or 1") # pragma: no cover
def _concath(self, others): if not isinstance(others, list): others = [others] def iterateh(self, others): cols = (self, *others) for dfs in zip(*cols): nrows = [_.shape[0] for _ in dfs] if min(nrows) != max(nrows): raise RuntimeError( "StreamingDataFram cannot merge DataFrame " "with different size or chunksize" ) yield pandas.concat(list(dfs), axis=1) return StreamingDataFrame(lambda: iterateh(self, others), **self.get_kwargs()) def _concatv(self, others): def iterator_concat(this, lothers): "iterator on dataframes" columns = None dtypes = None for df in this: if columns is None: columns = df.columns dtypes = df.dtypes yield df for obj in lothers: check = True for i, df in enumerate(obj): if check: if list(columns) != list(df.columns): raise ValueError( f"Frame others[{i}] do not have the " f"same column names or the same order." ) if list(dtypes) != list(df.dtypes): raise ValueError( f"Frame others[{i}] do not have the same column types." ) check = False yield df if isinstance(others, pandas.DataFrame): others = [others] elif isinstance(others, StreamingDataFrame): others = [others] def change_type(obj): "change column type" if isinstance(obj, pandas.DataFrame): return StreamingDataFrame.read_df(obj, obj.shape[0]) else: return obj others = list(map(change_type, others)) return StreamingDataFrame( lambda: iterator_concat(self, others), **self.get_kwargs() )
[docs] def groupby( self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, **kwargs ) -> pandas.DataFrame: """ Implements the streaming :epkg:`pandas:DataFrame:groupby`. We assume the result holds in memory. The out-of-memory is not implemented yet. :param by: see :epkg:`pandas:DataFrame:groupby` :param in_memory: in-memory algorithm :param lambda_agg: aggregation function, *sum* by default :param lambda_agg_agg: to aggregate the aggregations, *sum* by default :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` :return: :epkg:`pandas:DataFrame` As the input see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. This second one is always a **sum**. As a consequence, this function should not compute any *mean* or *count*, only *sum* because we do not know the size of each iterated :epkg:`dataframe`. To compute an average, sum and weights must be aggregated. Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. It could also be ``lambda gr: gr.max()`` or ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` as it would lead to incoherent results. .. exref:: :title: StreamingDataFrame and groupby :tag: streaming Here is an example which shows how to write a simple *groupby* with :epkg:`pandas` and see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. .. runpython:: :showcode: from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame df = DataFrame(dict(A=[3, 4, 3], B=[5,6, 7])) sdf = StreamingDataFrame.read_df(df) # The following: print(sdf.groupby("A", lambda gr: gr.sum())) # Is equivalent to: print(df.groupby("A").sum()) """ if not in_memory: raise NotImplementedError("Out-of-memory group by is not implemented.") if lambda_agg is None: def lambda_agg_(gr): "sum" return gr.sum() lambda_agg = lambda_agg_ if lambda_agg_agg is None: def lambda_agg_agg_(gr): "sum" return gr.sum() lambda_agg_agg = lambda_agg_agg_ ckw = kwargs.copy() ckw["as_index"] = False agg = [] for df in self: gr = df.groupby(by=by, **ckw) agg.append(lambda_agg(gr)) conc = pandas.concat(agg, sort=False) return lambda_agg_agg(conc.groupby(by=by, **kwargs))
[docs] def groupby_streaming( self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, strategy="cum", **kwargs, ) -> pandas.DataFrame: """ Implements the streaming :epkg:`pandas:DataFrame:groupby`. We assume the result holds in memory. The out-of-memory is not implemented yet. :param by: see :epkg:`pandas:DataFrame:groupby` :param in_memory: in-memory algorithm :param lambda_agg: aggregation function, *sum* by default :param lambda_agg_agg: to aggregate the aggregations, *sum* by default :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` :param strategy: ``'cum'``, or ``'streaming'``, see below :return: :epkg:`pandas:DataFrame` As the input see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. This second one is always a **sum**. As a consequence, this function should not compute any *mean* or *count*, only *sum* because we do not know the size of each iterated :epkg:`dataframe`. To compute an average, sum and weights must be aggregated. Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. It could also be ``lambda gr: gr.max()`` or ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` as it would lead to incoherent results. Parameter *strategy* allows three scenarios. First one if ``strategy is None`` goes through the whole datasets to produce a final :epkg:`DataFrame`. Second if ``strategy=='cum'`` returns a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`, each iteration produces the current status of the *group by*. Last case, ``strategy=='streaming'`` produces :epkg:`DataFrame` which must be concatenated into a single :epkg:`DataFrame` and grouped again to get the results. .. exref:: :title: StreamingDataFrame and groupby :tag: streaming Here is an example which shows how to write a simple *groupby* with :epkg:`pandas` and see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. .. runpython:: :showcode: from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame from import dummy_streaming_dataframe df20 = dummy_streaming_dataframe(20).to_dataframe() df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0) sdf20 = StreamingDataFrame.read_df(df20, chunksize=5) sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(), strategy='cum', as_index=False) for gr in sgr: print() print(gr) """ if not in_memory: raise NotImplementedError("Out-of-memory group by is not implemented.") if lambda_agg is None: def lambda_agg_(gr): "sum" return gr.sum() lambda_agg = lambda_agg_ if lambda_agg_agg is None: def lambda_agg_agg_(gr): "sum" return gr.sum() lambda_agg_agg = lambda_agg_agg_ ckw = kwargs.copy() ckw["as_index"] = False if strategy == "cum": def iterate_cum(): agg = None for df in self: gr = df.groupby(by=by, **ckw) gragg = lambda_agg(gr) if agg is None: yield lambda_agg_agg(gragg.groupby(by=by, **kwargs)) agg = gragg else: lagg = pandas.concat([agg, gragg], sort=False) yield lambda_agg_agg(lagg.groupby(by=by, **kwargs)) agg = lagg return StreamingDataFrame(lambda: iterate_cum(), **self.get_kwargs()) if strategy == "streaming": def iterate_streaming(): for df in self: gr = df.groupby(by=by, **ckw) gragg = lambda_agg(gr) yield lambda_agg(gragg.groupby(by=by, **kwargs)) return StreamingDataFrame(lambda: iterate_streaming(), **self.get_kwargs()) raise ValueError(f"Unknown strategy '{strategy}'") # pragma: no cover
[docs] def ensure_dtype(self, df, dtypes): """ Ensures the :epkg:`dataframe` *df* has types indicated in dtypes. Changes it if not. :param df: dataframe :param dtypes: list of types :return: updated? """ ch = False cols = df.columns for i, (has, exp) in enumerate(zip(df.dtypes, dtypes)): if has != exp: name = cols[i] df[name] = df[name].astype(exp) ch = True return ch
[docs] def __getitem__(self, *args): """ Implements some of the functionalities :epkg:`pandas` offers for the operator ``[]``. """ if len(args) != 1: raise NotImplementedError( # pragma: no cover "Only a list of columns is supported." ) cols = args[0] if isinstance(cols, str): # One column. iter_creation = self.iter_creation def iterate_col(): "iterate on one column" one_col = [cols] for df in iter_creation(): yield df[one_col] return StreamingSeries(iterate_col, **self.get_kwargs()) if not isinstance(cols, list): raise NotImplementedError("Only a list of columns is supported.") def iterate_cols(sdf): """Iterate on columns.""" for df in sdf: yield df[cols] return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs())
[docs] def __setitem__(self, index, value): """ Limited set of operators are supported. """ if not isinstance(index, str): raise ValueError(f"Only column affected are supported but index={index!r}.") if isinstance(value, (int, float, numpy.number, str)): # Is is equivalent to add_column. iter_creation = self.iter_creation def iterate_fct(): "iterate on rows" iters = iter_creation() for df in iters: dfc = df.copy() dfc[index] = value yield dfc self.iter_creation = iterate_fct elif isinstance(value, StreamingSeries): iter_creation = self.iter_creation def iterate_fct(): "iterate on rows" iters = iter_creation() for df, dfs in zip(iters, value): if df.shape[0] != dfs.shape[0]: raise RuntimeError( "Chunksize or shape are different when " "iterating on two StreamDataFrame at the same " "time: %r != %r." % (df.shape[0], dfs.shape[0]) ) dfc = df.copy() dfc[index] = dfs yield dfc self.iter_creation = iterate_fct else: raise NotImplementedError( "Not implemented for type(index)=%r and type(value)=%r." % (type(index), type(value)) )
[docs] def add_column(self, col, value): """ Implements some of the functionalities :epkg:`pandas` offers for the operator ``[]``. :param col: new column :param value: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` or a lambda function :return: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` ..note:: If value is a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`, *chunksize* must be the same for both. .. exref:: :title: Add a new column to a StreamingDataFrame :tag: streaming .. runpython:: :showcode: from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"])) sdf = StreamingDataFrame.read_df(df) sdf2 = sdf.add_column("d", lambda row: int(1)) print(sdf2.to_dataframe()) sdf2 = sdf.add_column("d", lambda row: int(1)) print(sdf2.to_dataframe()) """ if not isinstance(col, str): raise NotImplementedError( # pragma: no cover "Only a column as a string is supported." ) if isfunction(value): def iterate_fct(self, value, col): "iterate on rows" for df in self: dfc = df.copy() dfc.insert(dfc.shape[1], col, dfc.apply(value, axis=1)) yield dfc return StreamingDataFrame( lambda: iterate_fct(self, value, col), **self.get_kwargs() ) if isinstance(value, (pandas.Series, pandas.DataFrame, StreamingDataFrame)): raise NotImplementedError("Unable set a new column based on a datadframe.") def iterate_cst(self, value, col): "iterate on rows" for df in self: dfc = df.copy() dfc[col] = value yield dfc return StreamingDataFrame( lambda: iterate_cst(self, value, col), **self.get_kwargs() )
[docs] def fillna(self, **kwargs): """ Replaces the missing values, calls :epkg:`pandas:DataFrame:fillna`. :param kwargs: see :meth:`pandas.DataFrame.fillna` :return: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` .. warning:: The function does not check what happens at the limit of every chunk of data. Anything but a constant value will probably have an inconsistent behaviour. """ def iterate_na(self, **kwargs): "iterate on rows" if kwargs.get("inplace", True): kwargs["inplace"] = True for df in self: df.fillna(**kwargs) yield df else: for df in self: yield df.fillna(**kwargs) return StreamingDataFrame( lambda: iterate_na(self, **kwargs), **self.get_kwargs() )
[docs] def describe(self, percentiles=None, include=None, exclude=None): """ Calls :epkg:`pandas:DataFrame:describe` on every piece of the datasets. *percentiles* are not really accurate but just an indication. :param percentiles: see :epkg:`pandas:DataFrame:describe` :param include: see :epkg:`pandas:DataFrame:describe` :param exclude: see :epkg:`pandas:DataFrame:describe` :return: :epkg:`pandas:DataFrame:describe` .. versionchanged:: 0.3.219 Parameter *datetime_is_numeric* was removed (see :epkg:`pandas:DataFrame:describe`). """ merged = None stack = [] notper = ["count", "mean", "std"] for df in self: desc = df.describe( percentiles=percentiles, include=include, exclude=exclude ) count = desc.loc["count", :] rows = [name for name in desc.index if name not in notper] stack.append(desc.loc[rows, :]) if merged is None: merged = desc merged.loc["std", :] = ( merged.loc["std", :] ** 2 + merged.loc["mean", :] ** 2 ) * count merged.loc["mean", :] *= count else: merged.loc["count", :] += desc.loc["count", :] merged.loc["mean", :] += desc.loc["mean", :] * count merged.loc["std", :] += ( desc.loc["std", :] ** 2 + desc.loc["mean", :] ** 2 ) * count merged.loc["max", :] = numpy.maximum( merged.loc["max", :], desc.loc["max", :] ) merged.loc["min", :] = numpy.maximum( merged.loc["min", :], desc.loc["min", :] ) merged.loc["mean", :] /= merged.loc["count", :] merged.loc["std", :] = ( merged.loc["std", :] / merged.loc["count", :] - merged.loc["mean", :] ** 2 ) ** 0.5 values = pandas.concat(stack) summary = values.describe(percentiles=percentiles) merged = merged.loc[notper, :] rows = [name for name in summary.index if name not in notper] summary = summary.loc[rows, :] return pandas.concat([merged, summary])
[docs] def sort_values( self, by, axis=0, ascending=True, kind="quicksort", na_position="last", temp_file="_pandas_streaming_sort_values_", ): """ Sorts the streaming dataframe by values. :param by: one column :param ascending: order :param kind: see :meth:`pandas.DataFrame.sort_values` :param na_position: see :meth:`pandas.DataFrame.sort_values` :param temp_file: sorting a whole database is impossible without storing intermediate results on disk unless it can fit into the memory, but in that case, it is easier to convert the streaming database into a dataframe and sort it :return: streaming database """ if not isinstance(by, str): raise NotImplementedError( # pragma: no cover f"Only one column can be used to sort not {by!r}." ) keys = {} nans = [] indices = [] with open(temp_file, "wb") as f: for df in self: dfs = df.sort_values( by, ascending=ascending, kind=kind, na_position=na_position ) for tu in dfs[by]: if isinstance(tu, float) and numpy.isnan(tu): nans.append(len(indices)) else: if tu not in keys: keys[tu] = [] keys[tu].append(len(indices)) indices.append(f.tell()) st = BytesIO() pickle.dump(dfs, st) f.write(st.getvalue()) indices.append(f.tell()) values = list(keys.items()) values.sort(reverse=not ascending) def iterate(): with open(temp_file, "rb") as f: if na_position == "first": for p in nans:[p]) length = indices[p + 1] - indices[p] pkl = dfs = pickle.load(BytesIO(pkl)) sub = dfs[numpy.isnan(dfs[by])] yield sub for key, positions in values: for p in positions:[p]) length = indices[p + 1] - indices[p] pkl = dfs = pickle.load(BytesIO(pkl)) sub = dfs[dfs[by] == key] yield sub if na_position == "last": for p in nans:[p]) length = indices[p + 1] - indices[p] pkl = dfs = pickle.load(BytesIO(pkl)) sub = dfs[numpy.isnan(dfs[by])] yield sub res = StreamingDataFrame(iterate, **self.get_kwargs()) res._delete_.append(lambda: os.remove(temp_file)) return res
[docs] def __del__(self): """ Calls every function in `_delete_`. """ for f in self._delete_: f()
[docs] class StreamingSeries(StreamingDataFrame): """ Seens as a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` of one column. """ def __init__(self, iter_creation, check_schema=True, stable=True): StreamingDataFrame.__init__( self, iter_creation, check_schema=check_schema, stable=stable ) if len(self.columns) != 1: raise RuntimeError( # pragma: no cover f"A series can contain only one column not {len(self.columns)!r}." )
[docs] def apply(self, *args, **kwargs) -> "StreamingDataFrame": """ Applies :epkg:`pandas:Series:apply`. This function returns a @see cl StreamingSeries. """ return StreamingSeries( lambda: map(lambda df: df.apply(*args, **kwargs), self), # noqa: C417 **self.get_kwargs(), )
def __add__(self, value): """ Does an addition on every value hoping that has a meaning. :param value: any value which makes sense :return: a new series """ def iterate(): for df in self: yield df + value return StreamingSeries(iterate, **self.get_kwargs())