Matrices en 3 colonnes¶
Représentation d’une matrice avec Spark / Map / Reduce.
Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices en tableau de trois colonnes
.
Session spark with no cluster¶
Spark est censé tourner sur un cluster. Mais ce n’est pas essentielle pour comprendre la logique. Le notebook tourne donc en local.
[1]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/04 22:03:48 WARN Utils: Your hostname, xadupre2025, resolves to a loopback address: 127.0.1.1; using 172.17.197.78 instead (on interface eth0)
25/11/04 22:03:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/04 22:03:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Création d’une matrice aléatoire¶
[2]:
from numpy.random import rand
rnd1 = rand(10, 10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
[2]:
array([[2.82769564, 4.3446237 ],
[2.70903159, 4.28609675],
[2.3197026 , 4.01486866],
[2.61458882, 4.66117542],
[1.99321382, 3.49690316],
[3.08580871, 3.67840109],
[2.99800173, 4.73972669],
[3.55996446, 4.85961952],
[2.51579451, 3.26530604],
[2.21882233, 3.76041136]])
[3]:
import pandas
df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
[3]:
| 0 | 1 | |
|---|---|---|
| 0 | 0.452410 | 0.818571 |
| 1 | 0.384679 | 0.965623 |
| 2 | 0.646845 | 0.539604 |
| 3 | 0.138227 | 0.917017 |
| 4 | 0.422069 | 0.801644 |
| 5 | 0.238986 | 0.595039 |
| 6 | 0.895846 | 0.681612 |
| 7 | 0.410572 | 0.862415 |
| 8 | 0.869270 | 0.509816 |
| 9 | 0.588944 | 0.955183 |
[4]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
[5]:
mat1 = spark.sparkContext.textFile("rnd1.txt")
Conversion d’une matrice au format Spark¶
Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.
[6]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
[7]:
def process_mat_row(row):
values = row.split("\t")
index = int(values[0])
values = [float(_) for _ in values[1:]]
return [[index, j, v] for j, v in enumerate(values)]
[8]:
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[8]:
[[0, 0, 0.03659454680020524],
[0, 1, 0.767607811315502],
[0, 2, 0.5733073849596189],
[0, 3, 0.8757599092593937],
[0, 4, 0.9367553854443735],
[0, 5, 0.41209851669976916],
[0, 6, 0.5540866293161243],
[0, 7, 0.4665984547370805],
[0, 8, 0.7567418978844924],
[0, 9, 0.3129986409413059],
[1, 0, 0.49628400193691236],
[1, 1, 0.4944250595188363]]
[9]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[9]:
[[0, 0, 0.452409743518958],
[0, 1, 0.818571020089081],
[1, 0, 0.3846793223675351],
[1, 1, 0.9656232217356503],
[2, 0, 0.6468448619803056],
[2, 1, 0.5396040456507285],
[3, 0, 0.13822741516437753],
[3, 1, 0.9170173659401171],
[4, 0, 0.4220691346862584],
[4, 1, 0.8016438317882735],
[5, 0, 0.23898597387372977],
[5, 1, 0.5950391411819209]]
Produit matriciel¶
Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.
[10]:
def key_ij(row):
return row[0], (row[1], row[2])
def key_ji(row):
return row[1], (row[0], row[2])
mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[10]:
[(0, ((0, 0.03659454680020524), (0, 0.452409743518958))),
(0, ((0, 0.03659454680020524), (1, 0.818571020089081))),
(0, ((1, 0.49628400193691236), (0, 0.452409743518958))),
(0, ((1, 0.49628400193691236), (1, 0.818571020089081))),
(0, ((2, 0.26823832256619184), (0, 0.452409743518958))),
(0, ((2, 0.26823832256619184), (1, 0.818571020089081))),
(0, ((3, 0.8516746510081805), (0, 0.452409743518958))),
(0, ((3, 0.8516746510081805), (1, 0.818571020089081))),
(0, ((4, 0.210077377768215), (0, 0.452409743518958))),
(0, ((4, 0.210077377768215), (1, 0.818571020089081))),
(0, ((5, 0.0024981117896399896), (0, 0.452409743518958))),
(0, ((5, 0.0024981117896399896), (1, 0.818571020089081)))]
On effectue le produit matriciel.
[11]:
def produit_matriciel(row):
index, ((i, v1), (j, v2)) = row
return i, j, v1 * v2
produit = mat_join.map(produit_matriciel)
produit.take(12)
[11]:
[(0, 0, 0.01655572953207336),
(0, 1, 0.029955235503941618),
(1, 0, 0.22452371802884058),
(1, 1, 0.4062437017193898),
(2, 0, 0.12135363071412639),
(2, 1, 0.21957211732999163),
(3, 0, 0.38530591042420903),
(3, 1, 0.6971561878597784),
(4, 0, 0.0950410525952534),
(4, 1, 0.17196325341736698),
(5, 0, 0.001130170114032713),
(5, 1, 0.002044881915942166)]
Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.
Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées
Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées
Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.
[12]:
from operator import add
final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[12]:
[((0, 0), 2.8276956385710266),
((0, 1), 4.344623698461026),
((1, 0), 2.709031589561176),
((1, 1), 4.286096753861166),
((2, 0), 2.319702595813269),
((2, 1), 4.014868661312692),
((3, 0), 2.614588815326311),
((3, 1), 4.66117541697328),
((4, 0), 1.9932138244874387),
((4, 1), 3.4969031578212193),
((5, 0), 3.0858087055402157),
((5, 1), 3.678401085869831),
((6, 0), 2.9980017336305025),
((6, 1), 4.739726686558273),
((7, 0), 3.5599644578749863),
((7, 1), 4.859619515198687),
((8, 0), 2.515794511441488),
((8, 1), 3.26530604000568),
((9, 0), 2.218822328336375),
((9, 1), 3.7604113556777747)]
Résultat initial :
[13]:
rnd1 @ rnd2
[13]:
array([[2.82769564, 4.3446237 ],
[2.70903159, 4.28609675],
[2.3197026 , 4.01486866],
[2.61458882, 4.66117542],
[1.99321382, 3.49690316],
[3.08580871, 3.67840109],
[2.99800173, 4.73972669],
[3.55996446, 4.85961952],
[2.51579451, 3.26530604],
[2.21882233, 3.76041136]])
Même algorithme avec les Spark DataFrame¶
On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.
[14]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(
pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema
)
[15]:
mat1.printSchema()
root
|-- index: long (nullable = true)
|-- c1: double (nullable = true)
|-- c2: double (nullable = true)
|-- c3: double (nullable = true)
|-- c4: double (nullable = true)
|-- c5: double (nullable = true)
|-- c6: double (nullable = true)
|-- c7: double (nullable = true)
|-- c8: double (nullable = true)
|-- c9: double (nullable = true)
|-- c10: double (nullable = true)
[16]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(
pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema
)
[17]:
mat2.printSchema()
root
|-- index: long (nullable = true)
|-- c1: double (nullable = true)
|-- c2: double (nullable = true)
Nous allons avoir besoin de quelques-uns des fonctions et types suivant :
Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.
[18]:
from pyspark.sql.types import (
ArrayType,
StructField,
StructType,
DoubleType,
IntegerType,
)
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
[19]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
|-- index: long (nullable = true)
|-- x: array (nullable = false)
| |-- element: double (containsNull = true)
[20]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
|-- index: long (nullable = true)
|-- pos: integer (nullable = false)
|-- col: double (nullable = true)
[21]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape
[21]:
((10, 11), (100, 3))
On recommence le même procédé pour l’autre matrice.
[22]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))
Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.
[23]:
mat2_exp2 = (
mat2_exploded.withColumnRenamed("index", "index2")
.withColumnRenamed("pos", "pos2")
.withColumnRenamed("col", "col2")
)
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
[24]:
produit.printSchema()
root
|-- index: long (nullable = true)
|-- pos: integer (nullable = false)
|-- col: double (nullable = true)
|-- index2: long (nullable = true)
|-- pos2: integer (nullable = false)
|-- col2: double (nullable = true)
[25]:
produit.toPandas().head()
[25]:
| index | pos | col | index2 | pos2 | col2 | |
|---|---|---|---|---|---|---|
| 0 | 0 | 0 | 0.036595 | 0 | 0 | 0.452410 |
| 1 | 0 | 0 | 0.036595 | 0 | 1 | 0.818571 |
| 2 | 1 | 0 | 0.496284 | 0 | 0 | 0.452410 |
| 3 | 1 | 0 | 0.496284 | 0 | 1 | 0.818571 |
| 4 | 2 | 0 | 0.268238 | 0 | 0 | 0.452410 |
[26]:
prod = produit.select(
produit.index.alias("i"),
produit.pos2.alias("j"),
(produit.col * produit.col2).alias("val"),
)
final = prod.groupby("i", "j").sum("val")
[27]:
final.printSchema()
root
|-- i: long (nullable = true)
|-- j: integer (nullable = false)
|-- sum(val): double (nullable = true)
[28]:
df = final.toPandas()
[29]:
df.sort_values(["i", "j"]).head()
[29]:
| i | j | sum(val) | |
|---|---|---|---|
| 7 | 0 | 0 | 2.827696 |
| 10 | 0 | 1 | 4.344624 |
| 18 | 1 | 0 | 2.709032 |
| 3 | 1 | 1 | 4.286097 |
| 6 | 2 | 0 | 2.319703 |
[30]:
df.shape
[30]:
(20, 3)
Alternatives¶
Plutôt que d’avoir un table où chaque ligne représente trois coefficients, pourrions-nous considérer une matrice par bloc de 16x16 ? Que gagnerait-on ?
Fin¶
[31]:
spark.stop()