Matrices en 3 colonnes¶
Représentation d’une matrice avec Spark / Map / Reduce.
Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices en tableau de trois colonnes .
Session spark with no cluster¶
Spark est censé tourner sur un cluster. Mais ce n’est pas essentielle pour comprendre la logique. Le notebook tourne donc en local.
[1]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
your 131072x1 screen size is bogus. expect trouble
24/11/11 15:02:46 WARN Utils: Your hostname, xavier2024 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/11 15:02:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 15:02:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Création d’une matrice aléatoire¶
[2]:
from numpy.random import rand
rnd1 = rand(10, 10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
[2]:
array([[2.87499059, 2.8278766 ],
[2.50319361, 2.79105007],
[2.47242828, 2.96505912],
[3.5667529 , 3.62677158],
[2.03659963, 1.96720702],
[2.45931552, 2.69251358],
[2.3570895 , 2.54491955],
[2.42398656, 3.33332776],
[3.2048007 , 3.60479473],
[2.35964323, 2.75336408]])
[3]:
import pandas
df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
[3]:
0 | 1 | |
---|---|---|
0 | 0.534643 | 0.792275 |
1 | 0.868828 | 0.369334 |
2 | 0.015294 | 0.297075 |
3 | 0.509108 | 0.720137 |
4 | 0.674179 | 0.612696 |
5 | 0.398553 | 0.822608 |
6 | 0.321764 | 0.881242 |
7 | 0.173263 | 0.332271 |
8 | 0.953250 | 0.617029 |
9 | 0.843226 | 0.675595 |
[4]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
[5]:
mat1 = spark.sparkContext.textFile("rnd1.txt")
Conversion d’une matrice au format Spark¶
Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.
[6]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
[7]:
def process_mat_row(row):
values = row.split("\t")
index = int(values[0])
values = [float(_) for _ in values[1:]]
return [[index, j, v] for j, v in enumerate(values)]
[8]:
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[8]:
[[0, 0, 0.05338376562534908],
[0, 1, 0.7179621058657293],
[0, 2, 0.8416816618000933],
[0, 3, 0.5866463066118723],
[0, 4, 0.2985761548971829],
[0, 5, 0.17094084464701753],
[0, 6, 0.22587546398891023],
[0, 7, 0.8938702752578108],
[0, 8, 0.7138005257890329],
[0, 9, 0.8701336430078879],
[1, 0, 0.9550985992178377],
[1, 1, 0.26327652337825447]]
[9]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[9]:
[[0, 0, 0.5346425002633528],
[0, 1, 0.7922752739533419],
[1, 0, 0.8688279747342449],
[1, 1, 0.3693339071055035],
[2, 0, 0.01529354702055541],
[2, 1, 0.2970747253149303],
[3, 0, 0.5091076165917049],
[3, 1, 0.7201371902668229],
[4, 0, 0.674178721198942],
[4, 1, 0.6126955081354053],
[5, 0, 0.398552599571223],
[5, 1, 0.8226084971982536]]
Produit matriciel¶
Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.
[10]:
def key_ij(row):
return row[0], (row[1], row[2])
def key_ji(row):
return row[1], (row[0], row[2])
mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[10]:
[(0, ((0, 0.05338376562534908), (0, 0.5346425002633528))),
(0, ((0, 0.05338376562534908), (1, 0.7922752739533419))),
(0, ((1, 0.9550985992178377), (0, 0.5346425002633528))),
(0, ((1, 0.9550985992178377), (1, 0.7922752739533419))),
(0, ((2, 0.16781505492071924), (0, 0.5346425002633528))),
(0, ((2, 0.16781505492071924), (1, 0.7922752739533419))),
(0, ((3, 0.4988373531569529), (0, 0.5346425002633528))),
(0, ((3, 0.4988373531569529), (1, 0.7922752739533419))),
(0, ((4, 0.2663703869883476), (0, 0.5346425002633528))),
(0, ((4, 0.2663703869883476), (1, 0.7922752739533419))),
(0, ((5, 0.8312264238066461), (0, 0.5346425002633528))),
(0, ((5, 0.8312264238066461), (1, 0.7922752739533419)))]
On effectue le produit matriciel.
[11]:
def produit_matriciel(row):
index, ((i, v1), (j, v2)) = row
return i, j, v1 * v2
produit = mat_join.map(produit_matriciel)
produit.take(12)
[11]:
[(0, 0, 0.02854122992740946),
(0, 1, 0.04229463753548444),
(1, 0, 0.5106363030838507),
(1, 1, 0.7567010043477654),
(2, 0, 0.08972106054464521),
(2, 1, 0.13295571861080793),
(3, 0, 0.2666996497165864),
(3, 1, 0.3952165006305848),
(4, 0, 0.14241292969556701),
(4, 1, 0.21103867132425078),
(5, 0, 0.4444089735089506),
(5, 1, 0.6585601426386672)]
Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.
Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées
Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées
Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.
[12]:
from operator import add
final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[12]:
[((0, 0), 2.874990589654995),
((0, 1), 2.827876596857878),
((1, 0), 2.5031936080071473),
((1, 1), 2.7910500661755484),
((2, 0), 2.4724282791474432),
((2, 1), 2.9650591244671727),
((3, 0), 3.566752899363589),
((3, 1), 3.626771576425741),
((4, 0), 2.036599633115649),
((4, 1), 1.9672070245510558),
((5, 0), 2.459315517874097),
((5, 1), 2.692513578277047),
((6, 0), 2.3570894967638365),
((6, 1), 2.5449195463399703),
((7, 0), 2.423986556348134),
((7, 1), 3.333327755348771),
((8, 0), 3.204800695141252),
((8, 1), 3.6047947320158165),
((9, 0), 2.3596432321882945),
((9, 1), 2.753364084839743)]
Résultat initial :
[13]:
rnd1 @ rnd2
[13]:
array([[2.87499059, 2.8278766 ],
[2.50319361, 2.79105007],
[2.47242828, 2.96505912],
[3.5667529 , 3.62677158],
[2.03659963, 1.96720702],
[2.45931552, 2.69251358],
[2.3570895 , 2.54491955],
[2.42398656, 3.33332776],
[3.2048007 , 3.60479473],
[2.35964323, 2.75336408]])
Même algorithme avec les Spark DataFrame¶
On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.
[14]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(
pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema
)
[15]:
mat1.printSchema()
root
|-- index: long (nullable = true)
|-- c1: double (nullable = true)
|-- c2: double (nullable = true)
|-- c3: double (nullable = true)
|-- c4: double (nullable = true)
|-- c5: double (nullable = true)
|-- c6: double (nullable = true)
|-- c7: double (nullable = true)
|-- c8: double (nullable = true)
|-- c9: double (nullable = true)
|-- c10: double (nullable = true)
[16]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(
pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema
)
[17]:
mat2.printSchema()
root
|-- index: long (nullable = true)
|-- c1: double (nullable = true)
|-- c2: double (nullable = true)
Nous allons avoir besoin de quelques-uns des fonctions et types suivant :
Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.
[18]:
from pyspark.sql.types import (
ArrayType,
StructField,
StructType,
DoubleType,
IntegerType,
)
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
[19]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
|-- index: long (nullable = true)
|-- x: array (nullable = false)
| |-- element: double (containsNull = true)
[20]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
|-- index: long (nullable = true)
|-- pos: integer (nullable = false)
|-- col: double (nullable = true)
[21]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape
[21]:
((10, 11), (100, 3))
On recommence le même procédé pour l’autre matrice.
[22]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))
Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.
[23]:
mat2_exp2 = (
mat2_exploded.withColumnRenamed("index", "index2")
.withColumnRenamed("pos", "pos2")
.withColumnRenamed("col", "col2")
)
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
[24]:
produit.printSchema()
root
|-- index: long (nullable = true)
|-- pos: integer (nullable = false)
|-- col: double (nullable = true)
|-- index2: long (nullable = true)
|-- pos2: integer (nullable = false)
|-- col2: double (nullable = true)
[25]:
produit.toPandas().head()
[25]:
index | pos | col | index2 | pos2 | col2 | |
---|---|---|---|---|---|---|
0 | 0 | 0 | 0.053384 | 0 | 0 | 0.534643 |
1 | 0 | 0 | 0.053384 | 0 | 1 | 0.792275 |
2 | 1 | 0 | 0.955099 | 0 | 0 | 0.534643 |
3 | 1 | 0 | 0.955099 | 0 | 1 | 0.792275 |
4 | 2 | 0 | 0.167815 | 0 | 0 | 0.534643 |
[26]:
prod = produit.select(
produit.index.alias("i"),
produit.pos2.alias("j"),
(produit.col * produit.col2).alias("val"),
)
final = prod.groupby("i", "j").sum("val")
[27]:
final.printSchema()
root
|-- i: long (nullable = true)
|-- j: integer (nullable = false)
|-- sum(val): double (nullable = true)
[28]:
df = final.toPandas()
[29]:
df.sort_values(["i", "j"]).head()
[29]:
i | j | sum(val) | |
---|---|---|---|
7 | 0 | 0 | 2.874991 |
10 | 0 | 1 | 2.827877 |
18 | 1 | 0 | 2.503194 |
3 | 1 | 1 | 2.791050 |
6 | 2 | 0 | 2.472428 |
[30]:
df.shape
[30]:
(20, 3)
Alternatives¶
Plutôt que d’avoir un table où chaque ligne représente trois coefficients, pourrions-nous considérer une matrice par bloc de 16x16 ? Que gagnerait-on ?
Fin¶
[31]:
spark.stop()