Spark et MLlib - ML¶
Régression logisitique avec Spark.
MLlib est la librairie de machine learning distribué implémenté sur Spark et qui explique en partie son succès. La première mouture de la librairie était Mahout implémentée sur Hadoop. MLlib est devenu le standard. ML est la dernière version et s’appuie sur les DataFrame. On retrouve les mêmes concepts que ceux de scikit-learn tels que les Pipeline.
Data¶
[1]:
import os
import urllib.request as ur
import zipfile
if not os.path.exists("data_adult.txt"):
url = "https://github.com/sdpython/teachcompute/raw/main/_data/data_adult.zip"
with ur.urlopen(url) as u:
content = u.read()
with open("data_adult.zip", "wb") as f:
f.write(content)
with zipfile.ZipFile("data_adult.zip", "r") as zip_ref:
zip_ref.extractall(".")
assert os.path.exists("data_adult.txt")
[2]:
import pandas
df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8")
df.head()
[2]:
age | workclass | fnlwgt | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 39 | State-gov | 77516 | Bachelors | 13 | Never-married | Adm-clerical | Not-in-family | White | Male | 2174 | 0 | 40 | United-States | <=50K |
1 | 50 | Self-emp-not-inc | 83311 | Bachelors | 13 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 0 | 13 | United-States | <=50K |
2 | 38 | Private | 215646 | HS-grad | 9 | Divorced | Handlers-cleaners | Not-in-family | White | Male | 0 | 0 | 40 | United-States | <=50K |
3 | 53 | Private | 234721 | 11th | 7 | Married-civ-spouse | Handlers-cleaners | Husband | Black | Male | 0 | 0 | 40 | United-States | <=50K |
4 | 28 | Private | 338409 | Bachelors | 13 | Married-civ-spouse | Prof-specialty | Wife | Black | Female | 0 | 0 | 40 | Cuba | <=50K |
[3]:
df.dtypes
[3]:
age int64
workclass object
fnlwgt int64
education object
education_num int64
marital_status object
occupation object
relationship object
race object
sex object
capital_gain int64
capital_loss int64
hours_per_week int64
native_country object
target object
dtype: object
[4]:
cols = list(filter(lambda tu: tu[1] != object, zip(range(len(df.columns)), df.dtypes)))
cols
[4]:
[(0, dtype('int64')),
(2, dtype('int64')),
(4, dtype('int64')),
(10, dtype('int64')),
(11, dtype('int64')),
(12, dtype('int64'))]
[5]:
column_keep = set(_[0] for _ in cols)
column_keep
[5]:
{0, 2, 4, 10, 11, 12}
[6]:
df.to_csv("adult.txt", sep="\t", index=False, header=None)
[7]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
your 131072x1 screen size is bogus. expect trouble
24/11/11 15:04:11 WARN Utils: Your hostname, xavier2024 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/11 15:04:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 15:04:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[8]:
data = sc.textFile("adult.txt")
[9]:
col = data.take(2)
[10]:
col
[10]:
['39\t State-gov\t77516\t Bachelors\t13\t Never-married\t Adm-clerical\t Not-in-family\t White\t Male\t2174\t0\t40\t United-States\t <=50K',
'50\t Self-emp-not-inc\t83311\t Bachelors\t13\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t13\t United-States\t <=50K']
Régression logistique (RDD)¶
On reprend l’exemple de la documentation : Linear Methods - RDD-based API. On exclue les variables catégorielles pour garder l’exemple concis.
[11]:
from pyspark.mllib.regression import LabeledPoint
def parsePoint(line):
spl = line.split("\t")
values = [float(x) for i, x in enumerate(spl) if i in column_keep]
target = float(spl[-1].strip() == "<=50K")
return LabeledPoint(target, values)
# We prepare the training data
parsedData = data.map(parsePoint)
parsedData.collect()[:2]
[11]:
[LabeledPoint(1.0, [39.0,77516.0,13.0,2174.0,0.0,40.0]),
LabeledPoint(1.0, [50.0,83311.0,13.0,0.0,0.0,13.0])]
[12]:
from pyspark.mllib.classification import (
LogisticRegressionWithLBFGS,
LogisticRegressionModel,
)
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
spl = line.split("\t")
values = [float(x) for i, x in enumerate(spl) if i in column_keep]
target = float(spl[-1].strip() == "<=50K")
return LabeledPoint(target, values)
# We prepare the training data
parsedData = data.map(parsePoint)
# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)
24/11/11 15:04:26 WARN Instrumentation: [4427c4d8] Initial coefficients will be ignored! Its dimensions (1, 6) did not match the expected size (1, 6)
24/11/11 15:04:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
Pendant que ça tourne, il faut regarder la fenêtre terminal qui affiche les messages du serveur de notebook.
[13]:
model.numClasses, model.numFeatures, model.weights
[13]:
(2, 6, DenseVector([0.0045, 0.0, 0.0086, -0.0003, -0.0008, 0.009]))
[14]:
def remove_folder(
top: str, remove_also_top: bool = True, raise_exception: bool = True
) -> list[str]:
"""
Removes everything in folder *top*.
:param top: path to remove
:param remove_also_top: remove also root
:param raise_exception: raise an exception if a file cannot be remove
:return: list of removed files and folders --> list of tuple ( (name, "file" or "dir") )
"""
if top in {"", "C:", "c:", "C:\\", "c:\\", "d:", "D:", "D:\\", "d:\\"}:
raise RuntimeError( # pragma: no cover
"top is a root (c: for example), this is not safe"
)
res = []
first_root = None
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
t = os.path.join(root, name)
try:
os.remove(t)
except PermissionError as e: # pragma: no cover
if raise_exception:
raise PermissionError(f"unable to remove file {t}") from e
remove_also_top = False
continue
res.append((t, "file"))
for name in dirs:
t = os.path.join(root, name)
try:
os.rmdir(t)
except OSError as e:
if raise_exception:
raise OSError(f"unable to remove folder {t}") from e
remove_also_top = False # pragma: no cover
continue # pragma: no cover
res.append((t, "dir"))
if first_root is None:
first_root = root
if top is not None and remove_also_top:
res.append((top, "dir"))
os.rmdir(top)
return res
[15]:
def clean(folder):
if os.path.exists(folder):
return remove_folder(folder)
return []
clean("target/pythonLogisticRegressionWithLBFGSModel")
# Save and load model
model.save(sc, "target/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(
sc, "target/pythonLogisticRegressionWithLBFGSModel"
)
[16]:
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
def filter_error(ys):
return ys[0] != ys[1]
trainErr = labelsAndPreds.filter(filter_error).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
Training Error = 0.20217438039372254
Régression logisitique (DataFrame)¶
On s’inspire de l’exemple : Régression Logistique. Le code change, la préparation des données aussi. Les modèles acceptent comme entrées un vecteur colonne créé par un VectorAssembler.
[17]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
training = spark.createDataFrame(df)
training = training.withColumn("Y", training.target == " <=50K")
training = training.withColumn("Y", training.Y.astype("float"))
training = training.select(
[
"age",
"fnlwgt",
"education_num",
"capital_gain",
"capital_loss",
"hours_per_week",
"Y",
]
)
assembler = VectorAssembler(
inputCols=[
"age",
"fnlwgt",
"education_num",
"capital_gain",
"capital_loss",
"hours_per_week",
],
outputCol="features",
)
training = assembler.transform(training)
[18]:
training.explain()
== Physical Plan ==
*(1) Project [age#89L, fnlwgt#91L, education_num#93L, capital_gain#99L, capital_loss#100L, hours_per_week#101L, cast((target#103 = <=50K) as float) AS Y#136, UDF(struct(age_double_VectorAssembler_5cbe6d4fd991, cast(age#89L as double), fnlwgt_double_VectorAssembler_5cbe6d4fd991, cast(fnlwgt#91L as double), education_num_double_VectorAssembler_5cbe6d4fd991, cast(education_num#93L as double), capital_gain_double_VectorAssembler_5cbe6d4fd991, cast(capital_gain#99L as double), capital_loss_double_VectorAssembler_5cbe6d4fd991, cast(capital_loss#100L as double), hours_per_week_double_VectorAssembler_5cbe6d4fd991, cast(hours_per_week#101L as double))) AS features#167]
+- *(1) Scan ExistingRDD[age#89L,workclass#90,fnlwgt#91L,education#92,education_num#93L,marital_status#94,occupation#95,relationship#96,race#97,sex#98,capital_gain#99L,capital_loss#100L,hours_per_week#101L,native_country#102,target#103]
[19]:
head = training.take(2)
head
[19]:
[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0])),
Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]))]
[20]:
training.schema
[20]:
StructType([StructField('age', LongType(), True), StructField('fnlwgt', LongType(), True), StructField('education_num', LongType(), True), StructField('capital_gain', LongType(), True), StructField('capital_loss', LongType(), True), StructField('hours_per_week', LongType(), True), StructField('Y', FloatType(), True), StructField('features', VectorUDT(), True)])
[21]:
training.groupby("Y").count().collect()
[21]:
[Row(Y=1.0, count=24720), Row(Y=0.0, count=7841)]
[22]:
from pyspark.ml.classification import LogisticRegression
[23]:
lr = LogisticRegression(
maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="Y", featuresCol="features"
)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
Coefficients: (6,[],[])
Intercept: 1.1482462553407051
[24]:
prediction = lrModel.transform(training)
prediction.take(2)
[24]:
[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0),
Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0)]
Fin¶
[25]:
spark.stop()