import hashlib
import pickle
import random
import warnings
from io import StringIO
import pandas
[docs]
def sklearn_train_test_split(
    self, path_or_buf=None, export_method="to_csv", names=None, **kwargs
):
    """
    Randomly splits a dataframe into smaller pieces.
    The function returns streams of file names.
    The function relies on :func:`sklearn.model_selection.train_test_split`.
    It does not handle stratified version of it.
    :param self: see :class:`StreamingDataFrame
        <pandas_streaming.df.dataframe.StreamingDataFrame>`
    :param path_or_buf: a string, a list of strings or buffers, if it is a
        string, it must contain ``{}`` like ``partition{}.txt``
    :param export_method: method used to store the partitions, by default
        :meth:`pandas.DataFrame.to_csv`
    :param names: partitions names, by default ``('train', 'test')``
    :param kwargs: parameters for the export function and
        :func:`sklearn.model_selection.train_test_split`.
    :return: outputs of the exports functions
    The function cannot return two iterators or two
    see :class:`StreamingDataFrame
    <pandas_streaming.df.dataframe.StreamingDataFrame>`
    because running through one
    means running through the other. We can assume both
    splits do not hold in memory and we cannot run through
    the same iterator again as random draws would be different.
    We need to store the results into files or buffers.
    .. warning::
        The method *export_method* must write the data in
        mode *append* and allows stream.
    """
    if kwargs.get("stratify") is not None:
        raise NotImplementedError(  # pragma: no cover
            "No implementation yet for the stratified version."
        )
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=ImportWarning)
        from sklearn.model_selection import train_test_split
    opts = ["test_size", "train_size", "random_state", "shuffle", "stratify"]
    split_ops = {}
    for o in opts:
        if o in kwargs:
            split_ops[o] = kwargs[o]
            del kwargs[o]
    exportf_ = getattr(pandas.DataFrame, export_method)
    if export_method == "to_csv" and "mode" not in kwargs:
        exportf = lambda *a, **kw: exportf_(*a, mode="a", **kw)  # noqa: E731
    else:
        exportf = exportf_
    if isinstance(path_or_buf, str):
        if "{}" not in path_or_buf:
            raise ValueError("path_or_buf must contain {} to insert the partition name")
        if names is None:
            names = ["train", "test"]
        elif len(names) != len(path_or_buf):
            raise ValueError(  # pragma: no cover
                "names and path_or_buf must have the same length"
            )
        path_or_buf = [path_or_buf.format(n) for n in names]
    elif path_or_buf is None:
        path_or_buf = [None, None]
    else:
        if not isinstance(path_or_buf, list):
            raise TypeError(  # pragma: no cover
                "path_or_buf must be a list or a string"
            )
    bufs = []
    close = []
    for p in path_or_buf:
        if p is None:
            st = StringIO()
            cl = False
        elif isinstance(p, str):
            st = open(p, "w", encoding=kwargs.get("encoding"))  # noqa: SIM115
            cl = True
        else:
            st = p
            cl = False
        bufs.append(st)
        close.append(cl)
    for df in self:
        train, test = train_test_split(df, **split_ops)
        exportf(train, bufs[0], **kwargs)
        exportf(test, bufs[1], **kwargs)
        kwargs["header"] = False
    for b, c in zip(bufs, close):
        if c:
            b.close()
    return [
        st.getvalue() if isinstance(st, StringIO) else p
        for st, p in zip(bufs, path_or_buf)
    ] 
[docs]
def sklearn_train_test_split_streaming(
    self, test_size=0.25, train_size=None, stratify=None, hash_size=9, unique_rows=False
):
    """
    Randomly splits a dataframe into smaller pieces.
    The function returns streams of file names.
    The function relies on :func:`sklearn.model_selection.train_test_split`.
    It handles the stratified version of it.
    :param self: see :class:`StreamingDataFrame
        <pandas_streaming.df.dataframe.StreamingDataFrame>`
    :param test_size: ratio for the test partition
        (if *train_size* is not specified)
    :param train_size: ratio for the train partition
    :param stratify: column holding the stratification
    :param hash_size: size of the hash to cache information about partition
    :param unique_rows: ensures that rows are unique
    :return: Two see :class:`StreamingDataFrame
        <pandas_streaming.df.dataframe.StreamingDataFrame>`,
        one for train, one for test.
    The function returns two iterators or two
    see :class:`StreamingDataFrame
    <pandas_streaming.df.dataframe.StreamingDataFrame>`. It
    tries to do everything without writing anything on disk
    but it requires to store the repartition somehow.
    This function hashes every row and maps the hash with a part
    (train or test). This cache must hold in memory otherwise the
    function fails. The two returned iterators must not be used
    for the first time in the same time. The first time is used to
    build the cache. The function changes the order of rows if
    the parameter *stratify* is not null. The cache has a side effect:
    every exact same row will be put in the same partition.
    If that is not what you want, you should add an index column
    or a random one.
    """
    p = (1 - test_size) if test_size else None
    if train_size is not None:
        p = train_size
    n = 2 * max(1 / p, 1 / (1 - p))  # changement
    static_schema = []
    def iterator_rows():
        "iterates on rows"
        counts = {}
        memory = {}
        pos_col = None
        for df in self:
            if pos_col is None:
                static_schema.append(list(df.columns))
                static_schema.append(list(df.dtypes))
                static_schema.append(df.shape[0])
                if stratify is not None:
                    pos_col = list(df.columns).index(stratify)
                else:
                    pos_col = -1
            for obs in df.itertuples(index=False, name=None):
                strat = 0 if stratify is None else obs[pos_col]
                if strat not in memory:
                    memory[strat] = []
                memory[strat].append(obs)
                for k, v in memory.items():
                    if len(v) >= n + random.randint(0, 10):  # changement
                        vr = list(range(len(v)))
                        # on permute aléatoirement
                        random.shuffle(vr)
                        if (0, k) in counts:
                            tt = counts[1, k] + counts[0, k]
                            delta = -int(counts[0, k] - tt * p + 0.5)
                        else:
                            delta = 0
                        i = int(len(v) * p + 0.5)
                        i += delta
                        i = max(0, min(len(v), i))
                        one = set(vr[:i])
                        for d, obs_ in enumerate(v):
                            yield obs_, 0 if d in one else 1
                        if (0, k) not in counts:
                            counts[0, k] = i
                            counts[1, k] = len(v) - i
                        else:
                            counts[0, k] += i
                            counts[1, k] += len(v) - i
                        # on efface de la mémoire les informations produites
                        v.clear()
        # Lorsqu'on a fini, il faut tout de même répartir les
        # observations stockées.
        for k, v in memory.items():
            vr = list(range(len(v)))
            # on permute aléatoirement
            random.shuffle(vr)
            if (0, k) in counts:
                tt = counts[1, k] + counts[0, k]
                delta = -int(counts[0, k] - tt * p + 0.5)
            else:
                delta = 0
            i = int(len(v) * p + 0.5)
            i += delta
            i = max(0, min(len(v), i))
            one = set(vr[:i])
            for d, obs in enumerate(v):
                yield obs, 0 if d in one else 1
            if (0, k) not in counts:
                counts[0, k] = i
                counts[1, k] = len(v) - i
            else:
                counts[0, k] += i
                counts[1, k] += len(v) - i
    def h11(w):
        "pickle and hash"
        b = pickle.dumps(w)
        return hashlib.md5(b).hexdigest()[:hash_size]
    # We store the repartition in a cache.
    cache = {}
    def iterator_internal(part_requested):
        "internal iterator on dataframes"
        iy = 0
        accumul = []
        if len(cache) == 0:
            for obs, part in iterator_rows():
                h = h11(obs)
                if unique_rows and h in cache:
                    raise ValueError(
                        "A row or at least its hash is already cached. "  # noqa: UP030
                        "Increase hash_size or check for duplicates "
                        "('{0}')\n{1}.".format(h, obs)
                    )
                if h not in cache:
                    cache[h] = part
                else:
                    part = cache[h]
                if part == part_requested:
                    accumul.append(obs)
                    if len(accumul) >= static_schema[2]:
                        dfo = pandas.DataFrame(accumul, columns=static_schema[0])
                        self.ensure_dtype(dfo, static_schema[1])
                        iy += dfo.shape[0]
                        accumul.clear()
                        yield dfo
        else:
            for df in self:
                for obs in df.itertuples(index=False, name=None):
                    h = h11(obs)
                    part = cache.get(h)
                    if part is None:
                        raise ValueError(  # pragma: no cover
                            f"Second iteration. A row was "
                            f"never met in the first one\n{obs}"
                        )
                    if part == part_requested:
                        accumul.append(obs)
                        if len(accumul) >= static_schema[2]:
                            dfo = pandas.DataFrame(accumul, columns=static_schema[0])
                            self.ensure_dtype(dfo, static_schema[1])
                            iy += dfo.shape[0]
                            accumul.clear()
                            yield dfo
        if len(accumul) > 0:
            dfo = pandas.DataFrame(accumul, columns=static_schema[0])
            self.ensure_dtype(dfo, static_schema[1])
            iy += dfo.shape[0]
            yield dfo
    return (
        self.__class__(lambda: iterator_internal(0)),
        self.__class__(lambda: iterator_internal(1)),
    )