Spark
Spark是什么
Spark 是一个用来实现快速而通用的集群计算的平台。
Spark 适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。Spark通过在一个统一的框架下支持这些不同的计算,简单的把各种处理流程整合在一起。
所有的Spark 程序都遵循同样的结构:程序从输入数据创建一系列RDD,再使用转化操作派生出新的RDD,最后使用行动操作收集或存储结果RDD中的数据。
Spark的优势
1. 剥离对集群关注,专注于计算本身,可以在自己的笔记本上开发Spark。
2. Spark 基于内存的基本类型(primitive)为一些应用程序带来了100 倍的性能提升。Spark 允许用户程序将数据加载到集群内存中用于反复查询。(Spark 从一开始就是为交互式查询和迭代算法设计的,同时还支持内存式存储和高效的容错机制。)
3. 通用引擎,可以通过组合使用各种组件(SQL 机器学习等)完成各种各样的运算,无缝整合。
Spark结构

Spark SQL
它提供了在分布式数据处理中进行结构化数据处理和数据查询的功能。Spark SQL允许用户使用SQL语言来查询数据,这样可以简化数据分析和处理的过程。
MLlib
MLlib提供了一些更底层的机器学习原语,包括一个通用的梯度下降优化算法
GpaphX
GraphX 是用来操作图(比如社交网络的朋友关系图)的程序库,可以进行并行的图计算。与Spark Streaming 和Spark SQL 类似,GraphX 也扩展了Spark 的RDD API,能用来创建一个顶点和边都包含任意属性的有向图。还实现了一下常见图算法。
Spark Core
Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core 中还包含了对弹性分布式数据集(resilient distributed dataset,简称RDD)的API 定义。
集群管理器
Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器(clustermanager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark 自带的一个简易调度器,叫作独立调度器。如果要在没有预装任何集群管理器的机器上安装Spark,那么Spark自带的独立调度器可以让你轻松入门。
YARN
负责管理集群资源(如CPU、内存等)
Mesos
用于有效地管理集群资源和进行任务调度。它的设计目标是为了更好地支持大规模的计算集群,并提供资源隔离、弹性伸缩和高可用性等特性。
Spark作用
不需要关注在分布式系统上编程的复杂问题(包括网络通信,程序容错),通过已有的接口快速完成常见任务,以及对应用精选监视、审查和虚拟优化。
Spark数据来源
Spark 可以将任何Hadoop 分布式文件系统(HDFS)上的文件读取为分布式数据集,支持的Hadoop 输入格式包括文本文件、SequenceFile、Avro、Parquet 等。
Spark 编程组件
Driver:发起集群上各种并行操作(main),创建SparkContext。Spark Shell就是一个驱动器。
SparkContext:整个应用的上下文,控制应用的生命周期。代表对计算集群的一个连接,shell中自动创建了一个----叫sc的变量
RDD:不可变的数据集合,可由 SparkContext 创建,是 Spark 的基本计算单元。
驱动器程序一般要管理多个执行器(executor)节点。比如,如果我们在集群上运行count() 操作,那么不同的节点会统计文件的不同部分的行数。
Spark 会自动将函数(比如line.contains("Python"))发到各个执行器节点上。这样,你就可以在单一的驱动器程序中编程,并且让代码自动运行在多个节点上。
Scala 实现 单词数量统计
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession
.builder()
.appName("Word Count Example")
.getOrCreate()
// 读取文本文件,得到一个RDD
val textRDD = spark.sparkContext.textFile("path/to/your/textfile.txt")
// 将文本数据转换成单词的RDD
val wordsRDD = textRDD.flatMap(line => line.split("\\s+"))
// 对单词RDD进行转换和操作,统计每个单词出现的次数
val wordCounts = wordsRDD
.map(word => (word, 1))
.reduceByKey(_ + _)
// 输出结果
wordCounts.collect().foreach {
case (word, count) => println(s"$word: $count")
}
// 停止SparkSession
spark.stop()
}
}