Reducers récursifs

J’utilise volontiers une terminologie découverte chez Microsoft pour illustrer une façon d’écrire le même calcul qui a un impact sur la facilité avec laquelle on peut le distribution : utiliser des comptes plutôt que des moyennes.

Le notebook utilise des fonctions développées pour illustrer les notions, plus claires qu’efficaces.

Stream

Le map reduce s’applique à des jeux de données très grands. D’un point de vue mathématique, on écrit des algorithmes qui s’appliquent à des jeux de données infinis ou plutôt dont la taille n’est pas connu. Pour les distinguer des jeux de données, on les appelle des flux ou stream en anglais.

En aparté, écrits pour être parallélisés, ces traitements ont la particuliarité de ne pas conserver l’ordre dans lequel il traite les données. C’est particulièrement vrai lorsque le jeu de données est divisé sur plusieurs disques durs. Il est impossible de choisir un morceau en premier.

Mapper

Un mapper applique le même traitement à chaque observation du stream de façon indépendante.

[1]:
ens = [("a", 1), ("b", 4), ("a", 6), ("a", 3)]
[3]:
from teachcompute.fctmr import mapper

stream1 = mapper(lambda el: (el[0], el[1] + 1), ens)
stream1
[3]:
<map at 0x7f0e1b597fd0>

Le résultat n’existe pas tant qu’on ne demande explicitement que le calcul soit faut. Il faut parcourir le résultat.

[4]:
list(stream1)
[4]:
[('a', 2), ('b', 5), ('a', 7), ('a', 4)]

Et on ne peut le parcourir qu’une fois :

[5]:
list(stream1)
[5]:
[]

Coût du premier élément

Quand on a une infinité d’éléments à traiter, il est important de pouvoir regarder ce qu’un traitement donne sur les premiers éléments. Avec un mapper, cela correspond au coût d’un seul map.

[7]:
from teachcompute.fctmr import take

first = lambda it: take(it, count=1)
big_ens = ens * 100
[8]:
%timeit -n 1000 list(mapper(lambda el: (el[0], el[1]+1), big_ens))
45.5 µs ± 13.1 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
[9]:
%timeit -n 1000 first(mapper(lambda el: (el[0], el[1]+1), big_ens))
834 ns ± 574 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

Reducer

Un vrai reducer réduit les éléments d’un ensemble, il ne répartit pas les données. En pratique, on réduit rarement un ensemble qu’on n’a pas distribué au préalable, comme avec un groupby. On ne réduit pas toujours non plus un ensemble à une seule ligne. On empile les opérations de streaming, on repousse également le moment d’évaluer. La distribution s’effectue selon une clé qui est hashée (voir Hash et distribution). La première lambda fonction décrit ce qu’est cette clé, le premier élément du couple dans ce cas.

[10]:
from teachcompute.fctmr import reducer

stream1 = mapper(lambda el: (el[0], el[1] + 1), ens)
stream2 = reducer(lambda el: el[0], stream1, asiter=False)
stream2
[10]:
<generator object reducer at 0x7f0e1b5dba70>
[11]:
list(stream2)
[11]:
[('a', [('a', 2), ('a', 4), ('a', 7)]), ('b', [('b', 5)])]

Dans cet exemple, le reducer réduit chaque groupe à un seul résultat qui est l’ensemble des éléments. Quel est le coup du premier élément…

[12]:
def test2(ens, one=False):
    stream1 = mapper(lambda el: (el[0], el[1] + 1), ens)
    stream2 = reducer(lambda el: el[0], stream1, asiter=False)
    return list(stream2) if one else first(stream2)


%timeit -n 1000 test2(big_ens)
2.35 µs ± 841 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
[13]:
%timeit -n 1000 test2(big_ens, one=True)
186 µs ± 30.9 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

C’est plus court mais pas significativement plus court. Cela correspond au coût d’un tri de l’ensemble des observations et du coût de la construction du premier groupe.

Reducer et tri

Un stream est infini en théorie. En pratique il est fini mais on ne sait pas si un ou plusieurs groupes entiers tiendraient en mémoire. Une façon de faire est de limiter la présence des données en mémoire à un seul groupe et pour cela, il faut d’abord trier les données selon les clés. Ce n’est pas indispensable mais dans le pire des cas, c’est une bonne option. On pourrait avoir un stream comme suit :

[14]:
pas_cool = [(chr(int(c) + 96), i) for i, c in enumerate(str(11111111**2))]
pas_cool
[14]:
[('a', 0),
 ('b', 1),
 ('c', 2),
 ('d', 3),
 ('e', 4),
 ('f', 5),
 ('g', 6),
 ('h', 7),
 ('g', 8),
 ('f', 9),
 ('e', 10),
 ('d', 11),
 ('c', 12),
 ('b', 13),
 ('a', 14)]

Le groupe a est au début et à la fin, si on regroupe en mémoire, le groupe associé à a doit rester en mémoire du début à la fin. On ne sait jamais si un groupe ne va pas réapparaître plus tard. En triant, on est sûr.

Un autre map

On ajoute un dernier map qui fait la somme des éléments de chaque groupe.

[15]:
def sum_gr(key_gr):
    key, gr = key_gr
    return key, sum(e[1] for e in gr)


stream1 = mapper(lambda el: (el[0], el[1] + 1), ens)
stream2 = reducer(lambda el: el[0], stream1)
stream3 = map(sum_gr, stream2)
stream3
[15]:
<map at 0x7f0e1b597580>
[16]:
list(stream3)
[16]:
[('a', 13), ('b', 5)]

Combiner ou join

Un combiner ou join permet de fusionner deux bases de données qui ont en commun une clé.

[18]:
from teachcompute.fctmr import combiner

stream1 = mapper(lambda el: (el[0], el[1] + 1), ens)
stream2 = reducer(lambda el: el[0], stream1)
stream3 = map(sum_gr, stream2)
stream4 = mapper(lambda el: (el[0], el[1] + 10), pas_cool)
comb = combiner(lambda el: el[0], stream3, lambda el: el[0], stream4)
comb
[18]:
<generator object combiner at 0x7f0e1b5dba00>
[19]:
list(comb)
[19]:
[(('a', 13), ('a', 10)),
 (('a', 13), ('a', 24)),
 (('b', 5), ('b', 11)),
 (('b', 5), ('b', 23))]

Le coût du premier élément est un peu plus compliqué à inférer, cela dépend beaucoup des données.

[20]:
def job(ens, ens2, one=False, sens=True):
    stream1 = mapper(lambda el: (el[0], el[1] + 1), ens)
    stream2 = reducer(lambda el: el[0], stream1)
    stream3 = map(sum_gr, stream2)
    stream4 = mapper(lambda el: (el[0], el[1] + 10), ens2)
    if sens:
        comb = combiner(lambda el: el[0], stream3, lambda el: el[0], stream4)
    else:
        comb = combiner(lambda el: el[0], stream4, lambda el: el[0], stream3)
    return list(comb) if one else first(comb)


%timeit -n 1000 job(big_ens, pas_cool)
3.72 µs ± 1.26 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
[21]:
%timeit -n 1000 job(big_ens, pas_cool, sens=False)
4.41 µs ± 1.82 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
[22]:
%timeit -n 1000 job(big_ens, pas_cool, one=True)
247 µs ± 35.3 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
[23]:
%timeit -n 1000 job(big_ens, pas_cool, one=True, sens=False)
220 µs ± 22.5 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

Il y a différentes façons de coder un combiner, l’une d’elle consiste à réduire chacun des deux streams puis à faire le produit croisé de chaque groupe assemblé.

Reducers récursifs

C’est pas loin d’être un abus de langage, disons que cela réduit la dépendance au tri. Un exemple.

[24]:
def sum_gr(key_gr):
    key, gr = key_gr
    return key, sum(e[1] for e in gr)


def job_recursif(ens):
    stream2 = reducer(lambda el: el[0], ens)
    stream3 = map(sum_gr, stream2)
    return list(stream3)


job_recursif(ens)
[24]:
[('a', 10), ('b', 4)]

Et maintenant, on coupe en deux :

[25]:
n = len(ens) // 2
job_recursif(ens[:n])
[25]:
[('a', 1), ('b', 4)]
[26]:
job_recursif(ens[n:])
[26]:
[('a', 9)]

Et maintenant :

[27]:
job_recursif(job_recursif(ens[:n]) + job_recursif(ens[n:]))
[27]:
[('a', 10), ('b', 4)]

Le job ainsi écrit est associatif en quelque sorte. Cela laisse plus de liberté pour la distribution car on peut maintenant distribuer des clés identiques sur des machines différentes puis réappliquer le reducer sur les résultats de la première salve. C’est d’autant plus efficace que le reducer réduit beaucoup les données. Il reste à voir le cas d’un reducer non récursif.

[28]:
def mean(ens):
    s = 0.0
    for i, e in enumerate(ens):
        s += e
    return s / (i + 1)


def mean_gr(key_gr):
    key, gr = key_gr
    return key, mean(e[1] for e in gr)


def job_non_recursif(ens):
    stream2 = reducer(lambda el: el[0], ens)
    stream3 = map(mean_gr, stream2)
    return list(stream3)


job_non_recursif(ens)
[28]:
[('a', 3.3333333333333335), ('b', 4.0)]
[29]:
n = len(ens) // 2
job_non_recursif(ens[:n])
[29]:
[('a', 1.0), ('b', 4.0)]
[30]:
job_non_recursif(ens[n:])
[30]:
[('a', 4.5)]
[31]:
job_non_recursif(job_non_recursif(ens[:n]) + job_non_recursif(ens[n:]))
[31]:
[('a', 2.75), ('b', 4.0)]

Ce job ne doit pas être distribué n’importe comment.

[ ]:


Notebook on github