使用 Spark DataFrames 整理数据的简介
介绍
近年来,Spark 已成为分布式大数据处理和机器学习最常用的工具之一,由于其灵活性、速度和易用性,在很大程度上取代了较旧的 Hadoop MapReduce 框架。它主要用 Scala 编写,但与其他语言有各种绑定,包括 Java、R,最重要的是 Python。这些绑定使您可以将 Python 作为查询/脚本语言的表现力和熟悉度与 Scala 作为编译语言的原始功能和效率结合起来。
如上所述,Spark 是一个分布式处理框架,这意味着它将工作委托给多个工作节点或“执行器”。每个执行器都是一个独立的处理器;例如,数据处理中心的一台机器。执行器执行分配给它们的计算,然后将结果传输回单个主节点,即“驱动程序”。Spark 处理较低级别的细节,例如将哪个部分分配给哪个执行器、内存分配和监视超时,因此您可以专注于高级目标。
通常,在实际场景中,您将与驱动程序和执行程序(它们共同构成“集群”)物理分离,并通过 Internet 或 VPN 连接到它们。不过,就本教程而言,Spark 还可以使用本地计算机模拟集群,这很方便;每个核心都将是一个执行程序。通常,与集群的所有通信都是通过SparkSession对象执行的。因此,我们的第一步是创建一个。
注意:本指南假设您已经成功安装了 PySpark。
如果您使用命令行工作,命令pyspark应该实例化一个 Python shell,其中已创建SparkSession并将其分配给变量spark。另一方面,如果您更喜欢在 Jupyter 笔记本中工作,则可以运行以下代码来创建一个位于笔记本中的SparkSession 。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
太棒了!现在我们有一个活跃的SparkSession。下一步是实际获取一些要使用的数据。
创建 DataFrames
在 Spark 中,数据由DataFrame对象表示,可以将其视为遵循整洁数据格式的 2D 结构。这意味着每行代表一个观察值,每列代表一个变量;因此,列必须具有名称和类型。让我们从 Python 对象创建一个DataFrame作为具体示例来充实这个想法,并使用show方法显示它:
df = spark.createDataFrame([['Japan', 'Asia', 126.8],
['Portugal', 'Europe',10.31],
['Germany', 'Europe', 82.79],
['China', 'Asia', 1386.0],
['Pakistan', 'Asia', 197.0],
['Brazil', 'South America', 209.3],
['Spain', 'Europe', 46.72]],
['name', 'continent', 'population'])
df.show()
我们可以看到,这是一个包含国家/地区信息的DataFrame。每一行代表一个国家/地区,存储其名称、所在大洲及其人口。另一方面,每列代表相同类型的信息:例如,Name列包含数据中所有条目的名称。理解此结构至关重要,因为您可以对行和列执行不同的操作;因此,以正确的格式存储数据是处理存储为 Spark DataFrames的数据的关键。
如果您的数据仅能容纳在一台机器上(无论是笔记本电脑还是集群的驱动器),那么从本地 Python 对象创建 Spark DataFrame是没问题的。但是,在处理大数据时,您通常会有一个数据仓库或其他形式的存储,您需要从中加载数据。Spark 提供了一组丰富的 API 来实现这一点;请考虑以下两个示例:
# Reading from a Parquet archive stored at path/to/my_parquet_data
parquet_df = spark.read.parquet('path/to/my_parquet_data')
# Reading from a Hive table mytable stored in the database mydatabase
spark.sql('use mydatabase')
hive_df = spark.read.table('mytable')
此外,Spark 支持更适合本地数据的格式,例如 CSV 和 JSON,并且可以轻松扩展以从其他类型的数据存储(包括 Apache Cassandra)读取数据。不过,就目前而言,我们的玩具数据集应该足够了!
变换和动作
让我们尝试使用select方法从DataFrame中单独列出国家名称,如下所示:df.select('name')。您可能会得到类似这样的结果:
DataFrame[name: string]
这是什么?为什么我们得到的不是我们期望的名称列表,而只是 DataFrame 的描述?
要回答这个问题,我们需要了解 Spark 的求值策略。Spark 高效的因素之一是惰性求值:它将计算推迟到实际需要结果时。处理大量数据时,操作可能非常昂贵。如果计算结果实际上没有使用,那么计算结果将浪费计算能力和内存。对select的调用就是这样一个可以推迟的操作的例子;在 Spark 术语中,它被称为转换。对列和行的大多数操作都是转换。
这就是为什么上面我们必须调用df.show()而不是print(df)或类似的东西。当调用show时,必须向用户显示输出,这必然需要执行所有延迟计算,否则就没有什么可显示的!show和类似的命令强制评估所有必要的转换;因此,它们被称为动作。其他动作示例包括:
- collect方法,将数据从执行器传输到驱动程序
- count方法,计算DataFrame中的行数
- 将数据写入磁盘
除了不计算无关的结果之外,惰性求值还允许 Spark 整体优化您的查询,这可能会更有效率,因为它可以访问有关整个过程的信息,而不是一次只访问一个步骤。然而,缺点是,除了一些可以在转换时识别的易于捕捉的错误之外,错误也只会在评估操作时弹出。因此,在进行实验时,最好定期调用show来检查您的数据是否符合您的预期。
让我们通过以下命令获取这些国家的平均人口来快速尝试一下:
df.groupby().mean('population').show()
这应该会给你以下结果(或接近的结果):
+------------------+
| avg(population)|
+------------------+
|294.13142857142856|
+------------------+
结论
恭喜!您刚刚使用 Spark 执行了第一个操作!
最后,请注意show类似于print语句。它非常适合检查结果,但有时您需要实际值;例如,我们刚刚计算的平均人口为double。为此,只需将上面的show替换为collect,它将返回Row对象的列表。
没有什么可以阻止你对原始数据运行collect ;你可以在这里使用df.collect()来执行此操作。在这里,这样做是可行的,因为df非常小。然而,在有数亿行数据的情况下,尝试将所有数据拉到你的驱动程序很可能会使其崩溃,所以要小心!
免责声明:本内容来源于第三方作者授权、网友推荐或互联网整理,旨在为广大用户提供学习与参考之用。所有文本和图片版权归原创网站或作者本人所有,其观点并不代表本站立场。如有任何版权侵犯或转载不当之情况,请与我们取得联系,我们将尽快进行相关处理与修改。感谢您的理解与支持!
请先 登录后发表评论 ~