Overview
上一篇文章PySpark笔记之三:lightGBM调参之PySpark + Grid Search,我们记录了分别用PySpark
中自带的CrossValidator
和更通用的生成多个分类器同时执行训练预测的方式选取最好的模型。其中CrossValidator
并不能得到验证集上最佳的分类器,而是得到训练集上最佳的效果。而mmlspark
当中却有更为简单的方式,既可以得到验证集上最佳的效果,也可以方便地记录我们每一组参数对应的结果,是一种很好的方式。
1. 加载PySpark
和mmlspark
相关包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import numpy as np import pyspark spark = pyspark.sql.SparkSession.builder.appName( "spark lightgbm" ) \ .config( "spark.jars.packages" , "com.microsoft.ml.spark:mmlspark_2.11:0.18.1" ) \ .config( "spark.cores.max" , "20" ) \ .config( "spark.driver.memory" , "6G" ) \ .config( "spark.executor.memory" , "6G" ) \ .config( "spark.executor.cores" , "6" ) \ .getOrCreate() from pyspark.ml.feature import VectorAssembler from pyspark.ml.evaluation import BinaryClassificationEvaluator import mmlspark from mmlspark.lightgbm import LightGBMClassifier from mmlspark.automl import * from mmlspark.train import TrainClassifier, ComputeModelStatistics |
2.加载数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | df_train = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_val = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_test = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ |
处理训练集、验证集和测试集数据:
1 2 3 4 5 6 | feature_cols = list (df_train.columns) feature_cols.remove( "label" ) assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features" ) df_train = assembler.transform(df_train) df_val = assembler.transform(df_val) df_test = assembler.transform(df_test) |
3.建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | lgb = LightGBMClassifier( objective = "binary" , boostingType = 'gbdt' , isUnbalance = True , featuresCol = 'features' , labelCol = 'label' , maxBin = 60 , baggingFreq = 1 , baggingSeed = 696 , earlyStoppingRound = 20 , learningRate = 0.1 , #lambdaL1=1.0, #lambdaL2=45.0, maxDepth = 3 , numLeaves = 128 , baggingFraction = 0.7 , featureFraction = 0.7 , minSumHessianInLeaf = 0.001 , numIterations = 800 , verbosity = 1 ) |
以lambdaL1
和lambdaL2
为例,设置4组不同的参数,这里可以自己记录每组参数,方便和后面各模型效果对应,就不再赘述了:
1 2 3 4 | lightGBMs = list () for lambdaL1 in list (np.arange( 1.0 , 3.0 , 1.0 )): for lambdaL2 in list (np.arange( 1.0 , 3.0 , 1.0 )): lightGBMs.append(lgb.setLambdaL1(lambdaL1).setLambdaL2(lambdaL2)) |
当然,这里可以用这种方式来设置参数,更方便:
1 2 3 4 5 | import itertools lightGBMs = list () params = itertools.product([ 1.0 , 2.0 ], [ 1.0 , 2.0 ]) for param in params: lightGBMs.append(lgb.setLambdaL1(param[ 0 ]).setLambdaL2(param[ 1 ])) |
利用mmlspark.train
模块当中的TrainClassifier
类训练模型:
1 | lgb_models = [TrainClassifier(model = lgb, labelCol = "label" ).fit(df_train) for lgb in lightGBMs] |
利用mmlspark.automl
当中的FindBestModel
类,寻找在验证集上效果最好的模型:
1 | best_model = FindBestModel(evaluationMetric = 'AUC' , models = lgb_models).fit(df_val) |
我们可以看看最好的模型效果:
1 | best_model.getBestModelMetrics().collect() |
结果如下,是一个长度为1
的list
:
1 | [Row(evaluation_type='Classification', confusion_matrix=DenseMatrix(2, 2, [6647.0, 578.0, 2774.0, 1219.0], False), accuracy=0.7011945088251025, precision=0.30528424743300775, recall=0.6783528102392877, AUC=0.7554637200060459)] |
也可以看看所有4
个模型在验证集上的效果:
1 | best_model.getAllModelMetrics().collect() |
结果是个list
,长度为4
:
1 2 3 4 | [Row(model_name='TrainClassifier_64edf29511fb', metric=0.7544038268737058, parameters='featuresCol: TrainClassifier_31b56e2ce33a_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction'), Row(model_name='TrainClassifier_b09abcd6c7e5', metric=0.7541219526558816, parameters='featuresCol: TrainClassifier_9c85465a6f6d_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction'), Row(model_name='TrainClassifier_5a1bd3015154', metric=0.7528073862858738, parameters='featuresCol: TrainClassifier_ba6c57087959_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction'), Row(model_name='TrainClassifier_323901015bd2', metric=0.7554575473623404, parameters='featuresCol: TrainClassifier_59842f00f628_features, labelCol: label, predictionCol: prediction, probabilityCol: probability, rawPredictionCol: rawPrediction')] |
4.预测测试集
1 2 3 4 | predictions = bestModel.transform(df_test) metrics = ComputeModelStatistics().transform(predictions) print ( "Best model's AUC on test set = " + "{0:.2f}%" . format (metrics.first()[ "AUC" ] * 100 )) |
我们可以看看metrics
里面都是什么:
1 | metrics.collect() |
结果也是一个list
,包含混淆矩阵,accuracy
,precision
和recall
:
1 | [Row(evaluation_type='Classification', confusion_matrix=DenseMatrix(2, 2, [11323.0, 1777.0, 83.0, 115.0], False), accuracy=0.8601293427583095, precision=0.5808080808080808, recall=0.060782241014799156, AUC=0.7411781436942614)] |
总体来说,这种方式来进行分布式训练,比PySpark
自带的api
更方便一些,推荐mmlspark
方式。
本文参考了以下文章:
Simplifying Machine Learning Pipelines with mmlspark