Mapper, Reducers customisés avec SQL

Ce notebook propose l’utilisation de SQL avec SQLite pour manipuler les données depuis un notebook (avec le module sqlite3).

[15]:
%matplotlib inline
import matplotlib.pyplot as plt
from IPython.display import Image

Représentation

Le module pandas manipule des tables et c’est la façon la plus commune de représenter les données. Lorsque les données sont multidimensionnelles, on distingue les coordonnées des valeurs :

[2]:
Image("cube1.png")
[2]:
../../_images/practice_mapreduce_sql_map_reduce_4_0.png

Dans cet exemple, il y a :

  • 3 coordonnées : Age, Profession, Annéee

  • 2 valeurs : Espérance de vie, Population

On peut représenter les donnés également comme ceci :

[4]:
Image("cube2.png")
[4]:
../../_images/practice_mapreduce_sql_map_reduce_6_0.png

C’est assez simple. Prenons un exemple : Tables de mortalité par génération en Frances qu’on récupère à l’aide de la fonction mortality_table. C’est assez long (4-5 minutes) sur l’ensemble des données car elles doivent être prétraitées (voir la documentation de la fonction). Pour écouter, il faut utiliser le paramètre stop_at.

[3]:
from teachcompute.datasets import mortality_table

filename = mortality_table(verbose=True)
filename
[3]:
'./mortality.txt'
[6]:
import os

size = os.stat(filename).st_size
print(f"size={size / 2**20:1.3f} Mb")
size=120.594 Mb
[8]:
import pandas

df = pandas.read_csv(filename, sep="\t", encoding="utf-8", low_memory=False)
df.head()
[8]:
annee valeur age age_num indicateur genre pays
0 2021 0.00015 Y01 1.0 DEATHRATE F AL
1 2020 0.00052 Y01 1.0 DEATHRATE F AL
2 2019 0.00021 Y01 1.0 DEATHRATE F AL
3 2018 0.00067 Y01 1.0 DEATHRATE F AL
4 2017 0.00046 Y01 1.0 DEATHRATE F AL

Les indicateurs pour deux âges différents :

[9]:
df[
    ((df.age == "Y60") | (df.age == "Y61"))
    & (df.annee == 2000)
    & (df.pays == "FR")
    & (df.genre == "F")
]
[9]:
annee valeur age age_num indicateur genre pays
105650 2000 5.020000e-03 Y60 60.0 DEATHRATE F FR
107522 2000 4.860000e-03 Y61 61.0 DEATHRATE F FR
587296 2000 2.580000e+01 Y60 60.0 LIFEXP F FR
589157 2000 2.490000e+01 Y61 61.0 LIFEXP F FR
1087844 2000 5.010000e-03 Y60 60.0 PROBDEATH F FR
1089716 2000 4.850000e-03 Y61 61.0 PROBDEATH F FR
1569977 2000 9.949900e-01 Y60 60.0 PROBSURV F FR
1571849 2000 9.951500e-01 Y61 61.0 PROBSURV F FR
2051560 2000 9.307600e+04 Y60 60.0 PYLIVED F FR
2053421 2000 9.261800e+04 Y61 61.0 PYLIVED F FR
2531170 2000 9.331000e+04 Y60 60.0 SURVIVORS F FR
2533031 2000 9.284300e+04 Y61 61.0 SURVIVORS F FR
3010828 2000 2.405594e+06 Y60 60.0 TOTPYLIVED F FR
3012689 2000 2.312517e+06 Y61 61.0 TOTPYLIVED F FR

Données trop grosses pour tenir en mémoire : SQLite

On charge une grosse base de données (assez petite pour que la séance ne soit pas trop longue).

[10]:
df.shape
[10]:
(3385344, 7)

Les données sont trop grosses pour tenir dans une feuille Excel et les consulter il n’y a pas d’autres moyens que d’en regarder des extraits. Que passe-t-il quand les données sont encore plus grosses et qu’elles ne tiennent pas en mémoire ? Quelques solutions :

  • augmenter la mémoire de l’ordinateur, avec 20 Go, on peut faire beaucoup de choses,

  • stocker les données dans un serveur SQL,

  • stocker les données sur un système distribué (cloud, Hadoop, …)

La seconde option n’est pas toujours simple, il faut installer un serveur SQL. Pour aller plus vite, on peut simplement utiliser SQLite qui est une façon de faire du SQL sans serveur (cela prend quelques minutes). On utilise la méthode to_sql.

[11]:
import sqlite3
from pandas.io import sql

cnx = sqlite3.connect("mortalite.db3")
try:
    df.to_sql(name="mortalite", con=cnx)
except ValueError as e:
    if "Table 'mortalite' already exists" not in str(e):
        # seulement si l'erreur ne vient pas du fait que cela
        # a déjà été fait
        raise e
# on peut ajouter d'autres dataframe à la table comme si elle était créée par morceau
# voir le paramètre if_exists de la fonction to_sql

On peut maintenant récupérer un morceau avec la fonction read_sql.

[12]:
import pandas

example = pandas.read_sql("SELECT * FROM mortalite WHERE age_num==50 LIMIT 5", cnx)
example
[12]:
index annee valeur age age_num indicateur genre pays
0 84185 2021 0.00234 Y50 50.0 DEATHRATE F AL
1 84186 2020 0.00237 Y50 50.0 DEATHRATE F AL
2 84187 2019 0.00188 Y50 50.0 DEATHRATE F AL
3 84188 2018 0.00195 Y50 50.0 DEATHRATE F AL
4 84189 2017 0.00170 Y50 50.0 DEATHRATE F AL

L’ensemble des données restent sur le disque, seul le résultat de la requête est chargé en mémoire. Si on ne peut pas faire tenir les données en mémoire, il faut soit en obtenir une vue partielle (un échantillon aléatoire, un vue filtrée), soit une vue agrégrée. Pour finir, il faut fermer la connexion pour laisser d’autres applications ou notebook modifier la base ou tout simplement supprimer le fichier.

[13]:
cnx.close()

Sous Windows, on peut consulter la base avec le logiciel SQLiteSpy.

[16]:
Image("sqlite.png")
[16]:
../../_images/practice_mapreduce_sql_map_reduce_22_0.png

Sous Linux ou Max, on peut utiliser une extension Firefox SQLite Manager.

Cas 1 : filtrer pour créer un échantillon aléatoire

Si on ne peut pas faire tenir les données en mémoire, on peut soit regarder les premières lignes soit prendre un échantillon aléatoire. Deux options :

La première fonction est simple :

[17]:
sample = df.sample(frac=0.1)
sample.shape, df.shape
[17]:
((338534, 7), (3385344, 7))

Je ne sais pas si cela peut être réalisé sans charger les données en mémoire. Si les données pèsent 20 Go, cette méthode n’aboutira pas. Pourtant, on veut juste un échantillon pour commencer à regarder les données. On utilise la seconde option avec create_function et la fonction suivante :

[18]:
import random  # loi uniforme


def echantillon(proportion):
    return 1 if random.random() < proportion else 0
[19]:
import sqlite3
from pandas.io import sql

cnx = sqlite3.connect("mortalite.db3")

On déclare la fonction à la base de données.

[20]:
cnx.create_function("echantillon", 1, echantillon)

On veut récupérer environ 1% de la table ? On écrit d’abord le filtre.

[21]:
sample = pandas.read_sql("SELECT * FROM mortalite WHERE echantillon(0.01)", cnx)
sample.shape
[21]:
(34027, 8)
[22]:
sample.head()
[22]:
index annee valeur age age_num indicateur genre pays
0 82 2021 0.00026 Y01 1.0 DEATHRATE F BE
1 147 2018 0.00073 Y01 1.0 DEATHRATE F BG
2 223 2012 0.00015 Y01 1.0 DEATHRATE F CH
3 294 2003 0.00000 Y01 1.0 DEATHRATE F CY
4 342 1984 0.00065 Y01 1.0 DEATHRATE F CZ

On ferme la connexion.

[23]:
cnx.close()

Pseudo Map/Reduce avec SQLite

La liste des mots-clés du langage SQL utilisés par SQLite n’est pas aussi riche que d’autres solutions de serveurs SQL. La médiane ne semble pas en faire partie. Cependant, pour une année, un genre, un âge donné, on voudrait calculer la médiane de l’espérance de vie sur l’ensembles des pays.

[24]:
import sqlite3, pandas
from pandas.io import sql

cnx = sqlite3.connect("mortalite.db3")
[25]:
pays = pandas.read_sql("SELECT pays, COUNT(*) FROM mortalite GROUP BY pays", cnx)
pays.head()
[25]:
pays COUNT(*)
0 AL 16758
1 AM 16254
2 AT 94428
3 AZ 21672
4 BE 112488

Il n’y a pas le même nombre de données selon les pays, il est probable que le nombre de pays pour lesquels il existe des données varie selon les âges et les années.

[26]:
query = """SELECT nb_country, COUNT(*) AS nb_rows FROM (
                SELECT annee,age,age_num, count(*) AS nb_country FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num
            ) GROUP BY nb_country"""
df = pandas.read_sql(query, cnx)
[27]:
df.sort_values("nb_country", ascending=False).head(n=2)
[27]:
nb_country nb_rows
24 50 860
23 49 172
[28]:
ax = df.plot(x="nb_country", y="nb_rows")
ax.set_title("Nombre de données par pays");
../../_images/practice_mapreduce_sql_map_reduce_43_0.png

Soit un nombre inconstant de pays. Le fait qu’on est 100 pays suggère qu’on ait une erreur également.

[29]:
query = """SELECT annee,age,age_num, count(*) AS nb_country FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num
                HAVING nb_country >= 100"""
df = pandas.read_sql(query, cnx)
[30]:
df.head()
[30]:
annee age age_num nb_country

Ce sont des valeurs manquantes. Le problème pour calculer la médiane pour chaque observation est qu’il faut d’abord regrouper les lignes de la table par indicateur puis choisir la médiane dans chaque de ces petits groupes. On s’inspire pour cela de la logique Map/Reduce et de la fonction create_aggregate.

Cas 2 : reducer customisé avec SQL

Le reducer se présente toujours sous la forme suivante :

[31]:
class ReducerMediane:
    def __init__(self):
        # ???
        pass

    def step(self, value):
        # ???
        #
        pass

    def finalize(self):
        # ???
        # return ... //2 ]
        pass

Qu’on renseigne de la sorte :

[32]:
class ReducerMediane:
    def __init__(self):
        self.indicateur = []

    def step(self, value):
        if value >= 0:
            self.indicateur.append(value)

    def finalize(self):
        self.indicateur.sort()
        return self.indicateur[len(self.indicateur) // 2]

On le déclare ensuite à sqllite3.

[33]:
cnx.create_aggregate("ReducerMediane", 1, ReducerMediane)
[34]:
query = """SELECT annee,age,age_num, ReducerMediane(valeur) AS mediane FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num"""
df = pandas.read_sql(query, cnx)
[35]:
df.head()
[35]:
annee age age_num mediane
0 1960 Y01 1.0 73.7
1 1960 Y02 2.0 72.8
2 1960 Y03 3.0 71.9
3 1960 Y04 4.0 71.0
4 1960 Y05 5.0 70.0

Un reducer à deux entrées même si cela n’a pas beaucoup de sens ici :

[36]:
class ReducerMediane2:
    def __init__(self):
        self.indicateur = []

    def step(self, value, value2):
        if value >= 0:
            self.indicateur.append(value)
        if value2 >= 0:
            self.indicateur.append(value2)

    def finalize(self):
        self.indicateur.sort()
        return self.indicateur[len(self.indicateur) // 2]


cnx.create_aggregate("ReducerMediane2", 2, ReducerMediane2)
[37]:
query = """SELECT annee,age,age_num, ReducerMediane2(valeur, valeur+1) AS mediane2 FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num"""
df = pandas.read_sql(query, cnx)
df.head()
[37]:
annee age age_num mediane2
0 1960 Y01 1.0 74.0
1 1960 Y02 2.0 73.2
2 1960 Y03 3.0 72.3
3 1960 Y04 4.0 71.3
4 1960 Y05 5.0 70.4

Il n’est apparemment pas possible de retourner deux résultats mais on peut utiliser une ruse qui consise à les concaténer dans une chaîne de caracères.

[38]:
class ReducerQuantile:
    def __init__(self):
        self.indicateur = []

    def step(self, value):
        if value >= 0:
            self.indicateur.append(value)

    def finalize(self):
        self.indicateur.sort()
        q1 = self.indicateur[len(self.indicateur) // 4]
        q2 = self.indicateur[3 * len(self.indicateur) // 4]
        n = len(self.indicateur)
        return "%f;%f;%s" % (q1, q2, n)


cnx.create_aggregate("ReducerQuantile", 1, ReducerQuantile)
[41]:
query = """SELECT annee,age,age_num, ReducerQuantile(valeur) AS quantiles FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num"""
df = pandas.read_sql(query, cnx)
df.head()
[41]:
annee age age_num quantiles
0 1960 None NaN 4.400000;72.800000;20
1 1960 Y01 1.0 73.000000;74.000000;10
2 1960 Y02 2.0 72.100000;73.200000;10
3 1960 Y03 3.0 71.200000;72.300000;10
4 1960 Y04 4.0 70.300000;71.300000;10

On ferme la connexion.

[39]:
cnx.close()

Notion d’index

En SQL et pour de grandes tables, la notion d’index joue un rôle important pour accélérer les opérations de jointures (JOIN) ou de regroupement (GROUP BY). L’article A thorough guide to SQLite database operations in Python montre comment faire les principales opérations.

[43]:


Notebook on github