我们的应用场景是分析用户使用手机App的行为,描述如下所示:
1、手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
2、后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming
3、经过Spark Streaming实时计算程序分析,将结果写入Redis
本例子是采用模拟的kafka生产json的数据,通过spark进行消费,然后将结果保存至redis中。其中spark的streaming实现有两种方式,即有receiver和没有receiver的两种。
在开始本例子之前需要确保以下几件事情:
1、redis是通过授权的,且密码是admin(可修改)。
//admin是密码
/usr/local/redis/bin/redis-cli -a admin
2、确保zookeeper是服务正常的且端口是默认的,或者修改本实例代码中的端口即可
3、确保kafka的服务是正常的且端口是默认的,或者修改本实例代码中的端口即可
运行环境:
centos7
jdk1.8
kafka: kafka_2.11-0.8.2.2
spark :spark-2.2.0-bin-hadoop2.7
scala :2.11.8
redis :redis-4.0.1
以下正文开始:
一、生产kafka数据
/**
* Created by Administrator on 2017/9/13.
* kafka生产者用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式
* 一个事件包含4个字段:
* 1、uid:用户编号
* 2、event_time:事件发生时间戳
* 3、os_type:手机App操作系统类型
* 4、click_count:点击次数
*/
object
KafkaEventProducer {
private val
users
=
Array
(
"user_01"
,
"user_02"
,
"user_03"
,
"user_04"
,
"user_05"
,
"user_06"
,
"user_06"
,
"user_08"
,
"user_09"
,
"user_10"
)
private val
random
=
new
Random()
private var
pointer
= -
1
def
getUserID():
String
= {
pointer
=
pointer
+
1
if
(
pointer
>=
users
.length) {
pointer
=
0
users
(
pointer
)
}
else
{
users
(
pointer
)
}
}
def
click(): Double = {
random
.nextInt(
10
)
}
def
main(args: Array[
String
]): Unit = {
val
topic =
"kafka_spark_redis_T"
//kafka集群
val
brokers =
"hadoop2:9092,hadoop3:9092,hadoop4:9092"
val
props =
new
Properties()
props.put(
"metadata.broker.list"
, brokers)
props.put(
"serializer.class"
,
"kafka.serializer.StringEncoder"
)
//可以不要
props.put(
"group.id"
,
"sparkTest"
)
val
kafkaConfig =
new
ProducerConfig(props)
val
producer =
new
Producer[
String
,
String
](kafkaConfig)
while
(
true
) {
// prepare event data
val
event =
new
JSONObject()
event
.put(
"uid"
,
getUserID
)
.put(
"event_time"
, System.
currentTimeMillis
.toString)
.put(
"os_type"
,
"ios"
)
.put(
"click_count"
,
click
)
// produce event message
producer.send(
new
KeyedMessage[
String
,
String
](topic, event.toString))
println
(
"Message sent: "
+ event)
//control produce rate
Thread.
sleep
(
200
)
}
}
}
二、建立redis连接池
object
RedisClient
extends
Serializable {
val
redisHost
=
"hadoop4"//redis服务器
val
redisPort
=
6379
val
redisTimeout
=
30000
val
MAX_ACTIVE
: Int =
1024
val
MAX_IDLE
: Int =
200
val
MAX_WAIT
: Int =
10000
val
TEST_ON_BORROW
: Boolean =
true
val
AUTH
=
"admin"//授权密码
val
config
: JedisPoolConfig =
new
JedisPoolConfig
config
.setMaxTotal(
MAX_ACTIVE
)
config
.setMaxIdle(
MAX_IDLE
)
config
.setMaxWaitMillis(
MAX_WAIT
)
config
.setTestOnBorrow(
TEST_ON_BORROW
)
lazy val
pool
=
new
JedisPool(
config
,
redisHost
,
redisPort
,
redisTimeout
,
AUTH
)
lazy val
hook
=
new
Thread {
override def
run = {
println
(
"Execute hook thread: "
+
this
)
pool
.destroy()
}
}
sys.
addShutdownHook
(
hook
.run)
}
三、spark streaming消费数据,并存往redis中。
KafkaUtils.
createStream
object
Ka_spark_redis {
def
main(args: Array[
String
]): Unit = {
val
topics =
"kafka_spark_redis_T"//与produce中的topics相对应
val
numThreads =
3
val
zkQuorum =
"hadoop2:2181"//zookeeper地址,可以是集群
val
group =
"spaekTest"//与produce中的group相对应
val
sparkConf =
new
SparkConf().setAppName(
"Ka_spark_redis_T"
) .setMaster(
"local[2]"
)
Logger.
getLogger
(
"spark"
).setLevel(Level.
WARN
)
val
ssc =
new
StreamingContext(sparkConf,
Seconds
(
5
))
val
clickHashKey =
"app_users_click"//redis中Hash的名字,存储的格式<k,v>
val
topicMap = topics.split(
","
).map((_, numThreads.toInt)).toMap
val
data = KafkaUtils.
createStream
(ssc, zkQuorum, group, topicMap, StorageLevel.
MEMORY_AND_DISK_SER
)
//{"uid":"user_02",
// "event_time":"1505270531256",
// "os_type":"Android",
// "click_count":4}
val
events = data.flatMap(line => {
val
data = JSONObject.
fromObject
(line._2)
Some
(data)
})
// Compute user click times
val
userClicks = events.map(x => (x.getString(
"uid"
), x.getInt(
"click_count"
))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val
uid = pair._1
val
clickCount = pair._2
val
jedis = RedisClient.
pool
.getResource
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.
pool
.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
KafkaUtils. createDirectStream
object
UserClickCountAnalytics {
def
main(args: Array[
String
]): Unit = {
var
masterUrl =
"local[1]"
// Create a StreamingContext with the given master URL
val
conf =
new
SparkConf().setMaster(masterUrl).setAppName(
"UserClickCountStat"
)
val
ssc =
new
StreamingContext(conf,
Seconds
(
5
))
// Kafka configurations
val
topics =
Set
(
"kafka_spark_redis_T"
)
val
brokers =
"hadoop2:9092,hadoop3:9092,hadoop4:9092"
val
groupId =
"sparkTest"
val
kafkaParams =
Map
[
String
,
String
](
"metadata.broker.list"
-> brokers,
"serializer.class"
->
"kafka.serializer.StringEncoder"
)
val
clickHashKey =
"app_users_click"
// Create a direct stream
val
kafkaStream = KafkaUtils.
createDirectStream
[
String
,
String
, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val
events = kafkaStream.flatMap(line => {
val
data = JSONObject.
fromObject
(line._2)
Some
(data)
})
// Compute user click times
val
userClicks = events.map(x => (x.getString(
"uid"
), x.getInt(
"click_count"
))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val
uid = pair._1
val
clickCount = pair._2
val
jedis = RedisClient.
pool
.getResource
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.
pool
.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
四、运行环境依赖
本实例依赖spark和scala,版本已经在上面有列出,以下是部分依赖。
<
dependency
>
<
groupId
>
org.apache.spark
</
groupId
>
<
artifactId
>
spark-streaming-kafka-0-8_2.11
</
artifactId
>
<
version
>
2.2.0
</
version
>
</
dependency
>
<
dependency
>
<
groupId
>
org.apache.spark
</
groupId
>
<
artifactId
>
spark-streaming-flume_2.11
</
artifactId
>
<
version
>
2.2.0
</
version
>
</
dependency
>
<
dependency
>
<
groupId
>
org.codehaus.jettison
</
groupId
>
<
artifactId
>
jettison
</
artifactId
>
<
version
>
1.3.8
</
version
>
</
dependency
>
<
dependency
>
<
groupId
>
mysql
</
groupId
>
<
artifactId
>
mysql-connector-java
</
artifactId
>
<
version
>
5.1.43
</
version
>
</
dependency
>
<
dependency
>
<
groupId
>
net.sf.json-lib
</
groupId
>
<
artifactId
>
json-lib
</
artifactId
>
<
version
>
2.3
</
version
>
</
dependency
>
<
dependency
>
<
groupId
>
redis.clients
</
groupId
>
<
artifactId
>
jedis
</
artifactId
>
<
version
>
2.9.0
</
version
>
</
dependency
>
<
dependency
>
<
groupId
>
org.apache.commons
</
groupId
>
<
artifactId
>
commons-pool2
</
artifactId
>
<
version
>
2.2
</
version
>
</
dependency
>