pandas_streaming.df.dataframe

StreamingDataFrameSchemaError

class pandas_streaming.df.dataframe.StreamingDataFrameSchemaError[source][source]

Reveals an issue with inconsistant schemas.

StreamingDataFrame

class pandas_streaming.df.dataframe.StreamingDataFrame(iter_creation, check_schema=True, stable=True)[source][source]

Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on dataframe. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop:

for df in self:  # self is a StreamingDataFrame
    # ...

The constructor cannot receive an iterator otherwise this class would be able to walk through the data only once. The main reason is it is impossible to pickle (or dill) an iterator: it cannot be replicated. Instead, the class takes a function which generates an iterator on DataFrame. Most of the methods returns either a DataFrame either a see StreamingDataFrame. In the second case, methods can be chained.

By default, the object checks that the schema remains the same between two chunks. This can be disabled by setting check_schema=False in the constructor.

The user should expect the data to remain stable. Every loop should produce the same data. However, in some situations, it is more efficient not to keep that constraints. Draw a random @see me sample is one of these cases.

Parameters:
  • iter_creation – function which creates an iterator or an instance of see StreamingDataFrame

  • check_schema – checks that the schema is the same for every dataframe

  • stable – indicates if the dataframe remains the same whenever it is walked through

__del__()[source][source]

Calls every function in _delete_.

__getitem__(*args)[source][source]

Implements some of the functionalities pandas offers for the operator [].

__init__(iter_creation, check_schema=True, stable=True)[source][source]
__iter__()[source][source]

Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a copy of the initial iterator and restores it after the end of the iterations. If check_schema was enabled when calling the constructor, the method checks that every DataFrame follows the same schema as the first chunck.

Even with a big chunk size, it might happen that consecutive chunks might detect different type for one particular column. An error message shows up saying Column types are different after row with more information about the column which failed. In that case, pandas.DataFrame.read_csv can overwrite the type on one column by specifying dtype={column_name: new_type}. It frequently happens when a string column has many missing values.

__setitem__(index, value)[source][source]

Limited set of operators are supported.

__weakref__

list of weak references to the object (if defined)

add_column(col, value)[source][source]

Implements some of the functionalities pandas offers for the operator [].

Parameters:
Returns:

see StreamingDataFrame

..note:

If value is a see :class:`StreamingDataFrame
<pandas_streaming.df.dataframe.StreamingDataFrame>`,
*chunksize* must be the same for both.

Add a new column to a StreamingDataFrame

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"]))
sdf = StreamingDataFrame.read_df(df)
sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

>>>

         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1
         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1
apply(*args, **kwargs) StreamingDataFrame[source][source]

Applies pandas.DataFrame.apply. This function returns a see StreamingDataFrame.

applymap(*args, **kwargs) StreamingDataFrame[source][source]

Applies pandas.DataFrame.applymap. This function returns a see StreamingDataFrame.

property columns

See pandas.DataFrame.columns.

concat(others, axis=0) StreamingDataFrame[source][source]

Concatenates dataframes. The function ensures all pandas.DataFrame or see StreamingDataFrame share the same columns (name and type). Otherwise, the function fails as it cannot guess the schema without walking through all dataframes.

Parameters:
  • others – list, enumeration, pandas.DataFrame

  • axis – concatenate by rows (0) or by columns (1)

Returns:

see StreamingDataFrame

describe(percentiles=None, include=None, exclude=None)[source][source]

Calls pandas.DataFrame.describe on every piece of the datasets. percentiles are not really accurate but just an indication.

Parameters:
Returns:

pandas.DataFrame.describe

Changed in version 0.3.219: Parameter datetime_is_numeric was removed (see pandas.DataFrame.describe).

drop(labels=None, *, axis=0, index=None, columns=None, level=None, inplace=False, errors='raise') StreamingDataFrame[source][source]

Applies pandas.DataFrame.drop. This function returns a see StreamingDataFrame.

property dtypes

See pandas.DataFrame.dtypes.

ensure_dtype(df, dtypes)[source][source]

Ensures the dataframe df has types indicated in dtypes. Changes it if not.

Parameters:
  • df – dataframe

  • dtypes – list of types

Returns:

updated?

fillna(**kwargs)[source][source]

Replaces the missing values, calls pandas.DataFrame.fillna.

Parameters:

kwargs – see pandas.DataFrame.fillna()

Returns:

see StreamingDataFrame

Warning

The function does not check what happens at the limit of every chunk of data. Anything but a constant value will probably have an inconsistent behaviour.

get_kwargs()[source][source]

Returns the parameters used to call the constructor.

groupby(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, **kwargs) DataFrame[source][source]

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.

Parameters:
  • by – see pandas.DataFrame.groupby

  • in_memory – in-memory algorithm

  • lambda_agg – aggregation function, sum by default

  • lambda_agg_agg – to aggregate the aggregations, sum by default

  • kwargs – additional parameters for pandas.DataFrame.groupby

Returns:

pandas.DataFrame

As the input see StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.

Parameter lambda_agg is lambda gr: gr.sum() by default. It could also be lambda gr: gr.max() or lambda gr: gr.min() but not lambda gr: gr.mean() as it would lead to incoherent results.

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and see StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(dict(A=[3, 4, 3], B=[5, 6, 7]))
sdf = StreamingDataFrame.read_df(df)

# The following:
print(sdf.groupby("A", lambda gr: gr.sum()))

# Is equivalent to:
print(df.groupby("A").sum())

>>>

        B
    A    
    3  12
    4   6
        B
    A    
    3  12
    4   6
groupby_streaming(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, strategy='cum', **kwargs) DataFrame[source][source]

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.

Parameters:
  • by – see pandas.DataFrame.groupby

  • in_memory – in-memory algorithm

  • lambda_agg – aggregation function, sum by default

  • lambda_agg_agg – to aggregate the aggregations, sum by default

  • kwargs – additional parameters for pandas.DataFrame.groupby

  • strategy'cum', or 'streaming', see below

Returns:

pandas.DataFrame

As the input see StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.

Parameter lambda_agg is lambda gr: gr.sum() by default. It could also be lambda gr: gr.max() or lambda gr: gr.min() but not lambda gr: gr.mean() as it would lead to incoherent results.

Parameter strategy allows three scenarios. First one if strategy is None goes through the whole datasets to produce a final DataFrame. Second if strategy=='cum' returns a see StreamingDataFrame, each iteration produces the current status of the group by. Last case, strategy=='streaming' produces DataFrame which must be concatenated into a single DataFrame and grouped again to get the results.

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and see StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame
from pandas_streaming.data import dummy_streaming_dataframe

df20 = dummy_streaming_dataframe(20).to_dataframe()
df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
sgr = sdf20.groupby_streaming(
    "key", lambda gr: gr.sum(), strategy="cum", as_index=False
)
for gr in sgr:
    print()
    print(gr)

>>>

    
         key  cint    cstr
    0  False     7  s1s2s4
    1   True     3    s0s3
    
         key  cint          cstr
    0  False    27  s1s2s4s5s7s8
    1   True    18      s0s3s6s9
    
         key  cint                      cstr
    0  False    75  s1s2s4s5s7s8s10s11s13s14
    1   True    30               s0s3s6s9s12
    
         key  cint                               cstr
    0  False   127  s1s2s4s5s7s8s10s11s13s14s16s17s19
    1   True    63                  s0s3s6s9s12s15s18
head(n=5) DataFrame[source][source]

Returns the first rows as a DataFrame.

is_stable(do_check=False, n=10)[source][source]

Tells if the dataframe is supposed to be stable.

Parameters:
  • do_check – do not trust the value sent to the constructor

  • n – number of rows used to check the stability, None for all rows

Returns:

boolean

do_check=True means the methods checks the first n rows remains the same for two iterations.

iterrows()[source][source]

See pandas.DataFrame.iterrows.

merge(right, **kwargs) StreamingDataFrame[source][source]

Merges two see StreamingDataFrame and returns see StreamingDataFrame. right can be either a see StreamingDataFrame or simply a pandas.DataFrame. It calls pandas.DataFrame.merge in a double loop, loop on self, loop on right.

static read_csv(*args, **kwargs) StreamingDataFrame[source][source]

Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc. If not specified, it will be equal to 100000.

static read_df(df, chunksize=None, check_schema=True) StreamingDataFrame[source][source]

Splits a DataFrame into small chunks mostly for unit testing purposes.

Parameters:
  • dfpandas.DataFrame

  • chunksize – number rows per chunks (// 10 by default)

  • check_schema – check schema between two iterations

Returns:

iterator on see StreamingDataFrame

static read_json(*args, chunksize=100000, flatten=False, **kwargs) StreamingDataFrame[source][source]

Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. The important parameter is chunksize which defines the number of rows to parse in a single bloc and it must be defined to return an iterator. If lines is True, the function falls back into pandas.read_json, otherwise it used @see fn enumerate_json_items. If lines is 'stream', enumerate_json_items is called with parameter lines=True. Parameter flatten uses the trick described at Flattening JSON objects in Python. Examples:

<<<

from io import BytesIO
from pandas_streaming.df import StreamingDataFrame

data = b"""{"a": 1, "b": 2}
           {"a": 3, "b": 4}"""
it = StreamingDataFrame.read_json(BytesIO(data), lines=True)
dfs = list(it)
print(dfs)

>>>

    [   a  b
    0  1  2
    1  3  4]

<<<

from io import BytesIO
from pandas_streaming.df import StreamingDataFrame

data = b"""[{"a": 1,
             "b": 2},
            {"a": 3,
             "b": 4}]"""

it = StreamingDataFrame.read_json(BytesIO(data))
dfs = list(it)
print(dfs)

>>>

    [   a  b
    0  1  2
    1  3  4]

The parsed json must have an empty line at the end otherwise the following exception is raised: ijson.common.IncompleteJSONError: parse error: unallowed token at this point in JSON text.

static read_str(text, **kwargs) StreamingDataFrame[source][source]

Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc.

sample(reservoir=False, cache=False, **kwargs) StreamingDataFrame[source][source]

See pandas.DataFrame.sample(). Only frac is available, otherwise choose :meth`reservoir_sampling`. This function returns a see StreamingDataFrame.

Parameters:

If cache is True, the sample is cached (assuming it holds in memory). The second time an iterator walks through the

property shape

This is the kind of operations you do not want to do when a file is large because it goes through the whole stream just to get the number of rows.

sort_values(by, axis=0, ascending=True, kind='quicksort', na_position='last', temp_file='_pandas_streaming_sort_values_')[source][source]

Sorts the streaming dataframe by values.

Parameters:
  • by – one column

  • ascending – order

  • kind – see pandas.DataFrame.sort_values()

  • na_position – see pandas.DataFrame.sort_values()

  • temp_file – sorting a whole database is impossible without storing intermediate results on disk unless it can fit into the memory, but in that case, it is easier to convert the streaming database into a dataframe and sort it

Returns:

streaming database

tail(n=5) DataFrame[source][source]

Returns the last rows as a DataFrame. The size of chunks must be greater than n to get n lines. This method is not efficient because the whole dataset must be walked through.

to_csv(path_or_buf=None, **kwargs) StreamingDataFrame[source][source]

Saves the DataFrame into string. See pandas.DataFrame.to_csv.

to_dataframe() DataFrame[source][source]

Converts everything into a single DataFrame.

to_df() DataFrame[source][source]

Converts everything into a single DataFrame.

train_test_split(path_or_buf=None, export_method='to_csv', names=None, streaming=True, partitions=None, **kwargs)[source][source]

Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It chooses one of the options from module dataframe_split.

Parameters:
  • path_or_buf – a string, a list of strings or buffers, if it is a string, it must contain {} like partition{}.txt, if None, the function returns strings.

  • export_method – method used to store the partitions, by default pandas.DataFrame.to_csv(), additional parameters will be given to that function

  • names – partitions names, by default ('train', 'test')

  • kwargs – parameters for the export function and sklearn.model_selection.train_test_split().

  • streaming – the function switches to a streaming version of the algorithm.

  • partitions – splitting partitions

Returns:

outputs of the exports functions or two see class StreamingDataFrame if path_or_buf is None.

The streaming version of this algorithm is implemented by function sklearn_train_test_split_streaming. Its documentation indicates the limitation of the streaming version and gives some insights about the additional parameters.

where(*args, **kwargs) StreamingDataFrame[source][source]

Applies pandas.DataFrame.where. inplace must be False. This function returns a see StreamingDataFrame.

StreamingSeries

class pandas_streaming.df.dataframe.StreamingSeries(iter_creation, check_schema=True, stable=True)[source][source]

Seens as a see StreamingDataFrame of one column.

apply(*args, **kwargs) StreamingDataFrame[source][source]

Applies pandas.Series.apply. This function returns a @see cl StreamingSeries.