1、实验描述
- 利用Scala语言开发Spark WordCount程序
- 实验时长:
- 45分钟
- 主要步骤:
- 创建spark项目
- 编写wordcount 示例程序
- 运行Scala 程序
- 查看实验结果
2、实验环境
- 虚拟机数量:3(一主两从,主机名分别为:master、slave01、slave02)
- 系统版本:Centos 7.5
- Hadoop版本:Apache Hadoop 2.7.3
- Spark版本:Apache Spark 2.1.1
- JDK 版本:1.8.0_131
- Scala版本: scala2.11.11
- IDEA版本:ideaIC-2017.2.7
3、相关技能
- Spark集成开发环境IDEA开发程序
- Scala编写Spark程序
4、知识点
- 常见linux命令的使用
- Scala IDEA编程
- Spark RDD
- Scala原理
5、实验效果
spark RDD 单词计数结果查看操作最终效果如下图:
6、实验步骤
6.1打开虚拟机并启动Hadoop集群
6.1.1在master启动Hadoop集群
[zkpk@master ~]$ cd hadoop-2.7.3/
[zkpk@master hadoop-2.7.3]$ sbin/start-all.sh
[zkpk@master hadoop-2.7.3]$ hdfs dfsadmin -safemode leave
6.1.2在master上运行jps,确认NameNode, SecondaryNameNode, ResourceManager进程启动
6.1.3在slave01上运行jps,确认DataNode, NodeManager进程启动
6.1.4在slave02上运行jps,确认DataNode, NodeManager进程启动
6.2打开IDEA,配置软件包依赖,创建工程
[zkpk@master ~]$ cd idea-IC-172.4574.19/
[zkpk@master idea-IC-172.4574.19]$ nohup bin/idea.sh &
6.2.1进入如下界面,点击 Create New Project
6.2.2进入如下图界面,按照图标依次点击,最后点击next
6.2.3依次输入GroupId和ArtifactId和Version的值,随后点击next
6.2.4进入如下界面,设置本地Maven项目的setting.xml文件和warehouse仓库,点击next按钮
6.2.4.1本地setting.xml文件在/home/zkpk/apache-maven-3.5.0/conf目录下
6.2.4.2本地仓库文件夹warehouse在/home/zkpk/apache-maven-3.5.0/warehouse
6.2.5进入如下界面,输入工程名称spark_test,然后点击next
6.2.6进入如下界面,即表示工程spark_test创建成功
6.2.7如5.1.6步骤所示,工程创建完成后会自动打开一个名为zkpk的xml文件,删除如下图标红部分的依赖
6.2.8在xml文件中找到properties配置项,修改scala版本号(此处对应scala安装版本),并添加spark版本号(此处对应spark安装版本)
6.2.9找到dependency配置项,添加如下图标红部分的配置,分别是scala依赖和spark依赖,${scala.version}表示上述配置的scala.version变量
6.2.10一般修改pom.xml文件后,会提示enable auto-import,点击即可,如果没有提示,则可以点击工程名,依次选择Maven—->Reimport,即可根据pom.xml文件导入依赖包
6.2.11设置语言环境language level,点击菜单栏中的file,选择Project Structure
6.2.12弹出如下对话框,选择Modules,选择Language level为8,然后点击Apply,点击OK
6.2.13设置Java Compiler环境,点击菜单栏中的file,选择Setting
6.2.14弹出如下对话框,依次选择Build,Execution—->Compiler—->Java Compiler,设置图中的Project bytecode version为1.8,设置图中的Target bytecode version为1.8,然后依次点击Apply和OK
6.2.15如下图删除测试环境test中的测试类
6.2.16如下图删除,main文件夹中,包名下的App文件
6.2.17至此,Spark Maven工程创建完毕
6.3编写Scala程序完成Spark单词计数
6.3.1如下图依次打开src—>main—>scala,在org.zkpk.lab上点击右键,创建Scala Class
6.3.2弹出如下对话框,输入类名ScalaWordCount,点击ok
6.3.3在类ScalaWordCount中新建伴生对象object ScalaWordCount
6.3.4在伴生对象object ScalaWordCount中创建main方法
6.3.5在main方法中创建列表List对象并赋值给常量list,列表中包含4个元素,分别是:“hello
hi hi spark”,“hello spark hello hi sparksql”,“hello hi hi sparkstreaming”,“hello hi sparkgraphx”
6.3.6创建SparkConf对象,对Spark运行属性进行配置,调用该对象的setAppName方法设置Spark程序的名称为“word-count”,调用setMaster方法设置Spark程序运行模式,一般分为两种:本地模式和yarn模式,这里我们采用本地模式,参数为“local[*]”,属性设置完成后赋值给常量sparkConf
6.3.7创建SparkContext,参数为sparkconf,赋值给常量sc,该对象是Spark程序的入口
6.3.8调用SparkContext对象sc的方法parallelize,参数为列表对象list,该方法使用现成的scala集合生成RDD
lines,类型为String(RDD为Spark计算中的最小数据单元),该RDD存储元素是字符串语句
6.3.9调用RDD对象lines的flatMap方法按照空格切分RDD中的字符串元素,并存入新的RDD对象words中,参数类型为String,该RDD存储的元素是每一个单词
6.3.10调用RDD对象words的map方法,将RDD中的每一个单词转换为kv对,key是String类型的单词,value是Int类型的1,并赋值给新的RDD对象wordAndOne,参数为(String,Int)类型键值对
6.3.11调用RDD对象wordAndOne的reduceByKey方法,传入的参数为两个Int类型变量,该方法将RDD中的元素按照Key进行分组,将同一组中的value值进行聚合操作,得到valueRet,最终返回(key,valueRet)键值对,并赋值给新的RDD对象wordAndNum,参数为(String,Int)类型键值对
6.3.12调用RDD对象wordAndNum的sortBy方法,第一个参数为kv对中的value,即单词出现次数,第二个参数为boolean类型,true表示升序,false表示降序
6.3.13调用ret对象的collect方法,获取集合中的元素,再调用mkString方法,参数为“,”,将集合中的元素用逗号连接成字符串,调用println方法打印输出在控制台
6.3.14调用ret对象的saveAsTextFile,该方法的参数为运行时指定的参数,此方法的用处是将Spark程序运行的结果保存到指定路径,一般是把结果保存到HDFS中,所以这里的参数定义为:
hdfs://master:9000/sparktest
6.3.14.1HDFS根目录中不能存在sparktest此目录,spark程序会自动创建该目录
6.3.15调用SparkContext对象sc的stop方法,释放spark程序所占用的资源
6.4运行程序,查看结果
6.4.1程序编辑完成后,点击菜单栏上的Run按钮,选择Run…
6.4.2弹出对话框,选择第一个Edit Configurations
6.4.3进入运行参数配置页面,点击+号按钮,选择Application,进入Application配置界面
6.4.4指定Application的名称Name,主函数Main Class,参数arguments和运行模式VM
options(由于在编写程序是已经指定了本地运行模式,所以这里的运行模式不指定),最后点击运行
6.4.5查看控制台输出
6.4.6查看HDFS输出
7、参考答案
代码清单ScalaWordCount
package org.zkpk.lab
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}
class ScalaWordCount {
}
object ScalaWordCount{
def main(args:Array[String]):Unit={
val list =List("hello hi hi spark",
"hello spark hello hi sparksql",
"hello hi hi sparkstreaming",
"hello hi sparkgraphx")
val sparkConf=new SparkConf().setAppName("word-count").setMaster("local[*]")
val sc=new SparkContext(sparkConf)
val lines:RDD[String]=sc.parallelize(list)
val words:RDD[String]=lines.flatMap((line:String) => {line.split(" ")})
val wordAndOne:RDD[(String,Int)]=words.map((word:String) => {(word,1)})
val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((count1:Int,count2:Int) => {count1+count2})
val ret=wordAndNum.sortBy(kv=>kv._2,false)
println(ret.collect().mkString(","))
ret.saveAsTextFile(args(0))
sc.stop()
}
}
8、总结
本实验是利用scala语言完成spark单词计数,快速掌握Spark程序Scala编程