Overview
PySpark
是提供了Python
语言API
接口的Spark
,经过我的初步使用,未发现和Scala API
的Spark
有太大差别。且我们服务器上已经配置好了PySpark
,正好配合Jupyter notebook
使用来进行机器学习离线训练模型。
从这篇文章开始,将从一个算法工程师的视角去记录一下Spark
的使用。
Spark
在我看来就是一个计算工具,用来处理单机计算不了的问题。单机解决不了的问题,无非就是数据太大,内存,磁盘以及处理器不够用,然后把数据拆开给多台机器一起计算。Spark
相比较于Hadoop
,有一个长处就在于计算。而在机器学习过程当中,从数据预处理到模型训练都需要运算,尤其是数据量比较大的时候。我们尝试对比单机机器学习和分布式机器学习的方式,来加强对Spark
的理解。
1. 建立Spark
会话并读取文件
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("read data") \ #给任务命名
.master("spark://**.**.**.***:7077") \ #指定spark集群的主节点ip和端口
.config("spark.cores.max", "10") \ #给任务分配CPU
.config("spark.driver.memory", "2G") \ #给每个driver分配内存
.config("spark.executor.memory", "2G") \ #给每个executor分配内存
.config("spark.executor.cores", "2")\ #给每个executor分配CPU
.getOrCreate() #如果当前有那么就复用,如果没有就新建
我们在单机上是这样读数据的(一般是从csv
文件中读取):
import pandas as pd
df_single = pd.read_csv('data_total.csv')
Pandas
会把csv
文件读取成DataFrame
。
而Spark
的处理逻辑也是一样的,会将数据从csv/json/hive/数据库/Hadoop
读取为DataFrame
,让分析师和算法工程师无缝对接。而语法也是比较简单。
df_spark = spark.read.format("csv").option("header", true").load('hdfs://**.**.**.***:39000/users.csv')
spark.read
会返回一个DataFrameReader
的实例,通过format
指定DataFrameReader
读取的数据的格式。
当然,也可以这样读取:
df_spark = spark.read.csv(path='hdfs://**.**.**.***:39000/users.csv', sep=',', header=True)
df_spark.dtypes
数据类型如下:
[('order_sn', 'string'),
('apply_time', 'string'),
('label', 'string'),
...
('customer_blood_type_a', 'string'),
('customer_blood_type_ab', 'string'),
('customer_blood_type_b', 'string'),
('customer_blood_type_o', 'string'),
('customer_blood_type_other', 'string')]
我们可以看到,每一个字段的数据类型都被识别为string
,这是因为我们load
文件时,没有加inferSchema=True
。加上这个条件就正常了。
df_spark = spark.read.csv(path='hdfs://**.**.**.***:39000/users.csv', sep=',', header=True, inferSchema=True)
df_spark.dtypes
2. 划分数据集和筛选特征
我们根据时间来划分数据集:
df_train = df_total.filter(df_total.apply_time < '2019-10-21 00:00:00')
df_train.count()
df_val = df_total.filter((df_total.apply_time >= '2019-10-21 00:00:00') & (df_total.apply_time < '2019-10-27 00:00:00'))
df_val.count()
df_test = df_total.filter(df_total.apply_time >= '2019-10-27 00:00:00')
df_test.count()
填充空值:
df_train = df_train.fillna(0.0)
df_val = df_val.fillna(0.0)
df_test = df_test.fillna(0.0)
选择自己需要的列,首先加载特征列表:
feature_file = os.path.join("feature_list.txt")
feature_cols = [line.rstrip('\n') for line in open(feature_file)]
feature_cols.append("label")
然后选择需要的列,包括label
:
df_train = df_train.select(feature_cols)
df_val = df_val.select(feature_cols)
df_test = df_test.select(feature_cols)
3. 处理数据为double
类型
我们如果用spark
进行机器学习,除了label
列之外,其他的列最好都处理为double
,那么需要用cast
和withColumn
函数:
for col, t in df_train.dtypes:
if t != "double" and col != 'label':
df_train = df_train.withColumn(col, df_train[col].cast("double"))
for col, t in df_val.dtypes:
if t != "double" and col != 'label':
df_val = df_val.withColumn(col, df_val[col].cast("double"))
for col, t in df_test.dtypes:
if t != "double" and col != 'label':
df_test = df_test.withColumn(col, df_test[col].cast("double"))
4. 保存处理好的数据
df_train.toPandas().to_csv("训练集特征.csv", index=False)
df_val.toPandas().to_csv("验证集特征.csv", index=False)
df_test.toPandas().to_csv("测试集特征.csv", index=False)
5. 总结
总体来说,Spark
在读取数据的时候,和Pandas
的体验是很接近的。可以认为是一个分布式版本的Pandas
。
更多可以参考官方文档。