Overview

在实际机器学习工作当中,调参是我们一个重要的内容。PySpark当中就实现了一个最常用的调参方法Grid Search,我们结合lightGBM使用一下PySpark的调参。这个程序需要安装的依赖的安装方式,可以参考上一篇博客

1. 引入依赖包

import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("spark://***.***.***.***:7077") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

其中,pyspark.ml.tuning.ParamGridBuilder就是用以实现Grid Search的包。

2. 加载数据

df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/测试集特征.csv")

处理训练集特征:

feature_cols = list(df_train.columns)
feature_cols.remove("label")  # 从列名当中删除label才是真正的特征列表 
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") 
df_train = assembler.transform(df_train)

3. 模型构建

lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='label',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=30,
    learningRate=0.1,
    # lambdaL1=1.0,
    # lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    minSumHessianInLeaf=0.001,
    numIterations=800,
    verbosity=1
)

设置Grid Search参数组:

paramGrid = ParamGridBuilder() \
    .addGrid(lgb.lambdaL1, list(np.arange(1.0, 3.0, 1.0))) \
    .addGrid(lgb.lambdaL2, list(np.arange(1.0, 4.0, 1.0))) \
    .build()

设置完成之后,我们可以看一下参数都是哪些:

for param in paramGrid:
    print(param.values())

可以看到共6组参数,左边为lambdaL1,右边为lambdaL2

dict_values([1.0, 1.0])
dict_values([1.0, 2.0])
dict_values([1.0, 3.0])
dict_values([2.0, 1.0])
dict_values([2.0, 2.0])
dict_values([2.0, 3.0])

4. 交叉验证选择模型

官方提供了两种模型选择的方式:CrossValidatorTrainValidationSplit,可以参考官方文档CrossValidatorTrainValidationSplit的区别在于:CrossValidator会每次选取一部分训练集建模,去预测另外一部分训练集,这样会有K个预测的分数(K折交叉验证),最后模型的预测分数为K个分数的平均;而TrainValidationSplit则只会训练预测一次。这里,我们试着给出一个CrossValidator的例子。

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
cross_vallidator = CrossValidator(estimator=lgb,
                          estimatorParamMaps=paramGrid, 
                          evaluator=evaluator, 
                          numFolds=3)
model = cross_vallidator.fit(df_train)

最后,我们可以得到最好的模型,并看最好的模型的特征重要性:

model.bestModel
model.bestModel.getFeatureImportances()

但是,官方api查看每个结果对应的超参数却是非常不友好,我们只好自己想办法,这里我们参考了这篇博客LightGBM Hyper Parameters Tuning in Spark

def params_extract(model):
    """
    function extact hyperparameter information from a CrossValidatorModel
    input: a CrossValidatorModel instance, model fit by CrossValidator in pyspark.ml.tuning
    output: a dictionary with key(hyperparameters setting), value(evaluator's metrics, r2, auc,...)
    """
    length = len(model.avgMetrics)
    res = {}
    for i in range(length):
        s = ""
        paraDict = model.extractParamMap()[model.estimatorParamMaps][i]
        for j in paraDict.keys():
            s += str(j).split("__")[1] + "  "
            s += str(paraDict[j]) + "  "
        res[s.strip()] = model.avgMetrics[i]
    return {k: v for k, v in sorted(res.items(), key=lambda item: item[1])}

我们试一下:

params_extract(model)

输出如下:

{'lambdaL1  1.0  lambdaL2  2.0': 0.7300710699287217,
 'lambdaL1  1.0  lambdaL2  1.0': 0.7307078416518147,
 'lambdaL1  1.0  lambdaL2  3.0': 0.7310860740685388,
 'lambdaL1  2.0  lambdaL2  1.0': 0.7312930665848859,
 'lambdaL1  2.0  lambdaL2  2.0': 0.7317563737747359,
 'lambdaL1  2.0  lambdaL2  3.0': 0.7327463775422981}

5. 官方调参探讨

我们通过CrossValidator可以获得最佳的模型,但是会有一个问题:这个最佳是拟合训练集的最佳,而不是我们给出的验证集的最佳;即使是TrainValidationSplit,我们也不能自定义验证集并传入,只能随机选择验证集。这样对于那些样本时间先后顺序不敏感的数据是影响不大的,比如图像等,但是对于交易类数据,我们希望可以根据时间先后顺序自定义训练集,验证集和测试集,并且根据验证集的效果来确定最佳参数和模型。因此,我们就需要换种方式达到目的。

import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("spark://***.***.***.***:7077") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# 加载数据
df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/测试集特征.csv")

feature_cols = list(df_train.columns)
feature_cols.remove("label")
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

df_train = assembler.transform(df_train)
df_val = assembler.transform(df_val)
df_test = assembler.transform(df_test)

lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='label',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=20,
    learningRate=0.1,
    #lambdaL1=1.0,
    #lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    minSumHessianInLeaf=0.001,
    numIterations=800,
    verbosity=1
)

lightGBMs = list()
for lambdaL1 in list(np.arange(1.0, 3.0, 1.0)):
    for lambdaL2 in list(np.arange(1.0, 4.0, 1.0)):
        lightGBMs.append(lgb.setLambdaL1(lambdaL1).setLambdaL2(lambdaL2))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
metrics = []
models = []

# 选择验证集效果最好的模型
for learner in lightGBMs:
    model = learner.fit(df_train)
    models.append(model)
    scoredData = model.transform(df_val)
    metrics.append(evaluator.evaluate(scoredData))
best_metric = max(metrics)
best_model = models[metrics.index(best_metric)]

# 得到测试集上AUC
scored_test = best_model.transform(df_test)
print(evaluator.evaluate(scored_test))

当然,用这种方式的话,我们就可以很方便地记录每组参数对应的结果了,在评价验证集效果时,也可以用KS或者Gini等指标了,这里就不再赘述。

本文主要参考了:How to build machine learning model at large scale with Apache Spark and LightGBM for credit card fraud detection?
Simplifying Machine Learning Pipelines with mmlspark