1、实验描述
- 根据实际业务需求使用spark 完成对Apache格式的日志内容的分析。
- 实验时长:
- 45分钟
- 主要步骤:
- 启动spark-shell
- 自定义日志过滤函数
- 日志预处理
- 统计关于日志的相关指标
- 统计结果展示或保存到本地
2、实验环境
- 虚拟机数量:1
- 系统版本:Centos 7.5
- Scala版本:2.11.0
- Spark版本: Apache spark-2.1.1
3、相关技能
- Spark 常用算子
- apache日志的格式
- RDD 常见的操作
4、知识点
- 使用Scala 开发spark 应用
- 常见的日志格式
- Spark 日志分析的基本方式
5、实现效果
统计不同页面的访问操作效果如下图:
6、实验步骤
6.1打开终端,启动spark-shell。
6.1.1启动spark-shell,启动时指定启动模式
[zkpk@master ~]$ cd spark-2.1.1-bin-hadoop2.7/
[zkpk@master spark-2.1.1-bin-hadoop2.7]$ bin/spark-shell --master local[2]
6.2加载本地文件,使用textFile方法加载本地数据
scala> val logFile = "/home/zkpk/experiment/04/access.log"
scala> val logRDD = sc.textFile(logFile)
6.3Apache 日志的一般格式:
6.3.1日志内容从左到右依次是:远程IP地址,客户端记录,浏览者记录,
6.3.2请求的时间,包括三项内容:,日期,时间,时区,服务器收到的请求,包括三项内容:
6.3.2.1METHOD:请求的方法,GET/POST等
6.3.2.2RESOURCE:请求的目标链接地址
6.3.2.3PROTOCOL:HTTP版本号
6.3.3状态代码,表示请求是否成功
6.3.4发送的字节数
6.3.5发出请求时所在的URL
6.3.6客户端的详细信息:操作系统及浏览器等
6.4数据预处理:获取合法的日志数据,使用正则表达式做两件事情,一个是过滤掉非法的日志,一个是解析过滤后的日志来获得需要的数据元组。
6.4.1过滤无法解析的日志记录
scala > val LogPatterns= """^(\S+) (\S+) (\S+) \[([\w/]+)([\w:/]+)\s([+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
def filterWithparse(s: String) = {
LogPatterns.findFirstIn(s) match {
case Some(LogPatterns (_*)) => truecase _ => false
}
}
6.4.2定义解析日志的函数:在Scala命令行键入如下内容,解析日志记录,得到需要的元组
def parseLog(s: String) = {
val m = LogPatterns.findAllIn(s)
if(m.hasNext){
val clientIP = m.group(1).toString
val requestDate = m.group(4).toString
val requestURL = m.group(8).toString
val status = m.group(10).toString
(clientIP, requestDate, requestURL, status) } else {
("null", "null", "null", "null")
}
}
6.4.3解析日志文件
scala > val tmp = logRDD.filter(filterWithparse)
scala > val logRDDv1 = sc.parallelize(tmp.take(tmp.count().toInt).map(parseLog))
6.4.4统计每日PV,使用count操作
scala > logRDDv1.count
6.4.5使用sortByKey,按照请求日期字段进行排序,并将结果保存到本地
scala > val logRDDv2 = logRDDv1.map(x => (x._2, 1)).reduceByKey(_ + _)
//元组中的元素的访问通过下划线点下标访问(下标从1开始计)
scala> logRDDv2.sortByKey().coalesce(1).saveAsTextFile("/tmp/log_date")
6.4.5.1查看结果:
[zkpk@master ~]$more /tmp/log_date/part-00000
6.4.6统计独立IP数
scala> val logRDDv3 = logRDDv1.map(x => (x._2, x._1))
scala> val logRDDv4=logRDDv3.distinct()
scala> val logRDDv5 = logRDDv4.map(x => (x._1, 1)).reduceByKey(_ + _)
scala> val distinct_ip = logRDDv5.collect()
6.5统计每种不同的HTTP状态对应的访问次数,并且以降序展示
scala > val logRDDv6 = logRDDv1.map(x => (x._4, 1)).reduceByKey(_ + _)
scala > val StatusPV = logRDDv6.sortByKey().collect()
6.6统计不同独立IP的访问量,按照降序排列并展示前10条
scala> val logRDDv7 = logRDDv1.map(x => (x._1, 1)).reduceByKey(_ + _)
scala> val disinctIP_count = logRDDv7.sortBy(x=>x._2, false).take(10)
6.7统计不同页面的访问量
scala> val logRDDv8 = logRDDv1.map(x => (x._3, 1)).reduceByKey(_ + _)
scala> val distinct_PV = logRDDv8.sortBy(x=>x._2, false).take(10).foreach(println)
6.7.1由于日志中有大量的js文件的访问,因此我们增加一个去除列表,过滤掉属于列表中后缀名的文件
scala> val stopList = List("jpg", "ico", "png", "gif", "css", "txt", "asp")
def filterWithStop(s: String): Boolean = {
for(c <- stopList){
if(s.endsWith("."+c)){
false}
}
true
}
scala> val logRDDv9 = logRDDv1.filter(x => filterWithStop(x._3))
6.7.2再对过滤后的logRDDv9执行统计操作
scala> logRDDv9.map(x=>(x._3, 1)).reduceByKey(_+_).sortByKey().foreach(println)
7、总结
本次实验,通过使用Spark统计分析apache格式的日志文件,达到熟练掌握spark相关算子的目的