Matrices en 3 colonnes#

Représentation d’une matrice avec Spark / Map / Reduce.

Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices I\times J en tableau de trois colonnes (i,j,coefficient).

Session spark with no cluster#

Spark est censé tourner sur un cluster. Mais ce n’est pas essentielle pour comprendre la logique. Le notebook tourne donc en local.

[16]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext

Création d’une matrice aléatoire#

[17]:
from numpy.random import rand

rnd1 = rand(10, 10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
[17]:
array([[1.2962659 , 2.75236533],
       [2.26735872, 2.89961464],
       [1.29025917, 2.34056096],
       [1.82876448, 3.42098919],
       [1.91448985, 3.37298335],
       [1.84269033, 1.98821207],
       [2.28212544, 3.05316399],
       [1.88631937, 3.06186776],
       [2.67976259, 3.61823182],
       [1.70446473, 2.71078996]])
[18]:
import pandas

df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
[18]:
0 1
0 0.425791 0.508217
1 0.444969 0.926192
2 0.078127 0.349568
3 0.707894 0.845050
4 0.179368 0.555457
5 0.571995 0.419750
6 0.782654 0.712389
7 0.165768 0.830360
8 0.043705 0.759277
9 0.502934 0.110957
[19]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
[20]:
mat1 = spark.sparkContext.textFile("rnd1.txt")

Conversion d’une matrice au format Spark#

Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.

[21]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
[22]:
def process_mat_row(row):
    values = row.split("\t")
    index = int(values[0])
    values = [float(_) for _ in values[1:]]
    return [[index, j, v] for j, v in enumerate(values)]
[23]:
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[23]:
[[0, 0, 0.03925624285714491],
 [0, 1, 0.17938143471086276],
 [0, 2, 0.9271561615741587],
 [0, 3, 0.13497740334270003],
 [0, 4, 0.20256379268518632],
 [0, 5, 0.15623553510014287],
 [0, 6, 0.7926007655892027],
 [0, 7, 0.9953375114509172],
 [0, 8, 0.7100110433596362],
 [0, 9, 0.1783301416123766],
 [1, 0, 0.6661128314162409],
 [1, 1, 0.16617620104743758]]
[24]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[24]:
[[0, 0, 0.4257910374269508],
 [0, 1, 0.5082167960207694],
 [1, 0, 0.4449691717763494],
 [1, 1, 0.9261922479892456],
 [2, 0, 0.07812708982401129],
 [2, 1, 0.34956810336320765],
 [3, 0, 0.7078936713530861],
 [3, 1, 0.8450500475013194],
 [4, 0, 0.17936816503487407],
 [4, 1, 0.5554570108793752],
 [5, 0, 0.5719951163381093],
 [5, 1, 0.41975047374547725]]

Produit matriciel#

Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.

[25]:
def key_ij(row):
    return row[0], (row[1], row[2])


def key_ji(row):
    return row[1], (row[0], row[2])


mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[25]:
[(0, ((0, 0.03925624285714491), (0, 0.4257910374269508))),
 (0, ((0, 0.03925624285714491), (1, 0.5082167960207694))),
 (0, ((1, 0.6661128314162409), (0, 0.4257910374269508))),
 (0, ((1, 0.6661128314162409), (1, 0.5082167960207694))),
 (0, ((2, 0.8809481084845031), (0, 0.4257910374269508))),
 (0, ((2, 0.8809481084845031), (1, 0.5082167960207694))),
 (0, ((3, 0.09489201762862454), (0, 0.4257910374269508))),
 (0, ((3, 0.09489201762862454), (1, 0.5082167960207694))),
 (0, ((4, 0.9071785348453754), (0, 0.4257910374269508))),
 (0, ((4, 0.9071785348453754), (1, 0.5082167960207694))),
 (0, ((5, 0.8535870359856828), (0, 0.4257910374269508))),
 (0, ((5, 0.8535870359856828), (1, 0.5082167960207694)))]

On effectue le produit matriciel.

[26]:
def produit_matriciel(row):
    index, ((i, v1), (j, v2)) = row
    return i, j, v1 * v2


produit = mat_join.map(produit_matriciel)
produit.take(12)
[26]:
[(0, 0, 0.016714956371628058),
 (0, 1, 0.019950681968671398),
 (1, 0, 0.2836248735321248),
 (1, 1, 0.33852972897068484),
 (2, 0, 0.37509980903092655),
 (2, 1, 0.44771262515455135),
 (3, 0, 0.04040417062962855),
 (3, 1, 0.04822571716716593),
 (4, 0, 0.3862684894832736),
 (4, 1, 0.4610433683979326),
 (5, 0, 0.3634497095865398),
 (5, 1, 0.4338072685535089)]

Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.

  • Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées

  • Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées

Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.

[27]:
from operator import add

final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[27]:
[((0, 0), 1.2962658962226397),
 ((0, 1), 2.7523653340144056),
 ((1, 0), 2.267358716094168),
 ((1, 1), 2.899614641397404),
 ((2, 0), 1.2902591694424805),
 ((2, 1), 2.3405609608679425),
 ((3, 0), 1.8287644824176785),
 ((3, 1), 3.420989188235977),
 ((4, 0), 1.9144898451263708),
 ((4, 1), 3.372983349186469),
 ((5, 0), 1.842690333964681),
 ((5, 1), 1.9882120730740667),
 ((6, 0), 2.282125435665258),
 ((6, 1), 3.053163988860857),
 ((7, 0), 1.8863193688285897),
 ((7, 1), 3.061867764510199),
 ((8, 0), 2.6797625884756293),
 ((8, 1), 3.6182318180423017),
 ((9, 0), 1.7044647282526524),
 ((9, 1), 2.710789957326838)]

Résultat initial :

[28]:
rnd1 @ rnd2
[28]:
array([[1.2962659 , 2.75236533],
       [2.26735872, 2.89961464],
       [1.29025917, 2.34056096],
       [1.82876448, 3.42098919],
       [1.91448985, 3.37298335],
       [1.84269033, 1.98821207],
       [2.28212544, 3.05316399],
       [1.88631937, 3.06186776],
       [2.67976259, 3.61823182],
       [1.70446473, 2.71078996]])

Même algorithme avec les Spark DataFrame#

On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.

[29]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(
    pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema
)
/home/xadupre/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:485: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[30]:
mat1.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)
 |-- c3: double (nullable = true)
 |-- c4: double (nullable = true)
 |-- c5: double (nullable = true)
 |-- c6: double (nullable = true)
 |-- c7: double (nullable = true)
 |-- c8: double (nullable = true)
 |-- c9: double (nullable = true)
 |-- c10: double (nullable = true)

[31]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(
    pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema
)
/home/xadupre/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:485: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[32]:
mat2.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)

Nous allons avoir besoin de quelques-uns des fonctions et types suivant :

Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.

[33]:
from pyspark.sql.types import (
    ArrayType,
    StructField,
    StructType,
    DoubleType,
    IntegerType,
)
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
[34]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
 |-- index: long (nullable = true)
 |-- x: array (nullable = false)
 |    |-- element: double (containsNull = true)

[35]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)

[36]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape
[36]:
((10, 11), (100, 3))

On recommence le même procédé pour l’autre matrice.

[37]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))

Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.

[38]:
mat2_exp2 = (
    mat2_exploded.withColumnRenamed("index", "index2")
    .withColumnRenamed("pos", "pos2")
    .withColumnRenamed("col", "col2")
)
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
[39]:
produit.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)
 |-- index2: long (nullable = true)
 |-- pos2: integer (nullable = false)
 |-- col2: double (nullable = true)

[40]:
produit.toPandas().head()

[40]:
index pos col index2 pos2 col2
0 0 0 0.039256 0 0 0.425791
1 0 0 0.039256 0 1 0.508217
2 1 0 0.666113 0 0 0.425791
3 1 0 0.666113 0 1 0.508217
4 2 0 0.880948 0 0 0.425791
[41]:
prod = produit.select(
    produit.index.alias("i"),
    produit.pos2.alias("j"),
    (produit.col * produit.col2).alias("val"),
)
final = prod.groupby("i", "j").sum("val")
[42]:
final.printSchema()
root
 |-- i: long (nullable = true)
 |-- j: integer (nullable = false)
 |-- sum(val): double (nullable = true)

[43]:
df = final.toPandas()

[44]:
df.sort_values(["i", "j"]).head()
[44]:
i j sum(val)
7 0 0 1.296266
10 0 1 2.752365
18 1 0 2.267359
3 1 1 2.899615
6 2 0 1.290259
[45]:
df.shape
[45]:
(20, 3)
[ ]:


Notebook on github