Spark et MLlib - ML

Régression logisitique avec Spark.

MLlib est la librairie de machine learning distribué implémenté sur Spark et qui explique en partie son succès. La première mouture de la librairie était Mahout implémentée sur Hadoop. MLlib est devenu le standard. ML est la dernière version et s’appuie sur les DataFrame. On retrouve les mêmes concepts que ceux de scikit-learn tels que les Pipeline.

Data

[1]:
import os
import urllib.request as ur
import zipfile

if not os.path.exists("data_adult.txt"):
    url = "https://github.com/sdpython/teachcompute/raw/main/_data/data_adult.zip"
    with ur.urlopen(url) as u:
        content = u.read()
    with open("data_adult.zip", "wb") as f:
        f.write(content)
    with zipfile.ZipFile("data_adult.zip", "r") as zip_ref:
        zip_ref.extractall(".")
assert os.path.exists("data_adult.txt")
[2]:
import pandas

df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8")
df.head()
[2]:
age workclass fnlwgt education education_num marital_status occupation relationship race sex capital_gain capital_loss hours_per_week native_country target
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
2 38 Private 215646 HS-grad 9 Divorced Handlers-cleaners Not-in-family White Male 0 0 40 United-States <=50K
3 53 Private 234721 11th 7 Married-civ-spouse Handlers-cleaners Husband Black Male 0 0 40 United-States <=50K
4 28 Private 338409 Bachelors 13 Married-civ-spouse Prof-specialty Wife Black Female 0 0 40 Cuba <=50K
[3]:
df.dtypes
[3]:
age                int64
workclass         object
fnlwgt             int64
education         object
education_num      int64
marital_status    object
occupation        object
relationship      object
race              object
sex               object
capital_gain       int64
capital_loss       int64
hours_per_week     int64
native_country    object
target            object
dtype: object
[4]:
cols = list(filter(lambda tu: tu[1] != object, zip(range(len(df.columns)), df.dtypes)))
cols
[4]:
[(0, dtype('int64')),
 (2, dtype('int64')),
 (4, dtype('int64')),
 (10, dtype('int64')),
 (11, dtype('int64')),
 (12, dtype('int64'))]
[5]:
column_keep = set(_[0] for _ in cols)
column_keep
[5]:
{0, 2, 4, 10, 11, 12}
[6]:
df.to_csv("adult.txt", sep="\t", index=False, header=None)
[7]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
sc = spark.sparkContext
your 131072x1 screen size is bogus. expect trouble
24/11/11 15:04:11 WARN Utils: Your hostname, xavier2024 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/11 15:04:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 15:04:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[8]:
data = sc.textFile("adult.txt")
[9]:
col = data.take(2)

[10]:
col
[10]:
['39\t State-gov\t77516\t Bachelors\t13\t Never-married\t Adm-clerical\t Not-in-family\t White\t Male\t2174\t0\t40\t United-States\t <=50K',
 '50\t Self-emp-not-inc\t83311\t Bachelors\t13\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t13\t United-States\t <=50K']

Régression logistique (RDD)

On reprend l’exemple de la documentation : Linear Methods - RDD-based API. On exclue les variables catégorielles pour garder l’exemple concis.

[11]:
from pyspark.mllib.regression import LabeledPoint


def parsePoint(line):
    spl = line.split("\t")
    values = [float(x) for i, x in enumerate(spl) if i in column_keep]
    target = float(spl[-1].strip() == "<=50K")
    return LabeledPoint(target, values)


# We prepare the training data
parsedData = data.map(parsePoint)
parsedData.collect()[:2]

[11]:
[LabeledPoint(1.0, [39.0,77516.0,13.0,2174.0,0.0,40.0]),
 LabeledPoint(1.0, [50.0,83311.0,13.0,0.0,0.0,13.0])]
[12]:
from pyspark.mllib.classification import (
    LogisticRegressionWithLBFGS,
    LogisticRegressionModel,
)
from pyspark.mllib.regression import LabeledPoint


# Load and parse the data
def parsePoint(line):
    spl = line.split("\t")
    values = [float(x) for i, x in enumerate(spl) if i in column_keep]
    target = float(spl[-1].strip() == "<=50K")
    return LabeledPoint(target, values)


# We prepare the training data
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)
24/11/11 15:04:26 WARN Instrumentation: [4427c4d8] Initial coefficients will be ignored! Its dimensions (1, 6) did not match the expected size (1, 6)
24/11/11 15:04:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS

Pendant que ça tourne, il faut regarder la fenêtre terminal qui affiche les messages du serveur de notebook.

[13]:
model.numClasses, model.numFeatures, model.weights
[13]:
(2, 6, DenseVector([0.0045, 0.0, 0.0086, -0.0003, -0.0008, 0.009]))
[14]:
def remove_folder(
    top: str, remove_also_top: bool = True, raise_exception: bool = True
) -> list[str]:
    """
    Removes everything in folder *top*.

    :param top: path to remove
    :param remove_also_top: remove also root
    :param raise_exception: raise an exception if a file cannot be remove
    :return: list of removed files and folders --> list of tuple ( (name, "file" or "dir") )
    """
    if top in {"", "C:", "c:", "C:\\", "c:\\", "d:", "D:", "D:\\", "d:\\"}:
        raise RuntimeError(  # pragma: no cover
            "top is a root (c: for example), this is not safe"
        )

    res = []
    first_root = None
    for root, dirs, files in os.walk(top, topdown=False):
        for name in files:
            t = os.path.join(root, name)
            try:
                os.remove(t)
            except PermissionError as e:  # pragma: no cover
                if raise_exception:
                    raise PermissionError(f"unable to remove file {t}") from e
                remove_also_top = False
                continue
            res.append((t, "file"))
        for name in dirs:
            t = os.path.join(root, name)
            try:
                os.rmdir(t)
            except OSError as e:
                if raise_exception:
                    raise OSError(f"unable to remove folder {t}") from e
                remove_also_top = False  # pragma: no cover
                continue  # pragma: no cover
            res.append((t, "dir"))
        if first_root is None:
            first_root = root

    if top is not None and remove_also_top:
        res.append((top, "dir"))
        os.rmdir(top)

    return res
[15]:
def clean(folder):
    if os.path.exists(folder):
        return remove_folder(folder)
    return []


clean("target/pythonLogisticRegressionWithLBFGSModel")

# Save and load model
model.save(sc, "target/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(
    sc, "target/pythonLogisticRegressionWithLBFGSModel"
)

[16]:
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))


def filter_error(ys):
    return ys[0] != ys[1]


trainErr = labelsAndPreds.filter(filter_error).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
Training Error = 0.20217438039372254

Régression logisitique (DataFrame)

On s’inspire de l’exemple : Régression Logistique. Le code change, la préparation des données aussi. Les modèles acceptent comme entrées un vecteur colonne créé par un VectorAssembler.

[17]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

training = spark.createDataFrame(df)
training = training.withColumn("Y", training.target == " <=50K")
training = training.withColumn("Y", training.Y.astype("float"))
training = training.select(
    [
        "age",
        "fnlwgt",
        "education_num",
        "capital_gain",
        "capital_loss",
        "hours_per_week",
        "Y",
    ]
)
assembler = VectorAssembler(
    inputCols=[
        "age",
        "fnlwgt",
        "education_num",
        "capital_gain",
        "capital_loss",
        "hours_per_week",
    ],
    outputCol="features",
)
training = assembler.transform(training)
[18]:
training.explain()
== Physical Plan ==
*(1) Project [age#89L, fnlwgt#91L, education_num#93L, capital_gain#99L, capital_loss#100L, hours_per_week#101L, cast((target#103 =  <=50K) as float) AS Y#136, UDF(struct(age_double_VectorAssembler_5cbe6d4fd991, cast(age#89L as double), fnlwgt_double_VectorAssembler_5cbe6d4fd991, cast(fnlwgt#91L as double), education_num_double_VectorAssembler_5cbe6d4fd991, cast(education_num#93L as double), capital_gain_double_VectorAssembler_5cbe6d4fd991, cast(capital_gain#99L as double), capital_loss_double_VectorAssembler_5cbe6d4fd991, cast(capital_loss#100L as double), hours_per_week_double_VectorAssembler_5cbe6d4fd991, cast(hours_per_week#101L as double))) AS features#167]
+- *(1) Scan ExistingRDD[age#89L,workclass#90,fnlwgt#91L,education#92,education_num#93L,marital_status#94,occupation#95,relationship#96,race#97,sex#98,capital_gain#99L,capital_loss#100L,hours_per_week#101L,native_country#102,target#103]


[19]:
head = training.take(2)
head
[19]:
[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0])),
 Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]))]
[20]:
training.schema
[20]:
StructType([StructField('age', LongType(), True), StructField('fnlwgt', LongType(), True), StructField('education_num', LongType(), True), StructField('capital_gain', LongType(), True), StructField('capital_loss', LongType(), True), StructField('hours_per_week', LongType(), True), StructField('Y', FloatType(), True), StructField('features', VectorUDT(), True)])
[21]:
training.groupby("Y").count().collect()

[21]:
[Row(Y=1.0, count=24720), Row(Y=0.0, count=7841)]
[22]:
from pyspark.ml.classification import LogisticRegression
[23]:
lr = LogisticRegression(
    maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="Y", featuresCol="features"
)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
Coefficients: (6,[],[])
Intercept: 1.1482462553407051
[24]:
prediction = lrModel.transform(training)
prediction.take(2)
[24]:
[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0),
 Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0)]

Fin

[25]:
spark.stop()

Notebook on github