Overview
PySpark
是提供了Python
语言API
接口的Spark
,经过我的初步使用,未发现和Scala API
的Spark
有太大差别。且我们服务器上已经配置好了PySpark
,正好配合Jupyter notebook
使用来进行机器学习离线训练模型。
从这篇文章开始,将从一个算法工程师的视角去记录一下Spark
的使用。
Spark
在我看来就是一个计算工具,用来处理单机计算不了的问题。单机解决不了的问题,无非就是数据太大,内存,磁盘以及处理器不够用,然后把数据拆开给多台机器一起计算。Spark
相比较于Hadoop
,有一个长处就在于计算。而在机器学习过程当中,从数据预处理到模型训练都需要运算,尤其是数据量比较大的时候。我们尝试对比单机机器学习和分布式机器学习的方式,来加强对Spark
的理解。
1. 建立Spark
会话并读取文件
1 2 3 4 5 6 7 8 9 10 | from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName( "read data" ) \ #给任务命名 .config( "spark.cores.max" , "10" ) \ #给任务分配CPU .config( "spark.driver.memory" , "2G" ) \ #给每个driver分配内存 .config( "spark.executor.memory" , "2G" ) \ #给每个executor分配内存 .config( "spark.executor.cores" , "2" )\ #给每个executor分配CPU .getOrCreate() #如果当前有那么就复用,如果没有就新建 |
我们在单机上是这样读数据的(一般是从csv
文件中读取):
1 2 | import pandas as pd df_single = pd.read_csv( 'data_total.csv' ) |
Pandas
会把csv
文件读取成DataFrame
。
而Spark
的处理逻辑也是一样的,会将数据从csv/json/hive/数据库/Hadoop
读取为DataFrame
,让分析师和算法工程师无缝对接。而语法也是比较简单。
1 | df_spark = spark.read. format ( "csv" ).option( "header" , true").load( 'hdfs://**.**.**.***:39000/users.csv' ) |
spark.read
会返回一个DataFrameReader
的实例,通过format
指定DataFrameReader
读取的数据的格式。
当然,也可以这样读取:
1 2 | df_spark.dtypes |
数据类型如下:
1 2 3 4 5 6 7 8 9 | [('order_sn', 'string'), ('apply_time', 'string'), ('label', 'string'), ... ('customer_blood_type_a', 'string'), ('customer_blood_type_ab', 'string'), ('customer_blood_type_b', 'string'), ('customer_blood_type_o', 'string'), ('customer_blood_type_other', 'string')] |
我们可以看到,每一个字段的数据类型都被识别为string
,这是因为我们load
文件时,没有加inferSchema=True
。加上这个条件就正常了。
1 2 | df_spark = spark.read.csv(path = 'hdfs://**.**.**.***:39000/users.csv' , sep = ',' , header = True , inferSchema = True ) df_spark.dtypes |
2. 划分数据集和筛选特征
我们根据时间来划分数据集:
1 2 3 4 5 6 | df_train = df_total. filter (df_total.apply_time < '2019-10-21 00:00:00' ) df_train.count() df_val = df_total. filter ((df_total.apply_time > = '2019-10-21 00:00:00' ) & (df_total.apply_time < '2019-10-27 00:00:00' )) df_val.count() df_test = df_total. filter (df_total.apply_time > = '2019-10-27 00:00:00' ) df_test.count() |
填充空值:
1 2 3 | df_train = df_train.fillna( 0.0 ) df_val = df_val.fillna( 0.0 ) df_test = df_test.fillna( 0.0 ) |
选择自己需要的列,首先加载特征列表:
1 2 3 | feature_file = os.path.join( "feature_list.txt" ) feature_cols = [line.rstrip( '\n' ) for line in open (feature_file)] feature_cols.append( "label" ) |
然后选择需要的列,包括label
:
1 2 3 | df_train = df_train.select(feature_cols) df_val = df_val.select(feature_cols) df_test = df_test.select(feature_cols) |
3. 处理数据为double
类型
我们如果用spark
进行机器学习,除了label
列之外,其他的列最好都处理为double
,那么需要用cast
和withColumn
函数:
1 2 3 4 5 6 7 8 9 | for col, t in df_train.dtypes: if t ! = "double" and col ! = 'label' : df_train = df_train.withColumn(col, df_train[col].cast( "double" )) for col, t in df_val.dtypes: if t ! = "double" and col ! = 'label' : df_val = df_val.withColumn(col, df_val[col].cast( "double" )) for col, t in df_test.dtypes: if t ! = "double" and col ! = 'label' : df_test = df_test.withColumn(col, df_test[col].cast( "double" )) |
4. 保存处理好的数据
1 2 3 | df_train.toPandas().to_csv( "训练集特征.csv" , index = False ) df_val.toPandas().to_csv( "验证集特征.csv" , index = False ) df_test.toPandas().to_csv( "测试集特征.csv" , index = False ) |
5. 总结
总体来说,Spark
在读取数据的时候,和Pandas
的体验是很接近的。可以认为是一个分布式版本的Pandas
。
更多可以参考官方文档。