Examples

About array

  1. Saves and reads a numpy array in a zip file

Saves and reads a numpy array in a zip file

This shows an example on how to save and read a numpy.ndarray directly into a zip file.

<<<

import numpy
from pandas_streaming.df import to_zip, read_zip

arr = numpy.array([[0.5, 1.5], [0.4, 1.6]])

name = "dfsa.zip"
to_zip(arr, name, "arr.npy")
arr2 = read_zip(name, "arr.npy")
print(arr2)

>>>

    [[0.5 1.5]
     [0.4 1.6]]

(original entry : dataframe_io.py:docstring of pandas_streaming.df.dataframe_io.to_zip, line 32)

About DataFrame

  1. Group a dataframe by one column including nan values

  2. Hashes a set of columns in a dataframe

  3. Saves and reads a dataframe in a zip file

  4. Shuffles the rows of a dataframe

  5. Splits a dataframe, keep ids in separate partitions

  6. Unfolds a column of a dataframe.

Group a dataframe by one column including nan values

The regular pandas.dataframe.GroupBy of a pandas.DataFrame removes every nan values from the index.

<<<

from pandas import DataFrame

data = [dict(a=2, ind="a", n=1), dict(a=2, ind="a"), dict(a=3, ind="b"), dict(a=30)]
df = DataFrame(data)
print(df)
gr = df.groupby(["ind"]).sum()
print(gr)

>>>

        a  ind    n
    0   2    a  1.0
    1   2    a  NaN
    2   3    b  NaN
    3  30  NaN  NaN
         a    n
    ind        
    a    4  1.0
    b    3  0.0

Function @see fn pandas_groupby_nan modifies keeps them.

<<<

from pandas import DataFrame
from pandas_streaming.df import pandas_groupby_nan

data = [dict(a=2, ind="a", n=1), dict(a=2, ind="a"), dict(a=3, ind="b"), dict(a=30)]
df = DataFrame(data)
gr2 = pandas_groupby_nan(df, ["ind"]).sum()
print(gr2)

>>>

    /home/xadupre/github/pandas-streaming/pandas_streaming/df/dataframe_helpers.py:394: FutureWarning: The 'axis' keyword in DataFrame.groupby is deprecated and will be removed in a future version.
      res = df.groupby(by, axis=axis, as_index=as_index, dropna=False, **kwargs)
       ind   a    n
    0    a   4  1.0
    1    b   3  0.0
    2  NaN  30  0.0

(original entry : dataframe_helpers.py:docstring of pandas_streaming.df.dataframe_helpers.pandas_groupby_nan, line 21)

Hashes a set of columns in a dataframe

<<<

import pandas
from pandas_streaming.df import dataframe_hash_columns

df = pandas.DataFrame(
    [
        dict(a=1, b="e", c=5.6, ind="a1", ai=1),
        dict(b="f", c=5.7, ind="a2", ai=2),
        dict(a=4, b="g", ind="a3", ai=3),
        dict(a=8, b="h", c=5.9, ai=4),
        dict(a=16, b="i", c=6.2, ind="a5", ai=5),
    ]
)
print(df)
print("--------------")
df2 = dataframe_hash_columns(df)
print(df2)

>>>

          a  b    c  ind  ai
    0   1.0  e  5.6   a1   1
    1   NaN  f  5.7   a2   2
    2   4.0  g  NaN   a3   3
    3   8.0  h  5.9  NaN   4
    4  16.0  i  6.2   a5   5
    --------------
                  a           b             c         ind        ai
    0  4.648669e+11  3f79bb7b43  3.355454e+11  f55ff16f66  65048080
    1           NaN  252f10c836  5.803745e+11  2c3a4249d7   1214325
    2  2.750847e+11  cd0aa98561           NaN  f46dd28a54  80131111
    3  1.940968e+11  aaa9402664  9.635096e+10         NaN  19167269
    4  1.083806e+12  de7d1b721a  3.183198e+11  66220e7159   8788782

(original entry : dataframe_helpers.py:docstring of pandas_streaming.df.dataframe_helpers.dataframe_hash_columns, line 13)

Saves and reads a dataframe in a zip file

This shows an example on how to save and read a pandas.DataFrame directly into a zip file.

<<<

import pandas
from pandas_streaming.df import to_zip, read_zip

df = pandas.DataFrame([dict(a=1, b="e"), dict(b="f", a=5.7)])

name = "dfs.zip"
to_zip(df, name, encoding="utf-8", index=False)
df2 = read_zip(name, encoding="utf-8")
print(df2)

>>>

         a  b
    0  1.0  e
    1  5.7  f

(original entry : dataframe_io.py:docstring of pandas_streaming.df.dataframe_io.to_zip, line 11)

Shuffles the rows of a dataframe

<<<

import pandas
from pandas_streaming.df import dataframe_shuffle

df = pandas.DataFrame(
    [
        dict(a=1, b="e", c=5.6, ind="a1"),
        dict(a=2, b="f", c=5.7, ind="a2"),
        dict(a=4, b="g", c=5.8, ind="a3"),
        dict(a=8, b="h", c=5.9, ind="a4"),
        dict(a=16, b="i", c=6.2, ind="a5"),
    ]
)
print(df)
print("----------")

shuffled = dataframe_shuffle(df, random_state=0)
print(shuffled)

>>>

        a  b    c ind
    0   1  e  5.6  a1
    1   2  f  5.7  a2
    2   4  g  5.8  a3
    3   8  h  5.9  a4
    4  16  i  6.2  a5
    ----------
        a  b    c ind
    2   4  g  5.8  a3
    0   1  e  5.6  a1
    1   2  f  5.7  a2
    3   8  h  5.9  a4
    4  16  i  6.2  a5

(original entry : dataframe_helpers.py:docstring of pandas_streaming.df.dataframe_helpers.dataframe_shuffle, line 7)

Splits a dataframe, keep ids in separate partitions

In some data science problems, rows are not independant and share common value, most of the time ids. In some specific case, multiple ids from different columns are connected and must appear in the same partition. Testing that each id column is evenly split and do not appear in both sets in not enough. Connected components are needed.

<<<

from pandas import DataFrame
from pandas_streaming.df import train_test_connex_split

df = DataFrame(
    [
        dict(user="UA", prod="PAA", card="C1"),
        dict(user="UA", prod="PB", card="C1"),
        dict(user="UB", prod="PC", card="C2"),
        dict(user="UB", prod="PD", card="C2"),
        dict(user="UC", prod="PAA", card="C3"),
        dict(user="UC", prod="PF", card="C4"),
        dict(user="UD", prod="PG", card="C5"),
    ]
)

train, test = train_test_connex_split(
    df, test_size=0.5, groups=["user", "prod", "card"], fail_imbalanced=0.6
)

print(train)
print(test)

>>>

      user prod card  connex  weight
    0   UB   PD   C2       1       1
    1   UB   PC   C2       1       1
      user prod card  connex  weight
    0   UC  PAA   C3       0       1
    1   UD   PG   C5       2       1
    2   UA   PB   C1       0       1
    3   UC   PF   C4       0       1
    4   UA  PAA   C1       0       1

(original entry : connex_split.py:docstring of pandas_streaming.df.connex_split.train_test_connex_split, line 40)

Unfolds a column of a dataframe.

<<<

import pandas
import numpy
from pandas_streaming.df import dataframe_unfold

df = pandas.DataFrame([dict(a=1, b="e,f"), dict(a=2, b="g"), dict(a=3)])
print(df)
df2 = dataframe_unfold(df, "b")
print("----------")
print(df2)

# To fold:
folded = df2.groupby("a").apply(
    lambda row: (
        ",".join(row["b_unfold"].dropna())
        if len(row["b_unfold"].dropna()) > 0
        else numpy.nan
    )
)
print("----------")
print(folded)

>>>

       a    b
    0  1  e,f
    1  2    g
    2  3  NaN
    ----------
       a    b b_unfold
    0  1  e,f        e
    1  1  e,f        f
    2  2    g        g
    3  3  NaN      NaN
    :18: DeprecationWarning: DataFrameGroupBy.apply operated on the grouping columns. This behavior is deprecated, and in a future version of pandas the grouping columns will be excluded from the operation. Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping columns after groupby to silence this warning.
    ----------
    a
    1    e,f
    2      g
    3    NaN
    dtype: object

(original entry : dataframe_helpers.py:docstring of pandas_streaming.df.dataframe_helpers.dataframe_unfold, line 11)

About StreamingDataFrame

  1. Add a new column to a StreamingDataFrame

  2. StreamingDataFrame and groupby

  3. StreamingDataFrame and groupby

Add a new column to a StreamingDataFrame

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"]))
sdf = StreamingDataFrame.read_df(df)
sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

>>>

         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1
         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1

(original entry : dataframe.py:docstring of pandas_streaming.df.dataframe.StreamingDataFrame.add_column, line 16)

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and see StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(dict(A=[3, 4, 3], B=[5, 6, 7]))
sdf = StreamingDataFrame.read_df(df)

# The following:
print(sdf.groupby("A", lambda gr: gr.sum()))

# Is equivalent to:
print(df.groupby("A").sum())

>>>

        B
    A    
    3  12
    4   6
        B
    A    
    3  12
    4   6

(original entry : dataframe.py:docstring of pandas_streaming.df.dataframe.StreamingDataFrame.groupby, line 28)

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and see StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame
from pandas_streaming.data import dummy_streaming_dataframe

df20 = dummy_streaming_dataframe(20).to_dataframe()
df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
sgr = sdf20.groupby_streaming(
    "key", lambda gr: gr.sum(), strategy="cum", as_index=False
)
for gr in sgr:
    print()
    print(gr)

>>>

    
         key  cint    cstr
    0  False     7  s1s2s4
    1   True     3    s0s3
    
         key  cint          cstr
    0  False    27  s1s2s4s5s7s8
    1   True    18      s0s3s6s9
    
         key  cint                      cstr
    0  False    75  s1s2s4s5s7s8s10s11s13s14
    1   True    30               s0s3s6s9s12
    
         key  cint                               cstr
    0  False   127  s1s2s4s5s7s8s10s11s13s14s16s17s19
    1   True    63                  s0s3s6s9s12s15s18

(original entry : dataframe.py:docstring of pandas_streaming.df.dataframe.StreamingDataFrame.groupby_streaming, line 41)