Source code for pandas_streaming.df.dataframe_split

import hashlib
import pickle
import random
import warnings
from io import StringIO
import pandas


[docs] def sklearn_train_test_split( self, path_or_buf=None, export_method="to_csv", names=None, **kwargs ): """ Randomly splits a dataframe into smaller pieces. The function returns streams of file names. The function relies on :func:`sklearn.model_selection.train_test_split`. It does not handle stratified version of it. :param self: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` :param path_or_buf: a string, a list of strings or buffers, if it is a string, it must contain ``{}`` like ``partition{}.txt`` :param export_method: method used to store the partitions, by default :meth:`pandas.DataFrame.to_csv` :param names: partitions names, by default ``('train', 'test')`` :param kwargs: parameters for the export function and :func:`sklearn.model_selection.train_test_split`. :return: outputs of the exports functions The function cannot return two iterators or two see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` because running through one means running through the other. We can assume both splits do not hold in memory and we cannot run through the same iterator again as random draws would be different. We need to store the results into files or buffers. .. warning:: The method *export_method* must write the data in mode *append* and allows stream. """ if kwargs.get("stratify") is not None: raise NotImplementedError( # pragma: no cover "No implementation yet for the stratified version." ) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=ImportWarning) from sklearn.model_selection import train_test_split opts = ["test_size", "train_size", "random_state", "shuffle", "stratify"] split_ops = {} for o in opts: if o in kwargs: split_ops[o] = kwargs[o] del kwargs[o] exportf_ = getattr(pandas.DataFrame, export_method) if export_method == "to_csv" and "mode" not in kwargs: exportf = lambda *a, **kw: exportf_(*a, mode="a", **kw) # noqa: E731 else: exportf = exportf_ if isinstance(path_or_buf, str): if "{}" not in path_or_buf: raise ValueError("path_or_buf must contain {} to insert the partition name") if names is None: names = ["train", "test"] elif len(names) != len(path_or_buf): raise ValueError( # pragma: no cover "names and path_or_buf must have the same length" ) path_or_buf = [path_or_buf.format(n) for n in names] elif path_or_buf is None: path_or_buf = [None, None] else: if not isinstance(path_or_buf, list): raise TypeError( # pragma: no cover "path_or_buf must be a list or a string" ) bufs = [] close = [] for p in path_or_buf: if p is None: st = StringIO() cl = False elif isinstance(p, str): st = open(p, "w", encoding=kwargs.get("encoding")) # noqa: SIM115 cl = True else: st = p cl = False bufs.append(st) close.append(cl) for df in self: train, test = train_test_split(df, **split_ops) exportf(train, bufs[0], **kwargs) exportf(test, bufs[1], **kwargs) kwargs["header"] = False for b, c in zip(bufs, close): if c: b.close() return [ st.getvalue() if isinstance(st, StringIO) else p for st, p in zip(bufs, path_or_buf) ]
[docs] def sklearn_train_test_split_streaming( self, test_size=0.25, train_size=None, stratify=None, hash_size=9, unique_rows=False ): """ Randomly splits a dataframe into smaller pieces. The function returns streams of file names. The function relies on :func:`sklearn.model_selection.train_test_split`. It handles the stratified version of it. :param self: see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` :param test_size: ratio for the test partition (if *train_size* is not specified) :param train_size: ratio for the train partition :param stratify: column holding the stratification :param hash_size: size of the hash to cache information about partition :param unique_rows: ensures that rows are unique :return: Two see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`, one for train, one for test. The function returns two iterators or two see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. It tries to do everything without writing anything on disk but it requires to store the repartition somehow. This function hashes every row and maps the hash with a part (train or test). This cache must hold in memory otherwise the function fails. The two returned iterators must not be used for the first time in the same time. The first time is used to build the cache. The function changes the order of rows if the parameter *stratify* is not null. The cache has a side effect: every exact same row will be put in the same partition. If that is not what you want, you should add an index column or a random one. """ p = (1 - test_size) if test_size else None if train_size is not None: p = train_size n = 2 * max(1 / p, 1 / (1 - p)) # changement static_schema = [] def iterator_rows(): "iterates on rows" counts = {} memory = {} pos_col = None for df in self: if pos_col is None: static_schema.append(list(df.columns)) static_schema.append(list(df.dtypes)) static_schema.append(df.shape[0]) if stratify is not None: pos_col = list(df.columns).index(stratify) else: pos_col = -1 for obs in df.itertuples(index=False, name=None): strat = 0 if stratify is None else obs[pos_col] if strat not in memory: memory[strat] = [] memory[strat].append(obs) for k, v in memory.items(): if len(v) >= n + random.randint(0, 10): # changement vr = list(range(len(v))) # on permute aléatoirement random.shuffle(vr) if (0, k) in counts: tt = counts[1, k] + counts[0, k] delta = -int(counts[0, k] - tt * p + 0.5) else: delta = 0 i = int(len(v) * p + 0.5) i += delta i = max(0, min(len(v), i)) one = set(vr[:i]) for d, obs_ in enumerate(v): yield obs_, 0 if d in one else 1 if (0, k) not in counts: counts[0, k] = i counts[1, k] = len(v) - i else: counts[0, k] += i counts[1, k] += len(v) - i # on efface de la mémoire les informations produites v.clear() # Lorsqu'on a fini, il faut tout de même répartir les # observations stockées. for k, v in memory.items(): vr = list(range(len(v))) # on permute aléatoirement random.shuffle(vr) if (0, k) in counts: tt = counts[1, k] + counts[0, k] delta = -int(counts[0, k] - tt * p + 0.5) else: delta = 0 i = int(len(v) * p + 0.5) i += delta i = max(0, min(len(v), i)) one = set(vr[:i]) for d, obs in enumerate(v): yield obs, 0 if d in one else 1 if (0, k) not in counts: counts[0, k] = i counts[1, k] = len(v) - i else: counts[0, k] += i counts[1, k] += len(v) - i def h11(w): "pickle and hash" b = pickle.dumps(w) return hashlib.md5(b).hexdigest()[:hash_size] # We store the repartition in a cache. cache = {} def iterator_internal(part_requested): "internal iterator on dataframes" iy = 0 accumul = [] if len(cache) == 0: for obs, part in iterator_rows(): h = h11(obs) if unique_rows and h in cache: raise ValueError( "A row or at least its hash is already cached. " # noqa: UP030 "Increase hash_size or check for duplicates " "('{0}')\n{1}.".format(h, obs) ) if h not in cache: cache[h] = part else: part = cache[h] if part == part_requested: accumul.append(obs) if len(accumul) >= static_schema[2]: dfo = pandas.DataFrame(accumul, columns=static_schema[0]) self.ensure_dtype(dfo, static_schema[1]) iy += dfo.shape[0] accumul.clear() yield dfo else: for df in self: for obs in df.itertuples(index=False, name=None): h = h11(obs) part = cache.get(h) if part is None: raise ValueError( # pragma: no cover f"Second iteration. A row was " f"never met in the first one\n{obs}" ) if part == part_requested: accumul.append(obs) if len(accumul) >= static_schema[2]: dfo = pandas.DataFrame(accumul, columns=static_schema[0]) self.ensure_dtype(dfo, static_schema[1]) iy += dfo.shape[0] accumul.clear() yield dfo if len(accumul) > 0: dfo = pandas.DataFrame(accumul, columns=static_schema[0]) self.ensure_dtype(dfo, static_schema[1]) iy += dfo.shape[0] yield dfo return ( self.__class__(lambda: iterator_internal(0)), self.__class__(lambda: iterator_internal(1)), )