Overview

在实际机器学习工作当中,调参是我们一个重要的内容。PySpark当中就实现了一个最常用的调参方法Grid Search,我们结合lightGBM使用一下PySpark的调参。这个程序需要安装的依赖的安装方式,可以参考上一篇博客

1. 引入依赖包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("spark://***.***.***.***:7077") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

其中,pyspark.ml.tuning.ParamGridBuilder就是用以实现Grid Search的包。

2. 加载数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/测试集特征.csv")

处理训练集特征:

1
2
3
4
feature_cols = list(df_train.columns)
feature_cols.remove("label"# 从列名当中删除label才是真正的特征列表
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_train = assembler.transform(df_train)

3. 模型构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='label',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=30,
    learningRate=0.1,
    # lambdaL1=1.0,
    # lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    minSumHessianInLeaf=0.001,
    numIterations=800,
    verbosity=1
)

设置Grid Search参数组:

1
2
3
4
paramGrid = ParamGridBuilder() \
    .addGrid(lgb.lambdaL1, list(np.arange(1.0, 3.0, 1.0))) \
    .addGrid(lgb.lambdaL2, list(np.arange(1.0, 4.0, 1.0))) \
    .build()

设置完成之后,我们可以看一下参数都是哪些:

1
2
for param in paramGrid:
    print(param.values())

可以看到共6组参数,左边为lambdaL1,右边为lambdaL2

1
2
3
4
5
6
dict_values([1.0, 1.0])
dict_values([1.0, 2.0])
dict_values([1.0, 3.0])
dict_values([2.0, 1.0])
dict_values([2.0, 2.0])
dict_values([2.0, 3.0])

4. 交叉验证选择模型

官方提供了两种模型选择的方式:CrossValidatorTrainValidationSplit,可以参考官方文档CrossValidatorTrainValidationSplit的区别在于:CrossValidator会每次选取一部分训练集建模,去预测另外一部分训练集,这样会有K个预测的分数(K折交叉验证),最后模型的预测分数为K个分数的平均;而TrainValidationSplit则只会训练预测一次。这里,我们试着给出一个CrossValidator的例子。

1
2
3
4
5
6
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
cross_vallidator = CrossValidator(estimator=lgb,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)
model = cross_vallidator.fit(df_train)

最后,我们可以得到最好的模型,并看最好的模型的特征重要性:

1
2
model.bestModel
model.bestModel.getFeatureImportances()

但是,官方api查看每个结果对应的超参数却是非常不友好,我们只好自己想办法,这里我们参考了这篇博客LightGBM Hyper Parameters Tuning in Spark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def params_extract(model):
    """
    function extact hyperparameter information from a CrossValidatorModel
    input: a CrossValidatorModel instance, model fit by CrossValidator in pyspark.ml.tuning
    output: a dictionary with key(hyperparameters setting), value(evaluator's metrics, r2, auc,...)
    """
    length = len(model.avgMetrics)
    res = {}
    for i in range(length):
        s = ""
        paraDict = model.extractParamMap()[model.estimatorParamMaps][i]
        for j in paraDict.keys():
            s += str(j).split("__")[1] + "  "
            s += str(paraDict[j]) + "  "
        res[s.strip()] = model.avgMetrics[i]
    return {k: v for k, v in sorted(res.items(), key=lambda item: item[1])}

我们试一下:

1
params_extract(model)

输出如下:

1
2
3
4
5
6
{'lambdaL1  1.0  lambdaL2  2.0': 0.7300710699287217,
 'lambdaL1  1.0  lambdaL2  1.0': 0.7307078416518147,
 'lambdaL1  1.0  lambdaL2  3.0': 0.7310860740685388,
 'lambdaL1  2.0  lambdaL2  1.0': 0.7312930665848859,
 'lambdaL1  2.0  lambdaL2  2.0': 0.7317563737747359,
 'lambdaL1  2.0  lambdaL2  3.0': 0.7327463775422981}

5. 官方调参探讨

我们通过CrossValidator可以获得最佳的模型,但是会有一个问题:这个最佳是拟合训练集的最佳,而不是我们给出的验证集的最佳;即使是TrainValidationSplit,我们也不能自定义验证集并传入,只能随机选择验证集。这样对于那些样本时间先后顺序不敏感的数据是影响不大的,比如图像等,但是对于交易类数据,我们希望可以根据时间先后顺序自定义训练集,验证集和测试集,并且根据验证集的效果来确定最佳参数和模型。因此,我们就需要换种方式达到目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("spark://***.***.***.***:7077") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
 
# 加载数据
df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("hdfs://***.***.***.***:39000/young/测试集特征.csv")
 
feature_cols = list(df_train.columns)
feature_cols.remove("label")
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
 
df_train = assembler.transform(df_train)
df_val = assembler.transform(df_val)
df_test = assembler.transform(df_test)
 
lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='label',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=20,
    learningRate=0.1,
    #lambdaL1=1.0,
    #lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    minSumHessianInLeaf=0.001,
    numIterations=800,
    verbosity=1
)
 
lightGBMs = list()
for lambdaL1 in list(np.arange(1.0, 3.0, 1.0)):
    for lambdaL2 in list(np.arange(1.0, 4.0, 1.0)):
        lightGBMs.append(lgb.setLambdaL1(lambdaL1).setLambdaL2(lambdaL2))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
metrics = []
models = []
 
# 选择验证集效果最好的模型
for learner in lightGBMs:
    model = learner.fit(df_train)
    models.append(model)
    scoredData = model.transform(df_val)
    metrics.append(evaluator.evaluate(scoredData))
best_metric = max(metrics)
best_model = models[metrics.index(best_metric)]
 
# 得到测试集上AUC
scored_test = best_model.transform(df_test)
print(evaluator.evaluate(scored_test))

当然,用这种方式的话,我们就可以很方便地记录每组参数对应的结果了,在评价验证集效果时,也可以用KS或者Gini等指标了,这里就不再赘述。

本文主要参考了:How to build machine learning model at large scale with Apache Spark and LightGBM for credit card fraud detection?
Simplifying Machine Learning Pipelines with mmlspark