Matrices en 3 colonnes#
Représentation d’une matrice avec Spark / Map / Reduce.
Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices en tableau de trois colonnes .
Session spark with no cluster#
Spark est censé tourner sur un cluster. Mais ce n’est pas essentielle pour comprendre la logique. Le notebook tourne donc en local.
[16]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
Création d’une matrice aléatoire#
[17]:
from numpy.random import rand
rnd1 = rand(10, 10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
[17]:
array([[1.2962659 , 2.75236533],
[2.26735872, 2.89961464],
[1.29025917, 2.34056096],
[1.82876448, 3.42098919],
[1.91448985, 3.37298335],
[1.84269033, 1.98821207],
[2.28212544, 3.05316399],
[1.88631937, 3.06186776],
[2.67976259, 3.61823182],
[1.70446473, 2.71078996]])
[18]:
import pandas
df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
[18]:
0 | 1 | |
---|---|---|
0 | 0.425791 | 0.508217 |
1 | 0.444969 | 0.926192 |
2 | 0.078127 | 0.349568 |
3 | 0.707894 | 0.845050 |
4 | 0.179368 | 0.555457 |
5 | 0.571995 | 0.419750 |
6 | 0.782654 | 0.712389 |
7 | 0.165768 | 0.830360 |
8 | 0.043705 | 0.759277 |
9 | 0.502934 | 0.110957 |
[19]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
[20]:
mat1 = spark.sparkContext.textFile("rnd1.txt")
Conversion d’une matrice au format Spark#
Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.
[21]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
[22]:
def process_mat_row(row):
values = row.split("\t")
index = int(values[0])
values = [float(_) for _ in values[1:]]
return [[index, j, v] for j, v in enumerate(values)]
[23]:
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[23]:
[[0, 0, 0.03925624285714491],
[0, 1, 0.17938143471086276],
[0, 2, 0.9271561615741587],
[0, 3, 0.13497740334270003],
[0, 4, 0.20256379268518632],
[0, 5, 0.15623553510014287],
[0, 6, 0.7926007655892027],
[0, 7, 0.9953375114509172],
[0, 8, 0.7100110433596362],
[0, 9, 0.1783301416123766],
[1, 0, 0.6661128314162409],
[1, 1, 0.16617620104743758]]
[24]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[24]:
[[0, 0, 0.4257910374269508],
[0, 1, 0.5082167960207694],
[1, 0, 0.4449691717763494],
[1, 1, 0.9261922479892456],
[2, 0, 0.07812708982401129],
[2, 1, 0.34956810336320765],
[3, 0, 0.7078936713530861],
[3, 1, 0.8450500475013194],
[4, 0, 0.17936816503487407],
[4, 1, 0.5554570108793752],
[5, 0, 0.5719951163381093],
[5, 1, 0.41975047374547725]]
Produit matriciel#
Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.
[25]:
def key_ij(row):
return row[0], (row[1], row[2])
def key_ji(row):
return row[1], (row[0], row[2])
mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[25]:
[(0, ((0, 0.03925624285714491), (0, 0.4257910374269508))),
(0, ((0, 0.03925624285714491), (1, 0.5082167960207694))),
(0, ((1, 0.6661128314162409), (0, 0.4257910374269508))),
(0, ((1, 0.6661128314162409), (1, 0.5082167960207694))),
(0, ((2, 0.8809481084845031), (0, 0.4257910374269508))),
(0, ((2, 0.8809481084845031), (1, 0.5082167960207694))),
(0, ((3, 0.09489201762862454), (0, 0.4257910374269508))),
(0, ((3, 0.09489201762862454), (1, 0.5082167960207694))),
(0, ((4, 0.9071785348453754), (0, 0.4257910374269508))),
(0, ((4, 0.9071785348453754), (1, 0.5082167960207694))),
(0, ((5, 0.8535870359856828), (0, 0.4257910374269508))),
(0, ((5, 0.8535870359856828), (1, 0.5082167960207694)))]
On effectue le produit matriciel.
[26]:
def produit_matriciel(row):
index, ((i, v1), (j, v2)) = row
return i, j, v1 * v2
produit = mat_join.map(produit_matriciel)
produit.take(12)
[26]:
[(0, 0, 0.016714956371628058),
(0, 1, 0.019950681968671398),
(1, 0, 0.2836248735321248),
(1, 1, 0.33852972897068484),
(2, 0, 0.37509980903092655),
(2, 1, 0.44771262515455135),
(3, 0, 0.04040417062962855),
(3, 1, 0.04822571716716593),
(4, 0, 0.3862684894832736),
(4, 1, 0.4610433683979326),
(5, 0, 0.3634497095865398),
(5, 1, 0.4338072685535089)]
Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.
Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées
Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées
Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.
[27]:
from operator import add
final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[27]:
[((0, 0), 1.2962658962226397),
((0, 1), 2.7523653340144056),
((1, 0), 2.267358716094168),
((1, 1), 2.899614641397404),
((2, 0), 1.2902591694424805),
((2, 1), 2.3405609608679425),
((3, 0), 1.8287644824176785),
((3, 1), 3.420989188235977),
((4, 0), 1.9144898451263708),
((4, 1), 3.372983349186469),
((5, 0), 1.842690333964681),
((5, 1), 1.9882120730740667),
((6, 0), 2.282125435665258),
((6, 1), 3.053163988860857),
((7, 0), 1.8863193688285897),
((7, 1), 3.061867764510199),
((8, 0), 2.6797625884756293),
((8, 1), 3.6182318180423017),
((9, 0), 1.7044647282526524),
((9, 1), 2.710789957326838)]
Résultat initial :
[28]:
rnd1 @ rnd2
[28]:
array([[1.2962659 , 2.75236533],
[2.26735872, 2.89961464],
[1.29025917, 2.34056096],
[1.82876448, 3.42098919],
[1.91448985, 3.37298335],
[1.84269033, 1.98821207],
[2.28212544, 3.05316399],
[1.88631937, 3.06186776],
[2.67976259, 3.61823182],
[1.70446473, 2.71078996]])
Même algorithme avec les Spark DataFrame#
On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.
[29]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(
pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema
)
/home/xadupre/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:485: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[30]:
mat1.printSchema()
root
|-- index: long (nullable = true)
|-- c1: double (nullable = true)
|-- c2: double (nullable = true)
|-- c3: double (nullable = true)
|-- c4: double (nullable = true)
|-- c5: double (nullable = true)
|-- c6: double (nullable = true)
|-- c7: double (nullable = true)
|-- c8: double (nullable = true)
|-- c9: double (nullable = true)
|-- c10: double (nullable = true)
[31]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(
pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema
)
/home/xadupre/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:485: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[32]:
mat2.printSchema()
root
|-- index: long (nullable = true)
|-- c1: double (nullable = true)
|-- c2: double (nullable = true)
Nous allons avoir besoin de quelques-uns des fonctions et types suivant :
Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.
[33]:
from pyspark.sql.types import (
ArrayType,
StructField,
StructType,
DoubleType,
IntegerType,
)
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
[34]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
|-- index: long (nullable = true)
|-- x: array (nullable = false)
| |-- element: double (containsNull = true)
[35]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
|-- index: long (nullable = true)
|-- pos: integer (nullable = false)
|-- col: double (nullable = true)
[36]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape
[36]:
((10, 11), (100, 3))
On recommence le même procédé pour l’autre matrice.
[37]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))
Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.
[38]:
mat2_exp2 = (
mat2_exploded.withColumnRenamed("index", "index2")
.withColumnRenamed("pos", "pos2")
.withColumnRenamed("col", "col2")
)
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
[39]:
produit.printSchema()
root
|-- index: long (nullable = true)
|-- pos: integer (nullable = false)
|-- col: double (nullable = true)
|-- index2: long (nullable = true)
|-- pos2: integer (nullable = false)
|-- col2: double (nullable = true)
[40]:
produit.toPandas().head()
[40]:
index | pos | col | index2 | pos2 | col2 | |
---|---|---|---|---|---|---|
0 | 0 | 0 | 0.039256 | 0 | 0 | 0.425791 |
1 | 0 | 0 | 0.039256 | 0 | 1 | 0.508217 |
2 | 1 | 0 | 0.666113 | 0 | 0 | 0.425791 |
3 | 1 | 0 | 0.666113 | 0 | 1 | 0.508217 |
4 | 2 | 0 | 0.880948 | 0 | 0 | 0.425791 |
[41]:
prod = produit.select(
produit.index.alias("i"),
produit.pos2.alias("j"),
(produit.col * produit.col2).alias("val"),
)
final = prod.groupby("i", "j").sum("val")
[42]:
final.printSchema()
root
|-- i: long (nullable = true)
|-- j: integer (nullable = false)
|-- sum(val): double (nullable = true)
[43]:
df = final.toPandas()
[44]:
df.sort_values(["i", "j"]).head()
[44]:
i | j | sum(val) | |
---|---|---|---|
7 | 0 | 0 | 1.296266 |
10 | 0 | 1 | 2.752365 |
18 | 1 | 0 | 2.267359 |
3 | 1 | 1 | 2.899615 |
6 | 2 | 0 | 1.290259 |
[45]:
df.shape
[45]:
(20, 3)
[ ]: