机器学习
这个示例的目的是使用Spark的MLlib库进行逻辑回归模型的训练和预测。我们将基于ElasticNet正则化的逻辑回归模型来处理输入数据,并输出模型的系数和截距。ElasticNet是L1(Lasso)和L2(Ridge)正则化的组合,有助于防止模型过拟合。
数据格式说明
数据为sample_libsvm_data.txt
输入数据是稀疏矩阵格式,每行代表一个样本。每行的第一个数值表示类别标签,后续的键值对表示特征索引和对应的特征值。例如:
0 128:51 129:159 130:253 ...
1 155:178 156:255 157:105 ...
其中,第一个数值(0或1)表示类别标签,后面的键值对(例如128:51)表示特征索引128的值为51。
实际应用示例
- 手写数字识别:
- 每个样本表示一张手写数字图片,标签表示数字类别(0-9),特征表示图片的像素值。
- 128:51表示第128个像素的值为51。
- 文本分类:
- 每个样本表示一篇文档,标签表示文档类别(例如,垃圾邮件或正常邮件),特征表示词频或TF-IDF值。
- 128:51表示第128个单词在文档中出现了51次。
数据上传到hdfs
cd /opt/spark
hadoop fs -put data/mllib/sample_libsvm_data.txt /test
准备脚本
原始脚本为Spark官方自带机器学习脚本
"examples/src/main/python/ml/logistic_regression_with_elastic_net.py"
修改里面代码,将文件路径修改为刚才上传的hdfs路径
training = spark.read.format("libsvm").load("hdfs:///test/sample_libsvm_data.txt")
以下是logistic_regression_with_elastic_net.py脚本的主要部分和解释:
from __future__ import print_function
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder.appName("LogisticRegressionWithElasticNet").getOrCreate()
# 加载训练数据
data_path = "data/data.txt"
training = spark.read.format("libsvm").load(data_path)
# 创建逻辑回归模型实例
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 训练模型
lrModel = lr.fit(training)
# 打印模型的系数和截距
print("Multinomial coefficients: " + str(lrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(lrModel.interceptVector))
# 保存模型到hdfs上
model_path = "hdfs:///test/sample_libsvm_model"
lrModel.save(model_path)
# 停止SparkSession
spark.stop()
说明
- SparkSession: 创建一个新的SparkSession。
- 加载数据: 使用libsvm格式加载训练数据。
- LogisticRegression: 创建逻辑回归模型实例,并设置最大迭代次数、正则化参数和ElasticNet参数。
- 训练模型: 通过fit方法训练模型。
- 打印系数和截距: 输出模型的系数矩阵和截距向量。
运行机器学习脚本
cd /opt/spark
spark-submit /opt/spark/examples/src/main/python/ml/logistic_regression_with_elastic_net.py
训练结果解释
模型训练完成后,脚本将输出模型的系数和截距。例如:
Multinomial coefficients: 2 X 692 CSRMatrix
(0,272) 0.0001
(0,300) 0.0001
...
Multinomial intercepts: [0.2750587585718083,-0.2750587585718083]
这些输出表示训练得到的逻辑回归模型的参数,用于描述各个特征对分类结果的影响。
然后在hdfs上可以看到被报错的模型
root@hadoop-master1:/opt/spark# hadoop fs -ls hdfs:///test/sample_libsvm_model/data
Found 2 items
-rw-r--r-- 3 root supergroup 0 2024-05-16 06:09 hdfs:///test/sample_libsvm_model/data/_SUCCESS
-rw-r--r-- 3 root supergroup 4978 2024-05-16 06:09 hdfs:///test/sample_libsvm_model/data/part-00000-8859fc5d-5c4d-4519-9257-acb87db6e27b-c000.snappy.parquet
预测数据
处理预测数据源
使用Shell命令截取sample_libsvm_data.txt前20行,去除第一列的标签并保存为新的文件sample_libsvm_data_4_classification.txt:
sh
复制代码
# 截取前20行
head -n 20 data/mllib/sample_libsvm_data.txt > data/sample_libsvm_data_4_classification.txt
# 去除第一列的标签
# awk '{$1=""; print $0}' data/sample_libsvm_data_4_classification.txt > temp && mv temp data/sample_libsvm_data_4_classification.txt
# 截取前20行并将第一列设置为0
head -n 20 data/mllib/sample_libsvm_data.txt | awk '{$1=0; print $0}' > data/sample_libsvm_data_4_classification.txt
预测集上传到HDFS
hadoop fs -put data/sample_libsvm_data_4_classification.txt /test/sample_libsvm_data_4_classification.txt
预测脚本
以下是预测脚本logistic_regression_prediction.py
,从保存的模型路径/opt/spark/data/mllib/sample_libsvm_model读取模型,并使用处理后的预测数据进行预测,输出预测结果:
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder.appName("LogisticRegressionPrediction").getOrCreate()
# 加载训练好的模型
model_path = "hdfs:///test/sample_libsvm_model"
lrModel = LogisticRegressionModel.load(model_path)
# 加载新数据(处理后的前20行数据)
new_data_path = "hdfs:///test/sample_libsvm_data_4_classification.txt"
new_data = spark.read.format("libsvm").load(new_data_path)
# 使用模型进行预测
predictions = lrModel.transform(new_data)
# 显示预测结果
predictions.select("prediction", "probability").show()
# 停止SparkSession
spark.stop()
执行预测脚本:
spark-submit examples/src/main/python/ml/logistic_regression_prediction.py
最终成功输出预测结果