Matrices en 3 colonnes

Représentation d’une matrice avec Spark / Map / Reduce.

Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices I\times J en tableau de trois colonnes (i,j,coefficient).

Session spark with no cluster

Spark est censé tourner sur un cluster. Mais ce n’est pas essentielle pour comprendre la logique. Le notebook tourne donc en local.

[16]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext

Création d’une matrice aléatoire

[17]:
from numpy.random import rand

rnd1 = rand(10, 10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
[17]:
array([[1.2962659 , 2.75236533],
       [2.26735872, 2.89961464],
       [1.29025917, 2.34056096],
       [1.82876448, 3.42098919],
       [1.91448985, 3.37298335],
       [1.84269033, 1.98821207],
       [2.28212544, 3.05316399],
       [1.88631937, 3.06186776],
       [2.67976259, 3.61823182],
       [1.70446473, 2.71078996]])
[18]:
import pandas

df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
[18]:
0 1
0 0.425791 0.508217
1 0.444969 0.926192
2 0.078127 0.349568
3 0.707894 0.845050
4 0.179368 0.555457
5 0.571995 0.419750
6 0.782654 0.712389
7 0.165768 0.830360
8 0.043705 0.759277
9 0.502934 0.110957
[19]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
[20]:
mat1 = spark.sparkContext.textFile("rnd1.txt")

Conversion d’une matrice au format Spark

Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.

[21]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
[22]:
def process_mat_row(row):
    values = row.split("\t")
    index = int(values[0])
    values = [float(_) for _ in values[1:]]
    return [[index, j, v] for j, v in enumerate(values)]
[23]:
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[23]:
[[0, 0, 0.03925624285714491],
 [0, 1, 0.17938143471086276],
 [0, 2, 0.9271561615741587],
 [0, 3, 0.13497740334270003],
 [0, 4, 0.20256379268518632],
 [0, 5, 0.15623553510014287],
 [0, 6, 0.7926007655892027],
 [0, 7, 0.9953375114509172],
 [0, 8, 0.7100110433596362],
 [0, 9, 0.1783301416123766],
 [1, 0, 0.6661128314162409],
 [1, 1, 0.16617620104743758]]
[24]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[24]:
[[0, 0, 0.4257910374269508],
 [0, 1, 0.5082167960207694],
 [1, 0, 0.4449691717763494],
 [1, 1, 0.9261922479892456],
 [2, 0, 0.07812708982401129],
 [2, 1, 0.34956810336320765],
 [3, 0, 0.7078936713530861],
 [3, 1, 0.8450500475013194],
 [4, 0, 0.17936816503487407],
 [4, 1, 0.5554570108793752],
 [5, 0, 0.5719951163381093],
 [5, 1, 0.41975047374547725]]

Produit matriciel

Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.

[25]:
def key_ij(row):
    return row[0], (row[1], row[2])


def key_ji(row):
    return row[1], (row[0], row[2])


mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[25]:
[(0, ((0, 0.03925624285714491), (0, 0.4257910374269508))),
 (0, ((0, 0.03925624285714491), (1, 0.5082167960207694))),
 (0, ((1, 0.6661128314162409), (0, 0.4257910374269508))),
 (0, ((1, 0.6661128314162409), (1, 0.5082167960207694))),
 (0, ((2, 0.8809481084845031), (0, 0.4257910374269508))),
 (0, ((2, 0.8809481084845031), (1, 0.5082167960207694))),
 (0, ((3, 0.09489201762862454), (0, 0.4257910374269508))),
 (0, ((3, 0.09489201762862454), (1, 0.5082167960207694))),
 (0, ((4, 0.9071785348453754), (0, 0.4257910374269508))),
 (0, ((4, 0.9071785348453754), (1, 0.5082167960207694))),
 (0, ((5, 0.8535870359856828), (0, 0.4257910374269508))),
 (0, ((5, 0.8535870359856828), (1, 0.5082167960207694)))]

On effectue le produit matriciel.

[26]:
def produit_matriciel(row):
    index, ((i, v1), (j, v2)) = row
    return i, j, v1 * v2


produit = mat_join.map(produit_matriciel)
produit.take(12)
[26]:
[(0, 0, 0.016714956371628058),
 (0, 1, 0.019950681968671398),
 (1, 0, 0.2836248735321248),
 (1, 1, 0.33852972897068484),
 (2, 0, 0.37509980903092655),
 (2, 1, 0.44771262515455135),
 (3, 0, 0.04040417062962855),
 (3, 1, 0.04822571716716593),
 (4, 0, 0.3862684894832736),
 (4, 1, 0.4610433683979326),
 (5, 0, 0.3634497095865398),
 (5, 1, 0.4338072685535089)]

Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.

  • Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées

  • Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées

Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.

[27]:
from operator import add

final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[27]:
[((0, 0), 1.2962658962226397),
 ((0, 1), 2.7523653340144056),
 ((1, 0), 2.267358716094168),
 ((1, 1), 2.899614641397404),
 ((2, 0), 1.2902591694424805),
 ((2, 1), 2.3405609608679425),
 ((3, 0), 1.8287644824176785),
 ((3, 1), 3.420989188235977),
 ((4, 0), 1.9144898451263708),
 ((4, 1), 3.372983349186469),
 ((5, 0), 1.842690333964681),
 ((5, 1), 1.9882120730740667),
 ((6, 0), 2.282125435665258),
 ((6, 1), 3.053163988860857),
 ((7, 0), 1.8863193688285897),
 ((7, 1), 3.061867764510199),
 ((8, 0), 2.6797625884756293),
 ((8, 1), 3.6182318180423017),
 ((9, 0), 1.7044647282526524),
 ((9, 1), 2.710789957326838)]

Résultat initial :

[28]:
rnd1 @ rnd2
[28]:
array([[1.2962659 , 2.75236533],
       [2.26735872, 2.89961464],
       [1.29025917, 2.34056096],
       [1.82876448, 3.42098919],
       [1.91448985, 3.37298335],
       [1.84269033, 1.98821207],
       [2.28212544, 3.05316399],
       [1.88631937, 3.06186776],
       [2.67976259, 3.61823182],
       [1.70446473, 2.71078996]])

Même algorithme avec les Spark DataFrame

On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.

[29]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(
    pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema
)
/home/xadupre/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:485: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[30]:
mat1.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)
 |-- c3: double (nullable = true)
 |-- c4: double (nullable = true)
 |-- c5: double (nullable = true)
 |-- c6: double (nullable = true)
 |-- c7: double (nullable = true)
 |-- c8: double (nullable = true)
 |-- c9: double (nullable = true)
 |-- c10: double (nullable = true)

[31]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(
    pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema
)
/home/xadupre/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:485: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[32]:
mat2.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)

Nous allons avoir besoin de quelques-uns des fonctions et types suivant :

Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.

[33]:
from pyspark.sql.types import (
    ArrayType,
    StructField,
    StructType,
    DoubleType,
    IntegerType,
)
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
[34]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
 |-- index: long (nullable = true)
 |-- x: array (nullable = false)
 |    |-- element: double (containsNull = true)

[35]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)

[36]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape
[36]:
((10, 11), (100, 3))

On recommence le même procédé pour l’autre matrice.

[37]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))

Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.

[38]:
mat2_exp2 = (
    mat2_exploded.withColumnRenamed("index", "index2")
    .withColumnRenamed("pos", "pos2")
    .withColumnRenamed("col", "col2")
)
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
[39]:
produit.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)
 |-- index2: long (nullable = true)
 |-- pos2: integer (nullable = false)
 |-- col2: double (nullable = true)

[40]:
produit.toPandas().head()

[40]:
index pos col index2 pos2 col2
0 0 0 0.039256 0 0 0.425791
1 0 0 0.039256 0 1 0.508217
2 1 0 0.666113 0 0 0.425791
3 1 0 0.666113 0 1 0.508217
4 2 0 0.880948 0 0 0.425791
[41]:
prod = produit.select(
    produit.index.alias("i"),
    produit.pos2.alias("j"),
    (produit.col * produit.col2).alias("val"),
)
final = prod.groupby("i", "j").sum("val")
[42]:
final.printSchema()
root
 |-- i: long (nullable = true)
 |-- j: integer (nullable = false)
 |-- sum(val): double (nullable = true)

[43]:
df = final.toPandas()

[44]:
df.sort_values(["i", "j"]).head()
[44]:
i j sum(val)
7 0 0 1.296266
10 0 1 2.752365
18 1 0 2.267359
3 1 1 2.899615
6 2 0 1.290259
[45]:
df.shape
[45]:
(20, 3)

Fin

[ ]:
spark.stop()

Notebook on github