Overview

上一篇文章PySpark笔记之三:lightGBM调参之PySpark + Grid Search,我们记录了分别用PySpark中自带的CrossValidator和更通用的生成多个分类器同时执行训练预测的方式选取最好的模型。其中CrossValidator并不能得到验证集上最佳的分类器,而是得到训练集上最佳的效果。而mmlspark当中却有更为简单的方式,既可以得到验证集上最佳的效果,也可以方便地记录我们每一组参数对应的结果,是一种很好的方式。

1. 加载PySparkmmlspark相关包

import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("spark://***.***.***.***:7077") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from mmlspark.automl import *
from mmlspark.train import TrainClassifier, ComputeModelStatistics

2.加载数据

df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/测试集特征.csv")

处理训练集、验证集和测试集数据:

feature_cols = list(df_train.columns)
feature_cols.remove("label")
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_train = assembler.transform(df_train)
df_val = assembler.transform(df_val)
df_test = assembler.transform(df_test)

3.建模

lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='label',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=20,
    learningRate=0.1,
    #lambdaL1=1.0,
    #lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    minSumHessianInLeaf=0.001,
    numIterations=800,
    verbosity=1
)

lambdaL1lambdaL2为例,设置4组不同的参数,这里可以自己记录每组参数,方便和后面各模型效果对应,就不再赘述了:

lightGBMs = list()
for lambdaL1 in list(np.arange(1.0, 3.0, 1.0)):
    for lambdaL2 in list(np.arange(1.0, 3.0, 1.0)):
        lightGBMs.append(lgb.setLambdaL1(lambdaL1).setLambdaL2(lambdaL2))

当然,这里可以用这种方式来设置参数,更方便:

import itertools
lightGBMs = list()
params = itertools.product([1.0, 2.0], [1.0, 2.0])
for param in params:
    lightGBMs.append(lgb.setLambdaL1(param[0]).setLambdaL2(param[1]))

利用mmlspark.train模块当中的TrainClassifier类训练模型:

lgb_models = [TrainClassifier(model=lgb, labelCol="label").fit(df_train) for lgb in lightGBMs]

利用mmlspark.automl当中的FindBestModel类,寻找在验证集上效果最好的模型:

best_model = FindBestModel(evaluationMetric='AUC', models=lgb_models).fit(df_val)

我们可以看看最好的模型效果:

best_model.getBestModelMetrics().collect()

结果如下,是一个长度为1list

[Row(evaluation_type='Classification', confusion_matrix=DenseMatrix(2, 2, [6647.0, 578.0, 2774.0, 1219.0], False), accuracy=0.7011945088251025, precision=0.30528424743300775, recall=0.6783528102392877, AUC=0.7554637200060459)]

也可以看看所有4个模型在验证集上的效果:

best_model.getAllModelMetrics().collect()

结果是个list,长度为4

[Row(model_name='TrainClassifier_64edf29511fb', metric=0.7544038268737058, parameters='featuresCol: TrainClassifier_31b56e2ce33a_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction'),
 Row(model_name='TrainClassifier_b09abcd6c7e5', metric=0.7541219526558816, parameters='featuresCol: TrainClassifier_9c85465a6f6d_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction'),
 Row(model_name='TrainClassifier_5a1bd3015154', metric=0.7528073862858738, parameters='featuresCol: TrainClassifier_ba6c57087959_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction'),
 Row(model_name='TrainClassifier_323901015bd2', metric=0.7554575473623404, parameters='featuresCol: TrainClassifier_59842f00f628_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction')]

4.预测测试集

predictions = bestModel.transform(df_test)
metrics = ComputeModelStatistics().transform(predictions)
print("Best model's AUC on test set = "
      + "{0:.2f}%".format(metrics.first()["AUC"] * 100))

我们可以看看metrics里面都是什么:

metrics.collect()

结果也是一个list,包含混淆矩阵,accuracy,precisionrecall

[Row(evaluation_type='Classification', confusion_matrix=DenseMatrix(2, 2, [11323.0, 1777.0, 83.0, 115.0], False), accuracy=0.8601293427583095, precision=0.5808080808080808, recall=0.060782241014799156, AUC=0.7411781436942614)]

总体来说,这种方式来进行分布式训练,比PySpark自带的api更方便一些,推荐mmlspark方式。

本文参考了以下文章:
Simplifying Machine Learning Pipelines with mmlspark