作者信息

作者: 余辉       微信公众号:辉哥大数据
购买地址: 京东淘宝当当网
读者须知:本书配套示例源码、PPT课件、教学视频与作者答疑服务,购买之后可加粉丝群

本书封面

在这里插入图片描述


第14章 Spark实战案例

       本章将带你领略Spark在数据分析领域的实战风采。从使用Spark Core进行电影数据与日志数据分析,到使用Spark SQL进行电商数据和金融数据分析,帮助你全方位掌握Spark数据分析方法与技巧。本章是你提升Spark实战能力的绝佳选择。
       本章主要知识点:

  • Spark Core电影数据分析
  • Spark Core日志数据分析
  • Spark SQL电商数据分析
  • Spark SQL金融数据分析

14.1 Spark Core电影数据分析

       在深入探索Spark Core的统计功能时,我们将通过两个紧密相关的案例——用户连续登录超过3天和电影统计,来展示其强大的数据处理能力。为了增强学习的全面性,我们将使用同一组数据集,但分别采用Spark SQL的DSL编程风格和传统的SQL编程风格来实现相同的需求。
       在第一个案例中,我们将关注用户的登录行为,利用Spark SQL的DSL风格编写代码,以识别出连续登录超过3天的用户。
而在第二个案例中,我们将转换风格,使用SQL语句对电影数据进行统计分析,如计算各类电影的评分分布和受欢迎程度。
       通过这两种不同的实现方式,读者不仅能够复习Spark SQL的两种编程风格,还能更深入地理解如何在不同场景下灵活运用Spark Core进行数据处理和统计分析。

14.1.1 表格及数据样例

       为了让读者直观了解本次实战案例的数据和字段类型,这里将数据和字段类型以图片和表格形式进行展示。其中,图14-1所示的“用户登录表和电影评分表”是本次实验的两组数据的表格和字段。图14-1所示的“用户登录表数据”是以CSV文件进行存储的,文件名为loginUser.csv。图14-2所示的电影评分表数据是以JSON文件进行存储的,文件名为rating.json。
在这里插入图片描述

14.1.2 连续登录超过3天的用户DSL风格

本案例需求:
(1)使用DSL风格进行编程。
(2)使用用户登录表中的数据。
(3)根据提供的数据找出连续登录超过3天的用户。
连续登录超过3天的用户DSL风格示例代码如代码14-1所示。
代码14-1 LoginDSL.scala

package chapter14

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import utils.SparkUtils
/**
 * 需求:
 * 找出连续登录超过3天的用户
 *
 * 步骤:
 * 1. 开窗,按照uid分区,按照dt排序,标记rn
 * 2. 然后使用date_sub函数,用dt减去rn,标记为dis
 * 3. 使用uid和dis分组,标记count为cn
 * 4. where cs > 2
 **/
object LoginDSL {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkUtils.getSparkSeesion()
    import org.apache.spark.sql.functions._
    import spark.implicits._
    val df = spark.read
      .options(Map("header" -> "true", "inferSchema" -> "true"))
      .csv("doc/exercise/用户登录/loginUser.csv")
    df.printSchema()
    df.show()
    /** *
     * Error:(29, 40) type mismatch;
     * found   : Symbol
     * required: Int
     * .select('uid, 'dt, date_sub('dt, 'rn))
     */
    val win = Window.partitionBy('uid).orderBy('dt)
    df.select('uid, date_format('dt, "yyyy-MM-dd") as "dt", row_number() over (win) as "rn")
      .select('uid, 'dt, expr("date_sub(dt,rn)") as "dis")
      .groupBy('uid,'dis).agg(min('dt),max('dt),count('uid) as "cs")
      .where("cs > 2").drop('dis)
      .show()
  }
}

执行以上代码,输出结果如图14-3所示。
在这里插入图片描述

14.1.3 连续登录超过3天的用户SQL风格

本案例需求:
(1)使用SQL风格进行编程。
(2)使用用户登录表中的数据。
(3)根据提供的数据找出连续登录超过3天的用户。
连续登录超过3天的用户SQL风格示例代码如代码14-2所示。
代码14-2 LoginSQL.scala

package chapter14

import org.apache.spark.sql.{DataFrame, SparkSession}
import utils.SparkUtils
/**
 * 需求:
 * 找出连续登录超过3天的用户
 *
 * 步骤:
 * 1. 开窗,按照uid分区,按照dt排序,标记rn
 * 2. 然后使用date_sub函数,用dt减去rn,标记为dis
 * 3. 使用uid和dis分组,标记count为cn
 * 4. where cs > 2
 **/
object LoginSQL {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkUtils.getSparkSeesion()
    val frame: DataFrame = spark.read
      .option("header",true)
      .csv("doc/exercise/用户登录/loginUser.csv")

    frame.printSchema()
    frame.show()

    frame.createTempView("login")

    spark.sql(
      """
        |select
        |uid , min(dt) , max(dt) ,count(1) as cts
        |from
        |(
        | select
        |   uid , dt ,date_sub(dt,rn) as dis
        | from
        | (
        |   select
        |       uid , dt , row_number() over(partition by uid order by dt) rn
        |   from login
        | ) t2
        |) t3
        |group by uid , dis
        |having cts > 2
        |""".stripMargin).show()
  }
}

执行以上代码,输出结果如图14-4所示。
在这里插入图片描述
14.1.4 电影统计DSL风格
本案例需求:
(1)使用DSL风格。
(2)求每个人评价最高的10部电影。
(3)求最热门的前50部电影(被评分的次数说明热门程度)。
(4)求每个人的评分总和。
(5)求每部电影的总得分和平均得分。
电影统计DSL风格示例代码如代码14-3所示。
代码14-3 MovieDSL.scala

package chapter14

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
import utils.SparkUtils
/**
 * 需求:
 * 1. 求每个人评价最高的10部电影
 * 2. 求最热门的前50部电影 (被评分的次数说明热门程度)
 * 3. 求每个人的评分总和
 * 4. 求每部电影的总得分和平均得分
 **/
object MovieDSL {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkUtils.getSparkSeesion()
    import org.apache.spark.sql.functions._
    import spark.implicits._
    val df: DataFrame = spark.read.json("doc/exercise/电影评分/rating.json").drop("raete")
    // df.printSchema()
    // df.show(10)

    /***
     * 每个人评价最高的10部电影
     * 1. 首先按照用户进行分区 ,按照评分进行排序(降序),且打上标签(窗口函数)
     * 2. 只需要获取 rn <11
     *
     * uid    rate  rn
     * uid1   5      1
     * uid1   4      2
     * uid1   4      3
     * uid1   3      4
     * uid1   3      5
     */
    val win = Window.partitionBy('uid).orderBy('rate.desc)
    df.select('uid,'movie,'rate,row_number() over(win) as "rn")
      .where("rn < 11")
      .show()

    /***
     * 最热门的前50部电影(被评分的次数说明热门程度)
     * 1. 按照电影进行分组,求评分次数
     * 2. 将评分次数进行排序(降序)
     */
    df.groupBy('movie).agg(count('movie) as "cm").orderBy('cm.desc).show()

    /***
     * 每个人的评分总和
     */
    df.groupBy('uid).agg(sum('rate)).show()

    /***
     * 每部电影的总得分和平均得分
     *  1. 按照电影进行分组
     *  2. 再在组内求sum和avg
     */
    df.groupBy('movie).agg(sum('rate), avg('rate)).show()
  }
}

注意:本节执行结果可查看14.1.6节。
14.1.5 电影统计SQL风格
本案例需求:
(1)使用SQL风格。
(2)求每个人评价最高的10部电影。
(3)求最热门的前50部电影(被评分的次数说明热门程度)。
(4)求每个人的评分总和。
(5)求每部电影的总得分和平均得分。
电影统计SQL风格示例代码如代码14-4所示。
代码14-4 MovieSQL.scala

package chapter14

import org.apache.spark.sql.{DataFrame, SparkSession}
import utils.SparkUtils
/**
 * 需求:
 * 1. 求每个人评价最高的10部电影
 * 2. 求最热门的前50部电影 (被评分的次数说明热门程度)
 * 3. 求每个人的评分总和
 * 4. 求每部电影的总得分和平均得分
 **/
object MovieSQL {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkUtils.getSparkSeesion()
    val frame: DataFrame = spark
      .read
      .json("doc/exercise/电影评分/rating.json")
      .drop("raete")
    frame.createTempView("t")
    /***
     * 每个人评价最高的10部电影
     * 1. 首先按照用户进行分区,按照评分进行排序(降序),且打上标签(窗口函数)
     * 2. 只需要获取 rn <11
     *
     * uid    rate  rn
     * uid1   5      1
     * uid1   4      2
     * uid1   4      3
     * uid1   3      4
     * uid1   3      5
     */

    spark.sql(
      """
        |
        |select
        |  uid , movie ,rate,rn
        |from
        |(
        |   select
        |     uid , movie ,rate ,row_number() over(partition by uid order by rate desc) rn
        |   from t
        |) t2
        |where rn < 11
        |
        |""".stripMargin).show()

    /***
     * 最热门的前50部电影 (被评分的次数说明热门程度)
     * 1. 按照电影进行分组,求评分次数
     * 2. 将评分次数进行排序(降序)
     */

    spark.sql(
      """
        |
        |select
        |   movie,cs
        |from (
        |       select
        |       movie,count(1) as cs
        |       from t
        |       group by movie
        |) t2
        |order by cs desc
        |
        |""".stripMargin).show()

    /***
     * 每个人的评分总和
     */

    spark.sql(
      """
        |select
        |  uid,sum(rate)
        |from t group by uid
        |
        |""".stripMargin).show()

    /***
     * 每部电影的总得分和平均得分
     *  1. 按照电影进行分组
     *  2. 再在组内求sum和avg
     */
    spark.sql(
      """
        |select
        |   movie,sum(rate),avg(rate)
        |from t
        |group by movie
        |
        |""".stripMargin).show()
  }
}

本示例执行结果可查看14.1.6节。

14.1.6 电影统计运行结果

本案例最终代码实现的结果:
(1)每个人评价最高的10部电影,执行结果如图14-5所示。
(2)最热门的前50部电影(被评分的次数说明热门程度),执行结果如图14-6所示。
在这里插入图片描述
(3)每个人的评分总和,执行结果如图14-7所示。
(4)每部电影的总得分和平均得分,执行结果如图14-8所示。
在这里插入图片描述

14.2 Spark Core日志数据分析

       在互联网企业中,用户浏览网页的日志分析项目极为常见。本节将演示基于互联网企业设计的一个浏览网页的日志分析案例。
       首先需要从服务器或数据库中获取原始的日志数据。这些数据通常是非结构化或者结构化的,包含大量的用户访问信息。
       接着,利用Spark进行数据清洗,包括筛选有效记录、去重、格式转换等步骤,以确保数据质量。完成数据清洗后,使用Spark进行计算和存储。例如,可以计算每个网页的访问量,并将结果存储到HDFS、关系数据库或NoSQL数据库中,以便后续的分析和查询。
       最后,使用可视化工具如Echarts、FineBI或Tableau等,将分析结果以图表的形式展示出来。这些图表可以直观地展示各个网页的访问情况,帮助分析师更好地理解用户行为,优化网站设计和内容策略。
       整个过程充分利用了Spark的高速计算能力和丰富的数据处理功能,确保分析结果的准确性和时效性。
       通过本案例的学习,读者可以复习在Spark中使用Case Class、Schema+Row、JavaBean创建DataFrame方式,Spark SQL中的窗口函数的使用,以及foreach中连接MySQL的方法。

14.2.1 前期准备

       1. 表格及数据样例
       为了让读者直观了解本次实战案例的数据和字段类型,我们将数据和字段类型以图片形式进行展示。其中,图14-9所示的“页面浏览表及字段”是本次实验的表格和字段。图14-9所示的“前10条案例数据”是以TXT文件进行存储的,文件名为output_buffered.txt。
在这里插入图片描述
       2. 需求实现及计算要求
       统计每天PV并用FineBI可视化。要求使用Schema和Row创建DataFreme,使用Spark SQL进行计算,将每天对应的PV结果存储到MySQL中,存储字段有dt、pv,通过FineBI工具进行可视化展示。
       统计每天UV并用FineBI可视化。要求使用Case Class创建DataFreme,使用Spark SQL进行计算,将每天对应的UV结果存储到MySQL中,存储字段有dt、uv,通过FineBI工具进行可视化 展示。
       统计每天TopN并用FineBI可视化。要求使用JavaBean创建DataFreme,使用Spark SQL进行计算,将每天Top5的RUL结果存储到MySQL中,存储字段有dt、url、url_count,通过FineBI工具进行可视化展示。

       3. 可视化工具
       FineBI是帆软软件有限公司推出的一款商业智能(Business Intelligence,BI)产品。它旨在帮助企业快速搭建面向全员的自助分析BI平台,通过简单拖曳即可制作出丰富多样的数据可视化信息,让业务人员能够自主分析数据并辅助决策。FineBI支持多种数据源,提供数据预览、血缘分析等功能,并注重数据安全,支持权限设置。此外,它还具备高性能计算引擎,能够处理大规模数据集。

  • FineBI的官网地址为https:// home.fanruan.com/。
  • FineBI的帮助页面为https:// help.fanruan.com/finebi/。

        4. MySQL的Maven

将以下MySQL的依赖复制到pom.xml中:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.30</version>
</dependency> 

14.2.2 统计PV和可视化

由于结果数据需要存储到MySQL中,因此提前创建表格,MySQL的执行语句如下:

drop table view_pv;
CREATE TABLE view_pv(  
  dt char(20) not null primary key,
  pv int
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

统计PV的示例代码如代码14-5所示。
代码14-5 DataViewPV.scala

package chapter14

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
 * drop table view_pv;
 * CREATE TABLE view_pv(
 * dt char(20) not null primary key,
 * pv int
 * )ENGINE=InnoDB DEFAULT CHARSET=utf8;
 *
 * SELECT * from view_pv;
 *
 */
object DataViewPV {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("")
      .master("local[*]")
      .getOrCreate()
    val schema = new StructType()
      .add("dt", StringType)
      .add("name", StringType)
      .add("url", StringType)
    val lines = spark.sparkContext.textFile("doc/output_buffered.txt")
    // 将RDD关联了Schema,但依然是RDD
    val row = lines.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1), fields(2))
    })

    val df = spark.createDataFrame(row, schema)
    df.show()
    df.createTempView("dataTable")

    val view_pv = spark.sql(
      """
        |
        | select
        |    dt ,
        |    cast(count(name) as int) as pv
        | from dataTable group by dt
        |
        |""".stripMargin)
    //  把数据保存到MySQL表中
    view_pv.rdd.foreach(line => {
      // 每条数据与MySQL建立连接
      // 把数据插入MySQL表操作
      // 1. 获取连接
      val connection: Connection = DriverManager.getConnection("jdbc:mysql:// localhost:3306/spark", "root", "yuhui888")
      // 2. 定义插入数据的SQL语句
      val sql = "insert into view_pv(dt,pv) values(?,?)"
      // 3. 获取PreParedStatement

      try {
        val ps: PreparedStatement = connection.prepareStatement(sql)
        // 4. 获取数据,给?号赋值
        ps.setString(1, line.getAs[String](0))
        ps.setInt(2, line.getAs[Int](1))
        // 执行
        ps.execute()
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    })
  }
}

执行上述代码,将数据存储到MySQL的view_pv表中,结果如图14-10所示。
在这里插入图片描述
在FineBI的“数据中心”中,加载MySQL中的view_pv表,形成FineBI中的spark_view_pv数据集,如图14-11所示。
在这里插入图片描述
在FineBI的“我的分析”中,加载FineBI中的spark_view_pv数据集,通过拖曳方式形成最终需要的图表。统计PV数据可视化展示如图14-12所示。横轴为日期,纵轴为PV总数。
在这里插入图片描述

14.2.3 统计UV和可视化

MySQL表格创建
由于结果数据需要存储到MySQL中,因此提前创建表格,MySQL的执行语句如下:

drop table view_uv;  
CREATE TABLE view_uv(
  dt char(20) not null primary key,
  uv int
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

统计UV的示例代码如代码14-6所示。
代码14-6 DataViewUV.scala

package chapter14
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
 * author: yuhui
 * descriptions:
 * date: 2024 - 10 - 26 3:50 下午
 *
 * drop table view_uv;
 * CREATE TABLE view_uv(
 * dt char(20) not null primary key,
 * uv int
 * )ENGINE=InnoDB DEFAULT CHARSET=utf8;
 *
 * 需求:
 *
 */
case class DataViewUV(dt: String, name: String, url: String)
object DataViewUV {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val lines = spark.sparkContext.textFile("doc/output_buffered.txt")
    // 将RDD关联了Schema,但依然是RDD
    val dataDF: DataFrame = lines.map(line => {
      val fields = line.split(",")
      DataViewUV(fields(0), fields(1), fields(2))
    }).toDF()

    dataDF.createTempView("dataTable")

    val view_uv = spark.sql(
      """
        |
        | select
        | dt ,
        | cast(count(DISTINCT name) as int) as uv
        | from dataTable
        | group by dt
        |
        |""".stripMargin)

    //  把数据保存到MySQL表中
    view_uv.rdd.foreach(line => {
      // 每条数据与MySQL建立连接
      // 把数据插入MySQL表操作
      // 1. 获取连接
      val connection: Connection = DriverManager.getConnection("jdbc:mysql:// localhost:3306/spark", "root", "yuhui888")
      // 2. 定义插入数据的SQL语句
      val sql = "insert into view_uv(dt,uv) values(?,?)"
      // 3. 获取PreParedStatement

      try {
        val ps: PreparedStatement = connection.prepareStatement(sql)

        // 4. 获取数据,给?号赋值
        ps.setString(1, line.getAs[String](0))
        ps.setInt(2, line.getAs[Int](1))
        // 执行
        ps.execute()
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    })
  }
}

执行上述代码,将数据存储到MySQL的view_uv表中,结果如图14-13所示。
在这里插入图片描述
在FineBI的“数据中心”中,加载MySQL中的view_uv表,形成FineBI中的spark_view_uv数据集,如图14-14所示。
在这里插入图片描述
在FineBI的“我的分析”中,加载FineBI中的spark_view_uv数据集,通过拖曳方式形成最终需要的图表。统计UV数据可视化展示如图14-15所示。横轴为日期,纵轴为UV总数。
在这里插入图片描述
14.2.4 统计TopN和可视化
由于结果数据需要存储到MySQL中,因此提前创建表格,MySQL的执行语句如下:

drop table view_top;
CREATE TABLE view_top(   
  dt char(20) ,
  url char(50),
  url_count int
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
select * from view_top;

求TopN涉及窗口函数的使用,下面对窗口函数的解题步骤进行说明。

  1. WITH RankedUrls AS (…) 定义了一个公用表表达式(CTE),它首先按dt和url分组,并计算每个dt中每个url出现的次数(url_count)。
  2. ROW_NUMBER() OVER (PARTITION BY dt ORDER BY COUNT(*) DESC) 为每个dt分组内的url分配一个排名,排名依据是url出现的次数(降序)。
  3. 在外部查询中,我们从RankedUrls CTE中选择dt、url和url_count,但只选择排名在前五的记录(WHERE rank <= 5)。
  4. 最后,我们按dt和rank对结果进行排序,以确保输出是有序的。

统计TopN的示例代码如代码14-7所示。
代码14-7 DataViewTopN.scala

package chapter14
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.SparkSession
/**
 * CREATE TABLE view_top(
 * dt char(20) ,
 * url char(50),
 * url_count int
 * )ENGINE=InnoDB DEFAULT CHARSET=utf8;
 *
 */
object DataViewTopN {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("")
      .master("local[*]")
      .getOrCreate()

    val lines = spark.sparkContext.textFile("doc/output_buffered.txt")
    // 将RDD关联了Schema,但依然是RDD
    val rddBean = lines.map(line => {
      val fields = line.split(",")
      new DataView(fields(0), fields(1), fields(2))
    })
    val dataDF = spark.createDataFrame(rddBean, classOf[DataView])
    dataDF.show()
    dataDF.createTempView("dataTable")
    /** *
     * 1)WITH RankedUrls AS (...) 定义了一个公用表表达式(CTE),它首先按dt和url分组,并计算每个dt中每个url出现的次数(url_count)
     * 2)ROW_NUMBER() OVER (PARTITION BY dt ORDER BY COUNT(*) DESC) 为每个dt分组内的url分配一个排名,排名依据是url出现的次数(降序)
     * 3)在外部查询中,我们从RankedUrls CTE中选择dt、url和url_count,但只选择排名在前五的记录(WHERE rank <= 5)
     * 4)最后,我们按dt和rank对结果进行排序,以确保输出是有序的
     */
    val frame = spark.sql(
      """
        |
        | WITH RankedUrls AS (
        |  SELECT
        |    dt,
        |    url,
        |    count(*) as url_count,
        |    ROW_NUMBER() OVER (PARTITION BY dt ORDER BY COUNT(*) DESC) AS rank
        |  FROM
        |    dataTable
        |  GROUP BY
        |    dt,
        |    url
        |)
        |
        |SELECT
        |  dt,
        |  url,
        |  cast(url_count as int) as url_count
        |FROM
        |  RankedUrls
        |WHERE
        |  rank <= 5
        |ORDER BY
        |  dt,
        |  rank;
        |
        |""".stripMargin)

    // 把数据保存到MySQL表中
    frame.rdd.foreach(line => {
      // 每条数据与MySQL建立连接
      // 把数据插入MySQL表操作
      // 1. 获取连接
      val connection: Connection = DriverManager.getConnection("jdbc:mysql:// localhost:3306/spark", "root", "yuhui888")
      // 2. 定义插入数据的SQL语句
      val sql = "insert into view_top(dt,url,url_count) values(?,?,?)"
      // 3. 获取PreParedStatement

      try {
        val ps: PreparedStatement = connection.prepareStatement(sql)
        // 4. 获取数据,给?号赋值
        ps.setString(1, line.getAs[String](0))
        ps.setString(2, line.getAs[String](1))
        ps.setInt(3, line.getAs[Int](2))
        // 执行
        ps.execute()
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    })
  }
}

执行上述代码,将数据存储到MySQL的view_top表中,结果如图14-16所示。
在这里插入图片描述
在FineBI的“数据中心”中,加载MySQL中的view_top表,形成FineBI中的spark_view_top数据集,如图14-17所示。
在这里插入图片描述
在FineBI的“我的分析”中,加载FineBI中的spark_view_top数据集,通过拖曳的方式形成最终需要的图表。统计UV数据可视化展示如图14-18所示。第一列为日期,第二列为URL,第三列为URL的个数。
在这里插入图片描述

14.3 Spark SQL电商数据分析

基于Spark SQL的电商实战案例,需要涵盖以下3个需求。
(1)在电商数据分析中,我们利用Spark SQL对销售数据进行深度挖掘。首先,我们计算每年的销售单数和销售总额,这通过GROUP BY语句按年份分组,并使用COUNT和SUM函数分别统计销售单数和金额。
(2)其次,我们查询每年金额最大的订单及其具体金额。这涉及两步操作:先使用窗口函数ROW_NUMBER按年份和订单金额降序为每个年份的订单编号,然后筛选出每年排名第一的订单。
(3)最后,我们计算每年最畅销的货品。这同样需要分组和排序,但这次是按年份和货品分组,使用SUM函数统计每年每个货品的销售数量,再通过排序和限制结果集来找出每年销量最高的货品。
通过Spark SQL,我们能够高效地处理大规模电商销售数据,快速响应各种业务分析需求,如销售趋势分析、热销商品推荐等,为电商企业的决策提供有力支持。

14.3.1 数据和表格说明

本案例有三张表,每张表的数据字段如下。

  • tbDate:时间维度表,用于记录交易的时间信息。
  • tbStockDetail:订单明细表,用于记录交易的详细信息。
  • tbStock:关联表,用于将订单、时间、地点的信息连接在一起。

       三张表的数据字典如图14-19所示。每个订单可能包含多个货品,每个订单可以产生多次交易,不同的货品有不同的单价。也就是说,tbStock与tbStockDetail是一对多的关系,ordernumber(订单号)与itemid(货品)也是一对多的关系。其中,tbDate时间跨度从2013−2024年,tbStock时间跨度从2014−2020年。
在这里插入图片描述

14.3.2 加载数据

(1)YARN启动spark-shell,如图14-20所示。
在这里插入图片描述
(2)加载tbStock表。创建tbStock表的DataFrame,在spark-shell命令行中执行以下语句:

scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
defined class tbStock

scala> val tbStockRdd = spark.sparkContext.textFile("hdfs:// ns/spark_book_data/tbStock.txt")
tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at <console>:23
​
scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]
​
scala> tbStockDS.show()

执行以上语句,输出结果为:

+------------+----------+---------+                                             
| ordernumber|locationid|   dateid|
+------------+----------+---------+
|BYSL00000893|      ZHAO|2017-8-23|
|BYSL00000897|      ZHAO|2017-8-24|
|BYSL00000898|      ZHAO|2017-8-25|
|BYSL00000899|      ZHAO|2017-8-26|
|BYSL00000900|      ZHAO|2017-8-26|
|BYSL00000901|      ZHAO|2017-8-27|
|BYSL00000902|      ZHAO|2017-8-27|
|BYSL00000904|      ZHAO|2017-8-28|
|BYSL00000905|      ZHAO|2017-8-28|
|BYSL00000906|      ZHAO|2017-8-28|
|BYSL00000907|      ZHAO|2017-8-29|
|BYSL00000908|      ZHAO|2017-8-30|
|BYSL00000909|      ZHAO| 2017-9-1|
|BYSL00000910|      ZHAO| 2017-9-1|
|BYSL00000911|      ZHAO|2017-8-31|
|BYSL00000912|      ZHAO| 2017-9-2|
|BYSL00000913|      ZHAO| 2017-9-3|
|BYSL00000914|      ZHAO| 2017-9-3|
|BYSL00000915|      ZHAO| 2017-9-4|
|BYSL00000916|      ZHAO| 2017-9-4|
+------------+----------+---------+
only showing top 20 rows

(3)加载tbStockDetail表。创建tbStockDetail表的DataFrame,在spark-shell命令行中执行以下语句:

scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable
defined class tbStockDetail
​
scala> val tbStockDetailRdd = spark.sparkContext.textFile("hdfs:// ns/spark_book_data/tbStockDetail.txt")
tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at <console>:23
​
scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr=> tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]
​
scala> tbStockDetailDS.show()

执行以上语句,输出结果为:

+------------+------+--------------+------+-----+------+
| ordernumber|rownum|        itemid|number|price|amount|
+------------+------+--------------+------+-----+------+
|BYSL00000893|     0|FS527258160501|    -1|268.0|-268.0|
|BYSL00000893|     1|FS527258169701|     1|268.0| 268.0|
|BYSL00000893|     2|FS527230163001|     1|198.0| 198.0|
|BYSL00000893|     3|24627209125406|     1|298.0| 298.0|
|BYSL00000893|     4|K9527220210202|     1|120.0| 120.0|
|BYSL00000893|     5|01527291670102|     1|268.0| 268.0|
|BYSL00000893|     6|QY527271800242|     1|158.0| 158.0|
|BYSL00000893|     7|ST040000010000|     8|  0.0|   0.0|
|BYSL00000897|     0|04527200711305|     1|198.0| 198.0|
|BYSL00000897|     1|MY627234650201|     1|120.0| 120.0|
|BYSL00000897|     2|01227111791001|     1|249.0| 249.0|
|BYSL00000897|     3|MY627234610402|     1|120.0| 120.0|
|BYSL00000897|     4|01527282681202|     1|268.0| 268.0|
|BYSL00000897|     5|84126182820102|     1|158.0| 158.0|
|BYSL00000897|     6|K9127105010402|     1|239.0| 239.0|
|BYSL00000897|     7|QY127175210405|     1|199.0| 199.0|
|BYSL00000897|     8|24127151630206|     1|299.0| 299.0|
|BYSL00000897|     9|G1126101350002|     1|158.0| 158.0|
|BYSL00000897|    10|FS527258160501|     1|198.0| 198.0|
|BYSL00000897|    11|ST040000010000|    13|  0.0|   0.0|
+------------+------+--------------+------+-----+------+
only showing top 20 rows

(4)加载tbDate表。创建tbDate表的DataFrame,在spark-shell命令行中执行以下语句:

scala> case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int) extends Serializable
defined class tbDate
​
scala> val tbDateRdd = spark.sparkContext.textFile("hdfs:// ns/spark_book_data/tbDate.txt")
tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at <console>:23
​
scala> val tbDateDS = tbDateRdd.map(_.split(",")).map(attr=> tbDate(attr(0),attr(1).trim().toInt, attr(2).trim().toInt,attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int ... 8 more fields]
​
scala> tbDateDS.show()

执行以上语句,输出结果为:

+---------+------+-------+-----+---+-------+----+-------+------+---------+
|   dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
| 2013-1-1|201301|   2013|    1|  1|      3|   1|      1|     1|        1|
| 2013-1-2|201301|   2013|    1|  2|      4|   1|      1|     1|        1|
| 2013-1-3|201301|   2013|    1|  3|      5|   1|      1|     1|        1|
| 2013-1-4|201301|   2013|    1|  4|      6|   1|      1|     1|        1|
| 2013-1-5|201301|   2013|    1|  5|      7|   1|      1|     1|        1|
| 2013-1-6|201301|   2013|    1|  6|      1|   2|      1|     1|        1|
| 2013-1-7|201301|   2013|    1|  7|      2|   2|      1|     1|        1|
| 2013-1-8|201301|   2013|    1|  8|      3|   2|      1|     1|        1|
| 2013-1-9|201301|   2013|    1|  9|      4|   2|      1|     1|        1|
|2013-1-10|201301|   2013|    1| 10|      5|   2|      1|     1|        1|
|2013-1-11|201301|   2013|    1| 11|      6|   2|      1|     2|        1|
|2013-1-12|201301|   2013|    1| 12|      7|   2|      1|     2|        1|
|2013-1-13|201301|   2013|    1| 13|      1|   3|      1|     2|        1|
|2013-1-14|201301|   2013|    1| 14|      2|   3|      1|     2|        1|
|2013-1-15|201301|   2013|    1| 15|      3|   3|      1|     2|        1|
|2013-1-16|201301|   2013|    1| 16|      4|   3|      1|     2|        2|
|2013-1-17|201301|   2013|    1| 17|      5|   3|      1|     2|        2|
|2013-1-18|201301|   2013|    1| 18|      6|   3|      1|     2|        2|
|2013-1-19|201301|   2013|    1| 19|      7|   3|      1|     2|        2|
|2013-1-20|201301|   2013|    1| 20|      1|   4|      1|     2|        2|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
only showing top 20 rows

(5)注册3张临时表,在spark-shell命令行中执行以下语句:

scala> tbStockDS.createOrReplaceTempView("tbStock")
scala> tbDateDS.createOrReplaceTempView("tbDate")
scala> tbStockDetailDS.createOrReplaceTempView("tbStockDetail")

14.3.3 计算每年的销售单数和销售总额

本小节统计所有订单中每年的销售单数和销售总额。三张表连接后,以count(distinct a.ordernumber)统计销售单数、sum(b.amount)统计销售总额。
在spark-shell命令行中执行以下语句:

spark.sql("
SELECT c.theyear,
         COUNT(DISTINCT a.ordernumber),
         SUM(b.amount)
FROM tbStock a
JOIN tbStockDetail b
    ON a.ordernumber = b.ordernumber
JOIN tbDate c
    ON a.dateid = c.dateid
GROUP BY  c.theyear 
ORDER BY  c.theyear desc
").show

执行以上语句,输出结果为:

+-------+---------------------------+--------------------+
|theyear|count(DISTINCT ordernumber)|         sum(amount)|
+-------+---------------------------+--------------------+
|   2020|                         94|  210949.65999999995|
|   2019|                       2619|   6323697.189999991|
|   2018|                       4861|1.4674295299999997E7|
|   2017|                       4885|1.6719354559999991E7|
|   2016|                       3772|        1.36809829E7|
|   2015|                       3828| 1.325756415000001E7|
|   2014|                       1094|   3268115.499200004|
+-------+---------------------------+--------------------+

14.3.4 查询每年最大金额的订单及其金额

本小节完成统计每年最大金额订单的销售额,统计分为两个步骤进行。
(1)统计每年每个订单一共有多少销售额。在spark-shell命令行中执行以下语句:

spark.sql("
SELECT a.dateid,
         a.ordernumber,
         SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b
    ON a.ordernumber = b.ordernumber
GROUP BY  a.dateid, a.ordernumber
ORDER BY  a.dateid desc
").show

执行以上语句,输出结果为:

+--------+------------+------------------+                                      
|  dateid| ordernumber|       SumOfAmount|
+--------+------------+------------------+
|2020-1-9|LZSL00016335|            1096.0|
|2020-1-9|TSSL00016329|            4542.0|
|2020-1-9|RMSL00016334|            2174.0|
|2020-1-9|SSSL00016327|            6883.4|
|2020-1-9|GHSL00016326|            1427.0|
|2020-1-9|DYSL00016336|             498.0|
|2020-1-9|DGSL00016328|            2894.0|
|2020-1-8|DGSL00016324|            2420.0|
|2020-1-8|RMSL00016325|             319.0|
|2020-1-8|LZSL00016321|             349.0|
|2020-1-8|TSSL00016322|            3177.8|
|2020-1-8|SSSL00016320|3781.0999999999995|
|2020-1-8|GHSL00016323|            1148.0|
|2020-1-7|RMSL00016317|            2007.0|
|2020-1-7|LZSL00016315|             697.0|
|2020-1-7|SSSL00016313|1793.0799999999997|
|2020-1-7|TSSL00016319|            1715.0|
|2020-1-7|DGSL00016311|             808.0|
|2020-1-7|DYSL00016316|             658.0|
|2020-1-7|GHSL00016318|             937.0|
+--------+------------+------------------+
only showing top 20 rows

(2)以上一步的查询结果为基础表,与表tbDate使用dateid字段进行Join操作,求出每年最大金额订单的销售额。在spark-shell命令行中执行以下语句:

spark.sql("
SELECT theyear,
         MAX(c.SumOfAmount) AS SumOfAmount
FROM 
    (SELECT a.dateid,
         a.ordernumber,
         SUM(b.amount) AS SumOfAmount
    FROM tbStock a
    JOIN tbStockDetail b
        ON a.ordernumber = b.ordernumber
    GROUP BY  a.dateid, a.ordernumber ) c
JOIN tbDate d
    ON c.dateid = d.dateid
GROUP BY  theyear
ORDER BY  theyear DESC
").show 

执行以上语句,输出结果为:

+-------+------------------+
|theyear|        SumOfAmount|
+-------+------------------+
|   2020|13065.280000000002|
|   2019|25813.200000000008|
|   2018|             55828.0|
|   2017|            159126.0|
|   2016|             36124.0|
|   2015|38186.399999999994|
|   2014| 23656.79999999997|
+-------+------------------+

14.3.5 计算每年最畅销的货品

本小节统计每年最畅销的货品(哪个货品销售额amount在当年最高,哪个就是最畅销的 货品)。
(1)先求出每年每个货品的销售额。在spark-shell命令行中执行以下语句:

spark.sql("
SELECT c.theyear,
         b.itemid,
         SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b
    ON a.ordernumber = b.ordernumber
JOIN tbDate c
    ON a.dateid = c.dateid
GROUP BY  c.theyear, b.itemid
ORDER BY  c.theyear DESC
").show

执行以上语句,输出结果为:

+-------+--------------+-----------+
|theyear|         itemid|SumOfAmount|
+-------+--------------+-----------+
|   2020|01127157980401|        58.0|
|   2020|84127374060306|        58.0|
|   2020|YL428437620101|       198.0|
|   2020|JX329467480104|      1276.2|
|   2020|79126117060102|        58.0|
|   2020|04128137019202|       120.0|
|   2020|SN828409520181|       158.0|
|   2020|DP219372102201|       319.0|
|   2020|BM217392020101|       150.0|
|   2020|77926452010181|       238.0|
|   2020|00160000000000|        26.0|
|   2020|QY128121070406|        98.0|
|   2020|QY128111059202|        58.0|
|   2020|ZX219359720101|       358.0|
|   2020|JX329459410106|       399.0|
|   2020|30627235871007|        10.0|
|   2020|XR127168310306|       232.0|
|   2020|QY127170120704|        58.0|
|   2020|JX329467870201|       957.2|
|   2020|QY128136080402|        98.0|
+-------+--------------+-----------+
only showing top 20 rows

(2)在上一步的基础上,统计每年单个货品中的最大金额。在spark-shell命令行窗口中执行以下语句:

spark.sql("
SELECT d.theyear,
         MAX(d.SumOfAmount) AS MaxOfAmount
FROM 
    (SELECT c.theyear,
         b.itemid,
         SUM(b.amount) AS SumOfAmount
    FROM tbStock a
    JOIN tbStockDetail b
        ON a.ordernumber = b.ordernumber
    JOIN tbDate c
        ON a.dateid = c.dateid
    GROUP BY  c.theyear, b.itemid ) d
GROUP BY  d.theyear
ORDER BY  d.theyear desc 
").show

执行以上语句,输出结果为:

+-------+------------------+
|theyear|        MaxOfAmount|
+-------+------------------+
|   2020|              4494.0|
|   2019|             30029.2|
|   2018| 98003.59999999995|
|   2017|             70225.1|
|   2016|113720.60000000005|
|   2015| 56627.33000000001|
|   2014|            53401.76|
+-------+------------------+

14.4 Spark SQL金融数据分析

       在金融领域,对时间序列数据的统计分析至关重要。某金融机构拥有大量股票交易数据,包括每日的开盘价、收盘价、最高价、最低价和交易量等。为了评估股票的表现,预测市场趋势,该机构决定利用Apache Spark进行深度统计分析。
       面对海量的交易数据,Spark的分布式计算能力显得尤为重要。机构的技术团队利用Spark读取存储于Hadoop分布式文件系统(HDFS)中的交易数据,并转换为DataFrame。随后,他们利用Spark SQL和DataFrame API计算了股票价格的最大值、最小值、平均值和标准差,以评估价格的波动情况。
       此外,为了更深入地了解数据的分布情况,团队还计算了中位数和四分位数。这些统计值不仅有助于识别数据的异常值,还能为制定投资策略提供重要参考。通过Spark,该金融机构成功实现了对大规模金融数据的快速统计分析,为市场预测和风险管理提供了有力支持。

14.4.1 数据准备

       为了让读者直观了解本次实战案例的数据和字段类型,我们将数据和字段类型以图片形式进行展示。其中,图14-21所示是本次实验的表格和字段,前10条案例数据是以CSV文件进行存储的,文件名为DataAnalysis.csv。
在这里插入图片描述
通过Spark读取DataAnalysis.csv数据,并得到一个DataFrame,示例代码如代码14-8所示。
代码14-8 DataAnalysis.scala

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

case class DataAnalysis(feature1: Double, feature2: Double, feature3: Double, feature4: Double, label: String)

object DataAnalysis {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val lines = spark.sparkContext.textFile("doc/DataAnalysis.csv")
    // 将RDD关联了Schema,但依然是RDD
    val dataDF: DataFrame = lines.map(line => {
      val fields = line.split(",")
      DataAnalysis(fields(0).toDouble, fields(1).toDouble, fields(2).toDouble, fields(3).toDouble, fields(4))
    }).toDF()
    // 显示全部值
    dataDF.show()  
  }
}

执行上述代码,执行结果如图14-22所示。
在这里插入图片描述

14.4.2 最大值和最小值

       使用Spark SQL统计最大值或最小值,首先使用agg函数对数据进行聚合,这个函数一般配合group by使用。如果不使用group by,就相当于对所有的数据进行聚合。
随后,直接使用max和min函数聚合就可以。想要输出多个结果,中间用逗号分开,并使用as给聚合后的结果赋予一个列名,相当于SQL中的as:

import spark.implicits._
dataDF.agg(max($"feature1") as "max_feature1",
        min($"feature2") as "min_feature2")
        .show()

执行结果如下:

+------------+------------+
|max_feature1|max_feature2|
+------------+------------+
|           9.5|           1.5|
+------------+------------+

上面代码中的$代表一列,相当于col函数:

import spark.implicits._
dataDF.agg(max(col("feature1")) as "max_feature1",
        min(col("feature2")) as "min_feature2")
        .show()
14.4.3  平均值
平均值的计算使用mean函数:
dataDF.agg(mean($"feature1") as "mean_feature1",
        mean($"feature2") as "mean_feature2")
        .show()

执行结果如图14-23所示。
在这里插入图片描述

14.4.4 样本标准差和总体标准差

       样本标准差的计算可以使用stddev函数和stddev_samp函数,而总体标准差可以使用stddev_pop方法。需要注意的是,这里的函数和Hive SQL还是存在区别的。在Hive SQL中,stddev函数代表的是总体标准差;而在Spark SQL中,stddev函数代表的是样本标准差。我们可以查看一下源代码,如图14-24所示。
在这里插入图片描述
通过代码验证一下:

dataDF.agg(stddev($"feature1") as "stddev_feature1",
        stddev_pop($"feature1") as "stddev_pop_feature1",
        stddev_samp($"feature1") as "stddev_samp_feature1")
        .show()

执行结果如图14-25所示。
在这里插入图片描述

14.4.5 中位数

       Spark SQL中没有直接计算中位数的方法,所以我们借鉴14.4.4节的思路,来回顾一下。计算中位数也好,计算四分位数也好,无非就是要取得两个位置。假设我们的数据从小到大排,按照1, 2, 3, …, n进行编号。当数量n为奇数时,取编号(n + 1)/2位置的数即可;当n为偶数时,取(int)(n + 1)/2位置和(int)(n + 1)/2 + 1位置的数的平均值即可。但二者其实可以统一到一个公式中:
(1)假设n = 149(奇数),(n+1)/2=75,小数部分为0,那么中位数=75位置的数×(1 − 0)+76位置的数×(0 − 0)。
(2)假设n = 150(偶数),(n+1)/2=75.5,小数部分为0.5,那么中位数=75位置的数×(1 − 0.5)+76位置的数×(0.5 − 0)。
因此,可以把这个过程分解为三个步骤:第一步是给数字进行编号,Spark中同样使用row_number()函数(该函数的具体用法后续再展开,这里只提供一个简单的例子);第二步是计算(n+1)/2的整数部分和小数部分;第三步是根据公式计算中位数。
首先使用row_number()函数给数据进行编号:

val windowFun = Window.orderBy(col("feature3").asc)
dataDF.withColumn("rank",row_number().over(windowFun)).show(false)

执行结果如图14-26所示。
在这里插入图片描述
接下来确定中位数的位置,这里我们分别拿到(n + 1)/2的整数部分和小数部分:

val median_index = dataDF.agg(
  ((count($"feature3") + 1) / 2).cast("int") as "rank",
  ((count($"feature3") + 1) / 2 % 1) as "float_part"
)
median_index.show()

执行结果如图14-27所示。
在这里插入图片描述

       这里小数部分不为0,意味着我们不仅要拿到rank=75的数,还要拿到rank=76的数。我们最好把其放到一行上,这里同样使用lead函数,lead函数的作用就是拿到分组排序后,下一个位置或下n个位置的数。我们在后面还会细讲,这里只是抛砖引玉:

val windowFun = Window.orderBy(col("feature3").asc)
dataDF.withColumn("next_feature3", lead(col("feature3"), 1).over(windowFun)).show(false)

执行结果如图14-28所示。
在这里插入图片描述
接下来Join两个表,按公式计算中位数就可以了。完整的代码如下:

val median_index = dataDF.agg(
  ((count($"feature3") + 1) / 2).cast("int") as "rank",
  ((count($"feature3") + 1) / 2 % 1) as "float_part"
)

dataDF.withColumn("next_feature3", lead(col("feature3"), 1).over(windowFun)).show(false)

dataDF.withColumn("rank", row_number().over(windowFun))
          .withColumn("next_feature3", lead(col("feature3"), 1).over(windowFun))
          .join(median_index, Seq("rank"), "inner")
          .withColumn("median", ($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
          .show()

执行结果如图14-29所示。
在这里插入图片描述

14.4.6 四分位数

先来复习一下四分位数的两种解法:n+1方法和n−1方法。
对于n+1方法,如果数据量为n,则四分位数的位置为:

  • Q1的位置= (n+1)×0.25。
  • Q2的位置= (n+1)×0.5。
  • Q3的位置= (n+1)×0.75。
    对于n−1方法,如果数据量为n,则四分位数的位置为:
  • Q1的位置=1+(n−1)× 0.25。
  • Q2的位置=1+(n−1)× 0.5。
  • Q3的位置=1+(n−1)×0.75。

这里的思路和求解中位数的思路是一样的,我们分别实现这两种方法。首先是n+1方法:

val windowFun = Window.orderBy(col("feature3").asc)
val q1_index = dataDF.agg(
  ((count($"feature3") + 1) * 0.25).cast("int") as "rank",
  ((count($"feature3") + 1) * 0.25 % 1) as "float_part"
)
dataDF.withColumn("rank", row_number().over(windowFun))
  .withColumn("next_feature3", lead(col("feature3"), 1).over(windowFun))
  .join(q1_index, Seq("rank"), "inner")
  .withColumn("q1", ($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
  .show()

执行结果如图14-30所示。
在这里插入图片描述
接下来是n−1方法:

val windowFun = Window.orderBy(col("feature3").asc)
val q1_index_sub = dataDF.agg(
  ((count($"feature3") - 1) * 0.25).cast("int") as "rank",
  ((count($"feature3") - 1) * 0.25 % 1) as "float_part"
)
dataDF.withColumn("rank", row_number().over(windowFun))
  .withColumn("next_feature3", lead(col("feature3"), 1).over(windowFun))
  .join(q1_index_sub, Seq("rank"), "inner")
  .withColumn("q1", ($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
  .show()

执行结果如图14-31所示。
在这里插入图片描述

14.5 本章小结

       本章通过4个Spark实战案例,全面展示了Spark在不同领域的应用价值。在Spark Core日志统计案例中,我们掌握了处理和分析大规模日志数据的方法。通过日志分析可视化案例,我们学会了如何将分析结果以直观的方式呈现,增强了数据的可读性和洞察力。电商实战案例让我们领略了Spark SQL在处理电商大数据时的强大功能,为业务决策提供了有力支持。而数据分析案例则展示了Spark SQL在复杂数据分析任务中的广泛应用和卓越性能。这些案例不仅加深了我们对Spark的理解,也提升了我们的实战技能。

本书其他章节

  1. Spark大数据开发与应用案例(视频教学版)(一)–文前
  2. Spark大数据开发与应用案例(视频教学版)(二)–第一章上
  3. Spark大数据开发与应用案例(视频教学版)(三)–第一章下
  4. Spark大数据开发与应用案例(视频教学版)(四)–第二章上
  5. Spark大数据开发与应用案例(视频教学版)(五)–第二章下
  6. Spark大数据开发与应用案例(视频教学版)(六)–第三章上
  7. Spark大数据开发与应用案例(视频教学版)(七)–第三章下
  8. Spark大数据开发与应用案例(视频教学版)(八)–第四章上
  9. Spark大数据开发与应用案例(视频教学版)(九)–第四章下
  10. Spark大数据开发与应用案例(视频教学版)(十)–第五章
  11. Spark大数据开发与应用案例(视频教学版)(十一)–第六章
  12. Spark大数据开发与应用案例(视频教学版)(十二)–第七章
  13. Spark大数据开发与应用案例(视频教学版)(十三)–第八章
  14. Spark大数据开发与应用案例(视频教学版)(十四)–第九章
  15. Spark大数据开发与应用案例(视频教学版)(十五)–第十章上
  16. Spark大数据开发与应用案例(视频教学版)(十六)–第十章下
  17. Spark大数据开发与应用案例(视频教学版)(十七)–第十一章
  18. Spark大数据开发与应用案例(视频教学版)(十八)–第十二章
  19. Spark大数据开发与应用案例(视频教学版)(十九)–第十三章
  20. Spark大数据开发与应用案例(视频教学版)(二十)–第十四章
  21. Spark大数据开发与应用案例(视频教学版)(二十一)–第十五章

点赞+收藏+关注

在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐