Overview

就目前的PySpark版本2.4.5来说,虽有pyspark.ml这个模块可以进行机器学习,但是都是一些工业界不太常用的算法,而XGBoostLightGBM这样的常用算法还没有集成。幸好微软前几年发布了mmlspark这个包,其中包含了深度学习和LightGBM等算法,可以和PySpark无缝对接。下面我们看看怎么用PySparkmmlspark来运行LightGBM

1. 安装mmlspark

首先,我们默认已经安装好了PySpark,如果没有安装,那么安装命令如下:

pip3 install pyspark

然后,安装mmlspark的方式如下,命令行输入:

pyspark --packages com.microsoft.ml.spark:mmlspark_2.11:0.18.1

或者直接在jupyter notebook中,使用PySpark时,这样启动:

import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .getOrCreate()
import mmlspark

这个包比较大,第一次安装需要较长时间。我们服务器上maven仓库当中,mmlspark版本是0.18.1,而这个版本训练时会导致一个bug,下面会提到。
详细安装方式可参考官方文档:Azure / mmlspark,最新的mmlspark版本是1.0.0-rc1

2. 引入依赖包

import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("spark://***.***.***.***:7077") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

其中,VectorAssembler这个类,可以将特征转换成一个向量,作为分类器的输入。BinaryClassificationEvaluator可以评价预测效果。Pipeline相当于Spark的流水线,将各个步骤连接在一起后,一起由Spark运行。

3. 加载数据

df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/测试集特征.csv")

首先,要保证Hadoop上有我们需要的数据集文件。本地文件上传到Hadoop的命令很简单:
hdfs dfs -copyFromLocal 训练集特征.csv 验证集特征.csv 测试集特征.csv /young

处理训练集特征:

feature_cols = list(df_train.columns)
feature_cols.remove("label")  # 从列名当中删除label才是真正的特征列表 
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") 

VectorAssembler之后,原数据DataFrame当中会多出一列,列名为features,这个字段内容其实就是一个行向量,里面是该样本的每一个特征。

4. 模型构建

lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='label',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=30,
    learningRate=0.1,
    lambdaL1=1.0,
    lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    # minSumHessianInLeaf=1,
    numIterations=800,
    verbosity=30
)

其中,minSumHessianInLeaf这个参数,在设置为整型数值时,会抛ClassCastException异常,

Py4JJavaError: An error occurred while calling o1592.w.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
    at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)
    at org.apache.spark.ml.param.DoubleParam.w(params.scala:330)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

这是一个bug,在最新版本当中,这个bug应该已经改过来了。不过,对0.18.1这样比较老的版本,我们可以将这个变量设置为double或者float类型,例如1写成1.0,那么这个异常就不会出现了。
装载各个阶段到Pipeline流水线中,执行训练:

stages = [assembler, lgb]
pipeline_model = Pipeline(stages=stages)
model = pipeline_model.fit(df_train)

预测三个数据集:

train_preds = model.transform(df_train)
val_preds = model.transform(df_val)
val_preds = model.transform(df_val)

我们看看模型训练效果:

binaryEvaluator = BinaryClassificationEvaluator()
print ("Train AUC: " + str(binaryEvaluator.evaluate(train_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
print ("Val AUC: " + str(binaryEvaluator.evaluate(val_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
print ("Test AUC: " + str(binaryEvaluator.evaluate(test_preds, {binaryEvaluator.metricName: "areaUnderROC"})))

输出三个AUC

Train AUC: 0.8236477163798177
Val AUC: 0.7548691969544107
Test AUC: 0.749396806843621

当然,我们可以把预测概率结果和真实label取出来,方便进行计算其他自定义指标,例如KS等。

train_prob_list = [row.probability[0] for row in train_preds.select('probability').collect()]
train_label_list = [row.label for row in train_preds.select('label').collect()]

val_prob_list = [row.probability[0] for row in val_preds.select('probability').collect()]
val_label_list = [row.label for row in val_preds.select('label').collect()]

test_prob_list = [row.probability[0] for row in test_preds.select('probability').collect()]
test_label_list = [row.label for row in test_preds.select('label').collect()]

DataFrame在经过一系列transform之后,会多出4列,分别为features|rawPrediction|probability|prediction|rawPrediction是树模型最后的一对儿得分,和为0;在经过sigmoid之后,得到probability里的一对儿概率值,其和为1,分别表示模型判定该样本为两个分类的可能性;而prediction则是模型预测的样本类别。

本文参考了以下文章:
在集群上训练一个机器学习模型(以lightGBM为例)
利用pyspark.ml训练lightgbm模型的流程
Get Started with PySpark and LightGBM
Build XGBoost / LightGBM models on large datasets — what are the possible solutions?