import hashlib
import struct
import warnings
import numpy
from pandas import DataFrame, Index, Series
def numpy_types():
"""
Returns the list of :epkg:`numpy` available types.
:return: list of types
"""
return [
numpy.bool_,
numpy.int_,
numpy.intc,
numpy.intp,
numpy.int8,
numpy.int16,
numpy.int32,
numpy.int64,
numpy.uint8,
numpy.uint16,
numpy.uint32,
numpy.uint64,
numpy.float16,
numpy.float32,
numpy.float64,
numpy.complex64,
numpy.complex128,
]
def hash_str(c, hash_length):
"""
Hashes a string.
@param c value to hash
@param hash_length hash_length
@return string
"""
if isinstance(c, float):
if numpy.isnan(c):
return c
raise ValueError(f"numpy.nan expected, not {c}")
m = hashlib.sha256()
m.update(c.encode("utf-8"))
r = m.hexdigest()
if len(r) >= hash_length:
return r[:hash_length]
return r
def hash_int(c, hash_length):
"""
Hashes an integer into an integer.
@param c value to hash
@param hash_length hash_length
@return int
"""
if isinstance(c, float):
if numpy.isnan(c):
return c
else:
raise ValueError(f"numpy.nan expected, not {c}")
else:
b = struct.pack("i", c)
m = hashlib.sha256()
m.update(b)
r = m.hexdigest()
if len(r) >= hash_length:
r = r[:hash_length]
return int(r, 16) % (10**8)
def hash_float(c, hash_length):
"""
Hashes a float into a float.
@param c value to hash
@param hash_length hash_length
@return int
"""
if numpy.isnan(c):
return c
else:
b = struct.pack("d", c)
m = hashlib.sha256()
m.update(b)
r = m.hexdigest()
if len(r) >= hash_length:
r = r[:hash_length]
i = int(r, 16) % (2**53)
return float(i)
[docs]
def dataframe_hash_columns(df, cols=None, hash_length=10, inplace=False):
"""
Hashes a set of columns in a dataframe.
Keeps the same type. Skips missing values.
@param df dataframe
@param cols columns to hash or None for alls.
@param hash_length for strings only, length of the hash
@param inplace modifies inplace
@return new dataframe
This might be useful to anonimized data before
making it public.
.. exref::
:title: Hashes a set of columns in a dataframe
:tag: dataframe
.. runpython::
:showcode:
import pandas
from pandas_streaming.df import dataframe_hash_columns
df = pandas.DataFrame([dict(a=1, b="e", c=5.6, ind="a1", ai=1),
dict(b="f", c=5.7, ind="a2", ai=2),
dict(a=4, b="g", ind="a3", ai=3),
dict(a=8, b="h", c=5.9, ai=4),
dict(a=16, b="i", c=6.2, ind="a5", ai=5)])
print(df)
print('--------------')
df2 = dataframe_hash_columns(df)
print(df2)
"""
if cols is None:
cols = list(df.columns)
if not inplace:
df = df.copy()
def hash_intl(c):
"hash int"
return hash_int(c, hash_length)
def hash_strl(c):
"hash string"
return hash_str(c, hash_length)
def hash_floatl(c):
"hash float"
return hash_float(c, hash_length)
coltype = dict(zip(df.columns, df.dtypes))
for c in cols:
t = coltype[c]
if t == int: # noqa: E721
df[c] = df[c].apply(hash_intl)
elif t == numpy.int64:
df[c] = df[c].apply(lambda x: numpy.int64(hash_intl(x)))
elif t == float: # noqa: E721
df[c] = df[c].apply(hash_floatl)
elif t == object: # noqa: E721
df[c] = df[c].apply(hash_strl)
else:
raise NotImplementedError( # pragma: no cover
f"Conversion of type {t} in column '{c}' is not implemented"
)
return df
[docs]
def dataframe_unfold(df, col, new_col=None, sep=","):
"""
One column may contain concatenated values.
This function splits these values and multiplies the
rows for each split value.
@param df dataframe
@param col column with the concatenated values (strings)
@param new_col new column name, if None, use default value.
@param sep separator
@return a new dataframe
.. exref::
:title: Unfolds a column of a dataframe.
:tag: dataframe
.. runpython::
:showcode:
import pandas
import numpy
from pandas_streaming.df import dataframe_unfold
df = pandas.DataFrame([dict(a=1, b="e,f"),
dict(a=2, b="g"),
dict(a=3)])
print(df)
df2 = dataframe_unfold(df, "b")
print('----------')
print(df2)
# To fold:
folded = df2.groupby('a').apply(
lambda row: ','.join(row['b_unfold'].dropna())
if len(row['b_unfold'].dropna()) > 0 else numpy.nan)
print('----------')
print(folded)
"""
if new_col is None:
col_name = col + "_unfold"
else:
col_name = new_col
temp_col = "__index__"
while temp_col in df.columns:
temp_col += "_"
rows = []
for i, v in enumerate(df[col]):
if isinstance(v, str):
spl = v.split(sep)
for vs in spl:
rows.append({col: v, col_name: vs, temp_col: i})
else:
rows.append({col: v, col_name: v, temp_col: i})
df = df.copy()
df[temp_col] = list(range(df.shape[0]))
dfj = DataFrame(rows)
res = df.merge(dfj, on=[col, temp_col])
return res.drop(temp_col, axis=1).copy()
[docs]
def dataframe_shuffle(df, random_state=None):
"""
Shuffles a dataframe.
:param df: :epkg:`pandas:DataFrame`
:param random_state: seed
:return: new :epkg:`pandas:DataFrame`
.. exref::
:title: Shuffles the rows of a dataframe
:tag: dataframe
.. runpython::
:showcode:
import pandas
from pandas_streaming.df import dataframe_shuffle
df = pandas.DataFrame([dict(a=1, b="e", c=5.6, ind="a1"),
dict(a=2, b="f", c=5.7, ind="a2"),
dict(a=4, b="g", c=5.8, ind="a3"),
dict(a=8, b="h", c=5.9, ind="a4"),
dict(a=16, b="i", c=6.2, ind="a5")])
print(df)
print('----------')
shuffled = dataframe_shuffle(df, random_state=0)
print(shuffled)
"""
if random_state is not None:
state = numpy.random.RandomState(random_state)
permutation = state.permutation
else:
permutation = numpy.random.permutation
ori_cols = list(df.columns)
scols = set(ori_cols)
no_index = df.reset_index(drop=False)
keep_cols = [_ for _ in no_index.columns if _ not in scols]
index = no_index.index
index = permutation(index)
shuffled = no_index.iloc[index, :]
res = shuffled.set_index(keep_cols)[ori_cols]
res.index.names = df.index.names
return res
def pandas_fillna(df, by, hasna=None, suffix=None):
"""
Replaces the :epkg:`nan` values for something not :epkg:`nan`.
Mostly used by @see fn pandas_groupby_nan.
:param df: dataframe
:param by: list of columns for which we need to replace nan
:param hasna: None or list of columns for which we need to replace NaN
:param suffix: use a prefix for the NaN value
:return: list of values chosen for each column, new dataframe (new copy)
"""
suffix = suffix if suffix else "²nan"
df = df.copy()
rep = {}
for c in by:
if hasna is not None and c not in hasna:
continue
if df[c].dtype in (str, bytes, object):
se = set(df[c].dropna())
val = se.pop()
if isinstance(val, str):
cst = suffix
val = ""
elif isinstance(val, bytes):
cst = b"_"
else:
raise TypeError( # pragma: no cover
"Unable to determine a constant for type='{0}' dtype='{1}'".format( # noqa: UP030
val, df[c].dtype
)
)
val += cst
while val in se:
val += suffix
df[c].fillna(val, inplace=True)
rep[c] = val
else:
dr = df[c].dropna()
mi = abs(dr.min())
ma = abs(dr.max())
val = ma + mi
if val == ma and not isinstance(val, str):
val += ma + 1.0
if val <= ma:
raise ValueError( # pragma: no cover
"Unable to find a different value for column '{}' v='{}: "
"min={} max={}".format(c, val, mi, ma)
)
df[c].fillna(val, inplace=True)
rep[c] = val
return rep, df
[docs]
def pandas_groupby_nan(
df, by, axis=0, as_index=False, suffix=None, nanback=True, **kwargs
):
"""
Does a *groupby* including keeping missing values (:epkg:`nan`).
:param df: dataframe
:param by: column or list of columns
:param axis: only 0 is allowed
:param as_index: should be False
:param suffix: None or a string
:param nanback: put :epkg:`nan` back in the index,
otherwise it leaves a replacement for :epkg:`nan`.
(does not work when grouping by multiple columns)
:param kwargs: other parameters sent to
`groupby <http://pandas.pydata.org/pandas-docs/stable/
generated/pandas.DataFrame.groupby.html>`_
:return: groupby results
See :epkg:`groupby and missing values`.
If no :epkg:`nan` is detected, the function falls back in regular
:epkg:`pandas:DataFrame:groupby` which has the following
behavior.
.. exref::
:title: Group a dataframe by one column including nan values
:tag: dataframe
The regular :epkg:`pandas:dataframe:GroupBy` of a
:epkg:`pandas:DataFrame` removes every :epkg:`nan`
values from the index.
.. runpython::
:showcode:
from pandas import DataFrame
data = [dict(a=2, ind="a", n=1),
dict(a=2, ind="a"),
dict(a=3, ind="b"),
dict(a=30)]
df = DataFrame(data)
print(df)
gr = df.groupby(["ind"]).sum()
print(gr)
Function @see fn pandas_groupby_nan modifies keeps them.
.. runpython::
:showcode:
from pandas import DataFrame
from pandas_streaming.df import pandas_groupby_nan
data = [dict(a=2, ind="a", n=1),
dict(a=2, ind="a"),
dict(a=3, ind="b"),
dict(a=30)]
df = DataFrame(data)
gr2 = pandas_groupby_nan(df, ["ind"]).sum()
print(gr2)
"""
if nanback and suffix is None:
try:
res = df.groupby(by, axis=axis, as_index=as_index, dropna=False, **kwargs)
except TypeError:
# old version of pandas
res = None
if res is not None:
if suffix is None:
return res
res.index = Series(res.index).replace(numpy.nan, suffix)
return res
if axis != 0:
raise NotImplementedError("axis should be 0")
if as_index:
raise NotImplementedError("as_index must be False")
if isinstance(by, tuple):
raise TypeError("by should be of list not tuple")
if not isinstance(by, list):
by = [by]
hasna = {}
for b in by:
h = df[b].isnull().values.any()
if h:
hasna[b] = True
if len(hasna) > 0:
rep, df_copy = pandas_fillna(df, by, hasna, suffix=suffix)
res = df_copy.groupby(by, axis=axis, as_index=as_index, **kwargs)
if len(by) == 1:
if not nanback:
dummy = DataFrame([{"a": "a"}])
do = dummy.dtypes[0]
typ = dict(zip(df.columns, df.dtypes))
if typ[by[0]] != do:
warnings.warn( # pragma: no cover
f"[pandas_groupby_nan] NaN value: {rep}", stacklevel=0
)
return res
for b in by:
fnan = rep[b]
if fnan in res.grouper.groups:
res.grouper.groups[numpy.nan] = res.grouper.groups[fnan]
del res.grouper.groups[fnan]
new_val = [
(numpy.nan if b == fnan else b) for b in res.grouper.result_index
]
res.grouper.groupings[0]._group_index = Index(new_val)
res.grouper.groupings[0].obj[b].replace(fnan, numpy.nan, inplace=True)
if hasattr(res.grouper, "grouping"):
if isinstance(res.grouper.groupings[0].grouper, numpy.ndarray):
arr = numpy.array(new_val)
res.grouper.groupings[0].grouper = arr
if (
hasattr(res.grouper.groupings[0], "_cache")
and "result_index" in res.grouper.groupings[0]._cache
):
del res.grouper.groupings[0]._cache["result_index"]
else:
raise NotImplementedError(
"Not implemented for type: {0}".format( # noqa: UP030
type(res.grouper.groupings[0].grouper)
)
)
else:
grouper = res.grouper._get_grouper()
if isinstance(grouper, numpy.ndarray):
arr = numpy.array(new_val)
res.grouper.groupings[0].grouping_vector = arr
if (
hasattr(res.grouper.groupings[0], "_cache")
and "result_index" in res.grouper.groupings[0]._cache
):
index = res.grouper.groupings[0]._cache["result_index"]
if len(rep) == 1:
key = list(rep.values())[0] # noqa: RUF015
new_index = numpy.array(index)
for i in range(len(new_index)):
if new_index[i] == key:
new_index[i] = numpy.nan
res.grouper.groupings[0]._cache["result_index"] = (
index.__class__(new_index)
)
else:
raise NotImplementedError( # pragma: no cover
"NaN values not implemented for multiindex."
)
else:
raise NotImplementedError( # pragma: no cover
"Not implemented for type: {0}".format( # noqa: UP030
type(res.grouper.groupings[0].grouper)
)
)
res.grouper._cache["result_index"] = res.grouper.groupings[
0
]._group_index
else:
if not nanback:
dummy = DataFrame([{"a": "a"}])
do = dummy.dtypes[0]
typ = dict(zip(df.columns, df.dtypes))
for b in by:
if typ[b] != do:
warnings.warn( # pragma: no cover
f"[pandas_groupby_nan] NaN values: {rep}", stacklevel=0
)
break
return res
raise NotImplementedError(
"Not yet implemented. Replacing pseudo nan values by real nan "
"values is not as easy as it looks. Use nanback=False"
)
# keys = list(res.grouper.groups.keys())
# didit = False
# mapping = {}
# for key in keys:
# new_key = list(key)
# mod = False
# for k, b in enumerate(by):
# if b not in rep:
# continue
# fnan = rep[b]
# if key[k] == fnan:
# new_key[k] = numpy.nan
# mod = True
# didit = True
# mapping[fnan] = numpy.nan
# if mod:
# new_key = tuple(new_key)
# mapping[key] = new_key
# res.grouper.groups[new_key] = res.grouper.groups[key]
# del res.grouper.groups[key]
# if didit:
# # this code deos not work
# vnan = numpy.nan
# new_index = list(mapping.get(v, v)
# for v in res.grouper.result_index)
# names = res.grouper.result_index.names
# # index = MultiIndex.from_tuples(tuples=new_index, names=names)
# # res.grouper.result_index = index # does not work cannot set
# # values for [result_index]
# for k in range(len(res.grouper.groupings)):
# grou = res.grouper.groupings[k]
# new_val = list(mapping.get(v, v) for v in grou)
# grou._group_index = Index(new_val)
# b = names[k]
# if b in rep:
# vv = rep[b]
# grou.obj[b].replace(vv, vnan, inplace=True)
# if isinstance(grou.grouper, numpy.ndarray):
# grou.grouper = numpy.array(new_val)
# else:
# raise NotImplementedError(
# "Not implemented for type: {0}".format(
# type(grou.grouper)))
# del res.grouper._cache
return res
return df.groupby(by, axis=axis, **kwargs)