pandas_streaming.df.dataframe¶
StreamingDataFrameSchemaError¶
StreamingDataFrame¶
- class pandas_streaming.df.dataframe.StreamingDataFrame(iter_creation, check_schema=True, stable=True)[source][source]¶
Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on dataframe. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop:
for df in self: # self is a StreamingDataFrame # ...
The constructor cannot receive an iterator otherwise this class would be able to walk through the data only once. The main reason is it is impossible to
pickle
(or dill) an iterator: it cannot be replicated. Instead, the class takes a function which generates an iterator on DataFrame. Most of the methods returns either a DataFrame either a seeStreamingDataFrame
. In the second case, methods can be chained.By default, the object checks that the schema remains the same between two chunks. This can be disabled by setting check_schema=False in the constructor.
The user should expect the data to remain stable. Every loop should produce the same data. However, in some situations, it is more efficient not to keep that constraints. Draw a random @see me sample is one of these cases.
- Parameters:
iter_creation – function which creates an iterator or an instance of see
StreamingDataFrame
check_schema – checks that the schema is the same for every dataframe
stable – indicates if the dataframe remains the same whenever it is walked through
- __getitem__(*args)[source][source]¶
Implements some of the functionalities pandas offers for the operator
[]
.
- __iter__()[source][source]¶
Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a copy of the initial iterator and restores it after the end of the iterations. If check_schema was enabled when calling the constructor, the method checks that every DataFrame follows the same schema as the first chunck.
Even with a big chunk size, it might happen that consecutive chunks might detect different type for one particular column. An error message shows up saying
Column types are different after row
with more information about the column which failed. In that case, pandas.DataFrame.read_csv can overwrite the type on one column by specifyingdtype={column_name: new_type}
. It frequently happens when a string column has many missing values.
- __weakref__¶
list of weak references to the object (if defined)
- add_column(col, value)[source][source]¶
Implements some of the functionalities pandas offers for the operator
[]
.- Parameters:
col – new column
value – see
StreamingDataFrame
or a lambda function
- Returns:
..note:
If value is a see :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`, *chunksize* must be the same for both.
Add a new column to a StreamingDataFrame
<<<
from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"])) sdf = StreamingDataFrame.read_df(df) sdf2 = sdf.add_column("d", lambda row: int(1)) print(sdf2.to_dataframe()) sdf2 = sdf.add_column("d", lambda row: int(1)) print(sdf2.to_dataframe())
>>>
X Y d 0 4.5 a 1 1 6.0 b 1 2 7.0 c 1 X Y d 0 4.5 a 1 1 6.0 b 1 2 7.0 c 1
- apply(*args, **kwargs) StreamingDataFrame [source][source]¶
Applies pandas.DataFrame.apply. This function returns a see
StreamingDataFrame
.
- applymap(*args, **kwargs) StreamingDataFrame [source][source]¶
Applies pandas.DataFrame.applymap. This function returns a see
StreamingDataFrame
.
- property columns¶
- concat(others, axis=0) StreamingDataFrame [source][source]¶
Concatenates dataframes. The function ensures all pandas.DataFrame or see
StreamingDataFrame
share the same columns (name and type). Otherwise, the function fails as it cannot guess the schema without walking through all dataframes.- Parameters:
others – list, enumeration, pandas.DataFrame
axis – concatenate by rows (0) or by columns (1)
- Returns:
- describe(percentiles=None, include=None, exclude=None)[source][source]¶
Calls pandas.DataFrame.describe on every piece of the datasets. percentiles are not really accurate but just an indication.
- Parameters:
percentiles – see pandas.DataFrame.describe
include – see pandas.DataFrame.describe
exclude – see pandas.DataFrame.describe
- Returns:
Changed in version 0.3.219: Parameter datetime_is_numeric was removed (see pandas.DataFrame.describe).
- drop(labels=None, *, axis=0, index=None, columns=None, level=None, inplace=False, errors='raise') StreamingDataFrame [source][source]¶
Applies pandas.DataFrame.drop. This function returns a see
StreamingDataFrame
.
- property dtypes¶
- ensure_dtype(df, dtypes)[source][source]¶
Ensures the dataframe df has types indicated in dtypes. Changes it if not.
- Parameters:
df – dataframe
dtypes – list of types
- Returns:
updated?
- fillna(**kwargs)[source][source]¶
Replaces the missing values, calls pandas.DataFrame.fillna.
- Parameters:
kwargs – see
pandas.DataFrame.fillna()
- Returns:
Warning
The function does not check what happens at the limit of every chunk of data. Anything but a constant value will probably have an inconsistent behaviour.
- groupby(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, **kwargs) DataFrame [source][source]¶
Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.
- Parameters:
by – see pandas.DataFrame.groupby
in_memory – in-memory algorithm
lambda_agg – aggregation function, sum by default
lambda_agg_agg – to aggregate the aggregations, sum by default
kwargs – additional parameters for pandas.DataFrame.groupby
- Returns:
As the input see
StreamingDataFrame
does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.Parameter lambda_agg is
lambda gr: gr.sum()
by default. It could also belambda gr: gr.max()
orlambda gr: gr.min()
but notlambda gr: gr.mean()
as it would lead to incoherent results.StreamingDataFrame and groupby
Here is an example which shows how to write a simple groupby with pandas and see
StreamingDataFrame
.<<<
from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame df = DataFrame(dict(A=[3, 4, 3], B=[5, 6, 7])) sdf = StreamingDataFrame.read_df(df) # The following: print(sdf.groupby("A", lambda gr: gr.sum())) # Is equivalent to: print(df.groupby("A").sum())
>>>
B A 3 12 4 6 B A 3 12 4 6
- groupby_streaming(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, strategy='cum', **kwargs) DataFrame [source][source]¶
Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.
- Parameters:
by – see pandas.DataFrame.groupby
in_memory – in-memory algorithm
lambda_agg – aggregation function, sum by default
lambda_agg_agg – to aggregate the aggregations, sum by default
kwargs – additional parameters for pandas.DataFrame.groupby
strategy –
'cum'
, or'streaming'
, see below
- Returns:
As the input see
StreamingDataFrame
does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.Parameter lambda_agg is
lambda gr: gr.sum()
by default. It could also belambda gr: gr.max()
orlambda gr: gr.min()
but notlambda gr: gr.mean()
as it would lead to incoherent results.Parameter strategy allows three scenarios. First one if
strategy is None
goes through the whole datasets to produce a final DataFrame. Second ifstrategy=='cum'
returns a seeStreamingDataFrame
, each iteration produces the current status of the group by. Last case,strategy=='streaming'
produces DataFrame which must be concatenated into a single DataFrame and grouped again to get the results.StreamingDataFrame and groupby
Here is an example which shows how to write a simple groupby with pandas and see
StreamingDataFrame
.<<<
from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame from pandas_streaming.data import dummy_streaming_dataframe df20 = dummy_streaming_dataframe(20).to_dataframe() df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0) sdf20 = StreamingDataFrame.read_df(df20, chunksize=5) sgr = sdf20.groupby_streaming( "key", lambda gr: gr.sum(), strategy="cum", as_index=False ) for gr in sgr: print() print(gr)
>>>
key cint cstr 0 False 7 s1s2s4 1 True 3 s0s3 key cint cstr 0 False 27 s1s2s4s5s7s8 1 True 18 s0s3s6s9 key cint cstr 0 False 75 s1s2s4s5s7s8s10s11s13s14 1 True 30 s0s3s6s9s12 key cint cstr 0 False 127 s1s2s4s5s7s8s10s11s13s14s16s17s19 1 True 63 s0s3s6s9s12s15s18
- is_stable(do_check=False, n=10)[source][source]¶
Tells if the dataframe is supposed to be stable.
- Parameters:
do_check – do not trust the value sent to the constructor
n – number of rows used to check the stability, None for all rows
- Returns:
boolean
do_check=True means the methods checks the first n rows remains the same for two iterations.
- merge(right, **kwargs) StreamingDataFrame [source][source]¶
Merges two see
StreamingDataFrame
and returns seeStreamingDataFrame
. right can be either a seeStreamingDataFrame
or simply a pandas.DataFrame. It calls pandas.DataFrame.merge in a double loop, loop on self, loop on right.
- static read_csv(*args, **kwargs) StreamingDataFrame [source][source]¶
Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc. If not specified, it will be equal to 100000.
- static read_df(df, chunksize=None, check_schema=True) StreamingDataFrame [source][source]¶
Splits a DataFrame into small chunks mostly for unit testing purposes.
- Parameters:
df –
pandas.DataFrame
chunksize – number rows per chunks (// 10 by default)
check_schema – check schema between two iterations
- Returns:
iterator on see
StreamingDataFrame
- static read_json(*args, chunksize=100000, flatten=False, **kwargs) StreamingDataFrame [source][source]¶
Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. The important parameter is chunksize which defines the number of rows to parse in a single bloc and it must be defined to return an iterator. If lines is True, the function falls back into pandas.read_json, otherwise it used @see fn enumerate_json_items. If lines is
'stream'
, enumerate_json_items is called with parameterlines=True
. Parameter flatten uses the trick described at Flattening JSON objects in Python. Examples:<<<
from io import BytesIO from pandas_streaming.df import StreamingDataFrame data = b"""{"a": 1, "b": 2} {"a": 3, "b": 4}""" it = StreamingDataFrame.read_json(BytesIO(data), lines=True) dfs = list(it) print(dfs)
>>>
[ a b 0 1 2 1 3 4]
<<<
from io import BytesIO from pandas_streaming.df import StreamingDataFrame data = b"""[{"a": 1, "b": 2}, {"a": 3, "b": 4}]""" it = StreamingDataFrame.read_json(BytesIO(data)) dfs = list(it) print(dfs)
>>>
[ a b 0 1 2 1 3 4]
The parsed json must have an empty line at the end otherwise the following exception is raised: ijson.common.IncompleteJSONError: parse error: unallowed token at this point in JSON text.
- static read_str(text, **kwargs) StreamingDataFrame [source][source]¶
Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc.
- sample(reservoir=False, cache=False, **kwargs) StreamingDataFrame [source][source]¶
See
pandas.DataFrame.sample()
. Only frac is available, otherwise choose :meth`reservoir_sampling`. This function returns a seeStreamingDataFrame
.- Parameters:
reservoir – use reservoir sampling
cache – cache the sample
kwargs – additional parameters for
pandas.DataFrame.sample()
If cache is True, the sample is cached (assuming it holds in memory). The second time an iterator walks through the
- property shape¶
This is the kind of operations you do not want to do when a file is large because it goes through the whole stream just to get the number of rows.
- sort_values(by, axis=0, ascending=True, kind='quicksort', na_position='last', temp_file='_pandas_streaming_sort_values_')[source][source]¶
Sorts the streaming dataframe by values.
- Parameters:
by – one column
ascending – order
kind – see
pandas.DataFrame.sort_values()
na_position – see
pandas.DataFrame.sort_values()
temp_file – sorting a whole database is impossible without storing intermediate results on disk unless it can fit into the memory, but in that case, it is easier to convert the streaming database into a dataframe and sort it
- Returns:
streaming database
- tail(n=5) DataFrame [source][source]¶
Returns the last rows as a DataFrame. The size of chunks must be greater than
n
to getn
lines. This method is not efficient because the whole dataset must be walked through.
- to_csv(path_or_buf=None, **kwargs) StreamingDataFrame [source][source]¶
Saves the DataFrame into string. See pandas.DataFrame.to_csv.
- train_test_split(path_or_buf=None, export_method='to_csv', names=None, streaming=True, partitions=None, **kwargs)[source][source]¶
Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It chooses one of the options from module
dataframe_split
.- Parameters:
path_or_buf – a string, a list of strings or buffers, if it is a string, it must contain
{}
likepartition{}.txt
, if None, the function returns strings.export_method – method used to store the partitions, by default
pandas.DataFrame.to_csv()
, additional parameters will be given to that functionnames – partitions names, by default
('train', 'test')
kwargs – parameters for the export function and
sklearn.model_selection.train_test_split()
.streaming – the function switches to a streaming version of the algorithm.
partitions – splitting partitions
- Returns:
outputs of the exports functions or two see class StreamingDataFrame if path_or_buf is None.
The streaming version of this algorithm is implemented by function
sklearn_train_test_split_streaming
. Its documentation indicates the limitation of the streaming version and gives some insights about the additional parameters.
- where(*args, **kwargs) StreamingDataFrame [source][source]¶
Applies pandas.DataFrame.where. inplace must be False. This function returns a see
StreamingDataFrame
.
StreamingSeries¶
- class pandas_streaming.df.dataframe.StreamingSeries(iter_creation, check_schema=True, stable=True)[source][source]¶
Seens as a see
StreamingDataFrame
of one column.- apply(*args, **kwargs) StreamingDataFrame [source][source]¶
Applies pandas.Series.apply. This function returns a @see cl StreamingSeries.