Premiers pas avec Spark¶
Introduction à Spark et aux RDD.
[1]:
%matplotlib inline
Deux ou trois petites choses à ne pas oublier¶
Local et cluster¶
Spark n’est pas un langage de programmation mais un environnement de calcul distribué. L’installation en locale reproduit ce que Spark donnerait à grande échelle sur un cluster mais ce n’est pas rigoureusement identique. En particulier cela veut dire que si votre script tourne en local sur un petit jeu de données, il est possible qu’il échoue sur le cluster :
Les dépendances du script sont installées en local mais pas sur chaque machine du cluster Spark. Cela peut se faire à l’installation du cluster pour des dépendances conséquentes ou juste avant l’exécution d’un job pour des dépendances ponctuelles.
Les données sur le cluster sont en plus grand nombre, il est fort probable que l’échantillon aléatoire local ne soit pas représentatif.
Les chemins locaux ne fonctionnent pas sur le cluster. Il faudra d’abord uploader les données sur le cluster pour faire tourner le script.
Débugger est compliqué : les print ne marchent pas souvent, surtout si c’est en distribué. Le print va s’exécuter sur une machine distance qui est à mille lieues de votre écran.
Quand ça plante sur une machine distante, il faut s’accrocher. Le pire, c’est quand l’erreur arrive pour une observation toute bizarre après cinq heures de calcul. Si le message d’erreur n’est pas trop incompréhensible, on sen tire. En fait, le plus agaçant, c’est quand le calcul est carrément interrompu par le cluster au bout de cinq heures car il décrète que les probabilités d’aboutir sont quasi nulles. Là, on connaît l’erreur (skewed dataset) et on sait qu’on va souffrir pour construire la contournante.
Spark et RDD¶
Spark ne manipule pas des fichiers mais des Resilient Distributed Dataset ou RDD. En particulier :
Les RDD sont organisés en ligne : ce sont des blocs qui ne seront jamais cassés ni modifié. Ces lignes ne peuvent pas excéder 2 Go (voir SPARK-6235) mais il est conseillé de ne pas aller au-delà de quelques Mo.
Sauf exception, il est impossible d’accéder à une partie du fichier. Il faut le parcourir en entier (il n’y a pas d’index).
Les RDD fonctionnent comme des flux ou stream. On peut soit les lire, soit les écrire mais jamais les deux en même temps. Par conséquent, on ne peut pas modifier un RDD, il faut toujours en créer un autre.
Les RDD sont distribués. L’ordre des lignes qui le composent n’est pas prévisible.
Comme l’ordre est imprévisible, on ne stocke jamais les noms des colonnes dans les RDD.
Les partitions¶
Il existe une exception au point 2 : les partitions. Une partition est un ensemble de lignes traitées par le même processus. La parallélisation ne peut excéder le nombre de partitions. Par défaut, c’est aléatoire (hash hash). Mais on peut tout-à-fait partionner selon une colonne, deux colonnes. D’ailleurs, c’est là-dessus qu’on joue pour optimiser la distribution. Si on réduit (ou grouper) selon une colonne, c’est d’autant plus rapide si le stream est déjà partitionnée sur cette colonne. Voir également pyspark.RDD.repartition.
Spark et Python¶
Spark est implémenté en Java. L’API Python permet de faire beaucoup de choses mais :
Librairies sur Spark¶
Un des succès de Spark est de proposer des extensions dédiées à certains usages comme MLlib qui implémente des algorihmes de machine learning distribués, GraphX pour des algorithmes sur des graphes. MLlib sera bientôt remplacé par ML qui s’appuie sur les DataFrame.
Vérifier que Spark en local fonctionne¶
On essaye le « hello world » en Spark qui consiste à compter les mots dans un fichier. On prend le fichier du notebook.
[2]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
your 131072x1 screen size is bogus. expect trouble
24/11/11 14:58:33 WARN Utils: Your hostname, xavier2024 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/11 14:58:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 14:58:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[3]:
text_file = sc.textFile("spark_first_steps.ipynb")
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
counts.saveAsTextFile("fichier.out.txt")
[4]:
import os
os.listdir("fichier.out.txt/")
[4]:
['_SUCCESS',
'part-00000',
'part-00001',
'.part-00001.crc',
'._SUCCESS.crc',
'.part-00000.crc']
Sortie en plusieurs fichiers¶
Un job Spark est distribué. La sortie d’un job Spark s’effectue sous la forme de plusieurs stream dans un répertoire, un stream par processus. Cela explique la présence de part-00000, part-00001. Le fichier _SUCCESS
indique le statut du job.
Les opérations de bases¶
Documentation : programming-guide.html - transformations.
Dans cette section, on considère les données comme un ensemble de lignes de texte. Rien de plus. Donc, pas d’information de type, des conversions quasiment tout le temps. Bref, c’est utile pour comprendre. On y revient quand le reste ne marche pas. En général, on commence par Spark SQL. Ah oui j’oubliais, on s’en sert beaucoup quand les données ne sont pas structurées et sont décrites par du JSON, genre des logs d’un site internet. Chaque ligne est en fait un gros JSON.
On utilise un jeu de données de machine learning Adult légèrement pré-traités.
[5]:
import os
import urllib.request as ur
import zipfile
if not os.path.exists("data_adult.txt"):
url = "https://github.com/sdpython/teachcompute/raw/main/_data/data_adult.zip"
with ur.urlopen(url) as u:
content = u.read()
with open("data_adult.zip", "wb") as f:
f.write(content)
with zipfile.ZipFile("data_adult.zip", "r") as zip_ref:
zip_ref.extractall(".")
assert os.path.exists("data_adult.txt")
[6]:
import pandas
df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8")
df.head()
[6]:
age | workclass | fnlwgt | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
1 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
2 | 38 | Private | 215646 | HS-grad | 9 | Divorced | Handlers-cleaners | Not-in-family | White | Male | 0 | 0 | 40 | United-States | <=50K |
3 | 53 | Private | 234721 | 11th | 7 | Married-civ-spouse | Handlers-cleaners | Husband | Black | Male | 0 | 0 | 40 | United-States | <=50K |
4 | 28 | Private | 338409 | Bachelors | 13 | Married-civ-spouse | Prof-specialty | Wife | Black | Female | 0 | 0 | 40 | Cuba | <=50K |
On enlève le nom des colonnes.
[7]:
df.to_csv("adult.txt", sep="\t", encoding="utf-8", index=False, header=None)
[8]:
with open("adult.txt", "r", encoding="utf-8") as f:
print(f.read(1000))
39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
38 Private 215646 HS-grad 9 Divorced Handlers-cleaners Not-in-family White Male 0 0 40 United-States <=50K
53 Private 234721 11th 7 Married-civ-spouse Handlers-cleaners Husband Black Male 0 0 40 United-States <=50K
28 Private 338409 Bachelors 13 Married-civ-spouse Prof-specialty Wife Black Female 0 0 40 Cuba <=50K
37 Private 284582 Masters 14 Married-civ-spouse Exec-managerial Wife White Female 0 0 40 United-States <=50K
49 Private 160187 9th 5 Married-spouse-absent Other-service Not-in-family Black Female 0 0 16 Jamaica <=50K
52 Self-emp-not-inc 209642 HS-grad 9 Married-civ-spouse Exec-managerial Husband White Male 0 0 45 United-States >50K
31 Private 45781 Masters 14 Never-married
déclaration d’un RDD¶
La déclaration déclare l’existence d’un RDD comme on déclare un fichier. Pour l’instant aucune manipulation.
[9]:
rdd = sc.textFile("adult.txt")
enregistrement d’un RDD¶
[10]:
import os
if not os.path.exists("out"):
os.mkdir("out")
[11]:
if os.path.exists("out/copy_adult.txt"):
os.remove("out/copy_adult.txt")
rdd.saveAsTextFile(os.path.abspath("out/copy_adult.txt"))
[12]:
! ls out/copy_adult.txt
_SUCCESS part-00000 part-00001
[13]:
with open("out/copy_adult.txt/part-00000", "r", encoding="utf-8") as f:
print(f.read(1000))
39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
38 Private 215646 HS-grad 9 Divorced Handlers-cleaners Not-in-family White Male 0 0 40 United-States <=50K
53 Private 234721 11th 7 Married-civ-spouse Handlers-cleaners Husband Black Male 0 0 40 United-States <=50K
28 Private 338409 Bachelors 13 Married-civ-spouse Prof-specialty Wife Black Female 0 0 40 Cuba <=50K
37 Private 284582 Masters 14 Married-civ-spouse Exec-managerial Wife White Female 0 0 40 United-States <=50K
49 Private 160187 9th 5 Married-spouse-absent Other-service Not-in-family Black Female 0 0 16 Jamaica <=50K
52 Self-emp-not-inc 209642 HS-grad 9 Married-civ-spouse Exec-managerial Husband White Male 0 0 45 United-States >50K
31 Private 45781 Masters 14 Never-married
lecture locale d’un RDD avec pandas¶
On lit chaque morceaux avant de les concaténer.
[14]:
import glob
import pandas
def read_rdd(path, **options):
pat = os.path.join(path, "part*")
all_files = glob.glob(pat)
if len(all_files) == 0:
raise Exception("No file to read in '{0}'".format(path))
merge = []
for f in all_files:
try:
df = pandas.read_csv(f, header=None, **options)
except Exception as e:
raise Exception("Unable to read '{0}'".format(f)) from e
merge.append(df)
if len(merge) == 0:
raise Exception("No file to read in '{0}'".format(path))
concatenated_df = pandas.concat(merge, ignore_index=True)
return concatenated_df
data = read_rdd("out/copy_adult.txt", sep="\t", encoding="utf-8")
data.head(n=2)
[14]:
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
1 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
collect¶
Cette opération regroupe les deux précédentes en une seule. Il faut toute de même faire attention de ne pas l’exécuter sur un grand fichier sous peine de faire exploser la mémoire.
[15]:
res = rdd.collect()
[16]:
res[:2]
[16]:
['39\t State-gov\t77516\t Bachelors\t13\t Never-married\t Adm-clerical\t Not-in-family\t White\t Male\t2174\t0\t40\t United-States\t <=50K',
'50\t Self-emp-not-inc\t83311\t Bachelors\t13\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t13\t United-States\t <=50K']
[17]:
import pandas
df = pandas.DataFrame([_.split("\t") for _ in res])
df.head(2)
[17]:
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
1 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
map¶
Transformer une ligne en une autre ligne. Chaque ligne est traitée indépendemment des autres.
[18]:
def extract_column(cols, row):
spl = row.split("\t")
return [spl[i].strip() for i in cols]
res = rdd.map(lambda row: extract_column([1, 3], row))
res.collect()[:2]
[18]:
[['State-gov', 'Bachelors'], ['Self-emp-not-inc', 'Bachelors']]
filter¶
Garder ou jeter une ligne. Chaque ligne est traitée indépendemment des autres.
[19]:
def filter_column(row):
spl = row.split("\t")
return spl[-1].strip() != "<=50K"
res = rdd.filter(lambda row: filter_column(row))
res.collect()[:2]
[19]:
['52\t Self-emp-not-inc\t209642\t HS-grad\t9\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t45\t United-States\t >50K',
'31\t Private\t45781\t Masters\t14\t Never-married\t Prof-specialty\t Not-in-family\t White\t Female\t14084\t0\t50\t United-States\t >50K']
On combine souvent les deux :
[20]:
def filter_column_split(row):
return row[-1].strip() != "<=50K"
res = rdd.map(lambda row: extract_column([1, 3, -1], row)).filter(
lambda row: filter_column_split(row)
)
res.collect()[:2]
[20]:
[['Self-emp-not-inc', 'HS-grad', '>50K'], ['Private', 'Masters', '>50K']]
Il faut faire attention aux transformations successives des lignes.
flatMap¶
C’est la principale différence avec SQL. Une ligne peut devenir un nombre variable de lignes.
[21]:
def extract_column_and_multiply_row(n, row):
spl = row.split("\t")
return [tuple(_.strip() for _ in spl)] * n
res = rdd.flatMap(lambda row: extract_column_and_multiply_row(2, row))
res.collect()[:3]
[21]:
[('39',
'State-gov',
'77516',
'Bachelors',
'13',
'Never-married',
'Adm-clerical',
'Not-in-family',
'White',
'Male',
'2174',
'0',
'40',
'United-States',
'<=50K'),
('39',
'State-gov',
'77516',
'Bachelors',
'13',
'Never-married',
'Adm-clerical',
'Not-in-family',
'White',
'Male',
'2174',
'0',
'40',
'United-States',
'<=50K'),
('50',
'Self-emp-not-inc',
'83311',
'Bachelors',
'13',
'Married-civ-spouse',
'Exec-managerial',
'Husband',
'White',
'Male',
'0',
'0',
'13',
'United-States',
'<=50K')]
group / reduce + mapValues¶
Petite moyenne ?
[22]:
def extract_age_rich(row):
spl = row.split("\t")
target = spl[-1].strip()
age = float(spl[0])
return (age, target)
def custom_agg(aggset):
temp = list([_[0] for _ in aggset])
return len(temp), sum(temp)
ave = rdd.map(extract_age_rich).groupBy(lambda row: row[1]).mapValues(custom_agg)
fin = ave.collect()
fin
[22]:
[('>50K', (7841, 346963.0)), ('<=50K', (24720, 909294.0))]
sort¶
Je n’en parle pas. Trier un gros jeu de données est à proscrire. On peut trier au sein d’un groupe mais jamais un stream entier. Ca fait presque dix ans que j’écris des jobs map/reduce, je n’ai jamais écrit un sort sur tout un jeu de données. Ca s’appelle flinguer de la CPU pour rien.
join¶
Et on remet la moyenne dans le stream initial. Il vaut mieux regarder la documentation de la méthode join avant de commencer à lire le code qui suit.
[23]:
add_key = rdd.map(lambda row: row.split("\t")).map(lambda row: (row[-1].strip(), row))
join = add_key.join(ave)
join.collect()[:2]
[23]:
[('>50K',
(['52',
' Self-emp-not-inc',
'209642',
' HS-grad',
'9',
' Married-civ-spouse',
' Exec-managerial',
' Husband',
' White',
' Male',
'0',
'0',
'45',
' United-States',
' >50K'],
(7841, 346963.0))),
('>50K',
(['31',
' Private',
'45781',
' Masters',
'14',
' Never-married',
' Prof-specialty',
' Not-in-family',
' White',
' Female',
'14084',
'0',
'50',
' United-States',
' >50K'],
(7841, 346963.0)))]
On commence à comprendre pourquoi Spark SQL, ça risque d’être pas mal.
le choix existentiel du join : le petit join¶
On fait souvent une opération qui consiste à garder les lignes pour lesquelles une certaine valeur appartient à un ensemble. On peut faire un join classique ou alors l’ensemble est petit, traiter ce join comme un map. On broadcaste l’ensemble à chaque processus exécutant le map.
[24]:
ages = sc.broadcast([20, 30, 40])
ages.value
[24]:
[20, 30, 40]
[25]:
subset = rdd.filter(lambda row: int(row.split("\t")[0]) in ages.value)
subset.collect()[:2]
[25]:
['30\t State-gov\t141297\t Bachelors\t13\t Married-civ-spouse\t Prof-specialty\t Husband\t Asian-Pac-Islander\t Male\t0\t0\t40\t India\t >50K',
'40\t Private\t121772\t Assoc-voc\t11\t Married-civ-spouse\t Craft-repair\t Husband\t Asian-Pac-Islander\t Male\t0\t0\t40\t ?\t >50K']
les trucs qui servent parfois parce que … à l’usage ça sert¶
Ce que font les méthodes associées aux RDD, un peu comme les itérateurs, n’est pas toujours intuitif, mais il est à peu près sûr qu’elles vous serviront un jour (peut-être après avoir googlé ou bingé comme des fous).
[26]:
simple_rdd = sc.parallelize([2, 3, 4])
simple_rdd.collect()
[26]:
[2, 3, 4]
[27]:
simple_rdd.flatMap(lambda x: range(1, x)).collect()
[27]:
[1, 1, 2, 1, 2, 3]
le truc à retenir¶
collect, collect… qu’est-ce que je voulais dire déjà… Ah oui… Un job map/reduce c’est :
La déclaration des flux d’entrées.
Le traitement à proprement parler.
La déclaration des flux de sorties.
A moins d’écrire du java bas niveau, le job est transformé en un plan d’exécution qui n’est jamais exécuté si collect ou save machin chouette n’est jamais exécuté. Bref, c’est du lazy.
Spark DataFrame¶
Au début, ça commence par… créer un dataframe. Et comme pour pandas, ces objets retienennt les noms et les types.
[28]:
import pandas
data = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8")
data.head(2)
[28]:
age | workclass | fnlwgt | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
1 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
[29]:
# sdf = spark.createDataFrame(data) # ça marche
sdf = spark.read.csv("data_adult.txt", sep="\t", encoding="utf-8")
[30]:
sdf.show()
+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|_c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11| _c12| _c13| _c14|
+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age| workclass|fnlwgt| education|education_num| marital_status| occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|target|
| 39| State-gov| 77516| Bachelors| 13| Never-married| Adm-clerical| Not-in-family| White| Male| 2174| 0| 40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 13| United-States| <=50K|
| 38| Private|215646| HS-grad| 9| Divorced| Handlers-cleaners| Not-in-family| White| Male| 0| 0| 40| United-States| <=50K|
| 53| Private|234721| 11th| 7| Married-civ-spouse| Handlers-cleaners| Husband| Black| Male| 0| 0| 40| United-States| <=50K|
| 28| Private|338409| Bachelors| 13| Married-civ-spouse| Prof-specialty| Wife| Black| Female| 0| 0| 40| Cuba| <=50K|
| 37| Private|284582| Masters| 14| Married-civ-spouse| Exec-managerial| Wife| White| Female| 0| 0| 40| United-States| <=50K|
| 49| Private|160187| 9th| 5| Married-spouse-a...| Other-service| Not-in-family| Black| Female| 0| 0| 16| Jamaica| <=50K|
| 52| Self-emp-not-inc|209642| HS-grad| 9| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 45| United-States| >50K|
| 31| Private| 45781| Masters| 14| Never-married| Prof-specialty| Not-in-family| White| Female| 14084| 0| 50| United-States| >50K|
| 42| Private|159449| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 5178| 0| 40| United-States| >50K|
| 37| Private|280464| Some-college| 10| Married-civ-spouse| Exec-managerial| Husband| Black| Male| 0| 0| 80| United-States| >50K|
| 30| State-gov|141297| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| Asian-Pac-Islander| Male| 0| 0| 40| India| >50K|
| 23| Private|122272| Bachelors| 13| Never-married| Adm-clerical| Own-child| White| Female| 0| 0| 30| United-States| <=50K|
| 32| Private|205019| Assoc-acdm| 12| Never-married| Sales| Not-in-family| Black| Male| 0| 0| 50| United-States| <=50K|
| 40| Private|121772| Assoc-voc| 11| Married-civ-spouse| Craft-repair| Husband| Asian-Pac-Islander| Male| 0| 0| 40| ?| >50K|
| 34| Private|245487| 7th-8th| 4| Married-civ-spouse| Transport-moving| Husband| Amer-Indian-Eskimo| Male| 0| 0| 45| Mexico| <=50K|
| 25| Self-emp-not-inc|176756| HS-grad| 9| Never-married| Farming-fishing| Own-child| White| Male| 0| 0| 35| United-States| <=50K|
| 32| Private|186824| HS-grad| 9| Never-married| Machine-op-inspct| Unmarried| White| Male| 0| 0| 40| United-States| <=50K|
| 38| Private| 28887| 11th| 7| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 50| United-States| <=50K|
+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
only showing top 20 rows
Conversion à pandas¶
[31]:
df = sdf.toPandas()
[32]:
df.head()
[32]:
_c0 | _c1 | _c2 | _c3 | _c4 | _c5 | _c6 | _c7 | _c8 | _c9 | _c10 | _c11 | _c12 | _c13 | _c14 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | age | workclass | fnlwgt | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | target |
1 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
2 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
3 | 38 | Private | 215646 | HS-grad | 9 | Divorced | Handlers-cleaners | Not-in-family | White | Male | 0 | 0 | 40 | United-States | <=50K |
4 | 53 | Private | 234721 | 11th | 7 | Married-civ-spouse | Handlers-cleaners | Husband | Black | Male | 0 | 0 | 40 | United-States | <=50K |
Retour aux RDD¶
[33]:
sdf.rdd
[33]:
MapPartitionsRDD[54] at javaToPython at NativeMethodAccessorImpl.java:0
Récuperer le schéma¶
[34]:
sdf.schema
[34]:
StructType([StructField('_c0', StringType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', StringType(), True), StructField('_c5', StringType(), True), StructField('_c6', StringType(), True), StructField('_c7', StringType(), True), StructField('_c8', StringType(), True), StructField('_c9', StringType(), True), StructField('_c10', StringType(), True), StructField('_c11', StringType(), True), StructField('_c12', StringType(), True), StructField('_c13', StringType(), True), StructField('_c14', StringType(), True)])
[35]:
sdf.printSchema()
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)
|-- _c11: string (nullable = true)
|-- _c12: string (nullable = true)
|-- _c13: string (nullable = true)
|-- _c14: string (nullable = true)
Utiliser pandas pour spécifer le format¶
On utilise pandas sur une partie du stream.
[36]:
import pandas
df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8")
df.head(n=2)
[36]:
age | workclass | fnlwgt | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
1 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
[37]:
sdf = spark.createDataFrame(df)
[38]:
sdf.printSchema()
root
|-- age: long (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: long (nullable = true)
|-- education: string (nullable = true)
|-- education_num: long (nullable = true)
|-- marital_status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: long (nullable = true)
|-- capital_loss: long (nullable = true)
|-- hours_per_week: long (nullable = true)
|-- native_country: string (nullable = true)
|-- target: string (nullable = true)
[39]:
fullsdf = spark.createDataFrame(sdf.rdd, sdf.schema)
[40]:
fullsdf.printSchema()
root
|-- age: long (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: long (nullable = true)
|-- education: string (nullable = true)
|-- education_num: long (nullable = true)
|-- marital_status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: long (nullable = true)
|-- capital_loss: long (nullable = true)
|-- hours_per_week: long (nullable = true)
|-- native_country: string (nullable = true)
|-- target: string (nullable = true)
Enregistrement au format parquet¶
[41]:
fullsdf.write.parquet("data_adult.schema.parquet")
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 47.50% for 16 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 44.71% for 17 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 42.22% for 18 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 40.00% for 19 writers
24/11/11 14:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 38.00% for 20 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 40.00% for 19 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 42.22% for 18 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 44.71% for 17 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 47.50% for 16 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/11/11 14:58:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
Relecture du format parquet¶
[42]:
newsdf = spark.read.parquet("data_adult.schema.parquet/")
[43]:
newsdf.printSchema()
root
|-- age: long (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: long (nullable = true)
|-- education: string (nullable = true)
|-- education_num: long (nullable = true)
|-- marital_status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: long (nullable = true)
|-- capital_loss: long (nullable = true)
|-- hours_per_week: long (nullable = true)
|-- native_country: string (nullable = true)
|-- target: string (nullable = true)
Dataframe Spark VS Dataframe pandas¶
Spark a reproduit la même interface que pandas pour ses dataframes excepté que le résultat n’est pas calculé tant qu’on ne choisit pas de sauvegarder le résultat.
[44]:
fifty = fullsdf[fullsdf.age > 50]
[45]:
fifty.show()
+---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age| workclass|fnlwgt| education|education_num| marital_status| occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|target|
+---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 53| Private|234721| 11th| 7| Married-civ-spouse| Handlers-cleaners| Husband| Black| Male| 0| 0| 40| United-States| <=50K|
| 52| Self-emp-not-inc|209642| HS-grad| 9| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 45| United-States| >50K|
| 54| Private|302146| HS-grad| 9| Separated| Other-service| Unmarried| Black| Female| 0| 0| 20| United-States| <=50K|
| 59| Private|109015| HS-grad| 9| Divorced| Tech-support| Unmarried| White| Female| 0| 0| 40| United-States| <=50K|
| 56| Local-gov|216851| Bachelors| 13| Married-civ-spouse| Tech-support| Husband| White| Male| 0| 0| 40| United-States| >50K|
| 54| ?|180211| Some-college| 10| Married-civ-spouse| ?| Husband| Asian-Pac-Islander| Male| 0| 0| 60| South| >50K|
| 53| Self-emp-not-inc| 88506| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| White| Male| 0| 0| 40| United-States| <=50K|
| 57| Federal-gov|337895| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| Black| Male| 0| 0| 40| United-States| >50K|
| 53| Private|144361| HS-grad| 9| Married-civ-spouse| Machine-op-inspct| Husband| White| Male| 0| 0| 38| United-States| <=50K|
| 53| Private|169846| HS-grad| 9| Married-civ-spouse| Adm-clerical| Wife| White| Female| 0| 0| 40| United-States| >50K|
| 79| Private|124744| Some-college| 10| Married-civ-spouse| Prof-specialty| Other-relative| White| Male| 0| 0| 20| United-States| <=50K|
| 67| ?|212759| 10th| 6| Married-civ-spouse| ?| Husband| White| Male| 0| 0| 2| United-States| <=50K|
| 52| Private|276515| Bachelors| 13| Married-civ-spouse| Other-service| Husband| White| Male| 0| 0| 40| Cuba| <=50K|
| 59| Private|159937| HS-grad| 9| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 48| United-States| <=50K|
| 53| Private|346253| HS-grad| 9| Divorced| Sales| Own-child| White| Female| 0| 0| 35| United-States| <=50K|
| 57| Private|249977| Assoc-voc| 11| Married-civ-spouse| Prof-specialty| Husband| White| Male| 0| 0| 40| United-States| <=50K|
| 76| Private|124191| Masters| 14| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 40| United-States| >50K|
| 56| Self-emp-not-inc|335605| HS-grad| 9| Married-civ-spouse| Other-service| Husband| White| Male| 0| 1887| 50| Canada| >50K|
| 53| Private| 95647| 9th| 5| Married-civ-spouse| Handlers-cleaners| Husband| White| Male| 0| 0| 50| United-States| <=50K|
| 56| Self-emp-inc|303090| Some-college| 10| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 50| United-States| <=50K|
+---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
only showing top 20 rows
Fin¶
[46]:
spark.stop()