Overview
就目前的PySpark
版本2.4.5
来说,虽有pyspark.ml
这个模块可以进行机器学习,但是都是一些工业界不太常用的算法,而XGBoost
和LightGBM
这样的常用算法还没有集成。幸好微软前几年发布了mmlspark
这个包,其中包含了深度学习和LightGBM
等算法,可以和PySpark
无缝对接。下面我们看看怎么用PySpark
和mmlspark
来运行LightGBM
。
1. 安装mmlspark
首先,我们默认已经安装好了PySpark
,如果没有安装,那么安装命令如下:
1 | pip3 install pyspark |
然后,安装mmlspark
的方式如下,命令行输入:
1 | pyspark --packages com.microsoft.ml.spark:mmlspark_2.11:0.18.1 |
或者直接在jupyter notebook
中,使用PySpark
时,这样启动:
1 2 3 4 5 6 | import pyspark spark = pyspark.sql.SparkSession.builder.appName( "MyApp" ) \ .config( "spark.jars.packages" , "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1" ) \ .getOrCreate() import mmlspark |
这个包比较大,第一次安装需要较长时间。我们服务器上maven
仓库当中,mmlspark
版本是0.18.1
,而这个版本训练时会导致一个bug
,下面会提到。
详细安装方式可参考官方文档:Azure / mmlspark,最新的mmlspark
版本是1.0.0-rc1
。
2. 引入依赖包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import numpy as np import pyspark spark = pyspark.sql.SparkSession.builder.appName( "spark lightgbm" ) \ .config( "spark.jars.packages" , "com.microsoft.ml.spark:mmlspark_2.11:0.18.1" ) \ .config( "spark.cores.max" , "20" ) \ .config( "spark.driver.memory" , "6G" ) \ .config( "spark.executor.memory" , "6G" ) \ .config( "spark.executor.cores" , "6" ) \ .getOrCreate() import mmlspark from mmlspark.lightgbm import LightGBMClassifier from pyspark.ml.feature import VectorAssembler # from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml import Pipeline |
其中,VectorAssembler
这个类,可以将特征转换成一个向量,作为分类器的输入。BinaryClassificationEvaluator
可以评价预测效果。Pipeline
相当于Spark
的流水线,将各个步骤连接在一起后,一起由Spark
运行。
3. 加载数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | df_train = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_val = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_test = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ |
首先,要保证
Hadoop
上有我们需要的数据集文件。本地文件上传到Hadoop
的命令很简单:
hdfs dfs -copyFromLocal 训练集特征.csv 验证集特征.csv 测试集特征.csv /young
处理训练集特征:
1 2 3 | feature_cols = list (df_train.columns) feature_cols.remove( "label" ) # 从列名当中删除label才是真正的特征列表 assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features" ) |
VectorAssembler
之后,原数据DataFrame
当中会多出一列,列名为features
,这个字段内容其实就是一个行向量,里面是该样本的每一个特征。
4. 模型构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | lgb = LightGBMClassifier( objective = "binary" , boostingType = 'gbdt' , isUnbalance = True , featuresCol = 'features' , labelCol = 'label' , maxBin = 60 , baggingFreq = 1 , baggingSeed = 696 , earlyStoppingRound = 30 , learningRate = 0.1 , lambdaL1 = 1.0 , lambdaL2 = 45.0 , maxDepth = 3 , numLeaves = 128 , baggingFraction = 0.7 , featureFraction = 0.7 , # minSumHessianInLeaf=1, numIterations = 800 , verbosity = 30 ) |
其中,minSumHessianInLeaf
这个参数,在设置为整型数值时,会抛ClassCastException
异常,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | Py4JJavaError: An error occurred while calling o1592.w. : java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114) at org.apache.spark.ml.param.DoubleParam.w(params.scala:330) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) |
这是一个bug
,在最新版本当中,这个bug
应该已经改过来了。不过,对0.18.1
这样比较老的版本,我们可以将这个变量设置为double
或者float
类型,例如1
写成1.0
,那么这个异常就不会出现了。
装载各个阶段到Pipeline
流水线中,执行训练:
1 2 3 | stages = [assembler, lgb] pipeline_model = Pipeline(stages = stages) model = pipeline_model.fit(df_train) |
预测三个数据集:
1 2 3 | train_preds = model.transform(df_train) val_preds = model.transform(df_val) val_preds = model.transform(df_val) |
我们看看模型训练效果:
1 2 3 4 | binaryEvaluator = BinaryClassificationEvaluator() print ( "Train AUC: " + str (binaryEvaluator.evaluate(train_preds, {binaryEvaluator.metricName: "areaUnderROC" }))) print ( "Val AUC: " + str (binaryEvaluator.evaluate(val_preds, {binaryEvaluator.metricName: "areaUnderROC" }))) print ( "Test AUC: " + str (binaryEvaluator.evaluate(test_preds, {binaryEvaluator.metricName: "areaUnderROC" }))) |
输出三个AUC
:
1 2 3 | Train AUC: 0.8236477163798177 Val AUC: 0.7548691969544107 Test AUC: 0.749396806843621 |
当然,我们可以把预测概率结果和真实label
取出来,方便进行计算其他自定义指标,例如KS
等。
1 2 3 4 5 6 7 8 | train_prob_list = [row.probability[ 0 ] for row in train_preds.select( 'probability' ).collect()] train_label_list = [row.label for row in train_preds.select( 'label' ).collect()] val_prob_list = [row.probability[ 0 ] for row in val_preds.select( 'probability' ).collect()] val_label_list = [row.label for row in val_preds.select( 'label' ).collect()] test_prob_list = [row.probability[ 0 ] for row in test_preds.select( 'probability' ).collect()] test_label_list = [row.label for row in test_preds.select( 'label' ).collect()] |
DataFrame
在经过一系列transform
之后,会多出4
列,分别为features|rawPrediction|probability|prediction|
,rawPrediction
是树模型最后的一对儿得分,和为0;在经过sigmoid
之后,得到probability
里的一对儿概率值,其和为1
,分别表示模型判定该样本为两个分类的可能性;而prediction
则是模型预测的样本类别。
本文参考了以下文章:
在集群上训练一个机器学习模型(以lightGBM为例)
利用pyspark.ml训练lightgbm模型的流程
Get Started with PySpark and LightGBM
Build XGBoost / LightGBM models on large datasets — what are the possible solutions?
在版本么有问题的情况下,安装mmlspark等包后,引入的时候会报错,如:
from mmlspark.train import ComputeModelStatistics
from mmlspark.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer
from mmlspark.lightgbm import LightGBMRegressor
报错:
>>> from mmlspark.train import ComputeModelStatistics
Traceback (most recent call last):
File "", line 1, in
ImportError: cannot import name 'ComputeModelStatistics'
>>> from mmlspark.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer
Traceback (most recent call last):
File "", line 1, in
File "/home/hive/.local/lib/python3.6/site-packages/mmlspark/vw/VowpalWabbitRegressor.py", line 4, in
from mmlspark.vw._VowpalWabbitRegressor import _VowpalWabbitRegressor, _VowpalWabbitRegressionModel
ModuleNotFoundError: No module named 'mmlspark.vw._VowpalWabbitRegressor'
>>> from mmlspark.lightgbm import LightGBMRegressor
Traceback (most recent call last):
File "", line 1, in
File "/home/hive/.local/lib/python3.6/site-packages/mmlspark/lightgbm/LightGBMRegressor.py", line 11, in
from mmlspark.lightgbm._LightGBMRegressor import _LightGBMRegressor
ModuleNotFoundError: No module named 'mmlspark.lightgbm._LightGBMRegressor'
>>> import pyspark
>>> spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
... .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
... .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
... .getOrCreate()
>>> import mmlspark
>>> from mmlspark.lightgbm import LightGBMRegressor
Traceback (most recent call last):
File "", line 1, in
File "/home/hive/.local/lib/python3.6/site-packages/mmlspark/lightgbm/LightGBMRegressor.py", line 11, in
from mmlspark.lightgbm._LightGBMRegressor import _LightGBMRegressor
ModuleNotFoundError: No module named 'mmlspark.lightgbm._LightGBMRegressor'
请教这是什么原因呢?