绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Spark案例练习-UV的统计
2020-05-09 16:27:45

承接上一篇文档《Spark案例练习-PV的统计》

参数说明:



继续上面的PV代码编写即可

思路:UV的计算

1.数据进行过滤清洗,获取两个字段(时间、guid)

2.guid非空,时间非空,时间字符串的长度必须大于10

3.将同一天的数据放在一起,根据guid去重,统计去重的结果

代码:

val rdd2 = rdd.map(line => line.split("\t"))

.filter(arr => {

//保留正常数据

arr.length >=3 && arr(2).trim.nonEmpty && arr(0).trim.length > 10

})

.map(arr => {

val date = arr(0).trim.substring(0,10)

val guid = arr(2).trim

(date,guid) // (date,url)

})

继续编写代码



有两种方式:

  1. 基于groupByKey进行UV的统计
  2. 基于reduceByKey实现UV的统计

先看基于groupByKey进行UV的统计

val uvRdd = rdd2.groupByKey()

.map(t => {

val date = t._1

val guids = t._2

val uv = guids.toSet.size

(date,uv)

})

println("uv------------------" + uvRdd.collect().mkString(";"))





再看基于reduceByKey实现UV的统计

rdd2.map(t => {

((t._1,t._2),1)

})

.reduceByKey(_+_)

.map(_._1)

val uvRDD: RDD[(String, Int)] = rdd2.distinct()

.map(t => (t._1, 1))

.reduceByKey(_+_)

println("uv------------------" + uvRDD.collect().mkString(";"))






终指标的合并

val pvuvRdd = pvRdd.fullOuterJoin(uvRdd)

.map(t => {

val date = t._1

val pv = t._2._1.getOrElse(0) //如果有值则返回对应的值,如果无值则返回0

val uv = t._2._2.getOrElse(0)

//返回结果

(date,pv,uv)

})



打印一下,可以看到合并的数据



数据输出(Driver、保存HDFS上,保存到RDBMS中)

数据返回给Driver

val result = pvuvRdd.collect()



保存到HDFS上

pvuvRdd.saveAsTextFile(s"hdfs://master:9000/data/pv_uv/${System.currentTimeMillis()}")



端口注意下,如果想用域名(master)就要确保在本地hosts文件配置了(win环境下)

运行一下,可以看到hdfs上有了这个文件



保存到RDBMS中、保存到非关系型数据库中

建库建表

CREATE DATABASE spark_test;


USE spark_test;


CREATE TABLE pvuv(

`date` DATE NOT NULL,

`pv` INT(11) NOT NULL,

`uv` INT(11) NOT NULL

)ENGINE=MYISAM DEFAULT CHARSET=utf8;

编写代码

其中val conn = DriverManager.getConnection("","","")这句话是url、user和password

代码

pvuvRdd.foreachPartition(iter => {

//1. 创建数据库连接对象

//2. 创建数据输出prepareStatement对象

val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark_test","root","root")

val pstmt = conn.prepareStatement("insert into pvuv(date,pv,uv) values(?,?,?);")

//3. 数据迭代输出

iter.foreach(t => {

val date = t._1

val pv = t._2

val uv = t._3

pstmt.setString(1,date)

pstmt.setInt(2,pv)

pstmt.setInt(3,uv)

pstmt.executeUpdate()

})

//4. 关闭连接

conn.close()

pstmt.close()

})






运行代码,查看数据库


分享好友

分享这个小栈给你的朋友们,一起进步吧。

Apache Spark技术专区
创建时间:2020-05-08 17:16:40
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~