Overview
就目前的PySpark
版本2.4.5
来说,虽有pyspark.ml
这个模块可以进行机器学习,但是都是一些工业界不太常用的算法,而XGBoost
和LightGBM
这样的常用算法还没有集成。幸好微软前几年发布了mmlspark
这个包,其中包含了深度学习和LightGBM
等算法,可以和PySpark
无缝对接。下面我们看看怎么用PySpark
和mmlspark
来运行LightGBM
。
1. 安装mmlspark
首先,我们默认已经安装好了PySpark
,如果没有安装,那么安装命令如下:
pip3 install pyspark
然后,安装mmlspark
的方式如下,命令行输入:
pyspark --packages com.microsoft.ml.spark:mmlspark_2.11:0.18.1
或者直接在jupyter notebook
中,使用PySpark
时,这样启动:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
.config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
.getOrCreate()
import mmlspark
这个包比较大,第一次安装需要较长时间。我们服务器上maven
仓库当中,mmlspark
版本是0.18.1
,而这个版本训练时会导致一个bug
,下面会提到。
详细安装方式可参考官方文档:Azure / mmlspark,最新的mmlspark
版本是1.0.0-rc1
。
2. 引入依赖包
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
.master("spark://***.***.***.***:7077") \
.config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
.config("spark.cores.max", "20") \
.config("spark.driver.memory", "6G") \
.config("spark.executor.memory", "6G") \
.config("spark.executor.cores", "6") \
.getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
其中,VectorAssembler
这个类,可以将特征转换成一个向量,作为分类器的输入。BinaryClassificationEvaluator
可以评价预测效果。Pipeline
相当于Spark
的流水线,将各个步骤连接在一起后,一起由Spark
运行。
3. 加载数据
df_train = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/测试集特征.csv")
首先,要保证
Hadoop
上有我们需要的数据集文件。本地文件上传到Hadoop
的命令很简单:
hdfs dfs -copyFromLocal 训练集特征.csv 验证集特征.csv 测试集特征.csv /young
处理训练集特征:
feature_cols = list(df_train.columns)
feature_cols.remove("label") # 从列名当中删除label才是真正的特征列表
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
VectorAssembler
之后,原数据DataFrame
当中会多出一列,列名为features
,这个字段内容其实就是一个行向量,里面是该样本的每一个特征。
4. 模型构建
lgb = LightGBMClassifier(
objective="binary",
boostingType='gbdt',
isUnbalance=True,
featuresCol='features',
labelCol='label',
maxBin=60,
baggingFreq=1,
baggingSeed=696,
earlyStoppingRound=30,
learningRate=0.1,
lambdaL1=1.0,
lambdaL2=45.0,
maxDepth=3,
numLeaves=128,
baggingFraction=0.7,
featureFraction=0.7,
# minSumHessianInLeaf=1,
numIterations=800,
verbosity=30
)
其中,minSumHessianInLeaf
这个参数,在设置为整型数值时,会抛ClassCastException
异常,
Py4JJavaError: An error occurred while calling o1592.w.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)
at org.apache.spark.ml.param.DoubleParam.w(params.scala:330)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
这是一个bug
,在最新版本当中,这个bug
应该已经改过来了。不过,对0.18.1
这样比较老的版本,我们可以将这个变量设置为double
或者float
类型,例如1
写成1.0
,那么这个异常就不会出现了。
装载各个阶段到Pipeline
流水线中,执行训练:
stages = [assembler, lgb]
pipeline_model = Pipeline(stages=stages)
model = pipeline_model.fit(df_train)
预测三个数据集:
train_preds = model.transform(df_train)
val_preds = model.transform(df_val)
val_preds = model.transform(df_val)
我们看看模型训练效果:
binaryEvaluator = BinaryClassificationEvaluator()
print ("Train AUC: " + str(binaryEvaluator.evaluate(train_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
print ("Val AUC: " + str(binaryEvaluator.evaluate(val_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
print ("Test AUC: " + str(binaryEvaluator.evaluate(test_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
输出三个AUC
:
Train AUC: 0.8236477163798177
Val AUC: 0.7548691969544107
Test AUC: 0.749396806843621
当然,我们可以把预测概率结果和真实label
取出来,方便进行计算其他自定义指标,例如KS
等。
train_prob_list = [row.probability[0] for row in train_preds.select('probability').collect()]
train_label_list = [row.label for row in train_preds.select('label').collect()]
val_prob_list = [row.probability[0] for row in val_preds.select('probability').collect()]
val_label_list = [row.label for row in val_preds.select('label').collect()]
test_prob_list = [row.probability[0] for row in test_preds.select('probability').collect()]
test_label_list = [row.label for row in test_preds.select('label').collect()]
DataFrame
在经过一系列transform
之后,会多出4
列,分别为features|rawPrediction|probability|prediction|
,rawPrediction
是树模型最后的一对儿得分,和为0;在经过sigmoid
之后,得到probability
里的一对儿概率值,其和为1
,分别表示模型判定该样本为两个分类的可能性;而prediction
则是模型预测的样本类别。
本文参考了以下文章:
在集群上训练一个机器学习模型(以lightGBM为例)
利用pyspark.ml训练lightgbm模型的流程
Get Started with PySpark and LightGBM
Build XGBoost / LightGBM models on large datasets — what are the possible solutions?
在版本么有问题的情况下,安装mmlspark等包后,引入的时候会报错,如:
from mmlspark.train import ComputeModelStatistics
from mmlspark.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer
from mmlspark.lightgbm import LightGBMRegressor
报错:
>>> from mmlspark.train import ComputeModelStatistics
Traceback (most recent call last):
File "", line 1, in
ImportError: cannot import name 'ComputeModelStatistics'
>>> from mmlspark.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer
Traceback (most recent call last):
File "", line 1, in
File "/home/hive/.local/lib/python3.6/site-packages/mmlspark/vw/VowpalWabbitRegressor.py", line 4, in
from mmlspark.vw._VowpalWabbitRegressor import _VowpalWabbitRegressor, _VowpalWabbitRegressionModel
ModuleNotFoundError: No module named 'mmlspark.vw._VowpalWabbitRegressor'
>>> from mmlspark.lightgbm import LightGBMRegressor
Traceback (most recent call last):
File "", line 1, in
File "/home/hive/.local/lib/python3.6/site-packages/mmlspark/lightgbm/LightGBMRegressor.py", line 11, in
from mmlspark.lightgbm._LightGBMRegressor import _LightGBMRegressor
ModuleNotFoundError: No module named 'mmlspark.lightgbm._LightGBMRegressor'
>>> import pyspark
>>> spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
... .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
... .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
... .getOrCreate()
>>> import mmlspark
>>> from mmlspark.lightgbm import LightGBMRegressor
Traceback (most recent call last):
File "", line 1, in
File "/home/hive/.local/lib/python3.6/site-packages/mmlspark/lightgbm/LightGBMRegressor.py", line 11, in
from mmlspark.lightgbm._LightGBMRegressor import _LightGBMRegressor
ModuleNotFoundError: No module named 'mmlspark.lightgbm._LightGBMRegressor'
请教这是什么原因呢?