Matrices en 3 colonnes

Représentation d’une matrice avec Spark / Map / Reduce.

Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices I\times J en tableau de trois colonnes (i,j,coefficient).

Session spark with no cluster

Spark est censé tourner sur un cluster. Mais ce n’est pas essentielle pour comprendre la logique. Le notebook tourne donc en local.

[1]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
your 131072x1 screen size is bogus. expect trouble
24/11/11 15:02:46 WARN Utils: Your hostname, xavier2024 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/11 15:02:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 15:02:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Création d’une matrice aléatoire

[2]:
from numpy.random import rand

rnd1 = rand(10, 10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
[2]:
array([[2.87499059, 2.8278766 ],
       [2.50319361, 2.79105007],
       [2.47242828, 2.96505912],
       [3.5667529 , 3.62677158],
       [2.03659963, 1.96720702],
       [2.45931552, 2.69251358],
       [2.3570895 , 2.54491955],
       [2.42398656, 3.33332776],
       [3.2048007 , 3.60479473],
       [2.35964323, 2.75336408]])
[3]:
import pandas

df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
[3]:
0 1
0 0.534643 0.792275
1 0.868828 0.369334
2 0.015294 0.297075
3 0.509108 0.720137
4 0.674179 0.612696
5 0.398553 0.822608
6 0.321764 0.881242
7 0.173263 0.332271
8 0.953250 0.617029
9 0.843226 0.675595
[4]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
[5]:
mat1 = spark.sparkContext.textFile("rnd1.txt")

Conversion d’une matrice au format Spark

Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.

[6]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
[7]:
def process_mat_row(row):
    values = row.split("\t")
    index = int(values[0])
    values = [float(_) for _ in values[1:]]
    return [[index, j, v] for j, v in enumerate(values)]
[8]:
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)

[8]:
[[0, 0, 0.05338376562534908],
 [0, 1, 0.7179621058657293],
 [0, 2, 0.8416816618000933],
 [0, 3, 0.5866463066118723],
 [0, 4, 0.2985761548971829],
 [0, 5, 0.17094084464701753],
 [0, 6, 0.22587546398891023],
 [0, 7, 0.8938702752578108],
 [0, 8, 0.7138005257890329],
 [0, 9, 0.8701336430078879],
 [1, 0, 0.9550985992178377],
 [1, 1, 0.26327652337825447]]
[9]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[9]:
[[0, 0, 0.5346425002633528],
 [0, 1, 0.7922752739533419],
 [1, 0, 0.8688279747342449],
 [1, 1, 0.3693339071055035],
 [2, 0, 0.01529354702055541],
 [2, 1, 0.2970747253149303],
 [3, 0, 0.5091076165917049],
 [3, 1, 0.7201371902668229],
 [4, 0, 0.674178721198942],
 [4, 1, 0.6126955081354053],
 [5, 0, 0.398552599571223],
 [5, 1, 0.8226084971982536]]

Produit matriciel

Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.

[10]:
def key_ij(row):
    return row[0], (row[1], row[2])


def key_ji(row):
    return row[1], (row[0], row[2])


mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)

[10]:
[(0, ((0, 0.05338376562534908), (0, 0.5346425002633528))),
 (0, ((0, 0.05338376562534908), (1, 0.7922752739533419))),
 (0, ((1, 0.9550985992178377), (0, 0.5346425002633528))),
 (0, ((1, 0.9550985992178377), (1, 0.7922752739533419))),
 (0, ((2, 0.16781505492071924), (0, 0.5346425002633528))),
 (0, ((2, 0.16781505492071924), (1, 0.7922752739533419))),
 (0, ((3, 0.4988373531569529), (0, 0.5346425002633528))),
 (0, ((3, 0.4988373531569529), (1, 0.7922752739533419))),
 (0, ((4, 0.2663703869883476), (0, 0.5346425002633528))),
 (0, ((4, 0.2663703869883476), (1, 0.7922752739533419))),
 (0, ((5, 0.8312264238066461), (0, 0.5346425002633528))),
 (0, ((5, 0.8312264238066461), (1, 0.7922752739533419)))]

On effectue le produit matriciel.

[11]:
def produit_matriciel(row):
    index, ((i, v1), (j, v2)) = row
    return i, j, v1 * v2


produit = mat_join.map(produit_matriciel)
produit.take(12)
[11]:
[(0, 0, 0.02854122992740946),
 (0, 1, 0.04229463753548444),
 (1, 0, 0.5106363030838507),
 (1, 1, 0.7567010043477654),
 (2, 0, 0.08972106054464521),
 (2, 1, 0.13295571861080793),
 (3, 0, 0.2666996497165864),
 (3, 1, 0.3952165006305848),
 (4, 0, 0.14241292969556701),
 (4, 1, 0.21103867132425078),
 (5, 0, 0.4444089735089506),
 (5, 1, 0.6585601426386672)]

Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.

  • Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées

  • Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées

Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.

[12]:
from operator import add

final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[12]:
[((0, 0), 2.874990589654995),
 ((0, 1), 2.827876596857878),
 ((1, 0), 2.5031936080071473),
 ((1, 1), 2.7910500661755484),
 ((2, 0), 2.4724282791474432),
 ((2, 1), 2.9650591244671727),
 ((3, 0), 3.566752899363589),
 ((3, 1), 3.626771576425741),
 ((4, 0), 2.036599633115649),
 ((4, 1), 1.9672070245510558),
 ((5, 0), 2.459315517874097),
 ((5, 1), 2.692513578277047),
 ((6, 0), 2.3570894967638365),
 ((6, 1), 2.5449195463399703),
 ((7, 0), 2.423986556348134),
 ((7, 1), 3.333327755348771),
 ((8, 0), 3.204800695141252),
 ((8, 1), 3.6047947320158165),
 ((9, 0), 2.3596432321882945),
 ((9, 1), 2.753364084839743)]

Résultat initial :

[13]:
rnd1 @ rnd2
[13]:
array([[2.87499059, 2.8278766 ],
       [2.50319361, 2.79105007],
       [2.47242828, 2.96505912],
       [3.5667529 , 3.62677158],
       [2.03659963, 1.96720702],
       [2.45931552, 2.69251358],
       [2.3570895 , 2.54491955],
       [2.42398656, 3.33332776],
       [3.2048007 , 3.60479473],
       [2.35964323, 2.75336408]])

Même algorithme avec les Spark DataFrame

On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.

[14]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(
    pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema
)
[15]:
mat1.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)
 |-- c3: double (nullable = true)
 |-- c4: double (nullable = true)
 |-- c5: double (nullable = true)
 |-- c6: double (nullable = true)
 |-- c7: double (nullable = true)
 |-- c8: double (nullable = true)
 |-- c9: double (nullable = true)
 |-- c10: double (nullable = true)

[16]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(
    pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema
)
[17]:
mat2.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)

Nous allons avoir besoin de quelques-uns des fonctions et types suivant :

Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.

[18]:
from pyspark.sql.types import (
    ArrayType,
    StructField,
    StructType,
    DoubleType,
    IntegerType,
)
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
[19]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
 |-- index: long (nullable = true)
 |-- x: array (nullable = false)
 |    |-- element: double (containsNull = true)

[20]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)

[21]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape

[21]:
((10, 11), (100, 3))

On recommence le même procédé pour l’autre matrice.

[22]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))

Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.

[23]:
mat2_exp2 = (
    mat2_exploded.withColumnRenamed("index", "index2")
    .withColumnRenamed("pos", "pos2")
    .withColumnRenamed("col", "col2")
)
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
[24]:
produit.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)
 |-- index2: long (nullable = true)
 |-- pos2: integer (nullable = false)
 |-- col2: double (nullable = true)

[25]:
produit.toPandas().head()

[25]:
index pos col index2 pos2 col2
0 0 0 0.053384 0 0 0.534643
1 0 0 0.053384 0 1 0.792275
2 1 0 0.955099 0 0 0.534643
3 1 0 0.955099 0 1 0.792275
4 2 0 0.167815 0 0 0.534643
[26]:
prod = produit.select(
    produit.index.alias("i"),
    produit.pos2.alias("j"),
    (produit.col * produit.col2).alias("val"),
)
final = prod.groupby("i", "j").sum("val")
[27]:
final.printSchema()
root
 |-- i: long (nullable = true)
 |-- j: integer (nullable = false)
 |-- sum(val): double (nullable = true)

[28]:
df = final.toPandas()
[29]:
df.sort_values(["i", "j"]).head()
[29]:
i j sum(val)
7 0 0 2.874991
10 0 1 2.827877
18 1 0 2.503194
3 1 1 2.791050
6 2 0 2.472428
[30]:
df.shape
[30]:
(20, 3)

Alternatives

Plutôt que d’avoir un table où chaque ligne représente trois coefficients, pourrions-nous considérer une matrice par bloc de 16x16 ? Que gagnerait-on ?

Fin

[31]:
spark.stop()

Notebook on github