Overview
在实际机器学习工作当中,调参是我们一个重要的内容。PySpark
当中就实现了一个最常用的调参方法Grid Search
,我们结合lightGBM
使用一下PySpark
的调参。这个程序需要安装的依赖的安装方式,可以参考上一篇博客。
1. 引入依赖包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import numpy as np import pyspark spark = pyspark.sql.SparkSession.builder.appName( "spark lightgbm" ) \ .config( "spark.jars.packages" , "com.microsoft.ml.spark:mmlspark_2.11:0.18.1" ) \ .config( "spark.cores.max" , "20" ) \ .config( "spark.driver.memory" , "6G" ) \ .config( "spark.executor.memory" , "6G" ) \ .config( "spark.executor.cores" , "6" ) \ .getOrCreate() import mmlspark from mmlspark.lightgbm import LightGBMClassifier from pyspark.ml.feature import VectorAssembler from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml import Pipeline |
其中,pyspark.ml.tuning.ParamGridBuilder
就是用以实现Grid Search
的包。
2. 加载数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | df_train = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_val = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_test = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ |
处理训练集特征:
1 2 3 4 | feature_cols = list (df_train.columns) feature_cols.remove( "label" ) # 从列名当中删除label才是真正的特征列表 assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features" ) df_train = assembler.transform(df_train) |
3. 模型构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | lgb = LightGBMClassifier( objective = "binary" , boostingType = 'gbdt' , isUnbalance = True , featuresCol = 'features' , labelCol = 'label' , maxBin = 60 , baggingFreq = 1 , baggingSeed = 696 , earlyStoppingRound = 30 , learningRate = 0.1 , # lambdaL1=1.0, # lambdaL2=45.0, maxDepth = 3 , numLeaves = 128 , baggingFraction = 0.7 , featureFraction = 0.7 , minSumHessianInLeaf = 0.001 , numIterations = 800 , verbosity = 1 ) |
设置Grid Search
参数组:
1 2 3 4 | paramGrid = ParamGridBuilder() \ .addGrid(lgb.lambdaL1, list (np.arange( 1.0 , 3.0 , 1.0 ))) \ .addGrid(lgb.lambdaL2, list (np.arange( 1.0 , 4.0 , 1.0 ))) \ .build() |
设置完成之后,我们可以看一下参数都是哪些:
1 2 | for param in paramGrid: print (param.values()) |
可以看到共6
组参数,左边为lambdaL1
,右边为lambdaL2
:
1 2 3 4 5 6 | dict_values([ 1.0 , 1.0 ]) dict_values([ 1.0 , 2.0 ]) dict_values([ 1.0 , 3.0 ]) dict_values([ 2.0 , 1.0 ]) dict_values([ 2.0 , 2.0 ]) dict_values([ 2.0 , 3.0 ]) |
4. 交叉验证选择模型
官方提供了两种模型选择的方式:CrossValidator
和TrainValidationSplit
,可以参考官方文档。CrossValidator
和TrainValidationSplit
的区别在于:CrossValidator
会每次选取一部分训练集建模,去预测另外一部分训练集,这样会有K
个预测的分数(K
折交叉验证),最后模型的预测分数为K
个分数的平均;而TrainValidationSplit
则只会训练预测一次。这里,我们试着给出一个CrossValidator
的例子。
1 2 3 4 5 6 | evaluator = BinaryClassificationEvaluator(labelCol = "label" , metricName = "areaUnderROC" ) cross_vallidator = CrossValidator(estimator = lgb, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = 3 ) model = cross_vallidator.fit(df_train) |
最后,我们可以得到最好的模型,并看最好的模型的特征重要性:
1 2 | model.bestModel model.bestModel.getFeatureImportances() |
但是,官方api
查看每个结果对应的超参数却是非常不友好,我们只好自己想办法,这里我们参考了这篇博客LightGBM Hyper Parameters Tuning in Spark:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def params_extract(model): """ function extact hyperparameter information from a CrossValidatorModel input: a CrossValidatorModel instance, model fit by CrossValidator in pyspark.ml.tuning output: a dictionary with key(hyperparameters setting), value(evaluator's metrics, r2, auc,...) """ length = len (model.avgMetrics) res = {} for i in range (length): s = "" paraDict = model.extractParamMap()[model.estimatorParamMaps][i] for j in paraDict.keys(): s + = str (j).split( "__" )[ 1 ] + " " s + = str (paraDict[j]) + " " res[s.strip()] = model.avgMetrics[i] return {k: v for k, v in sorted (res.items(), key = lambda item: item[ 1 ])} |
我们试一下:
1 | params_extract(model) |
输出如下:
1 2 3 4 5 6 | { 'lambdaL1 1.0 lambdaL2 2.0' : 0.7300710699287217 , 'lambdaL1 1.0 lambdaL2 1.0' : 0.7307078416518147 , 'lambdaL1 1.0 lambdaL2 3.0' : 0.7310860740685388 , 'lambdaL1 2.0 lambdaL2 1.0' : 0.7312930665848859 , 'lambdaL1 2.0 lambdaL2 2.0' : 0.7317563737747359 , 'lambdaL1 2.0 lambdaL2 3.0' : 0.7327463775422981 } |
5. 官方调参探讨
我们通过CrossValidator
可以获得最佳的模型,但是会有一个问题:这个最佳是拟合训练集的最佳,而不是我们给出的验证集的最佳;即使是TrainValidationSplit
,我们也不能自定义验证集并传入,只能随机选择验证集。这样对于那些样本时间先后顺序不敏感的数据是影响不大的,比如图像等,但是对于交易类数据,我们希望可以根据时间先后顺序自定义训练集,验证集和测试集,并且根据验证集的效果来确定最佳参数和模型。因此,我们就需要换种方式达到目的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | import numpy as np import pyspark spark = pyspark.sql.SparkSession.builder.appName( "spark lightgbm" ) \ .config( "spark.jars.packages" , "com.microsoft.ml.spark:mmlspark_2.11:0.18.1" ) \ .config( "spark.cores.max" , "20" ) \ .config( "spark.driver.memory" , "6G" ) \ .config( "spark.executor.memory" , "6G" ) \ .config( "spark.executor.cores" , "6" ) \ .getOrCreate() import mmlspark from mmlspark.lightgbm import LightGBMClassifier from pyspark.ml.feature import VectorAssembler from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator # 加载数据 df_train = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_val = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ df_test = spark.read. format ( "csv" ) \ .option( "inferSchema" , "true" ) \ .option( "header" , "true" ) \ .option( "sep" , "," ) \ feature_cols = list (df_train.columns) feature_cols.remove( "label" ) assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features" ) df_train = assembler.transform(df_train) df_val = assembler.transform(df_val) df_test = assembler.transform(df_test) lgb = LightGBMClassifier( objective = "binary" , boostingType = 'gbdt' , isUnbalance = True , featuresCol = 'features' , labelCol = 'label' , maxBin = 60 , baggingFreq = 1 , baggingSeed = 696 , earlyStoppingRound = 20 , learningRate = 0.1 , #lambdaL1=1.0, #lambdaL2=45.0, maxDepth = 3 , numLeaves = 128 , baggingFraction = 0.7 , featureFraction = 0.7 , minSumHessianInLeaf = 0.001 , numIterations = 800 , verbosity = 1 ) lightGBMs = list () for lambdaL1 in list (np.arange( 1.0 , 3.0 , 1.0 )): for lambdaL2 in list (np.arange( 1.0 , 4.0 , 1.0 )): lightGBMs.append(lgb.setLambdaL1(lambdaL1).setLambdaL2(lambdaL2)) evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction" , metricName = "areaUnderROC" ) metrics = [] models = [] # 选择验证集效果最好的模型 for learner in lightGBMs: model = learner.fit(df_train) models.append(model) scoredData = model.transform(df_val) metrics.append(evaluator.evaluate(scoredData)) best_metric = max (metrics) best_model = models[metrics.index(best_metric)] # 得到测试集上AUC scored_test = best_model.transform(df_test) print (evaluator.evaluate(scored_test)) |
当然,用这种方式的话,我们就可以很方便地记录每组参数对应的结果了,在评价验证集效果时,也可以用KS
或者Gini
等指标了,这里就不再赘述。
本文主要参考了:How to build machine learning model at large scale with Apache Spark and LightGBM for credit card fraud detection?
Simplifying Machine Learning Pipelines with mmlspark
作者您好,
我在databricks运行代码,这里结合CrossValidator和lightgbm, 总是会出错。并且databricks的显示并行计算的进度条不弹出来。
把github的issues都翻遍了,也没解决方法,也尝试了把autoScale和dynamicAllocation都关掉了。请问您是否遇到过一样的问题?
现在作者都不怎么维护那个库了,好多人提问都没有人回答。