简介
这是关于如何使用Python利用Hadoop(分布式计算框架)的系列文章的第一部分。
本系列文章的目的是集中讨论具体的工具和配方,以解决许多数据专家面临的经常性挑战,例如。
-
使用Python移动HDFS(Hadoop分布式文件系统)文件。
-
将数据从HDFS加载到一个数据结构中,如Spark或pandasDataFrame,以便进行计算。
-
将分析的结果****写回HDFS。
这个系列的第一个工具是Spark。一个框架,它把自己定义为一个统一的分析引擎,用于大规模数据处理。
Apache Spark
PySpark 和 findspark的安装
我鼓励你使用conda__虚拟环境。如果你不知道如何设置conda,请阅读这篇文章。
首先,安装findspark
,这个库将帮助你把Spark整合到你的Python工作流程中,同时也安装pyspark
,以防你在本地计算机中工作,而不是在一个合适的Hadoop集群中。
如果你是在Hadoop集群中学习本教程,可以跳过PySpark的安装。
conda install -c conda-forge findspark -y
# optional, for local setup
conda install -c conda-forge pyspark openjdk -y
复制代码
用findspark设置Spark
一旦你安装了findspark,就可以在你的Python代码中设置Spark的使用。
这里提供了本地和集群模式的代码,解开你需要的行,并根据你的特定基础设施和库的版本调整路径(cloudera Spark的路径应该和这里提供的很相似)。
import findspark
# Local Spark
findspark.init(‘/home/cloudera/miniconda3/envs/<your_environment_name>/lib/python3.7/site-packages/pyspark/’)
# Cloudera Cluster Spark
findspark.init(spark_home=’/opt/cloudera/parcels/SPARK2–2.3.0.cloudera4–1.cdh5.13.3.p0.611179/lib/spark2/’)
复制代码
本教程是使用Cloudera Quickstart VM(CentOS linux发行版,其用户名为
_cloudera_
),记得根据你的基础设施调整路径!
创建一个Spark应用程序
一旦Spark被初始化,我们必须创建一个Spark应用程序,执行以下代码,并确保你**指定你需要的主站,**如'yarn'
,在一个适当的Hadoop集群的情况下,或'local[*]'
,在一个完全本地设置的情况下。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘example_app’).master(‘yarn’).getOrCreate()
复制代码
PySpark配方和使用案例
一旦我们有了工作中的Spark,让我们开始与Hadoop互动,利用它的一些常见用例。
列出Hive数据库
让我们获取现有的数据库。我假设你已经熟悉了Spark DataFrame API和它的方法。
spark.sql("show databases").show()
复制代码
你应该得到类似这样的东西。
+------------+
|databaseName|
+------------+
| db1|
| default|
| fhadoop|
+------------+
复制代码
将pandas DataFrame转换成Spark DataFrame
第一个整合是关于如何将数据从pandas库(这是Python标准库,用于执行内存数据操作)转移到Spark。
首先,让我们加载一个pandas DataFrame。这个是关于**马德里的空气质量(**只是为了满足你的好奇心,但对于将数据从一个地方移动到另一个地方来说并不重要)。你可以在这里下载它。请确保你安装了库pytables
,以读取hdf5
格式的数据。
import pandas as pd
air_quality_df = pd.read_hdf(‘data/air_quality/air-quality-madrid/madrid.h5’, key=’28079008')
air_quality_df.head()
复制代码
这个数据是许多众所周知的污染物的时间序列,如氮氧化物、臭氧,等等。
让我们对这个DataFrame做一些改变,比如重设日期时间索引,以避免在加载到Spark时丢失信息。日期列也将被转换为字符串,因为Spark在处理日期时有一些问题(与系统地域、时区等有关),除非根据你的地域进一步配置。
air_quality_df.reset_index(inplace=True)
air_quality_df[‘date’] = air_quality_df[‘date’].dt.strftime(‘%Y-%m-%d %H:%M:%S’)
复制代码
我们可以简单地用createDataFrame
从pandas加载到Spark。
air_quality_sdf = spark.createDataFrame(air_quality_df)
复制代码
一旦DataFrame被加载到Spark(如air_quality_sdf
),就可以使用PySpark DataFrame API轻松操作。
air_quality_sdf.select('date', 'NOx').show(5)
复制代码
输出应该是这样的。
+— — — — — — — — — -+ — — — — — — — — — +
| date| NOx|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 1017.0|
|2001–07–01 02:00:00| 409.20001220703125|
|2001–07–01 03:00:00| 143.39999389648438|
|2001–07–01 04:00:00| 149.3000030517578|
|2001–07–01 05:00:00| 124.80000305175781|
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows
复制代码
从Spark DataFrame创建Hive表
为了将Spark DataFrame持久化到HDFS中,在那里可以使用默认的Hadoop SQL引擎(Hive)进行查询,一个直接的策略(不是唯一的策略)是从该DataFrame中创建一个时态视图。
air_quality_sdf.createOrReplaceTempView("air_quality_sdf")
复制代码
一旦创建了时间视图,就可以从Spark SQL引擎中使用create table as select
来创建一个真正的表。在创建这个表之前,我将创建一个名为analytics
的新数据库来存储它。
sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)
复制代码
然后,我们可以在那里创建一个新的表。
sql_create_table = """
create table if not exists analytics.pandas_spark_hive
using parquet as select
to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
result_create_table = spark.sql(sql_create_table)
复制代码
使用PySpark从Hive表中读取数据
一旦我们创建了我们的Hive表,可以使用Spark SQL引擎检查结果,将结果加载回来,例如,选择臭氧污染物浓度随时间的变化。
spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)
复制代码
输出 :
+ — — — — — — — — — + — — — — — — — — — +
| date_parsed | O_3|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 9.010000228881836|
|2001–07–01 02:00:00| 23.81999969482422|
|2001–07–01 03:00:00| 31.059999465942383|
|2001–07–01 04:00:00| 23.780000686645508|
|2001–07–01 05:00:00| 29.530000686645508|
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows
复制代码
希望你喜欢这个帖子。在接下来的几周里,我们将发布一系列的文章,介绍你可以用Python掌握Hadoop的其他工具。