您正在查看: 编程语言 分类下的文章

PySpark笔记之一:读写数据

Overview

PySpark是提供了Python语言API接口的Spark,经过我的初步使用,未发现和Scala APISpark有太大差别。且我们服务器上已经配置好了PySpark,正好配合Jupyter notebook使用来进行机器学习离线训练模型。
从这篇文章开始,将从一个算法工程师的视角去记录一下Spark的使用。
Spark在我看来就是一个计算工具,用来处理单机计算不了的问题。单机解决不了的问题,无非就是数据太大,内存,磁盘以及处理器不够用,然后把数据拆开给多台机器一起计算。Spark相比较于Hadoop,有一个长处就在于计算。而在机器学习过程当中,从数据预处理到模型训练都需要运算,尤其是数据量比较大的时候。我们尝试对比单机机器学习和分布式机器学习的方式,来加强对Spark的理解。

1. 建立Spark会话

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("read data") \ #给任务命名
    .master("spark://**.**.**.***:7077") \ #指定spark集群的主节点ip和端口
    .config("spark.cores.max", "10") \ #给任务分配CPU
    .config("spark.driver.memory", "2G") \ #给每个driver分配内存
    .config("spark.executor.memory", "2G") \ #给每个executor分配内存
    .config("spark.executor.cores", "2")\ #给每个executor分配CPU
    .getOrCreate() #如果当前有那么就复用,如果没有就新建

我们在单机上是这样读数据的(一般是从csv文件中读取):

import pandas as pd
df_single = pd.read_csv('data_total.csv')

Pandas会把csv文件读取成DataFrame
Spark的处理逻辑也是一样的,会将数据从csv/json/hive/数据库/Hadoop读取为DataFrame,让分析师和算法工程师无缝对接。而语法也是比较简单。

df_spark = spark.read.format("csv").option("header", true").load('hdfs://**.**.**.***:39000/users.csv')

spark.read会返回一个DataFrameReader的实例,通过format指定DataFrameReader读取的数据的格式。
当然,也可以这样读取:

df_spark = spark.read.csv(path='hdfs://**.**.**.***:39000/users.csv', sep=',',  header=True)
df_spark.dtypes

数据类型如下:

[('name', 'string'),
 ('sex', 'string'),
 ('mail', 'string'),
 ('country', 'string'),
 ('date', 'string'),
 ('blood_group', 'string')]

统计用户的血型:

print(df_spark.filter(df.blood_group == 'A+').count())

这句代码相当于单机Pandas中的:

len(df_single[df_single.blood_group == 'A+'])

统计各个国家的人数:

country_num = df_spark.groupby(df_spark.country).agg(F.count('*').alias('num')).cache()
print(country_num.collect())

输出如下:

[Row(country='Chad', num=4060), Row(country='Anguilla', num=4015), Row(country='Paraguay', num=4080), Row(country='Macao', num=4071), Row(country='Heard Island and McDonald Islands', num=4042), Row(country='Yemen', num=4102), Row(country='Senegal', num=4188), Row(country='Sweden', num=4092), Row(country='Tokelau', num=4171), Row(country='French Southern Territories', num=4032),
...,
Row(country='Kiribati', num=4058), Row(country='Guyana', num=4134), Row(country='Eritrea', num=4032), Row(country='Jersey', num=3931), Row(country='Philippines', num=4012), Row(country='Tonga', num=3988), Row(country='Norfolk Island', num=4074), Row(country='Djibouti', num=4164), Row(country='Malaysia', num=4138), Row(country='Singapore', num=4087)]

2. 总结

总体来说,Spark在读取数据的时候,和Pandas的体验是很接近的。可以认为是一个分布式版本的Pandas

更多可以参考官方文档

TensorFlow 2.0训练结构化数据

Overview

近期正在更新新用户模型,仍然在用XGBoost。由于训练集数据已经达到20W,故用神经网络来训练一下,看看效果如何。
TensorFlow 2.0集成了Keras,易用性很高,且Keras之后不再单独更新了,而是作为TensorFlow的一个模块来使用。我们这次就用TensorFlow 2.0中的tf.keras来训练我们的结构化数据。

1. 导入特征列表及数据

import numpy as np
import pandas as pd
import sklearn
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras import models, layers, losses, metrics

# 特征列表
name_list = pd.read_csv('feature_list.txt', header=None, index_col=0)
my_feature_names = list(name_list.transpose())

# 导入数据
df_total = pd.read_csv('data_total.csv')
df_total.head()

2. 数据处理和数据集划分

# 空值填充为0
df_total = df_total.fillna(0)

# 划分数据集
df_train = df_total[df_total.apply_time < '2020-01-21 00:00:00']
df_val = df_total[(df_total.apply_time >= '2020-01-21 00:00:00') & (df_total.apply_time < '2020-02-01 00:00:00')]
df_test = df_total[df_total.apply_time >= '2020-02-01 00:00:00']

# 选取我们需要的数据列
train_x = df_train[my_feature_names]
train_y = df_train['label']
val_x = df_val[my_feature_names]
val_y = df_val['label']
test_x = df_test[my_feature_names]
test_y = df_test['label']

# 数据标准化
scaler = StandardScaler()
train_x = scaler.fit_transform(train_x)
val_x = scaler.transform(val_x)
test_x = scaler.transform(test_x)

3. 模型构建

tf.keras.backend.clear_session()
METRICS = [
      tf.keras.metrics.AUC(name='auc'),
]

def make_model(metrics = METRICS, output_bias=None):
    if output_bias is not None:
        output_bias = tf.keras.initializers.Constant(output_bias)
    model = tf.keras.Sequential([
        layers.Dense(
          64, activation='relu',
          input_shape=(train_x.shape[-1],)),
        layers.Dropout(0.2),
        layers.Dense(
          128, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(
          32, activation='relu'),
        layers.Dense(1, activation='sigmoid',
                         bias_initializer=output_bias),
    ])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(lr=1e-3),
        loss=losses.BinaryCrossentropy(),
        metrics=metrics)

    return model
# 设置早停
EPOCHS = 100
BATCH_SIZE = 2000

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_auc', 
    verbose=1,
    patience=20,
    mode='max',
    restore_best_weights=True)

# 处理不平衡问题
neg = len(train_y) - sum(train_y)
pos = sum(train_y)
total = len(train_y)
weight_for_0 = (1 / neg)*(total)/2.0 
weight_for_1 = (1 / pos)*(total)/2.0

class_weight = {0: weight_for_0, 1: weight_for_1}

# 构建模型
model = make_model()
model.summary()

输出如下:

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense (Dense)                (None, 64)                16384     
_________________________________________________________________
dropout (Dropout)            (None, 64)                0         
_________________________________________________________________
dense_1 (Dense)              (None, 128)               8320      
_________________________________________________________________
dropout_1 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 32)                4128      
_________________________________________________________________
dense_3 (Dense)              (None, 1)                 33        
=================================================================
Total params: 28,865
Trainable params: 28,865
Non-trainable params: 0

4. 模型训练

weighted_history = model.fit(
    train_x,
    train_y,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    callbacks = [early_stopping],
    validation_data=(val_x, val_y),
    # 设置类权重
    class_weight=class_weight) 

输出如下:

Train on 206917 samples, validate on 15830 samples
Epoch 1/100
206917/206917 [==============================] - 3s 12us/sample - loss: 0.6584 - auc: 0.6498 - val_loss: 0.6108 - val_auc: 0.6729
Epoch 2/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6305 - auc: 0.6974 - val_loss: 0.6042 - val_auc: 0.6840
Epoch 3/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6238 - auc: 0.7075 - val_loss: 0.6018 - val_auc: 0.6895
Epoch 4/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6190 - auc: 0.7142 - val_loss: 0.5987 - val_auc: 0.6940
Epoch 5/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6157 - auc: 0.7190 - val_loss: 0.5978 - val_auc: 0.6961
Epoch 6/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6126 - auc: 0.7230 - val_loss: 0.5957 - val_auc: 0.6989
Epoch 7/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6104 - auc: 0.7257 - val_loss: 0.5951 - val_auc: 0.7007
Epoch 8/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6082 - auc: 0.7284 - val_loss: 0.5947 - val_auc: 0.7019
Epoch 9/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6067 - auc: 0.7301 - val_loss: 0.5937 - val_auc: 0.7034
Epoch 10/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6043 - auc: 0.7335 - val_loss: 0.5937 - val_auc: 0.7038
Epoch 11/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6035 - auc: 0.7344 - val_loss: 0.5934 - val_auc: 0.7036
Epoch 12/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6016 - auc: 0.7365 - val_loss: 0.5924 - val_auc: 0.7046
Epoch 13/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.6013 - auc: 0.7367 - val_loss: 0.5930 - val_auc: 0.7041
Epoch 14/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5996 - auc: 0.7390 - val_loss: 0.5925 - val_auc: 0.7042
Epoch 15/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5984 - auc: 0.7403 - val_loss: 0.5930 - val_auc: 0.7045
Epoch 16/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5976 - auc: 0.7412 - val_loss: 0.5937 - val_auc: 0.7034
Epoch 17/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5961 - auc: 0.7430 - val_loss: 0.5942 - val_auc: 0.7034
Epoch 18/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5948 - auc: 0.7444 - val_loss: 0.5946 - val_auc: 0.7027
Epoch 19/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5938 - auc: 0.7455 - val_loss: 0.5949 - val_auc: 0.7023
Epoch 20/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5924 - auc: 0.7472 - val_loss: 0.5944 - val_auc: 0.7024
Epoch 21/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5925 - auc: 0.7471 - val_loss: 0.5953 - val_auc: 0.7028
Epoch 22/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5915 - auc: 0.7482 - val_loss: 0.5944 - val_auc: 0.7022
Epoch 23/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5906 - auc: 0.7488 - val_loss: 0.5964 - val_auc: 0.7008
Epoch 24/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5900 - auc: 0.7496 - val_loss: 0.5947 - val_auc: 0.7025
Epoch 25/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5894 - auc: 0.7503 - val_loss: 0.5956 - val_auc: 0.7031
Epoch 26/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5882 - auc: 0.7517 - val_loss: 0.5944 - val_auc: 0.7028
Epoch 27/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5870 - auc: 0.7532 - val_loss: 0.5975 - val_auc: 0.7001
Epoch 28/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5869 - auc: 0.7530 - val_loss: 0.5965 - val_auc: 0.7022
Epoch 29/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5861 - auc: 0.7537 - val_loss: 0.5970 - val_auc: 0.7011
Epoch 30/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5854 - auc: 0.7543 - val_loss: 0.5960 - val_auc: 0.7015
Epoch 31/100
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5844 - auc: 0.7559 - val_loss: 0.5994 - val_auc: 0.6989
Epoch 32/100
206000/206917 [============================>.] - ETA: 0s - loss: 0.5835 - auc: 0.7568Restoring model weights from the end of the best epoch.
206917/206917 [==============================] - 1s 4us/sample - loss: 0.5836 - auc: 0.7568 - val_loss: 0.5982 - val_auc: 0.6992
Epoch 00032: early stopping

验证集最好的AUC0.7046,和XGBoost训练的还是有些差距,经过调参之后,应该会更接近一些。

本文主要参考了官方文档的以下内容:对结构化数据进行分类Classification on imbalanced data

Python3连接PostgreSQL数据库

Overview

之前项目的人遗留的数据散落在多种数据库中,既有MySQLMongoDB,也有CassandraPostgreSQL。在Python3版本的jupyter中连接PostgreSQL需要安装psycopg2,而psycopg2Python2中则是已经集成好的。

1. 安装python3-psycopg2libpq-dev

先在Linux上安装好这两个包,

sudo apt-get install python3-psycopg2
sudo apt-get install libpq-dev

再安装psycopg2,

pip3 install psycopg2

否则就会报错如下:

ERROR: Command errored out with exit status 1:
     command: /home/yangbingjiao/anaconda3/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-wxj7s4lx/psycopg2/setup.py'"'"'; __file__='"'"'/tmp/pip-install-wxj7s4lx/psycopg2/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base pip-egg-info
         cwd: /tmp/pip-install-wxj7s4lx/psycopg2/
    Complete output (23 lines):
    running egg_info
    creating pip-egg-info/psycopg2.egg-info
    writing pip-egg-info/psycopg2.egg-info/PKG-INFO
    writing dependency_links to pip-egg-info/psycopg2.egg-info/dependency_links.txt
    writing top-level names to pip-egg-info/psycopg2.egg-info/top_level.txt
    writing manifest file 'pip-egg-info/psycopg2.egg-info/SOURCES.txt'

    Error: pg_config executable not found.

    pg_config is required to build psycopg2 from source.  Please add the directory
    containing pg_config to the $PATH or specify the full executable path with the
    option:

        python setup.py build_ext --pg-config /path/to/pg_config build ...

    or with the pg_config option in 'setup.cfg'.

    If you prefer to avoid building psycopg2 from source, please install the PyPI
    'psycopg2-binary' package instead.

    For further information please check the 'doc/src/install.rst' file (also at
    <http://initd.org/psycopg/docs/install.html>).

    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

2.连接PostgreSQL数据库

import psycopg2
import pandas as pd

conn = psycopg2.connect(database="database", user="user", password="password", host="192.168.1.230", port="5432")
cur = conn.cursor()
pql = """
SELECT * 
FROM database.table
WHERE  create_time >= '2019-09-23' AND create_time < '2019-09-30'
order by create_time
limit 10
"""
cur.execute(pql)
rows = cur.fetchall()
df = pd.DataFrame(rows)
conn.close()

本文主要参考了以下文章:
Python3连接PostgreSQL(10.5)数据库

阿里云Ubuntu16.04服务器安装Jupyter

Overview

Jupyter已经不用再花笔墨去介绍了。今年公司国内的业务已经很稳定,我也可以放心交给其他人了,现在主要精力放在东南亚的业务上。所以,离线模型训练就需要在云上安装Jupyter环境。这次,我用Anaconda来安装。

1. 安装Anaconda

首先找到LinuxPython3.7版本的AnacondaAnaconda,复制链接地址。然后ssh登录到云服务器上,运行下面的命令下载:

wget https://repo.anaconda.com/archive/Anaconda3-2019.07-Linux-x86_64.sh

下载完成后,安装:

bash Anaconda3-2019.07-Linux-x86_64.sh

一路yes和回车,最后会安装到本地用户目录下。

2. 启动Jupyter

安装好Anaconda之后,jupyter其实已经安装好了,接下来我们启动它。

jupyter notebook

会发现报错如下:

Jupyter Notebook won't start due to ports being already in use.

这是因为服务器上已经有Jupyter了,它占用了8888端口,导致我安装的程序打不开。由于已经安装的Jupyter在别的用户目录下,涉及到更烦人的权限问题,所以我采用更换端口的方式来运行我的Jupyter

2.1 配置Jupyter端口

我们在命令行中运行下面命令:

jupyter notebook --generate-config

然后打开配置文件:

vim  ~/.jupyter/jupyter_notebook_config.py

直接Shift+G跳转到文件尾部,添加这几句:

c.NotebookApp.ip = '*'  # 允许访问此服务器的 IP,星号表示任意 IP
c.NotebookApp.open_browser = False # 运行时不打开本机浏览器
c.NotebookApp.port = 8080 # 使用的端口

我们把端口设置成8080,和另外的Jupyter区别开,保存文件并退出。

然后重新运行:

jupyter notebook

这下就正常了,显示如下:

[I 17:49:13.763 NotebookApp] The Jupyter Notebook is running at:
[I 17:49:13.763 NotebookApp] http://****.**.id:8080/?token=555aa7ce6bbddb*********c1c716b3e4744626d
[I 17:49:13.764 NotebookApp]  or http://127.0.0.1:8080/?token=555aa7ce6bbddb*********c1c716b3e4744626d

这里token=后面的一串字符就是密码,要记住。
然后我们在本地浏览器中输入 http://服务器ip:8080,就会提示输入密码,输入前面保存的token即可。
但是这里有个问题:我们的命令行窗口是不稳定的,一旦断掉Jupyter就失效了。我们只要用screen打开Jupyter就可以关掉窗口了:

screen jupyter notebook 

之后我们就可以在本地很便捷地使用远程服务器进行离线建模训练了。

本文主要参考了以下文章,感谢:
远程服务器上开启jupyter notebook
Jupyter Notebook won't start due to ports being already in use

CatBoost贝叶斯调参程序

Overview

之前我们记录了CatBoost一个训练的例子,这次我们更新一个CatBoost调参的例子,用的是业界比较流行的贝叶斯调参法。

1. 引入依赖包并加载数据

import pandas as pd
import numpy as np
from catboost import CatBoostClassifier, CatBoost, Pool, cv
from bayes_opt import BayesianOptimization

data_train = pd.read_csv('data/训练集.csv')
data_val = pd.read_csv('data/验证集.csv')
data_test = pd.read_csv('data/测试集.csv')

2. 加载特征列表并处理数据

name_list = pd.read_csv('特征列表_20190705.txt', header=None, index_col=0)
my_feature_names = list(name_list.transpose())
len(my_feature_names)

data_train_X = data_train[my_feature_names]
data_val_X = data_val[my_feature_names]
data_test_X = data_test[my_feature_names]

data_train_y = data_train['label']
data_val_y  = data_val['label']
data_test_y  = data_test['label']

3. 贝叶斯调参

def cat_train(bagging_temperature, reg_lambda, learning_rate):
    params = {
        'iterations':800,
        'depth':3,
        'bagging_temperature':bagging_temperature,
        'reg_lambda':reg_lambda,
        'learning_rate':learning_rate,
        'loss_function':'Logloss',
        'eval_metric':'AUC',
        'random_seed':696,
        'verbose':30
    }

    model = CatBoost(params)
    # 评价数据集是验证集,评价指标是AUC
    model.fit(data_train_X, data_train_y, eval_set=(data_val_X, data_val_y), plot=False, early_stopping_rounds=20) 
    
    print(params)
    score_max = model.best_score_.get('validation').get('AUC')
    return score_max

cat_opt = BayesianOptimization(cat_train, 
                           {
                              'bagging_temperature': (1, 50),  
                              'reg_lambda': (1, 200),
                              'learning_rate':(0.05, 0.2)
                            })

cat_opt.maximize(n_iter=15, init_points=5)

有了最佳参数之后,用这组最佳参数即可训练出最终的模型了。

贝叶斯调参部分,我们参考了如下文章:
Bayesian methods of hyperparameter optimization

Hyperparameter Optimization using bayesian optimization

以及GitHub源码:BayesianOptimization