1.1文章介绍介绍

本文主要介绍大数据相关的技术和项目

1.2

目录

1.1文章介绍介绍

1.2

项目介绍

1.3 项目指标

1.3.1离线指标

1.3.2实时指标

1.3.3最难的两个指标

1.4项目遇到问题

1.4.1 Sqoop

1.4.2Flume

1.4.3Kafka

1.4.4Hadoop

1.5 项目相关流程问题

1. 如何保证你写的 sql 正确性?

2. 测试数据哪来的?

3. 测试环境什么样?

4. 测试之后如何上线?

5. 你做的项目工作流程是什么?

6. 项目实际工作流程?

7.公司项目版本迭代多久一次多久一次 ,迭代到哪个版本?

8.项目开发中每天做什么事?

9.DWD层做了哪些事?

DWD层做了哪些事?

项目的收获?

第2章 涉及技术

2.1 Linux&Shell

2.1.1 Linux常用高级命令

2.1.2 Linux常用工具及写过脚本

2.1.3 Shell中单引号和双引号区别

2.2 Hadoop

2.2.1 Hadoop基本概念

2.2.2 Hadoop常用端口号

2.2.3 Hadoop 配置文件以及简单的Hadoop 集群搭建

2.2.4 HDFS 读流程和写流程

2.2.5 NameNode工作机制SecongdaryNameNode工作流程恢复流程

2.2.6 HDFS组成架构及主要作用

2.2.7 Hadoop集群启动时要启动哪些进程?作用?

2.2.8 MapReduce工作原理

2.2.9 MapReduce中shuffer机制

2.2.10 MapReduce中切片机制

2.2.11 Yarn工作机制

2.2.12 hadoop的调Yarn调度器,你们使⽤的是哪种策略,为什么?

2.2.13Hadoop、MapReduce数据倾斜怎么处理?

2.2.14HDFS小文件处理

2.2.15Hadoop宕机

2.2.16Hadoop项目经验之基准测试

2.2.16 HDFS在上传文件时,其中一个 DataNode 突然挂掉怎么办

2.2.18 项目经验之压缩

2.2.19 Hadoop优化

2.3 ZooKeeper

2.3.1 ZooKeeper概述

2.3.2 ZooKeeper选举机制

2.3.3 ZooKeeper常用命令

2.3.4 ZooKeeper监听器原理

2.3.5 ZooKeeper 部署方式有哪几种?集群中角色有哪些?最少要几台机器?

2.4 Flume

2.4.1 Flume概述

2.4.2 Flume组成,Put事务,Take事务

2.4.3 Flume拦截器

2.4.4 Flume 监控器

2.4.5 Flume 采集数据会丢失吗?

2.4.6 Flume 参数调优

2.4.7 Flume 优化

2.5 Kafka

2.5.1 Kafka概述

2.5.2 为什么使用kafka

2.5.3 kafka消息队列与传统消息队列区别

2.5.4 kafka主备模式

2.5.5 kafka高吞吐量

2.5.6 kafka架构

2.5.7 kafka机器数量、副本设定、日志保存时间、硬盘大小

2.5.8 kafka中数据量计算

2.5.9 kafka的ISR副本同步队列

2.5.10 kafka挂掉

2.5.11 kafka数据重复

2.5.12 kafka消息数据积压, Kafka消费能力不足怎么处理?

2.5.13 Kafka 消费过的消息如何再消费?

2. 5.14 kafka 的数据是放在磁盘上还是内存上,为什么速度会快?

2.5.15 为什么 Kafka 不支持读写分离?

2.65 MySQL

2.65.1 DDL,DML,DQL,DCL

2.65.1 索引

2.65.2 存储结构

2.65.3 b-tree 和b+tree 的区别

2.65.3 MySQL 的事务要素ACID以及并发问题,脏读幻读和隔离级别等

2.6 Hive

2.6.1 Hive概述

2.6.2 Hive和数据库的比较

2.6.3 Hive内部表和外部表区别

2.6.4 4个by区别

2.6.5 系统函数

2.6.6 窗口函数

2.6.7 自定义UDF、UDTF函数

2.6.8 Union与Union all区别

2.6.9 Hive有哪些计算引擎,区别?

2.6.10 Hive索引吗?

2.6.11 运维如何对hive进行调度

2.6.12 使用hive解析过JSON串吗?

2.6.13 sort by 和 order by,group by, distribute by区别?

2.6.14 hive分区和分桶

2.6.15 hive数据倾斜处理?

2.6.16 hive中表有几类?

2.6.17 hive有哪些数据类型及类型转换?

2.6.18 hive 中drop、truncate和delete区别

2.6.19 hive表的连接方式

2.6.20 hive中count(*)count(1)和count(字段区别)

2.6.21 hive中like_和like%区别

2.6.22 hive性能优化

2.6.23 sql性能优化以及in和exist区别?

子查询语句可以通过in关键字实现,一个查询语句的条件落在另一个select语句的查询结果中。程序先运行在嵌套在最内层的语句,再运行外层的语句。但缺点是mysql执行子查询时,需要创建临时表,查询完毕后,需要再删除这些临时表,有一些额外的性能消耗。

2.7 Hbase

2.7.1 Hbase概述

2.7.2 Hbase与hadoop的关系

2.7.3 Hbase读流程

2.7.4 Hbase写流程

2.7.5 Hbase数据flush过程

2.7.6 Hbase中rowkey原则

2.7.7. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些

2.7.8. HBase 中 compact 用途是什么,什么时候触发,分为哪两种?

2.8 Spark

2.8.1 Spark概述与运行流程

2.8.2 Spark有哪几种部署方式?请分别简要论述

2.8.3 Spark提交任务参数

2.8.3 如何理解Spark 中的血统概念(RDD)

2.8.4 简述 Spark的宽窄依赖,以及 的宽窄依赖,以及 Spark如何划分 stage,每个 ,每个 stage又根据什么决定 又根据什么决定 task个数 ?

2.8.5 请列举 Spark的 transformation算子

2.8.6 请列举 Spark的active算子

2.8.7 请列举 Spark的引起shuffer的算子

2.8.8 spark中shuffer机制

2.8.9 spark和mapReduce中shuffer区别

2.8.10 简述 Spark中共享变量(广播和累加器)的基本原理与用途

2.8.11 如何使用Spark实现TopN的获取(描述思路或使用伪代码)

2.8.11 spark如何保证宕机迅速恢复?

2.8.12 hadoop 和 spark 的相同点和不同点?

2.8.12 RDD 持久化原理?

2.8.13 checkpoint 检查点机制?

2.8.14 RDD 机制理解吗?

2.8.15 SparkStreaming以及基本工作原理

2.8.16 DStream以及基本工作原理

2.8.17 spark 有哪些组件?

2.8.18 Spark 主备切换机制原理知道吗?

2.8.19 spark 解决了 hadoop 的哪些问题?

2.8.20 数据倾斜的产生和解决办法?

2.8.21 RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么

2.8.22 Spark Streaming 优雅关闭

2.8.23 SparkStreaming有哪几种方式消费 Kafka中的数据,区别?

2.8.23 介绍一下 cogroup rdd 实现原理,你在什么场景下用过这个 rdd?

2.8.24 Spark 中的 OOM 问题?

2.8.25 Spark SQL 是如何将数据写到 Hive 表的?

2.8.26 通常来说,Spark 与 MapReduce 相比,Spark 运行效率更高。请 说明效率更高来源于 Spark 内置的哪些机制?

2.8.27 Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?

2.9.1 Flink概述

2.9.2 Flink 集群有哪些角色?各自有什么作用

2.9.3 介绍一下Flink的容错机制

2.9.4 Flink 相比 Spark Streaming 有什么区别

2.9.5 Flink 常用的算子有哪些

2.9.6 如何处理生产环境中的数据倾斜问题

2.9.7 Flink 中的 Time 有哪几种

2.9.8 Flink 中 window 出现数据倾斜怎么解决

2.9.9 Flink有没重启策略?说说有哪几种?

2.10 Sqoop

2.10.1 Sqoop概述

2.10.2 Sqoop常用命令

2.10.3 Sqoop导入导出Null存储一致性问题

2.10.4 Sqoop底层运行的任务是什么 底层运行的任务是什么

2.10.5 Sqoop一天导 一天导 入多少数据

2.10.6 Sqoop数据导出的时候一次执行多长间?

2.10.7 Sqoop在导入数据的时候倾斜在导入数据的时候倾斜

2.10.8 Sqoop数据导出Parquet(项目中遇到的问题)

第3章 数据仓库

3.1 概念

3.1.1 数据集市与数据仓库的概念

3.1.2 数仓分层和如何设计数据仓库?

3.1.3 关系型数据库范式理论

3.1.4 数据模型

3.1.5 SKU&SPU

3.2 数仓建模

3.2.1 关系建模与维度建模

3.2.2 维度表和事实表

3.2.3 拉链表、全量表、增量表、流水表

3.2.4 数仓建模流程

3.2.5 数仓环境之hive引擎

3.2.6 即席查询数据仓库

3.2.7 数据仓库每天跑多少张表,大概什么时候运行,运行多久?

3.2.8 活动的话,数据量会增加多少? 怎么解决?

3.2.9 数仓中使用的哪种文件存储格式 数仓中使用的哪种文件存储格式?

3.2.10 你感觉数仓建设中最重要的是什么

第3章 代码

3.1 电商指标

3.1.1 指标体系

3.1.2 数据分析

3.2 手写代码

3.2.1 手写 Spark-WordCount

3.2.2 冒泡排序

3.2.3 sql语句连续7天登录的用户信息

3.2.4 用一条 SQL 语句显示所有可能的比赛组合

3.2.5 用Scala编写Spark程序实现TopN

3.2.6 找出所有科目成绩都大于某一学科平均成绩的学生

3.2.7 用一条 SQL 语句查询出每门课都大于 80 分的学生姓名


项目介绍

这是一个基于分布式集群而生的大数据项目。

1.3 项目指标

1.3.1离线指标

日活、月活、周活、留存、留存率、新增(日、周、年)、转化率、流失、回流、七天内连续3天登录(点赞、收藏、评价、购买、加购、下单、活动)、连续3周(月)登录、GMV、复购率、复购率排行、点赞、评论、收藏、领优惠价人数、使用优惠价、沉默、值不值得买、退款人数、退款率topn 热门商品

日活/周活/月活统计

(每日的根据 key 聚合,求 key 的总数)

我们先根据用户ID和访问日期去重,再统计每天的访问用户数。

SELECT COUNT(DISTINCT user_id) AS user_cnt,DATE(create_ts) AS view_day FROM user_trace GROUP BY DATE(create_ts);

2. 用户新增

每日新增(每日活跃设备 left join 每日新增表,如果 join 后,每日新

增表的设备 id 为空,就是新增) 方法1:

select r.date1,ifull(r2.count_num,0) from
record r left join(
select r1.date1,count(*) count_num
from record r1 where r1.date1=(select min(date1) from record group by id)) r2 on r.date1=r2.date1
group by r1.date1
order by r1.date1

方法2:

select date1,sum(case when date_num=1 then 1 else 0 end) from (select date1,row_number()over(partition by user_id order by date) date_num 
from login)
group by date1
order by date1

3. 用户留存率

(一周留存)10 日新增设备明细 join 11日活跃设备明细表,就是 10日留存的。注意每日留存,一周留存 。所谓留存,就是指某日创建的账号在后续自然日登录的比例。

比如3月1日新增账号创建数为100,在3月2日这部分用户登录数为51,那么3月1日新增用户的次日留存率为51/100=51%。

1.考虑到用户每天登录的次数不一定只有一次,为了方面后续的数据处理,可以先对登录数据按照日期和用户id进行去重DISTINCT处理;

2.为了计算某条登录日志是该用户创建账号后的第几天登录,我们可以用用户登录日志和账号创建日志进行inner join(这里考虑到不在统计周期内的创建账号的用户数据也会记录在用户登录日志里,所以去掉);

3.然后用登录日期字段和创建账户字段进行差值DATEDIFF获取第几天登录;

4.对于第0天登录的数据则可以理解为新增用户数,第N(≥1)天登录的数据则为这批新增用户后续有登录的用户数;

5.用第N天登录的数据 / 新增用户数 就是对应第N天留存率。

SELECT
  create_date
, 新增用户数
, concat(CAST(ROUND((100 * 次日留存) / 新增用户数,2) AS char), '%') 次日留存率
, concat(CAST(ROUND((100 * 3日留存) / 新增用户数,2) AS char), '%') 3日留存率
, concat(CAST(ROUND((100 * 7日留存) / 新增用户数,2) AS char), '%') 7日留存率
FROM
  (
   SELECT
     create_date
   , count((CASE WHEN (day_diff = 0) THEN role_id END)) 新增用户数
   , count((CASE WHEN (day_diff = 1) THEN role_id END)) 次日留存
   , count((CASE WHEN (day_diff = 2) THEN role_id END)) 3日留存
   , count((CASE WHEN (day_diff = 7) THEN role_id END)) 7日留存
   FROM
     (
      SELECT
        login_log.role_id
      , create_date
      , DATEDIFF(login_date, create_date) day_diff
      FROM
        ((
         SELECT DISTINCT
           STR_TO_DATE($part_date, '%Y-%m-%d') login_date
         , role_id
         FROM
           role_login
      )  login_log
      INNER JOIN (
         SELECT DISTINCT
           STR_TO_DATE($part_date, '%Y-%m-%d') create_date
         , role_id
         FROM
           role_create
      )  create_log ON (login_log.role_id = create_log.role_id))
   )  temp_1
   GROUP BY create_date
)  temp_2
ORDER BY create_date ASC

4. 流量指标分析

观测每日、每小时的访问量(PV)、访问人数(UV)、平均访问量(PV/UV),针对性的开展不同的营销活动,吸引更多的优质流量。

行为转化分析:统计用户不同行为间的转化情况,关注转化率低的环节,优化交易流程,提高转化率

产品贡献定量分析:根据产品贡献程度,调整产品结构,策划营销主题,助力爆款产品

用户价值分析:对用户进行价值分层,针对不同层级的用户施行不同的营销策略

通过对用户价值的细分,进行差异化的惊喜运营,从而提升运营效率和用户体验。RFM模型是衡量客户价值和客户创利能力的重要工具。

一般挽留客户占比34.36%,占比最高。这类用户交易时间间隔长,交易频率低,交易金额小,存在流失风险。可以及时与用户取得联系,明确流失原因或了解用户需要,想办法挽回用户。

一般发展用户占比31.29%,占比排名第二。这类用户交易时间间隔短,但交易频率和交易金额低。可以利用推荐系统推荐其平时浏览的同类商品,或者是与此类用户有相同购买属性人群购买的商品,发送优惠券等,避免用户流失。

重要价值客户占比23.86%。这类用户交易时间间隔短,交易频率高,交易金额大,是高质量用户。应加强交流互动,深入了解用户需求,提供个性化服务,增加用户粘性

R(Recency):最后一次消费时间间隔。R值越小,用户价值越高。

F(Frequency):消费频率。F值越大,用户价值越高。

M(Monetary):消费金额。M值越大,用户价值越高。

浏览量(pv):用户每打开一个页面算作一个浏览,用户对同一个页面多次点击pv累计多次

访客数(uv):同一个用户多次访问只计算一个uv

人均浏览量(pv/uv):每个用户平均浏览次数

1.流量指标分析

每日PV、UV、人均浏览量、成交量、销售额

select日期,sum(behavior_type="pv") as 浏览量,    
count(distinct user_id) as 访客数,
    sum(behavior_type="pv")/count(distinct user_id) as 人均浏览量,
    sum(behavior_type="buy") as 成交量,
    sum(if(behavior_type="buy",amount,0)) as 销售额from UserBehavior_newgroup by 日期;

注意:不能用count。count函数是对非空记录进行计数,不区分返回结果。count(behavior_type="pv")与count(behavior_type)是一样的效果,得出的是每日所有行为的统计数。

按日期分组,按用户行为类别计数(即按条件计数)。如果把条件放在where语句中,是先执行where语句后执行select求和,没法动态筛选。考虑把条件放在select语句中作为逻辑判断条件,返回的是1和0的结果,再对返回的结果分组进行聚合运算。

2.行为转化分析

统计每个行为类别的人数

select 
  behavior_type,
  count(distinct user_id) as 用户数from UserBehavior_new group by behavior_type;

转化率=当前行为人数÷上一行为人数

行与行之间没法相除,用开窗函数当前行的上一行数据,即获取上一行为人数。lag(需要返回的字段,1)over(order by ...) 字符串是没法直接排序的,可以对字符进行重编码,用if函数赋值,对数值进行排序。

select 
    behavior_type,
    count(distinct user_id) as 用户人数,
    lag(count(distinct user_id),1) over (order by if(behavior_type="pv",0,if(behavior_type="fav",1,if(behavior_type="cart",2,3)))) as 上一行为人数,
    count(distinct user_id)/lag(count(distinct user_id),1) over (order by if(behavior_type="pv",0,if(behavior_type="fav",1,if(behavior_type="cart",2,3))))  as 转化率from UserBehavior_newgroup by behavior_type;

浏览—加购—购买的转化率

select 7 
    behavior_type,
    count(distinct user_id) as 用户人数,
    lag(count(distinct user_id),1) over (order by if(behavior_type="pv",0,if(behavior_type="cart",1,2))) as 上一行为人数,
    ifnull(count(distinct user_id)/lag(count(distinct user_id),1) over (order by if(behavior_type="pv",0,if(behavior_type="cart",1,2))),1) as 转化率from UserBehavior_newwhere behavior_type in ("pv","cart","buy")group by behavior_type;

3.产品贡献定量分析(帕累托分析)

产生购买行为的商品类目

select
    item_category,
    sum(amount) as 销售额from UserBehavior_newwhere behavior_type="buy"group by item_category;

累计销售额百分比=当前类目商品的累计销售额/所有类目商品的总销售额

select  item_category,
    sum(amount) as 销售额,
    sum(sum(amount)) over (order by sum(amount) desc) as 累计销售额,
    sum(sum(amount)) over (order by sum(amount) desc)/(select sum(amount) from UserBehavior_new where behavior_type="buy")  as 累计销售额百分比from UserBehavior_newwhere behavior_type="buy"group by item_category;

筛选贡献80%的类目

select *from(select    
item_category,
    sum(amount) as 销售额,
    sum(sum(amount)) over (order by sum(amount) desc) as 累计销售额,
    sum(sum(amount)) over (order by sum(amount) desc)/(select sum(amount) from UserBehavior_new where behavior_type="buy")  as 累计销售额百分比
		from UserBehavior_new
		where behavior_type="buy"
		group by item_category) as twhere 累计销售额百分比<=0.8;

4.用户价值分析

每个用户消费时间间隔、消费频次、消费金额/* 消费时间间隔R:距离分析时间节点的最后消费时间。 timestampdiff() 消费频次F:对这段时间内的消费次数计数 消费金额M:求和消费金额

select	user_id,
    max(日期) as 最近一次消费日期,
    timestampdiff(day,max(日期),"2014-12-19") as 间隔天数,
    count(*) as 消费频次,
    sum(amount) as 消费金额from UserBehavior_newwhere behavior_type="buy"group by user_id;

RFM评分/* 实际业务中会根据用户数量分布,制定评分标准。

select
	user_id,
    timestampdiff(day,max(日期),"2014-12-19") as 间隔天数,
    count(*) as 消费频次,
    sum(amount) as 消费金额,
    case when  timestampdiff(day,max(日期),"2014-12-19")<=6 then 5
		when  timestampdiff(day,max(日期),"2014-12-19")<=12 then 4
        when  timestampdiff(day,max(日期),"2014-12-19")<=18 then 3
        when  timestampdiff(day,max(日期),"2014-12-19")<=24 then 2
        else 1
	end as R评分,
    if(count(*)=1,1,if(count(*)=2,2,if(count(*)=3,3,if(count(*)=4,4,5)))) as F评分,
    if(sum(amount)<100,1,if(sum(amount)<200,2,if(sum(amount)<300,3,if(sum(amount)<400,4,5)))) as M评分from UserBehavior_newwhere behavior_type="buy"group by user_id;

RFM均值(根据评分计算均值)

select 
  avg(R评分) as R均值, 
   avg(F评分) as F均值,
   avg(M评分) as M均值from(
	select user_id,
	case when  timestampdiff(day,max(日期),"2014-12-19")<=6 then 5
		when  timestampdiff(day,max(日期),"2014-12-19")<=12 then 4
		when  timestampdiff(day,max(日期),"2014-12-19")<=18 then 3
		when  timestampdiff(day,max(日期),"2014-12-19")<=24 then 2
			else 1
		end as R评分,
		if(count(*)=1,1,if(count(*)=2,2,if(count(*)=3,3,if(count(*)=4,4,5)))) as F评分,
		if(sum(amount)<100,1,if(sum(amount)<200,2,if(sum(amount)<300,3,if(sum(amount)<400,4,5)))) as M评分
	from UserBehavior_new
	where behavior_type="buy"
	group by user_id ) as t;

结果:3.5984, 2.1039, 2.2051-- RFM重要程度

select *,
    if(R评分>3.5984,"高","低") as R程度,
    if(F评分>2.1039,"高","低") as F程度,
    if(M评分>2.2051,"高","低") as M程度from(select
	user_id,
    timestampdiff(day,max(日期),"2014-12-19") as 间隔天数,
    count(*) as 消费频次,
    sum(amount) as 消费金额,
    case when  timestampdiff(day,max(日期),"2014-12-19")<=6 then 5
		when  timestampdiff(day,max(日期),"2014-12-19")<=12 then 4
        when  timestampdiff(day,max(日期),"2014-12-19")<=18 then 3
        when  timestampdiff(day,max(日期),"2014-12-19")<=24 then 2
        else 1
	end as R评分,
    if(count(*)=1,1,if(count(*)=2,2,if(count(*)=3,3,if(count(*)=4,4,5)))) as F评分,
    if(sum(amount)<100,1,if(sum(amount)<200,2,if(sum(amount)<300,3,if(sum(amount)<400,4,5)))) as M评分from UserBehavior_newwhere behavior_type="buy"group by user_id) as t;

RFM用户价值

select * ,
    case 
      when R程度='高' and F程度='高' and M程度='高' then '重要价值用户'
      when R程度='高' and F程度='低' and M程度='高' then '重要发展用户'
      when R程度='低' and F程度='高' and M程度='高' then '重要保持用户'
      when R程度='低' and F程度='低' and M程度='高' then '重要挽留用户'
      when R程度='高' and F程度='高' and M程度='低' then '一般价值用户'
      when R程度='高' and F程度='低' and M程度='低' then '一般发展用户'
      when R程度='低' and F程度='高' and M程度='低' then '一般保持用户'
      else '一般挽留用户'
      end as 用户价值分类 from(select *,
    if(R评分>3.5984,"高","低") as R程度,
    if(F评分>2.1039,"高","低") as F程度,
    if(M评分>2.2051,"高","低") as M程度from(select
	user_id,
    timestampdiff(day,max(日期),"2014-12-19") as 间隔天数,
    count(*) as 消费频次,
    sum(amount) as 消费金额,
    case when  timestampdiff(day,max(日期),"2014-12-19")<=6 then 5
	when  timestampdiff(day,max(日期),"2014-12-19")<=12 then 4
        when  timestampdiff(day,max(日期),"2014-12-19")<=18 then 3
        when  timestampdiff(day,max(日期),"2014-12-19")<=24 then 2
        else 1
	end as R评分,
    if(count(*)=1,1,if(count(*)=2,2,if(count(*)=3,3,if(count(*)=4,4,5)))) as F评分,
    if(sum(amount)<100,1,if(sum(amount)<200,2,if(sum(amount)<300,3,if(sum(amount)<400,4,5)))) as M评分from UserBehavior_newwhere behavior_type="buy"group by user_id) as t1) as t2;

1.3.2实时指标

1. 每日日活实时统计

2. 每日订单量实时统计

3. 一小时内日活实时统计

4. 一小时内订单数实时统计

5. 一小时内交易额实时统计

6. 一小时内广告点击实时统计

7. 一小时内区域订单数统计

8. 一小时内区域订单额统计

9. 一小时内各品类销售 top3 商品统计

1.3.3最难的两个指标

1. 活跃用户指标

我们经常会算活跃用户,活跃用户是指至少连续 5 天登录账户的用户,返回的

结果表按照 id 排序。

思路:

1. 去重:由于每个人可能一天可能不止登陆一次,需要去重

2. 排序:对每个 ID 的登录日期排序

3. 差值:计算登录日期与排序之间的差值,找到连续登陆的记录

4. 连续登录天数计算:select id, count(*) group by id, 差值(伪代码)

5. 取出登录 5 天以上的记录

6. 通过表合并,取出 id 对应用户名

SELECT DISTINCT b.id, name 
FROM
(SELECT id, login_date, 
DATE_SUB(login_date,ROW_NUMBER() OVER(PARTITION BY id ORDER BY login_date)) AS 
diff
FROM(SELECT DISTINCT id, login_date FROM Logins) a) b 
INNER JOIN Accounts ac 
ON b.id = ac.id 
GROUP BY b.id, diff 
HAVING COUNT(b.id) >= 5

注意点:

1. DATE_SUB 的应用:DATE_SUB (DATE,X),注意,X 为正数表示当前日期的前 X 天;

2. 如何找连续日期:通过排序与登录日期之间的差值,因为排序连续,因此若登录日期连续,则差值一致;

3. GROUP BY 和 HAVING 的应用:通过 id 和差值的 GROUP BY,用 COUNT 找到连续天数大于 5 天的 id,注意 COUNT 不是一定要出现在 SELECT 后,可以直接用在 HAVING 中。

1.4项目遇到问题

1.4.1 Sqoop

1.Sqoop 中导入导出 Null 存储一致性问题

原因:Hive 中的 Null 在底层是以“\N”来存储,而 MySQL 中的 Null 在底层就是Null,为了保证数据两端的一致性。

解决:在导出数据时采用--input-null-string 和 input-null-non-string 两个参数。导入数据时采用--null-string 和 --null-non-string。

2.Sqoop 数据导出一致性问题

当 Sqoop 导出数据到 MySql 时,使用 4 个 map 怎么保证数据的一致性

原因:因为在导出数据的过程中 map 任务可能会失败,可以使用—staging-table

– clear-staging

解决:任务执行成功首先在 tmp 临时表中,然后将 tmp 表中的数据复制到目标表中(这个时候可以使用事务,保证事务的一致性)

3.Sqoop 数据导出 Parquet

Ads 层数据用 Sqoop 往 MySql 中导入数据的时候,如果用了 orc(Parquet)不能导入,需转化成 text 格式

1.4.2Flume

1.Flume上传到HDFS出现大量小文件

原因:

1.元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。

2.计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

解决:

(1)采用har 归档方式,将小文件归档

(2)采用CombineTextInputFormat

(3)有小文件场景开启JVM 重用;如果没有小文件,不要开启JVM 重用,因为会一直占用使用到的task 卡槽,直到任务完成才释放。

JVM 重用可以使得JVM 实例在同一个job 中重新使用N 次,N 的值可以在Hadoop 的mapred-site.xml 文件中进行配置。通常在10-20 之间

2.Flume挂掉

flume ng 1.7版本后提供Taildir Source 可以读取多个文件最新追加写入的内容!

Taildir Source是可靠的,即使flume出现了故障或挂掉。Taildir Source在工作时,会将读取文件的最后的位置记录在一个json文件中,一旦agent重启,会从之前已经记录的位置,继续执行tail操作!Json文件中,位置是可以修改,修改后,Taildir Source会从修改的位置进行tail操作!如果JSON文件丢失了,此时会重新从每个文件的第一行,重新读取,这会造成数据的重复!

3.Flume优化

1、调整Flume进程的内存大小,建议设置1G~2G,太小的话会导致频繁GC因为Flume进程也是基于Java的,所以就涉及到进程的内存设置,一般建议启动的单个Flume进程(或者说单个Agent)内存设置为1G~2G,内存太小的话会频繁GC,影响Agent的执`行效率。

2、在一台服务器启动多个agent的时候,建议修改配置区分日志文件

1.4.3Kafka

1.kafka挂掉

1) Flume 记录

2) 日志有记录

3) 短期没事

2.Kafka 消息数据积压,Kafka 消费能力不足怎么处理?

1) 如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

2) 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

3.Kafka 数据重复

幂等性+ack-1+事务

Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;

4.Kafka丢不丢数据

Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。

Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。

Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。

5.Kafka单条日志传输大小

Kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中,常常会出现一条消息大于1M,如果不对Kafka进行配置。则会出现生产者无法将消息推送到Kafka或消费者无法去消费Kafka里面的数据,这时我们就要对Kafka进行以下配置:server.properties

replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M

message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右

1.4.4Hadoop

1.项目经验之基准测试

搭建完Hadoop集群后需要对HDFS读写性能和MR计算能力测试。测试jar包在hadoop的share文件夹下。

2.Hadoop宕机

1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)

2)如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize。。

3.Hadoop解决数据倾斜方法

1)提前在map进行combine,减少传输的数据量

在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。

如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。

  1. 导致数据倾斜的key 大量分布在不同的mapper

(1)局部聚合加全局聚合。

第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。

第二次mapreduce,去掉key的随机前缀,进行全局聚合。

思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。

这个方法进行两次mapreduce,性能稍差。

(2)增加Reducer,提升并行度JobConf.setNumReduceTasks(int)

(3)实现自定义分区

根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

4.集群资源分配参数不均

集群有30台机器,跑mr任务的时候发现5个map任务全都分配到了同一台机器上,这个可能是由于什么原因导致的吗?

解决方案:yarn.scheduler.fair.assignmultiple 这个参数默认是开的,需要关掉。

1.5 项目相关流程问题

1. 如何保证你写的 sql 正确性?

我一般是造一些特定的测试数据进行测试。

另外离线数据和实时数据分析的结果比较。

2. 测试数据哪来的?

一部分自己写 Java 程序自己造,一部分从生产环境上取一部分。

3. 测试环境什么样?

测试环境的配置是生产的一半

4. 测试之后如何上线?

上线的时候,将脚本打包,提交 git。先发邮件抄送经理和总监,运维。通过之

后跟运维一 起上线。

5. 你做的项目工作流程是什么?

1. 先与产品讨论,看报表的各个数据从哪些埋点中取

2. 将业务逻辑过程设计好,与产品确定后开始开发

3. 开发出报表 SQL 脚本,并且跑几天的历史数据,观察结果

4. 将报表放入调度任务中,第二天给产品看结果。

5. 周期性将表结果导出或是导入后台数据库,生成可视化报表

6. 项目实际工作流程?

1步:确定指标的业务口径

由产品经理主导,找到提出该指标的运营负责人沟通。首先要问清楚指标是怎么定义的,比如活跃用户是指启动过APP的用户。设备id 还是用户id。

2步:需求评审

由产品经理主导设计原型,对于活跃主题,我们最终要展示的是最近n天的活跃用户数变化趋势,效果如下图所示。此处大数据开发工程师、后端开发工程师、前端开发工程师一同参与,一起说明整个功能的价值和详细的操作流程,确保大家理解的一致。

3步:大数据开发

大数据开发工程师,通过数据同步的工具如Flume、Sqoop等将数据同步到ODS层,然后就是一层一层的通过SQL计算到DWD、DWS层,最后形成可为应用直接服务的数据填充到ADS

4步:后端开发

后端工程师负责,为大数据工程师提供业务数据接口;

同时还负责读取ADS层分析后,写入MySQL中的数据。

5步:前端开发

前端工程师负责,前端埋点。

对分析后的结果数据进行可视化展示。

6步:联调

此时数据开发工程师、前端开发工程师、后端开发工程师都要参与进来。此时会要求大数据开发工程师基于历史的数据执行计算任务,大数据开发工程师承担数据准确性的校验。前后端解决用户操作的相关BUG保证不出现低级的问题完成自测。

7步:测试

测试工程师对整个大数据系统进行测试。测试的手段包括,边界值、等价类等。

提交测试异常的软件有:禅道、bugzila(测试人员记录测试问题1.0,输入是什么,结果是什么,跟预期不一样->需要开发人员解释,是一个bug,下一个版本解决1.1->测试人员再测试。测试1.1ok->测试经理关闭bug)

8步:上线

运维工程师会配合我们的前后端开发工程师更新最新的版本到服务器。此时产品经理要找到该指标的负责人长期跟进指标的准确性。重要的指标还要每过一个周期内部再次验证,从而保证数据的准确性。

7.公司项目版本迭代多久一次多久一次 ,迭代到哪个版本?

瀑布式开发、敏捷开发

差不多一个月会迭代一次。每月都有节日(元旦、春节、情人节、3.8妇女节、端午节、618、国庆、中秋、1111/6.1/5.1、生日、周末)新产品、新区域就产品或我们提出优化需求,然后评估时间。每周我们都会开会做下周计划和本周总结。

8.项目开发中每天做什么事?

1)新需求(活动、优化、新产品、新市场)。

2)故障分析:数仓的任何步骤出现问题,需要查看问题,比如日活,月活下降或快速上升等。

3)新技术的预言(比如flink、数仓建模、数据质量、元数据管理)

4)晨会-》10做操-》讨论中午吃什么-》12点出去吃1点-》睡到2点-》3点茶歇水果-》晚上吃啥-》吃加班餐-》开会-》晚上6点吃饭-》7点开始干活-10点-》11点

9.DWD层做了哪些事?

1 数据清洗

(1)空值去除

(2)过滤核心字段无意义的数据,比如订单表中订单id为null,支付表中支付id为空

(3)将用户行为宽表和业务表进行数据一致性处理

2 清洗的手段

Sql、mr、rdd、kettle、Python(项目中采用sql进行清除)

3 清洗掉多少数据算合理

1万条数据清洗掉1条。

4 脱敏

对手机号、身份证号等敏感数据脱敏

5 压缩LZO

6 列式存储parquet

DWD层做了哪些事?

1 DWS层有3-5张宽表(处理100-200个指标70%以上的需求)

具体宽表名称:用户行为宽表,用户购买商品明细行为宽表,商品宽表,购物车宽表,物流宽表、登录注册、售后等。

2 哪个宽表最宽?大概有多少个字段?

最宽的是用户行为宽表。大概有60-100个

项目的收获?

1) 值得保持的优点

  • 团队氛围融洽、交流通畅。
  • 团队构成比较合理。年轻人技术强力,老人能够把控项目方向。
  • 遇到问题及时沟通,群策群力解决问题。
  • 有吃苦耐劳的精神,每个人都抱有很高的责任心。能顶住持续高强的压力。
  • 公司大环境给予的支持力度大,从技术、工程、到后勤保障都值得称赞。
  1. 仍需要改进的地方

需求管理:

客户提出来的需求比较零散,需要整理入册,安排优先级。应对其状态进行追踪。保持与客户的互动。

单点作战:

每个人担当的任务没有其他人可以分担,一旦出现问题,项目将受到较大影响。

质量管控:

由于持续高压,导致程序质量多少存在一些问题。

  1. 项目优化策略

资源隔离:

将耗费性能的处理隔离出来,单独占用资源,以防止资源被其他处理抢占。

热点数据:

将经常访问的数据形成热点数据区,以加快查询速度。(HBase Bucket Cache)

数据分流:

增加后置资源利用率,将一部分企业放置到后置集群进行处理。

采用SSD:

实践证明SSD盘可以非常明显的提高读写速度。

批量写入:

数据写入数据库的处理,尽量使用批量写入的方式。

展示分离:

展示页面所使用的数据与永久持久化的数据可以分离开,以提高展示的性能

1.6 项目组织架构

1框架版本选型

1) Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业

的运维人员)

2) CDH:国内使用最多的版本,但 CM 不开源,但其实对中、小公司使用来说没有影响(建议使

用)

3) HDP:开源,可以进行二次开发,但是没有 CDH 稳定,国内使用较少

2服务器选型

服务器使用物理机还是云主机?

1) 机器成本考虑:

a. 物理机:以 128G 内存,20 核物理 CPU,40 线程,8THDD 和 2TSSD 硬盘,单台报价 4W 出

头,需考虑托管服务器费用。一般物理机寿命 5 年左右

b. 云主机,以阿里云为例,差不多相同配置,每年 5W

2) 运维成本考虑:

a. 物理机:需要有专业的运维人员

b. 云主机:很多运维工作都由阿里云已经完成,运维相对较轻松

3.集群规模

3.1用户行为数据

  • 每天日活跃用户100万,每人一天平均100条:10万*100条=1000万条;
  • 每条日志1k左右,每天1亿条:100000000/1024/1024=100G;
  • 数仓ODS层采用LZO存储:100G压缩为10G左右;
  • 数仓DWD采用LZO加parquest存储,大约10G左右;
  • 数仓DWS层轻度聚合存储(目的是为了快速运算,不压缩),50G左右;
  • 数仓ADS层数据量很小,忽略不计;
  • 保存副本:70G*3=210G
  • 半年内不扩容服务器来算:210G*180天=约37T
  • 预留30%Buf=37T/0.7=857G,也就大约1T

3.2 Kafka中的数据

  • 每天约100G*2(副本)= 200G
  • 保存三天200G*3=600G
  • 预留20%-30%Buf=54T/0.7=77T

3.3 flume数据

flume缓存的数据比较少,可以忽略不计;

3.4 业务数据

  • 每天活跃用户100万,每天下单的用户10万,每人每天产生10条业务数据,每天1K,算下来就是10万*10条*1K=1G
  • 数仓四层存储,1G*3=3G
  • 保存3个副本:3*3G=9G
  • 半年内不扩容服务器来算:9G*180天=1.6T
  • 预留20%-30%Buf=1.6T/0.7=2T

3.5 集群总的数据

总数据:53T+1T+2T=56T,因此约8T*10台服务器

(1)每天日活跃用户100万,每人一天平均100条:10万*100条=1000万条
(2)每条日志1k左右,每天1亿条:100000000/1024/1024=100G
(3)半年内不扩容服务器来算:100G*180天=约18T
(4)保存副本:18T*3=54T
(5)预留20%-30%Buf=54T/0.7=77T
(6)因此约8T*10台服务器

4.人员配置参考

属于研发部,我们属于大数据组,其他还有后端项目组,前端组、测试组、UI组等。其他的还有产品部、运营部、人事部、财务部、行政部等。

大数据开发工程师=>大数据组组长=》项目经理=>部门经理=》技术总监CTO 中型公司(5~10人左右):组长1人,离线4人左右(离线处理、数仓),实时2人左右,组长和技术大牛1人兼顾和javaEE、前端。

第2章 涉及技术

2.1 Linux&Shell

2.1.1 Linux常用高级命令

序号

命令

命令解释

1

top

查看内存

2

df -h

查看磁盘存储情况

3

iotop

查看磁盘IO读写(yum install iotop安装)

4

iotop -o

直接查看比较高的磁盘读写程序

5

netstat -tunlp | grep 端口号

查看端口占用情况

6

uptime

查看报告系统运行时长及平均负载

7

ps-aux

查看进程

8

pwd

显示工作路径

2.1.2 Linux常用工具及写过脚本

1)awk、sed、cut、sort

2)用Shell写过哪些脚本

(1)集群启动,分发脚本

(2)数仓与mysql的导入导出

(3)数仓层级内部的导入

2.1.3 Shell中单引号和双引号区别

(1)单引号不取变量值

(2)双引号取变量值

(3)反引号`,执行引号中命令

(4)双引号内部嵌套单引号,取出变量值

(5)单引号内部嵌套双引号,不取出变量值

2.2 Hadoop

分布式系统基础架构计算平台

2.2.1 Hadoop基本概念

1.Hadoop 是一个处理、存储和计算分析海量的分布式、非结构化数据的开源框架。

2.HDFS是分布式文件存储管理系统。

3.MapReduce是一个分布式运算程序的编程框架

4.yarn是hadoop中的资源管理器,负责任务调度。

2.2.2 Hadoop常用端口号

组件

节点

默认

端口

配置

命令解释

HDFS

DataNode

50010

dfs.datanode.address

datanode服务端口,用于数据传输

HDFS

DataNode

50075

dfs.datanode.http.address

http服务的端口

HDFS

DataNode

50475

dfs.datanode.https.address

https服务的端口

HDFS

DataNode

50020

dfs.datanode.ipc.address

ipc服务的端口

HDFS

NameNode

50070

dfs.namenode.http-address

http服务的端口

HDFS

NameNode

50470

dfs.namenode.https-address

https服务的端口

HDFS

NameNode

8020

fs.defaultFS

接收Client连接的RPC端口,用于获取文件系统metadata信息。

HDFS

NameNode

9000

fs.default.name

客户端访问集群端口

Yarn

ResourceManager

8088

yarn.resourcemanager.webapp.address

http服务端口,访问MR执行情况

Yarn

JobHistory Server

19888

mapreduce.jobhistory.webapp.address

历史服务器http服务端口

ZooKeeper

Server

2181

/etc/zookeeper/conf/zoo.cfg中clientPort=<port>

对客户端提供服务的端口

2.2.3 Hadoop 配置文件以及简单的Hadoop 集群搭建

(1)配置文件:

Hadoop2.x core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml slaves

Hadoop3.x core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml workers

(2)简单的集群搭建过程:

JDK 安装

配置SSH 免密登录

配置hadoop 核心文件:

格式化namenode

2.2.4 HDFS 读流程和写流程

1.hdfs上传文件的流程(写流程)

1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。

2)NameNode 返回是否可以上传。

3)客户端根据文件大小进行切片,请求第一个 Block 上传到哪几个 DataNode 服务器上。

4)NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。

5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。

6)dn1、dn2、dn3 逐级应答客户端。

7)客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以

Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。

8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。

(重复执行 3-7 步)。

2.hdfs读取文件的流程(读流程)

1)客户端通过 Distributed FileSystem 向 NameNode 请求下载文件,NameNode 通过查询元 数据,找到文件块所在的 DataNode 地址。

2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。

3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。

4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

2.2.5 NameNode工作机制SecongdaryNameNode工作流程恢复流程

1.NameNode工作机制

第一阶段:NameNode 启动

(1)第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。如果不是第一次启

动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode 记录操作日志,更新滚动日志。

(4)NameNode 在内存中对数据进行增删改。

第二阶段:Secondary NameNode 工作

(1)Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode

是否检查结果。

  1. Secondary NameNode 请求执行 CheckPoint。

(3)NameNode 滚动正在写的 Edits 日志。

(4)将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。

(5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件 fsimage.chkpoint。

(7)拷贝 fsimage.chkpoint 到 NameNode。

(8)NameNode 将 fsimage.chkpoint 重新命名成 fsimage

2.Secondary NameNode辅助恢复NameNode流程,恢复NameNode环写之前的数据

第一步:杀死namenode进程

第二步:删除namenode的fsimage与edits文件

第三步:拷贝secondaryNamenode的fsimage与edits文件到namenode的fsimage与edits文件夹下面去

第四步:启动namenode

3.Secondary NameNode 不能恢复 NameNode 的全部数据,那如何 保证 NameNode 数据存储安全?

这个问题就要说 NameNode 的高可用了,即 NameNode HA

一个 NameNode 有单点故障的问题,那就配置双 NameNode,配置有两个关键点,

一是必须要保证这两个 NN 的元数据信息必须要同步的,二是一个 NN 挂掉之后

另一个要立马补上。

2.2.6 HDFS组成架构及主要作用

1)NameNode(nn):就是Master,它

是一个主管、管理者。

(1)管理HDFS的名称空间;

(2)配置副本策略;

(3)管理数据块(Block)映射信息;

(4)处理客户端读写请求。

2)DataNode:就是Slave。NameNode

下达命令,DataNode执行实际的操作。

(1)存储实际的数据块;

(2)执行数据块的读/写操作。

3)Client:就是客户端。

(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传;

(2)与NameNode交互,获取文件的位置信息;

(3)与DataNode交互,读取或者写入数据;

(4)Client提供一些命令来管理HDFS,比如NameNode格式化;

(5)Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;

4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不

能马上替换NameNode并提供服务。

(1)辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode ;

(2)在紧急情况下,可辅助恢复NameNode。

2.2.7 Hadoop集群启动时要启动哪些进程?作用?

1.NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。

2.SecondaryNameNode它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并editslog,减少NN启动时间。

3.DataNode它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个datanode守护进程。

4.ResourceManager(JobTracker)JobTracker负责调度DataNode上的工作。每个DataNode有一个TaskTracker,它们执行实际工作。

5.NodeManager(TaskTracker)执行任务

6.DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。

7.JournalNode 高可用情况下存放namenode的editlog文件.

2.2.8 MapReduce工作原理

Map task:

程序会根据InputFormat将输入文件分割成splits,每个split会作为一个map task的输入,每个map task会有一个内存缓冲区,输入数据经过map阶段处理后的中间结果会写入内存缓冲区,并且决定数据写入到哪个partitioner,当写入的数据到达内存缓冲区的的阀值(默认是0.8),会启动一个线程将内存中的数据溢写入磁盘,同时不影响map中间结果继续写入缓冲区。在溢写过程中,MapReduce框架会对key进行排序,如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件(最少有一个溢写文件),如果是多个溢写文件,则最后合并所有的溢写文件为一个文件。

Reduce task:

当所有的map task完成后,每个map task会形成一个最终文件,并且该文件按区划分。reduce任务启动之前,一个map task完成后,

就会启动线程来拉取map结果数据到相应的reduce task,不断地合并数据,为reduce的数据输入做准备,当所有的map tesk完成后,

数据也拉取合并完毕后,reduce task 启动,最终将输出输出结果存入HDFS上。

2.2.9 MapReduce中shuffer机制

shuffle 阶段分为四个步骤:依次为:分区,排序,规约,分组,其中前三个步骤

在 map 阶段完成,最后一个步骤在 reduce 阶段完成

shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce

阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称

作 shuffle。

1. Collect 阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的

是 key/value,Partition 分区信息等。

2. Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,

在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有

相同分区号和 key 的数据进行排序。

3. Merge 阶段:把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终

只产生一个中间数据文件

4.** Copy 阶段**:ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份

属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值

的时候,就会将数据写到磁盘之上

4. Merge 阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到

本地的数据文件进行合并操作

5. Sort 阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对

数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。

Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,

磁盘 io 的次数越少,执行速度就越快

缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认 100M

2.2.10 MapReduce中切片机制

1.MapReduce中FileInputFormat 切片机制

  1. 简单地按照文件的内容长度进行切片
  2. 切片大小,默认等于Block大小
  3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

2.MapReduce中CombineTextInputFormat 切片机制

框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会

是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的

MapTask,处理效率极其低下。

2.2.11 Yarn工作机制

yarn基本组成:

1) ResourceManager:RM 是一个全局的资源管理器,负责整个系统的资源管理和分配

2) ApplicationMaster:与RM协商获得资源,监控内部任务运行状态

3) nodeManager:他接收并处理来自 AM 的 Container 启动和停止请求。

4)Container 是 YARN 中的资源抽象,封装了各种资源。

当 jobclient 向 YARN 提交一个应用程序后,YARN 将分两个阶段运行这个应用程

序:一是启动 ApplicationMaster;第二个阶段是由 ApplicationMaster 创建应用程

序,为它申请资源,监控运行直到结束。

具体步骤如下:

1) 用户向 YARN 提交一个应用程序,并指定 ApplicationMaster 程序、启动

ApplicationMaster 的命令、用户程序。

2) RM 为这个应用程序分配第一个 Container,并与之对应的 NM 通讯,要求它在

这个 Container 中启动应用程序 ApplicationMaster。

3) ApplicationMaster 向 RM 注册,然后拆分为内部各个子任务,为各个内部任务

申请资源,并监控这些任务的运行,直到结束。

4) AM 采用轮询的方式向 RM 申请和领取资源。

5) RM 为 AM 分配资源,以 Container 形式返回

6) AM 申请到资源后,便与之对应的 NM 通讯,要求 NM 启动任务。

7) NodeManager 为任务设置好运行环境,将任务启动命令写到一个脚本中,并

通过运行这个脚本启动任务

8) 各个任务向 AM 汇报自己的状态和进度,以便当任务失败时可以重启任务。

9) 应用程序完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己

2.2.12 hadoop的调Yarn调度器,你们使⽤的是哪种策略,为什么?

(1)默认情况下hadoop使⽤的FIFO, 先进先出的调度策略。按照作业的优先级来处理。

(2)计算能⼒调度器( Capacity Scheduler ) ⽀持多个队列,每个队列可配置⼀定的资源量,每个队列采⽤FIFO, 为了防⽌同 ⼀个⽤户的作业独占资源,那么调度器会对同⼀个⽤户提交的作业所占资源进⾏限定,⾸先按以下策略选择⼀个合适队 列:计算每个队列中正在运⾏的任务数与其应该分得的计算资源之间的⽐值,选择⼀个该⽐值最⼩的队列;然后按以下策 略选择该队列中⼀个作业:按照作业优先级和提交时间顺序选择,同时考虑⽤户资源量限制和内存限制。

(3)公平调度器( Fair Scheduler ) ⽀持多队列多⽤户,每个队列中的资源量可以配置,同⼀队 列中的作业公平共享队列中所有资源。

2)区别:

FIFO 调度器:支持单队列、先进先出生产环境不会用。

容量调度器:支持多队列,保证先进入的任务优先执行。

公平调度器:支持多队列,保证每个任务公平享有队列资源。

3)在生产环境下怎么选择?

大厂:如果对并发度要求比较高,选择公平,要求服务器性能必须OK;

中小公司,集群服务器资源不太充裕选择容量。

2.2.13Hadoop、MapReduce数据倾斜怎么处理?

MapReduce优化方法主要从六个方面考虑:数据输入,Map阶段,Reduce阶段,IO传输,数据倾斜问题和常用的调优参数

数据倾斜是大量相同key被partion分配到一个分区里,违背了并行计算的初衷,造成资源浪费,效率低下。

根本原因是:reduce数据处理不均匀

方法一:抽样和范围分区,可以通过对原始数据的抽样的结果,预设分区

方法二:自定义分区

方法三:Combine,Combine的目的就是聚合并精简数据

方法四:采用MapJoin,尽量避免ReduceJoin

2.2.14HDFS小文件处理

1) HDFS 小文件影响

a. 影响 NameNode 的寿命,因为文件元数据存储在 NameNode 的内存中

b. 影响计算引擎的任务数量,比如每个小的文件都会生成一个 Map 任务

2) 数据输入小文件处理:

a. 合并小文件:对小文件进行归档(Har)、自定义 Inputformat 将小文件存储成

SequenceFile 文件。

b. 采用 ConbinFileInputFormat 来作为输入,解决输入端大量小文件场景。

c. 对于大量小文件 Job,可以开启 JVM 重用。

2.2.15Hadoop宕机

1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)

2)如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize。。

2.2.16Hadoop项目经验之基准测试

Hadoop 带有一些基准测试程序,可以最少的准备成本轻松运行。基准测试被打包在测试程

序 JAR 文件中,通过无参调用 JAR 文件可以得到其列表

搭建完Hadoop集群后需要对HDFS读写性能和MR计算能力测试。测试jar包在hadoop的share文件夹下。

2.2.15 HDFS 在读取文件的时候,如果其中一个块突然损坏了怎么办

客户端读取完 DataNode 上的块之后会进行 checksum 验证,也就是把客户端读

取到本地的块与 HDFS 上的原始块进行校验,如果发现校验结果不一致,客户端

会通知 NameNode,然后再从下一个拥有该 block 副本的 DataNode 继续读

2.2.16 HDFS在上传文件时,其中一个 DataNode 突然挂掉怎么办

客户端上传文件时与DataNode建立pipeline管道,管道正向是客户端向DataNode

发送的数据包,管道反向是 DataNode 向客户端发送 ack 确认,也就是正确接收

到数据包之后发送一个已确认接收到的应答,当 DataNode 突然挂掉了,客户端

接收不到这个 DataNode 发送的 ack 确认 ,客户端会通知 NameNode,NameNode 检查该块的副本与规定的不符, NameNode 会通知 DataNode 去复制副本,并将挂掉的 DataNode 作下线处理,不再让它参与文件上传与下载。

2.2.17.NameNode在启动的时候会做哪些操作?

NameNode启动的时候,会加载fsimage

Fsimage加载过程完成的操作主要是为了:

(1)从fsimage中读取该HDFS中保存的每⼀个⽬录和每⼀个⽂件

(2)初始化每个⽬录和⽂件的元数据信息

(3)根据⽬录和⽂件的路径,构造出整个namespace在内存中的镜像

(4)如果是⽂件,则读取出该⽂件包含的所有blockid,并插⼊到BlocksMap中。

2.2.18 项目经验之压缩

提示:如果面试过程问起,我们一般回答压缩方式为 Snappy,特点速度快,缺点无法切分(可

以回答在链式 MR 中,Reduce 端输出使用 bzip2 压缩,以便后续的 map 任务对数据进行 split)

启用 lzo 的压缩方式对于小规模集群是很有用处,压缩比率大概能降到原始日志大小的 1/3。

同时解压缩的速度也比较快。

Hadoop 默认不支持 LZO 压缩,如果需要支持 LZO 压缩,需要添加 jar 包,并在 hadoop 的

cores-site.xml 文件中添加相关压缩配置。

2.2.19 Hadoop优化

1) HDFS 小文件影响

a. 影响 NameNode 的寿命,因为文件元数据存储在 NameNode 的内存中

b. 影响计算引擎的任务数量,比如每个小的文件都会生成一个 Map 任务

2) 数据输入小文件处理

a. 合并小文件:对小文件进行归档(Har)、自定义 Inputformat 将小文件存储成

SequenceFile 文件。

b. 采用 ConbinFileInputFormat 来作为输入,解决输入端大量小文件场景。

c. 对于大量小文件 Job,可以开启 JVM 重用。

3) Map 阶段

a. 增大环形缓冲区大小。由 100m 扩大到 200m

b. 增大环形缓冲区溢写的比例。由 80%扩大到 90%

c. 减少对溢写文件的 merge 次数。

d. 不影响实际业务的前提下,采用 Combiner 提前合并,减少 I/O。

4) Reduce 阶段

a. 合理设置 Map 和 Reduce 数:两个都不能设置太少,也不能设置太多。太少,会导致 Task

等待,延长处理时间;太多,会导致 Map、Reduce 任务间竞争资源,造成处理超时等错误。

b. 设置 Map、Reduce 共存:调整 slowstart.completedmaps 参数,使 Map 运行到一定程度

后,Reduce 也开始运行,减少 Reduce 的等待时间。

c. 规避使用 Reduce,因为 Reduce 在用于连接数据集的时候将会产生大量的网络消耗。

d. 增加每个 Reduce 去 Map 中拿数据的并行数

e. 集群性能可以的前提下,增大 Reduce 端存储数据内存的大小。

5)

  1. IO 传输

a. 采用数据压缩的方式,减少网络 IO 的的时间。安装 Snappy 和 LZOP 压缩编码器。

b. 使用 SequenceFile 二进制文件

6) 整体

a. MapTask 默认内存大小为 1G,可以增加 MapTask 内存大小为 4-5g

b. ReduceTask 默认内存大小为 1G,可以增加 ReduceTask 内存大小为 4-5g

c. 可以增加 MapTask 的 cpu 核数,增加 ReduceTask 的 CPU 核数

d. 增加每个 Container 的 CPU 核数和内存大小

e. 调整每个 Map Task 和 Reduce Task 最大重试次数

2.3 ZooKeeper

2.3.1 ZooKeeper概述

ZooKeeper:是一个针对大型分布 式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、

分布式同步、组服务等。 Zookeeper=文件系统+通知机制

ZooKeeper 的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能

稳定的系统提供给用户。

2.3.2 ZooKeeper选举机制

半数机制

三个核心选举原则:

1) Zookeeper 集群中只有超过半数以上的服务器启动,集群才能正常工作;

2) 在集群正常工作之前,myid 小的服务器给 myid 大的服务器投票,直到集群正常工作,选出

Leader;

3) 选出 Leader 之后,之前的服务器状态由 Looking 改变为 Following,以后的服务器都是

Follower

比如:

假设有五台服务器组成的 zookeeper 集群,它们的 id 从 1-5,同时它们都是最新启动的

1) 1 启动,选自己

2) 2 启动,选自己(比 1 大,12 选 2)

3) 3 启动,选自己(123 都选 3,超过半数)当选 leader

4) 4 启动,已有 leader3

5) 5 启动,已有 leader3

2.3.3 ZooKeeper常用命令

ls path [watch] 使用 ls 命令来查看当前 znode 中所包含的内容

create 普通创建 -s 含有序列 -e 临时(重启或者超时消失)

get path [watch] 获得节点的值

set 设置节点的具体值

delete 删除节点

rmr 递归删除节点

2.3.4 ZooKeeper监听器原理

1)首先要有一个main()线程

2)在main线程中创建Zookeeper客户端,这时就会创建两个线

程,一个负责网络连接通信(connet),一个负责监听(listener)。

3)通过connect线程将注册的监听事件发送给Zookeeper。

4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。

5)Zookeeper监听到有数据或路径变化,就会将这个消息发送

给listener线程。

6)listener线程内部调用了process()方法。

2.3.5 ZooKeeper 部署方式有哪几种?集群中角色有哪些?最少要几台机器?

(1)部署方式单机模式、集群模式

(2)角色:Leader 和 Follower

(3)集群最少需要机器数:3

2.4 Flume

2.4.1 Flume概述

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,灵活简单。 主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。主要五大组成:Agent(传输基本单元),Source,Channel,Sink,Event.

Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的,是 Flume 数据传输的基本单元。

Agent 主要有 3 个部分组成,Source、Channel、Sink。

Source 是负责接收数据到 Flume Agent 的组件。

Channel 是位于 Source 和 Sink 之间的缓冲区。

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 是完全事务性的。

Event.传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。

Source类型:

Spooling Directory Source:监听文件夹

Exec Source:监听命令或文件

NetCat Source:一个NetCat Source用来监听一个指定端口,并接收监听到的数据。

Kafka Source:支持从Kafka指定的topic中读取数据。

2.4.2 Flume组成,Put事务,Take事务

1) flume 组成,Put 事务,Take 事务

Taildir Source:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取

文件位置,实现断点续传。

File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可

靠性要求高的场景,比如,金融行业。如果是金融、对钱要求准确的公司,选择

Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性

要求不高的场景,比如,普通的日志数据。

Kafka Channel:减少了 Flume 的 Sink 阶段,提高了传输效率。 下一级是kafka优先使用

Source 到 Channel 是 Put 事务 ,Channel 到 Sink 是 Take 事务

Flume事务机制:Flume事务有put和take机制,两者保证数据传输的准确性.

Put事务:Source到Channel

doPut:将数据从souce写入临时缓冲区putList

doCommit:检查Channel内存队列是否足够合并

doRollback:channel 内存队列空间不足,则回滚数据

take事务:channel到sink

doTake:将数据取到临时缓冲区takeList

doCommit:如果数据全部发送成功,则清除临时缓冲区takeList

doRollback:数据发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列

2.4.3 Flume拦截器

1)拦截器注意事项

项目中自定义了:ETL拦截器。

采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些

2)自定义拦截器步骤

(1)实现Interceptor

(2)重写四个方法

(3)静态内部类,实现Interceptor.Builder

3)拦截器可以不用吗?

可以不用;需要在下一级hive的dwd层和sparksteaming里面处理

优势:只处理一次,轻度处理;劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。

2.4.4 Flume 监控器

1)采用Ganglia 监控器,监控到flume 尝试提交的次数远远大于最终成功的次数,说明flume 运行比较差。

2.4.5 Flume 采集数据会丢失吗?

Flume架构原理来讲不会出现数据丢失 source到channel channel到sink都是事务性的,基本不可能出现数据丢失

我能想到的就是使用了 memory channel 然后agent宕机导致数据丢失

2.4.6 Flume 参数调优

增加 Source 个(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读

取数据的能力。

使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。

增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行,过

多的 Sink 会占用系统资源,造成系统资源不必要的浪费。

2.4.7 Flume 优化

通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。

2.5 Kafka

2.5.1 Kafka概述

Kafka 是一种高吞吐量的分布式消息队列发布订阅消息系统,有如下特性:

(1)通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 的消息

存储也能够保持长时间的稳定性能。

(2)高吞吐量:即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息。

(3)支持通过 Kafka 服务器和消费机集群来分区消息。

(4)支持 Hadoop 并行数据加载。

模式:(1)点对点模式(2)发布/订阅模式

2.5.2 为什么使用kafka

缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够

多的机器来保证冗余,kafka 在中间可以起到一个缓冲的作用,把消息暂存

在 kafka 中,下游服务就可以按照自己的节奏进行慢慢处理。

解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为

一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获

取扩展能力。

冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅 topic

的服务消费到,供多个毫无关联的业务使用。

健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会

影响主要业务的正常进行。

异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异

步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列

中放入多少消息就放多少,然后在需要的时候再去处理它们。

2.5.3 kafka消息队列与传统消息队列区别

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。

支持Java和Scala编程语言编写

消费模式:pull

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒

可扩展性:kafka集群支持热扩展

持久性、可靠性:消息被持久化到本地磁盘(zero-copy机制),并且支持数据备份防止数据丢失

高并发:支持数千个客户端同时读写

顺序保证:数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。每一个Partition内的消息的有序性

2.5.4 kafka主备模式

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。

Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

2.5.5 kafka高吞吐量

Kafka的优点是高吞吐量和高效率

原因:1)顺序读写

  1. Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存进行页缓存
  2. 分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
  3. 批量读写和批量压缩
  4. 零复制技术

优化提升吞吐量:1)设置发送消息的缓冲区文件buffer.memory

  1. 设置压缩文件compression.type
  2. 设置batch.size稍微提升缓存大小

2.5.6 kafka架构

1)Producer :消息生产者,就是向 kafka broker 发消息的客户端;

2)Consumer :消息消费者,向 kafka broker 取消息的客户端;

3)Topic :可以理解为一个队列;

4)Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)

和单播(发给任意一个 consumer)的手段。

5)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker

可以容纳多个 topic;

6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,

一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。

7)Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查

找。

2.5.7 kafka机器数量、副本设定、日志保存时间、硬盘大小

Kafka 机器数量=2*(峰值生产速度*副本数/100)+ 1

一般我们设置成2 个或3 个,很多企业设置为2 个。

副本的优势:提高可靠性;副本劣势:增加了网络IO 传输

默认保存7天;生产环境建议3天

每天的数据量100g*2个副本*3天/70%

2.5.8 kafka中数据量计算

每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟

平均每秒钟:1150条

低谷每秒钟:50条

高峰每秒钟:1150条*(2-20倍)=2300条-23000条

每条日志大小:0.5k-2k(取1k)

每秒多少数据量:2.0M-20MB

2.5.9 kafka的ISR副本同步队列

ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。

2.5.10 kafka挂掉

1)Flume记录

2)日志有记录

3)短期没事

想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从

而解决,重新恢复节点。

2.5.11 kafka数据重复

幂等性+ack-1+事务

Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;

2.5.12 kafka消息数据积压, Kafka消费能力不足怎么处理?

1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

2.5.13 Kafka 消费过的消息如何再消费?

kafka 消费消息的 offset 是定义在 zookeeper 中的, 如果想重复消费 kafka 的消

息,可以在 redis 中自己记录 offset 的 checkpoint 点(

n 个),当想重复消费消息时,通过读取 redis 中的 checkpoint 点进行 zookeeper 的 offset 重设,这样就可以达到重复消费消息的目的了

2. 5.14 kafka 的数据是放在磁盘上还是内存上,为什么速度会快?

kafka 使用的是磁盘存储。

速度快是因为:

1. 顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动

作”,它是耗时的。所以硬盘 “讨厌”随机 I/O, 喜欢顺序 I/O。为了提高读写硬盘的速

度,Kafka 就是使用顺序 I/O。

2. Memory Mapped Files(内存映射文件):64 位操作系统中一般可以表示 20G 的数据文

件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完

成映射之后你对物理内存的操作会被同步到硬盘上。

3. Kafka 高效文件存储设计: Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通

过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。信

2.5.15 为什么 Kafka 不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行

交互的,从 而实现的是一种主写主读的生产消费模型。

Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:

数据一致性问题和延时问题

2.65 MySQL

2.65.1 DDL,DML,DQL,DCL

1.DDL数据定义语言

用来定义数据库对象:库、表、列等;

适用范围:对数据库中的某些对象(例如,database,table)进行管理,如Create,Alter和Drop.

注意: 在生产环境中,DDL类操作需要慎用,因为不能做roolback操作,一旦执行无法回退。

2.DML数据操作语言

对数据库中的数据进行一些简单操作,,如insert,delete,update,select(插入、删除、修改、检索)等都是DML.

对select来说,它有两种划分方法,可以放在DML,也可以单独放在DQL

3.DQL数据查询语言

用来查询记录(数据)。

基本结构是由SELECT子句,FROM子句,WHERE子句组成的查询块:

SELECT <字段名表>FROM <表或视图名>WHERE <查询条件>

4.DCL数据控制语言

用来定义访问权限和安全级别。

数据控制语言DCL用来授予或回收访问数据库的某种特权,并控制数据库操纵事务发生的时间及效果,对数据库实行监视等。如:

(1)GRANT:授权。

(2)ROLLBACK [WORK] TO[SAVEPOINT]:回滚命令,回退到某一点。回滚命令使数据库状态回到上次最后提交的状态。其格式为:SQL>ROLLBACK;

(3) COMMIT [WORK]:提交。

在数据库的插入、删除和修改操作时,只有当事务在提交到数据库时才算完成。在事务提交前,只有操作数据库的这个人才能有权看到所做的事情,别人只有在最后提交完成后才可以看到。提交数据有三种类型:显式提交、隐式提交及自动提交。

下面分别说明这三种类型。

(1) 显式提交

用COMMIT命令直接完成的提交为显式提交。其格式为:SQL>COMMIT;

(2) 隐式提交

用SQL命令间接完成的提交为隐式提交。这些命令是:ALTER,AUDIT,COMMENT,CONNECT,CREATE,DISCONNECT,DROP,EXIT,GRANT,NOAUDIT,QUIT,REVOKE,RENAME。

(3) 自动提交

若把AUTOCOMMIT设置为ON,则在插入、修改、删除语句执行后,系统将自动进行提交,这就是自动提交。其格式为:SQL>SET

AUTOCOMMIT ON;

2.65.1 索引

Mysql官方索引定义:索引(index)是帮助mysql高效获取的数据结构。

索引的本质是:数据结构

标准答案:可以理解为“索引就是排好序的快速查找数据结构”

索引的两大功能:排序和查找(索引会影响到where后面的查找和order by后面的排序)

索引的目的在于提高查找效率,类比新华字典。

在数据库外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用(指向)数据,可以在这些数据结构上实现高级查找算法。这种数据结构就是索引。不需要全表扫描,精确查找

  • 索引本身也很大,一般以索引文件的形式存储在磁盘上的文件中
  • 索引中包括:聚集索引,覆盖索引,复合索引,前缀索引,唯一索引等,默认都是使用B+树结构组织索引,此外还有哈希索引
  • 优势:检索:可以提高数据检索的效率,降低数据库的IO成本(大学图书馆书目索引)

排序:通过索引列对数据进行排序,降低了CPU的消耗

  • 劣势: 索引列占磁盘空间

降低更新表(insert,update,delete写操作)的效率,因为更新表的时候,mysql不仅要保存数据,还要保存索引更新的索引字段以及因为更新带来的键值变化

注:索引只是提高效率的一个因素,如果MySQL有大量表,就需要花时间研究建立优化查询

单值索引:即一个索引只包含单个列,一个表可以有多个单列索引(一般用复合索引)单表5个索引

唯一索引:索引列的值必须唯一,但允许有空值

复合索引:即一个索引包含多个列

哪些情况下需要创建索引?

  1. 主键自动建立唯一索引
  2. 频繁作为查询条件的字段应该创建索引(比如:银行系统的银行卡号,电信系统的手机号,微信号等)
  3. 查询中与其它表关联的字段,外键关系建立索引(比如员工和部门表中的departmentId)
  4. 查询中排序的字段,排序字段若通过索引去访问将大大提高查询排序速度
  5. 查询中统计或者分组字段

哪些情况下不需要创建索引?

  1. 表记录太少(比如100条之内)mysql300万左右性能是开始下降的,虽然官方文档写的800万
  2. 经常增删改的表(因为更新表的同时,mysql不仅要保存数据,还要保存索引文件)
  3. 数据重复且平均的表字段(比如14万国籍都是中国,或者性别不是男就是女)

2.65.2 存储结构

Mysq存储结构:1. InnoDB数据页结构

2. 数据页数据存储

3. 一条记录的结构

4. Page Directory:页目录

Mysql索引存储结构:

  1. 二叉树:优点:查找减半
    缺点:索引数据只能是无序的,有序数据用二叉树是个单链,完全无意义。
  2. 红黑树:优点:相比二叉树好,有序数据也可以使用,当节点为3时,会自动分解平衡。

缺点:如果数据量很大,每次插入数据,它都会自动平衡,所以特别消耗性能,而且节点的高度是无法预测的,所以磁盘I/O操作也不可控。

3.HASH:原理:存储结构是key-value形式存在数组中,然后通过hash函数(key)得到一个值,这个值就是它们的索引。当取数据的时候,key通过hash得到索引值,直接找就行了,复杂度为o(1)。

优点:查找速度快。

缺点:

1.会碰到key冲突情况。

2.HASH结构无序,多以当查找范围数据的话,就慢一点,对于不等值的查找,就更慢了(不能避免全表扫描)

3.无法通过索引值排序,因为索引存放的值是经过hash的,可能跟原来的值不相等

4.B-Tree:优点:

1.一次可以设置多个节点,降低了树的高度,多以查找很快。

2.节点中的数据key从左到右依次递增。

缺点:

1.根节点不仅存了索引key也存了对应的记录,所以比较占用空间。

2.子节点之间没有双向链表,每次查找数据都是从根节点出发,如果是查找范围数据的话,就没有优势了

5.B+Tree:优点:(与B-Tree区别)

1.索引携带的数据移到了叶子节点上,在空间相同的情况下,那肯定是B+Tree存储的索引更多一些,而且树的高度更低。

2.子节点之间是有双向指针指向的,查找的时候,顺着指针找就行了,不用每次都从根节点出发寻找,所以速度更快。

数据结构:B+Tree

一般来说能够达到range 就可以算是优化了

口诀(两个法则加6 种索引失效的情况)

全值匹配我最爱,最左前缀要遵守;

带头大哥不能死,中间兄弟不能断;

索引列上少计算,范围之后全失效;

LIKE 百分写最右,覆盖索引不写*;

不等空值还有OR,索引影响要注意;

VAR 引号不可丢,SQL 优化有诀窍。

2.65.3 b-tree 和b+tree 的区别

5.B+Tree:优点:(与B-Tree区别)

1.索引携带的数据移到了叶子节点上,在空间相同的情况下,那肯定是B+Tree存储的索引更多一些,而且树的高度更低。

2.子节点之间是有双向指针指向的,查找的时候,顺着指针找就行了,不用每次都从根节点出发寻找,所以速度更快。

1) B-树的关键字和记录是放在一起的,叶子节点可以看作外部节点,不包含任何信息;B+树的

非叶子节点中只有关键字和指向下一个节点的索引,记录只放在叶子节点中。

2) 在B-树中,越靠近根节点的记录查找时间越快,只要找到关键字即可确定记录的存在;而

B+树中每个记录的查找时间基本是一样的,都需要从根节点走到叶子节点,而且在叶子节点中还要再

比较关键字。

2.65.3 MySQL 的事务要素ACID以及并发问题,脏读幻读和隔离级别等

一、事务的基本要素(ACID)

1) 原子性(Atomicity):事务开始后所有操作,要么全部做完,要么全部不做,不可能停滞

在中间环节。事务执行过程中出错,会回滚到事务开始前的状态,所有的操作就像没有发生一样。也

就是说事务是一个不可分割的整体,就像化学中学过的原子,是物质构成的基本单位

2) 一致性(Consistency):事务开始前和结束后,数据库的完整性约束没有被破坏。比如A

向B 转账,不可能A 扣了钱,B 却没收到。

3) 隔离性(Isolation):同一时间,只允许一个事务请求同一数据,不同的事务之间彼此没

有任何干扰。比如A 正在从一张银行卡中取钱,在A 取钱的过程结束前,B 不能向这张卡转账。

4) 持久性(Durability):事务完成后,事务对数据库的所有更新将被保存到数据库,不能回

滚。

二、事务的并发问题

1) 脏读:事务A 读取了事务B 更新的数据,然后B 回滚操作,那么A 读取到的数据是脏数据

2) 不可重复读:事务A 多次读取同一数据,事务B 在事务A 多次读取的过程中,对数据作了

更新并提交,导致事务A 多次读取同一数据时,结果不一致

3) 幻读:系统管理员A 将数据库中所有学生的成绩从具体分数改为ABCDE 等级,但是系统管理

员B 就在这个时候插入了一条具体分数的记录,当系统管理员A 改结束后发现还有一条记录没有改过

来,就好像发生了幻觉一样,这就叫幻读。

小结:不可重复读的和幻读很容易混淆,不可重复读侧重于修改,幻读侧重于新增或删除。

解决不可重复读的问题只需锁住满足条件的行,解决幻读需要锁表

三、MySQL 事务隔离级别

事务隔离级别 脏读 不可重复读 幻读

读未提交(read-uncommitted) 是 是 是

不可重复读(read-committed) 否 是 是

可重复读(repeatable-read) 否 否 是

串行化(serializable) 否 否 否

2.6 Hive

2.6.1 Hive概述

Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类 SQL 查询功能。本质是:将 HQL 转化成 MapReduce 程序

Hive特点:1)Hive 处理的数据存储在 HDFS 2)Hive 分析数据底层的实现是 MapReduce 3)执行程序运行在 Yarn 上

Hive组成架构:1.用户接口:Client2.元数据:Metastore3.Hadoop4.驱动器:Driver(解析器,编译器,优化器,执行器)

(1)解析器(SQL Parser):将 SQL 字符串转换成抽象语法树 AST,这一步一般都用

第三方工具库完成,比如 antlr;对 AST 进行语法分析,比如表是否存在、字段是否存

在、SQL 语义是否有误。

(2)编译器(Physical Plan):将 AST 编译生成逻辑执行计划。

(3)优化器(Query Optimizer):对逻辑执行计划进行优化。

(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于 Hive 来

说,就是 MR/Spark。

2.6.2 Hive和数据库的比较

Hive 和数据库除了拥有类似的查询语言,再无类似之处。

1)数据存储位置

Hive 存储在HDFS 。数据库将数据保存在块设备或者本地文件系统中。

2)数据更新

Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,

3)执行延迟

Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。

4)数据规模

Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。

2.6.3 Hive内部表和外部表区别

元数据、原始数据

1)删除数据时:

内部表:元数据、原始数据全删除

外部表:只删除元数据

2)内部表数据由 Hive 自身管理,外部表数据由 HDFS 管理;

3)内部表数据存储的位置是 hive.metastore.warehouse.dir(默认:

/user/hive/warehouse),外部表数据的存储位置由自己制定(如果没有 LOCATION,

Hive将在HDFS上的/user/hive/warehouse文件夹下以外部表的表名创建一个文件夹,并将属于这个表的数据存放在这里);外部表也可链接存储在hbase中。

2.6.4 4个by区别

1)Order By:全局排序,只有一个Reducer;

2)Sort By:分区内有序;

3)Distrbute By:类似MR中Partition,进行分区,结合sort by使用。

4)Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。

在生产环境中Order By用的比较少,容易导致OOM。

在生产环境中Sort By+ Distrbute By用的多。

2.6.5 系统函数

1)date_add、date_sub函数(加减日期)

2)next_day函数(周指标相关)

3)date_format函数(根据格式整理日期)

4)last_day函数(求当月最后一天日期)

5)collect_set函数

6)get_json_object解析json函数

7)NVL(表达式1,表达式2)

2.6.6 窗口函数

窗口函数指定了函数工作的数据窗口大小(当前行的上下多少行),这个数据窗口大小可能会随着行的变化而变化。

窗口函数和聚合函数区别?

窗口函数对于每个组返回多行,组内每一行对应返回一行值。

聚合函数对于每个组只返回一行。

RANK() 排序相同时会重复,总数不会变

DENSE_RANK() 排序相同时会重复,总数会减少

ROW_NUMBER() 会根据顺序计算

1) OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化

2) CURRENT ROW:当前行

3) n PRECEDING:往前 n 行数据

4) n FOLLOWING:往后 n 行数据

5) UNBOUNDED:起点,UNBOUNDED PRECEDING

表示从前面的起点, UNBOUNDED FOLLOWING 表

示到后面的终点

6) LAG(col,n):往前第 n 行数据

7) LEAD(col,n):往后第 n 行数据

8) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于

每一行,NTILE 返回此行所属的组的编号。注意:n 必须为 int 类型。

语法:

分析函数 over(partition by 列名 order by 列名 rows between 开始位置 and 结束位置)

常用分析函数

聚合类:avg()、sum()、max()、min()

排名类:

row_number() 按照值排序时产生一个自增编号,不会重复

rank() 按照值排序时产生一个自增编号,值相等时会重复,会产生空位

dense_rank() 按照值排序时产生一个自增编号,值相等时会重复,不会产生空位

其他类:

lag(列名,往前的行数,[行数为null时的默认值,不指定为null])

lead(列名,往后的行数,[行数为null时的默认值,不指定为null])

ntile(n) 把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,ntile返回此行所属的组的编号

注意点:

over()函数中的分区、排序、指定窗口范围可组合使用也可以不指定,根据不同的业务需求结合使用

over()函数中如果不指定分区,窗口大小是针对查询产生的所有数据,如果指定了分区,窗口大小是针对每个分区的数据

over()函数中的窗口范围说明:

current row:当前行

unbounded:起点,unbounded preceding 表示从前面的起点, unbounded following表示到后面的终点

n preceding :往前n行数据

n following:往后n行数据

2.6.7 自定义UDF、UDTF函数

1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?

(1)用UDF函数解析公共字段;用UDTF函数解析事件字段。

(2)自定义UDF:继承UDF类,重写evaluate方法

(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close

2)为什么要自定义UDF/UDTF?

因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。脱敏使用,比如身份证号隐藏

  1. 如何实现udf自定义函数?

继承 UDF 类。

重写 evaluate 方法。

将该 java 文件打包成 jar。

在 beeline(hive 的一种终端)中输入如下命令:

0: jdbc:hive2://localhost:10000> add jar /data/tommyyang/HiveUDF.jar;

0: jdbc:hive2://localhost:10000> create temporary function ip2loc as 'cn.tommyyang.IPToLocation';

0: jdbc:hive2://localhost:10000> select ip2loc("118.28.1.1");

0: jdbc:hive2://localhost:10000> drop temporary function ip2loc;

  1. UDF,UDAF,UDTF区别?

UDF:单行进入,单行输出

UDF 操作作用于单个数据行,并且产生一个数据行作为输出。大多数函数都属于这一类(比如数学函数和字符串函数)。

UDAF:多行进入,单行输出

UDAF 接受多个输入数据行,并产生一个输出数据行。像COUNT和MAX这样的函数就是聚集函数。

UDTF:单行输入,多行输出

UDTF 操作作用于单个数据行,并且产生多个数据行-------一个表作为输出。lateral view explore() 

2.6.8 Union与Union all区别

1)union会将联合的结果集去重,效率较union all差

2)union all不会对结果集去重,所以效率高

2.6.9 Hive有哪些计算引擎,区别?

Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘DAG有向无环图。兼顾了可靠性和效率。一般处理天指标。

Tez引擎:完全基于内存。注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。

2.6.10 Hive索引吗?

Hive 支持索引,但是 Hive 的索引与关系型数据库中的索引并不相同,比如,Hive

不支持主键或者外键。

Hive 索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少

MapReduce 任务中需要读取的数据块的数量。

Hive 索引的机制如下:

hive 在指定列上建立索引,会产生一张索引表(Hive 的一张物理表),里面的字

段包括,索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量;

v0.8 后引入 bitmap 索引处理器,这个处理器适用于排重后,值较少的列(例如,

某字段的取值只可能是几个枚举值)

因为索引是用空间换时间,索引列的取值过多会导致建立 bitmap 索引表过大。

但是,很少遇到 hive 用索引的。说明还是有缺陷 or 不合适的地方的。

2.6.11 运维如何对hive进行调度

1. 将 hive 的 sql 定义在脚本当中

2. 使用 azkaban 或者 oozie 进行任务的调度

3. 监控任务调度页面

2.6.12 使用hive解析过JSON串吗?

l hive 处理 json 数据总体来说有两个方向的路走

1. 将 json 以字符串的方式整个入 Hive 表,然后通过使用 UDF 函数解析已经导入

到 hive 中的数据,比如使用 LATERAL VIEW json_tuple 的方法,获取所需要的列名。

2. 在导入之前将 json 拆成各个字段,导入 Hive 表的数据是已经解析过得。这将

需要使用第三方的SerDe。

2.6.13 sort by 和 order by,group by, distribute by区别?

order by 会对输入做全局排序,因此只有一个 reducer(多个 reducer 无法保证全

局有序)只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间。

sort by 不是全局排序,其在数据进入 reducer 前完成排序.

因此,如果用 sort by 进行排序,并且设置 mapred.reduce.tasks>1, 则 sort by 只

保证每个 reducer 的输出有序,不保证全局有序。

distribute by在排序中会将指定key相同的值,放在一个reduce中,通常结合sort by使用,distribute by必须在sort by之前,

cluster by:只能使用默认升序排序,不能通过设置ASC和DESC参数来改变排序方式;cluster by等价于distribute by + sort by

group by:把相同的key放到同一个reduce,但是后面必须是聚合操作

distribute by和group by区别:都是将key值相同的数据放到同一个reduce里面,但是group by后面必须跟聚合操作

order by和sort by区别:order by是全局排序,而sort by是局部排序,当sort by中reduce为1时,等价于order by

2.6.14 hive分区和分桶

区别:分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理

的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围

划分。

分桶是将数据集分解成更容易管理的若干部分的另一个技术。

分区针对的是数据的存储路径;分桶针对的是数据文件。

分区是hive存放数据的一种方式。将列值作为目录来存放数据,就是一个分区。这样查询时使用分区列进行过滤,只需根据列值直接扫描对应目录下的数据,不扫描其他不关心的分区,快速定位,提高查询效率。分动态和静态分区两种:

静态分区:若分区的值是确定的,那么称为静态分区。新增分区或者是加载分区数据时,已经指定分区名

(1)、尽量不要用动态分区,因为动态分区的时候,将会为每一个分区分配reducer数量,当分区数量多的时候,reducer数量将会增加,对服务器是一种灾难。

(2)、动态分区和静态分区的区别,静态分区不管有没有数据都将会创建该分区,动态分区是有结果集将创建,否则不创建。

(3)、hive动态分区的严格模式和hive提供的hive.mapred.mode的严格模式。

分桶表就是对指定列进行哈希(hash)计算,然后会根据 hash 值进行切分数据,将

具有不同 hash 值的数据写到每个桶对应的文件中。

2.6.15 hive数据倾斜处理?

从本质上来说,发生数据倾斜的原因有两种:一是任务中需要处理大量相同

的 key 的数据。二是任务读取不可分割的大文件

1. 空值引发的数据倾斜

实际业务中有些大量的 null 值或者一些无意义的数据参与到计算作业中,表中

有大量的 null 值,如果表之间进行 join 操作,就会有 shuffle 产生,这样所有

的 null 值都会被分配到一个 reduce 中,必然产生数据倾斜。

解决方案

第一种:可以直接不让 null 值参与 join 操作,即不让 null 值有 shuffle 阶段

第二种:因为 null 值参与 shuffle 时的 hash 结果是一样的,那么我们可以给

null 值随机赋值,这样它们的 hash 结果就不一样,就会进到不同的 reduce 中

2. 不同数据类型引发的数据倾斜

对于两个表join,表a中需要 join的字段key为 int,表b中key 字段既有string

类型也有 int 类型。当按照 key 进行两个表的 join 操作时,默认的 Hash 操作会

按 int 型的 id 来进行分配,这样所有的 string 类型都被分配成同一个 id,结

果就是所有的 string 类型的字段进入到一个 reduce 中,引发数据倾斜。

解决方案

如果 key 字段既有 string 类型也有 int 类型,默认的 hash 就都会按 int 类型来

分配,那我们直接把 int 类型都转为 string 就好了,这样 key 字段都为 string,

hash 时就按照 string 类型分配了

3. 不可拆分大文件引发的数据倾斜

当对文件使用 GZIP 压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压 缩文件很大,则处理该文件的 Map 需要花费的时间会远多于读取普通文件的 Map 时间,该 Map 任务会成为作业运行的瓶颈。这种情况也就是 Map 读取文件的数据倾斜。

解决方案:

这种数据倾斜问题没有什么好的解决方案,只能将使用 GZIP 压缩等不支持文件

分割的文件转为 bzip 和 zip 等支持文件分割的压缩方式。

所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,

在数据压缩的时候可以采用 bzip2 和 Zip 等支持文件分割的压缩算法

4. 表连接时引发的数据倾斜

两表进行普通的 repartition join 时,如果表连接的键存在倾斜,那么在

Shuffle 阶段必然会引起数据倾斜。

解决方案

通常做法是将倾斜的数据存到分布式缓存中,分发到各个 Map 任务所在节点。

在 Map 阶段完成 join 操作,即 MapJoin,这避免了 Shuffle,从而避免了数据

倾斜。

2.6.16 hive中表有几类?

Hive中有四类表。

内部表:创建表的时候默认把该表的数据,使用默认的组织方式,来存储在默认的仓库路径

外部表:创建表的时候,数据已经在hdfs了,不是默认目录,需要在创建表的时候指定目录。适合多个team共同访问的数据

分区表:表中数据一定属于某个分区

分桶表:按照文件的形式来组织区分和默认的mapreduce的hashpartioner一模一样

2.6.17 hive有哪些数据类型及类型转换?

1.Hive中的数据类型分为两类:基本类型和复杂类型

2.基本类型包含:tinyint,smallint,int,bigint,float,double,boolean,string,timestamp,binary

3.复杂类型:array,map和struct

a. array:数组类型,对应了Java中的集合或者数组。

类型转换:

Hive 的原子数据类型是可以进行隐式转换的,类似于 Java 的类型转换,例如某表达式

使用 INT 类型,TINYINT 会自动转换为 INT 类型,但是 Hive 不会进行反向转化,例如,某表达式使用 TINYINT 类型,INT 不会自动转换为 TINYINT 类型,它会返回错误,除非使用 CAST

操作。

1)隐式类型转换规则如下

(1)任何整数类型都可以隐式地转换为一个范围更广的类型,如 TINYINT 可以转换成

INT,INT 可以转换成 BIGINT。 (2)所有整数类型、FLOAT 和 STRING 类型都可以隐式地转换成 DOUBLE。 (3)TINYINT、SMALLINT、INT 都可以转换为 FLOAT。 (4)BOOLEAN 类型不可以转换为任何其它的类型。

2)可以使用 CAST 操作显示进行数据强制类型转换

例如 CAST(‘1’ AS INT)将把字符串’1’ 转换成整数 1;如果强制类型转换失败,如执行

CAST(‘X’ AS INT),表达式返回空值 NULL。

2.6.18 hive 中drop、truncate和delete区别

drop:drop table 表名

删除内容和定义,释放空间。(表结构和数据一同删除)

truncate (清空表中的数据):truncate table 表名

删除内容,释放空间,但不删除定义。(表结构还在,数据删除)

delete:delete from 表名 (where 列名 = 值)

删除内容,不删除定义,也不释放空间。

三者的执行速度,一般来说:drop > truncate > delete

区别:

1、truncate与drop是DDL语句,执行后无法回滚;delete是DML语句,可回滚

2、truncate只能作用于表;delete,drop可作用于表、视图等

3、truncate会清空表中的所有行,但表结构及其约束、索引等保持不变;drop会删除表的结构及其所依赖的约束、索引等

4、truncate会重置表的自增值;delete不会

5、truncate不会激活与表有关的删除触发器;delete可以

6、truncate后会使表和索引所占用的空间会恢复到初始大小;delete操作不会减少表或索引所占用的空间,drop语句将表所占用的空间全释放掉

2.6.19 hive表的连接方式

内连接:只有进行连接的两个表中都存在与连接条件相匹配的数据才会被保留下来。

左外连接:JOIN操作符左边表中符合WHERE子句的所有记录将会被返回。

右外连接:JOIN操作符右边表中符合WHERE子句的所有记录将会被返回。

左外连接左的独有:

右外连接右的独有:

全外连接:将会返回所有表中符合WHERE语句条件的所有记录。如果任一表的指定字段没有符合条件的值的话,那么就使用NULL 值替代。

左外独有+右外独有:

2.6.20 hive中count(*)count(1)和count(字段区别)

count(*)和count(1):对表中行数进行统计计算,包含null值。

count(某字段):对表中该字段的行数进行统计,不包含null值。如果出现空字符串,同样会进行统计。

1、count(*)、count(1):

  count(*)对行的数目进行计算,包含NULL,count(1)这个用法和count(*)的结果是一样的。

  如果表没有主键,那么count(1)比count(*)快。表有主键,count(*)会自动优化到主键列上。

  如果表只有一个字段,count(*)最快。

  count(1)跟count(主键)一样,只扫描主键。count(*)跟count(非主键)一样,扫描整个表。明显前者更快一些。

  count(1)和count(*)基本没有差别,但在优化的时候尽量使用count(1)。

2、count(1)、count(列名):

(1) count(1) 会统计表中的所有的记录数,包含字段为null 的记录。

(2) count(字段) 会统计该字段在表中出现的次数,忽略字段为null 的情况。即不统计字段为null 的记录。

2.6.21 hive中like_和like%区别

Like_和like%都表示模糊查询,like%表示任意个字符,Like_只表示一个字符

2.6.22 hive性能优化

1MapJoin 把小表全部加载到内存在map端进行join,避免reducer处理。

2)行列过滤

列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。

行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。

3)列式存储

4)采用分区技术和分桶技术

5)合理设置Map

6)合理设置Reduce

7)在不影响最终业务逻辑得情况下,可以开启map端combiner

8)小文件解决方案

(1)在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。

2merge 输出合并小文件

3)开启JVM重用

1. SQL 语句优化

2. 数据格式优化

Hive 提供了多种数据存储组织格式,不同格式对程序的运行效率也会有极大的

影响.多使用默认的列式存储Parquet格式和ORC格式(已经成为主流选择之一)

3. 小文件过多优化

4. 并行执行优化

通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行

  1. JVM 优化
  2. 计算分析引擎可以选择tez引擎或者spark引擎

2.6.23 sql性能优化以及in和exist区别?

1. 避免使用select *

2. 用union all代替union

3 小表驱动大表

SQL中in与exists都可以用来查找表中,某个数据是否满足存在的条件。它们的功能都是一样的,但是合理的使用in和exists,会让SQL的查找速度快上很多。这里先上结论:当主表数据较大,副表数据较少时,使用in效率较高;当主表数据较少,副表数据较大时,使用exists效率高。

  • in 适用于左边大表,右边小表。
  • exists 适用于左边小表,右边大表。

不管是用in,还是exists关键字,其核心思想都是用小表驱动大表。

4 批量操作

提供一个批量插入数据的方法。

insert into order(id,code,user_id)

values(123,'001',100),(124,'002',100),(125,'003',101);

这样只需要远程请求一次数据库,sql性能会得到提升,数据量越多,提升越大。

但需要注意的是,不建议一次批量操作太多的数据,如果数据太多数据库响应也会很慢。批量操作需要把握一个度,建议每批数据尽量控制在500以内。如果数据多于500,则分多批次处理。

5 多用limit

有时候,我们需要查询某些数据中的第一条,比如:查询某个用户下的第一个订单,想看看他第一次的首单时间。常规做法效率非常不高,需要先查询出所有的数据,有点浪费资源。

使用limit 1,只返回该用户下单时间最小的那一条数据即可。

7 增量查询

有时候,我们需要通过远程接口查询数据,然后同步到另外一个数据库。如果直接获取所有的数据,然后同步过去。如果数据很多的话,查询性能会非常差。

正解:按id和时间升序,每次只同步一批数据,这一批数据只有100条记录。每次同步完成之后,保存这100条数据中最大的id和时间,给同步下一批数据的时候用。

select * from user where id>#{lastId} and create_time >= #{lastCreateTime}

limit 100;

8 高效的分页

有时候,列表页在查询数据时,为了避免一次性返回过多的数据影响接口性能,我们一般会对查询接口做分页处理。

9 用连接查询代替子查询

子查询语句可以通过in关键字实现,一个查询语句的条件落在另一个select语句的查询结果中。程序先运行在嵌套在最内层的语句,再运行外层的语句。但缺点是mysql执行子查询时,需要创建临时表,查询完毕后,需要再删除这些临时表,有一些额外的性能消耗。

select o.* from order o

inner join user u on o.user_id = u.id

where u.status=1

10 join的表不宜过多

根据阿里巴巴开发者手册规定,join表的数量不应该超过3个。

12 控制索引的数量

索引能够显著的提升查询sql的性能,但索引数量并非越多越好。

因为表中新增数据时,需要同时为它创建索引,而索引是需要额外的存储空间的,而且还会有一定的性能消耗。

阿里巴巴的开发者手册中规定,单表的索引数量应该尽量控制在5个以内,并且单个索引中的字段数不超过5个。

2.7 Hbase

2.7.1 Hbase概述

HBase 是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用 HBASE 技

术可在廉价 PC Server 上搭建起大规模结构化存储集群。

Hbase特点:1)海量存储2)列族存储3)极易扩展4)高并发

Hbase 的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩

展,一个是基于存储的扩展(HDFS)。

HBase 架构

1)Client

Client 包含了访问 Hbase 的接口,另外 Client 还维护了对应的 cache 来加速 Hbase 的访

问,比如 cache 的.META.元数据的信息。

2)Zookeeper

HBase 通过 Zookeeper 来做 master 的高可用、RegionServer 的监控、元数据的入口以及

集群配置的维护等工作。

3)Hmaster

master 节点的主要职责如下: 为 RegionServer 分配 Region 维护整个集群的负载均衡 维护集群的元数据信息

4)HregionServer

HregionServer 直接对接用户的读写请求,是真正的“干活”的节点。它的功能概括如下:

管理 master 为其分配的 Region

处理来自客户端的读写请求

负责和底层 HDFS 的交互,存储数据到 HDFS

负责 Region 变大以后的拆分

负责 Storefile 的合并工作

5)HDFS

HDFS 为 Hbase 提供最终的底层数据存储服务,同时为 HBase 提供高可用支持

2.7.2 Hbase与hadoop的关系

HDFS

· 为分布式存储提供文件系统

· 针对存储大尺寸的文件进行优化,不需要对 HDFS 上的文件进行随机读写

· 直接使用文件

· 数据模型不灵活

· 使用文件系统和处理框架

· 优化一次写入,多次读取的方式

1. 一次性写入,多次读取。

2. 保证数据的一致性。

3. 主要是可以部署在许多廉价机器中,通过多副本提高可靠性,提供了容错

和恢复机制。

HBase

· 提供表状的面向列的数据存储

· 针对表状数据的随机读写进行优化

· 使用 key-value 操作数据

· 提供灵活的数据模型

· 使用表状存储,支持 MapReduce,依赖 HDFS

· 优化了多次读,以及多次写

1. 瞬间写入量很大,数据库不好支撑或需要很高成本支撑的场景。

2. 数据需要长久保存,且量会持久增长到比较大的场景。

3. HBase 不适用与有 join,多级索引,表关系复杂的数据模型。

4. 大数据量(100s TB 级数据)且有快速随机访问的需求。如:淘宝的交易

历史记录。数据量巨大无容置疑,面向普通用户的请求必然要即时响应。

5. 业务场景简单,不需要关系数据库中很多特性(例如交叉列、交叉表,事

务,连接等等)。

2.7.3 Hbase读流程

1) Client 先访问 zookeeper,从 meta 表读取 region 的位置,然后读取 meta 表中的数据。meta

中又存储了用户表的 region 信息;

2) 根据 namespace、表名和 rowkey 在 meta 表中找到对应的 region 信息;

3) 找到这个 region 对应的 regionserver;

4) 查找对应的 region;

5) 先从 MemStore 找数据,如果没有,再到 BlockCache 里面读;

6) BlockCache 还没有,再到 StoreFile 上读(为了读取的效率);

7) 如果是从 StoreFile 里面读取的数据,不是直接返回给客户端,而是先写入 BlockCache,再

返回给客户端。

2.7.4 Hbase写流程

1) Client 向 HregionServer 发送写请求;

2) HRegionServer 将数据写到 HLog(

write ahead log)。为了数据的持久化和恢复;

3) HRegionServer 将数据写到内存(

MemStore);

4) 反馈 Client 写成功。

2.7.5 Hbase数据flush过程

1) 当 MemStore 数据达到阈值(默认是 128M,老版本是 64M),将数据刷到硬盘,将内存中的

数据删除,同时删除 HLog 中的历史数据;

2) 并将数据存储到 HDFS 中;

3) 在 HLog 中做标记点。

2.7.6 Hbase中rowkey原则

1. rowkey 长度原则

rowkey 是一个二进制码流,可以是任意字符串,最大长度 64kb,实际应用中一

般为 10-100bytes,以 byte[]形式保存,一般设计成定长。

2. rowkey 散列原则

如果 rowkey 按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将

rowkey 的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高

数据均衡分布在每个 RegionServer,以实现负载均衡的几率。

3. rowkey 唯一原则

必须在设计上保证其唯一性,rowkey 是按照字典顺序排序存储的,因此,设计

rowkey 的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,

将最近可能会被访问的数据放到一块

RowKey 如何设计

1) 生成随机数、hash、散列值

2) 字符串反转

2.7.7. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些

热点现象

某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region

所在的 RegionServer 处理请求量骤增,负载量明显偏大,而其他的 RgionServer

明显空闲

热点现象解决办法

加盐:在 rowkey 的前面增加随机数,使得它和之前的 rowkey 的开头不同

哈希:哈希可以使负载分散到整个集群,但是读却是可以预测的

反转:第三种防止热点的方法时反转固定长度或者数字格式的 rowkey

2.7.8. HBase 中 compact 用途是什么,什么时候触发,分为哪两种?

在 hbase 中每当有 memstore 数据 flush 到磁盘之后,就形成一个 storefile,

当 storeFile 的数量达到一定程度后,就需要将 storefile 文件来进行 compaction

操作。

Compact 的作用

1. 合并文件

2. 清除过期,多余版本的数据

3. 提高读写数据的效率

HBase 中实现了两种 compaction 的方式:minor and major。 这两种

compaction 方式的区别是:

1. Minor 操作只用来做部分文件的合并操作以及包括 minVersion=0 并且

设置 ttl 的过期版本清理,不做任何删除数据、多版本数据的清理工作。

2. Major 操作是对 Region 下的 HStore 下的所有 StoreFile 执行合并操

作,最终的结果是整理合并出一个文件

2.8 Spark

2.8.1 Spark概述与运行流程

Spark是一种基于内存的快速、通用、可扩展的大数据分析引擎。

Spark Core:实现了 Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。包含了对弹性分布式数据集(Resilient DistributedDataSet,简称 RDD).方法Context

Spark SQL:是 Spark 用来操作结构化数据的程序包.包含了对弹性分布式数据集DataFreame和DataSet.方法SparkSession

Spark Streaming:是 Spark 提供的对实时数据进行流式计算的组件。包含了对弹性分布式数据集Dstream.方法StreamingContext

Spark MLlib:提供常见的机器学习(ML)功能的程序库。

SparkGragh:提供图计算。

1)快:与MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以

上。

2)易用:Spark支持Java、Python和Scala的API

3)通用:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理 (Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。

4)兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和 Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。

Spark 运行流程

具体运行流程如下:

1. SparkContext 向资源管理器注册并向资源管理器申请运行 Executor

2. 资源管理器分配 Executor,然后资源管理器启动 Executor

3. Executor 发送心跳至资源管理器

4. SparkContext 构建 DAG 有向无环图

5. 将 DAG 分解成 Stage(

TaskSet)

6. 把 Stage 发送给 TaskScheduler

7. Executor 向 SparkContext 申请 Task

8. TaskScheduler 将 Task 发送给 Executor 运行

9. 同时 SparkContext 将应用程序代码发放给 Executor

10.Task 在 Executor 上运行,运行完毕释放所有资源

2.8.2 Spark有哪几种部署方式?请分别简要论述

1)Local:运行在一台机器上,通常是练手或者测试环境。

2)Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。

3)Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

2.8.3 Spark提交任务参数

Spark一般使用shell脚本提交任务

1)在提交任务时的几个重要参数

executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个

num-executors —— 启动executors的数量,默认为2

executor-memory —— executor内存大小,默认1G

driver-cores —— driver使用内核数,默认为1

driver-memory —— driver内存大小,默认512M

2.8.3 如何理解Spark 中的血统概念(RDD)

RDD 在Lineage 依赖方面分为两种Narrow Dependencies 与Wide Dependencies 用来解决数据容错时的高效性以及划分任务时候起到重要作用。

2.8.4 简述 Spark的宽窄依赖,以及 的宽窄依赖,以及 Spark如何划分 stage,每个 ,每个 stage又根据什么决定 又根据什么决定 task个数 ?

Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。

窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。

宽依赖指的是多个子 RDD 的 Partition 会依赖同一个父 RDD 的 Partition,会引起shuffle;

总结:宽依赖我们形象的比喻为超生

窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;

宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。

1. 对于窄依赖:

窄依赖的多个分区可以并行计算;窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

2. 对于宽依赖:

划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

2.8.5 请列举 Spark的 transformation算子

2.3.1.1 map(func)案例

1. 作用:返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成

2.3.1.2 mapPartitions(func) 案例

1. 作用:类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD

上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]。假设有 N 个元素,有 M 个分

区,那么 map 的函数的将被调用 N 次,而 mapPartitions 被调用 M 次,一个函数一次处理所有

分区

2.3.1.3 mapPartitionsWithIndex(func) 案例

1. 作用:类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型

为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U];

2.3.1.4 flatMap(func) 案例

1. 作用:类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应

该返回一个序列,而不是单一元素)

2.3.1.6 glom 案例

1. 作用:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]

2.3.1.7 groupBy(func)案例

1. 作用:分组,按照传入函数的返回值进行分组。将相同的 key 对应的值放入一个迭代器

2.3.1.8 filter(func) 案例

1. 作用:过滤。返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输

入元素组成。

2.3.1.9 sample(withReplacement, fraction, seed) 案例

1. 作用:以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽

出的数据是否放回,true 为有放回的抽样,false 为无放回的抽样,seed 用于指定随机数生

成器种子。

2.3.1.10 distinct([numTasks])) 案例

1. 作用:对源 RDD 进行去重后返回一个新的 RDD。默认情况下,只有 8 个并行任务来操

作,但是可以传入一个可选的 numTasks 参数改变它。

2.3.1.11 coalesce(numPartitions) 案例

1. 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

2.3.1.12 repartition(numPartitions) 案例

1. 作用:根据分区数,重新通过网络随机洗牌所有数据。

2.3.1.14 sortBy(func,[ascending], [numTasks]) 案例

1. 作用;使用 func 先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

2.3.1.15 pipe(command, [envVars]) 案例

1. 作用:管道,针对每个分区,都执行一个 shell 脚本,返回输出的 RDD。

2.3.2.1 union(otherDataset) 案例

1. 作用:对源 RDD 和参数 RDD

2.8.6 请列举 Spark的active算子

2.4.1 reduce(func)案例

1. 作用:通过 func 函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数 据。

2.4.2 collect()案例

1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。

2.4.3 count()案例

1. 作用:返回 RDD 中元素的个数

2.4.4 first()案例

1. 作用:返回 RDD 中的第一个元素

2.4.5 take(n)案例

1. 作用:返回一个由 RDD 的前 n 个元素组成的数组

2.4.6 takeOrdered(n)案例

1. 作用:返回该 RDD 排序后的前 n 个元素组成的数组

2.4.7 aggregate 案例

1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

2. 作用:aggregate 函数将每个分区里面的元素通过 seqOp 和初始值进行聚合,然后用

combine 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作。这个函数最终返回

的类型不需要和 RDD 中元素类型一致

2.4.8 fold(num)(func)案例

1. 作用:折叠操作,aggregate 的简化操作,seqop 和 combop 一样。

2.4.9 saveAsTextFile(path)

作用:将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,

对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本

2.4.12 countByKey()案例

1. 作用:针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数

2.4.13 foreach(func)案例

1. 作用:在数据集的每一个元素上,运行函数 func 进行更新。

2.8.7 请列举 Spark的引起shuffer的算子

spark中会导致shuffle操作的有以下⼏种算⼦、

1、repartition类的操作:⽐如repartition、repartitionAndSortWithinPartitions、coalesce等

2、byKey类的操作:⽐如reduceByKey、groupByKey、sortByKey等

3、join类的操作:⽐如join、cogroup等

重分区: ⼀般会shuffle,因为需要在整个集群中,对之前所有的分区的数据进⾏随机,均匀的打乱,然后把数据放⼊下游新的指定数量的分

区内

byKey类的操作:因为你要对⼀个key,进⾏聚合操作,那么肯定要保证集群中,所有节点上的,相同的key,⼀定是到同⼀个节点上进⾏

处理

join类的操作:两个rdd进⾏join,就必须将相同join

key的数据,shuffle到同⼀个节点上,然后进⾏相同key的两个rdd数据的笛卡尔乘积

2.8.8 spark中shuffer机制

Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort

的 Shuffle。

基于 Hash 的 Shuffle 机制的优缺点

优点

· 可以省略不必要的排序开销。

· 避免了排序所需的内存开销。

缺点

· 生产的文件过多,会对文件系统造成压力。

· 大量小文件的随机读写带来一定的磁盘开销。

· 数据块写入时所需的缓存空间也会随之增加,对内存造成压力。

SortShuffleManager 的运行机制主要分成三种:

1. 普通运行机制

2. bypass 运行机制,当 shuffle read task 的数量小于等于

spark.shuffle.sort.bypassMergeThreshold 参数的值时(默认为 200),

就会启用 bypass 机制;

3. Tungsten Sort 运行机制,开启此运行机制需设置配置项

spark.shuffle.manager=tungsten-sort。开启此项配置也不能保证就一定

采用此运行机制(后面会解释)。

基于 Sort 的 Shuffle 机制的优缺点

优点

· 小文件的数量大量减少,Mapper 端的内存占用变少;

· Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容

易达到性能瓶颈。

缺点

· 如果 Mapper 中 Task 的数量过大,依旧会产生很多小文件,此时在 Shuffle 传数据的过程中到 Reducer 端,Reducer 会需要同时大量地记 录进行反序列化,导致大量内存消耗和 GC 负担巨大,造成系统缓慢,甚至崩溃;

· 强制了在 Mapper 端必须要排序,即使数据本身并不需要排序;

· 它要基于记录本身进行排序,这就是 Sort-Based Shuffle 最致命的性能消耗。

2.8.9 spark和mapReduce中shuffer区别

1. 相同点:都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的ShuffleMapTask,也可能是 ResultTask)

2. 不同点:

· MapReduce 默认是排序的,spark 默认不排序,除非使用 sortByKey 算 子。

· MapReduce 可以划分成 split,map()、spill、merge、shuffle、sort、 reduce()等阶段,spark 没有明显的阶段划分,只有不同的 stage 和算子操作。

· MR 落盘,Spark 不落盘,spark 可以解决 mr 落盘导致效率低下的问题。

2.8.10 简述 Spark中共享变量(广播和累加器)的基本原理与用途

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。

共享变量出现的原因:

通常在向Spark 传递函数时,比如使用map() 函数或者用filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。

Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

2.8.11 如何使用Spark实现TopN的获取(描述思路或使用伪代码)

方法1:

(1)按照key对数据进行聚合(groupByKey)

(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。

方法2:

(1)取出所有的key

(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序

方法3:

(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区

(2)对每个分区运用spark的排序算子进行排序

2.8.11 spark如何保证宕机迅速恢复?

1. 适当增加 spark standby master

2. 编写 shell 脚本,定期检测 master 状态,出现宕机后对 master 进行重启操作

2.8.12 hadoop 和 spark 的相同点和不同点?

Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能

力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操

,所以适合高时延环境下批处理计算的应用;

Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分

成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、

reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算

的应用;

spark与hadoop最大的区别在于迭代式计算模型

2.8.12 RDD 持久化原理?

spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。

调用 cache()和 persist()方法即可。

如果需要从内存中清除缓存,可以使用 unpersist()方法。RDD 持久化是可以手动

选择不同的策略的。

2.8.13 checkpoint 检查点机制?

Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的

文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD

所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数

据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以使

spark streaming 阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供

恢复时使用。

2.8.14 RDD 机制理解吗?

RDD 是 spark 提供的核心抽象,全称为弹性分布式数据集。

RDD 在逻辑上是一个 hdfs 文件,在抽象上是一种元素集合,包含了数据。它是

被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD

中的数据可以被并行操作(分布式数据集)

所有算子都是基于 rdd 来执行的,不同的场景会有不同的 rdd 实现类,但是

都可以进行互相转换

2.8.15 SparkStreaming以及基本工作原理

Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞吐量、

容错的实时数据流的处理。

它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket,并且

能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后的数据

可以保存到文件系统、数据库等存储中。

Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆分

成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交给 spark

的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个

的 batch 组成的。

2.8.16 DStream以及基本工作原理

DStream 是 spark streaming 提供的一种高级抽象,代表了一个持续不断的数据流。

DStream 可以通过输入数据源来创建,比如 Kafka、flume 等,也可以通过其他

DStream 的高阶函数来创建,比如 map、reduce、join 和 window 等。

DStream 内部其实不断产生 RDD,每个 RDD 包含了一个时间段的数据。

Spark streaming 一定是有一个输入的 DStream 接收数据,按照时间划分成一个一

个的 batch,并转化为一个 RDD,RDD 的数据是分散在各个子节点的 partition 中。

2.8.17 spark 有哪些组件?

1. master:管理集群和节点,不参与计算。

2. worker:计算节点,进程本身不参与计算,和 master 汇报。

3. Driver:运行程序的 main 方法,创建 spark context 对象。

4. spark context:控制整个 application 的生命周期,包括 dagsheduler 和 task

scheduler 等组件。

5. client:用户提交程序的入口。

2.8.18 Spark 主备切换机制原理知道吗?

Master 实际上可以配置两个,

Spark 原生的 standalone 模式是支持 Master 主备切

换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active

Master。

Spark Master 主备切换可以基于两种机制,一种是基于文件系统的,一种是基于

ZooKeeper 的。

基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到

Standby Master 上;

而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。

2.8.19 spark 解决了 hadoop 的哪些问题?

1.

MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;

Spark:Spark 采用 RDD 计算模型,简单容易上手。

2.

MR:只提供 map 和 reduce 两个操作,表达能力欠缺;

Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、

reducebykey 等;

3.

MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,

这些 job 之间的管理以来需要开发者自己进行管理;

Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,

而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;

4.

MR:中间结果存放在 hdfs 中;

Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁

盘,而不是 hdfs;

5.

MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;

Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要

进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。

6.

MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;

Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。

2.8.20 数据倾斜的产生和解决办法?

数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition

上的计算需要耗费相当长的时间。

在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而

一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如

果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来

的 stage 无法执行,从而导致整个 job 执行变慢。

避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通

过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执

行。

如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:

groupByKey;reduceByKey;aggregaByKey;join;cogroup;

2.8.21 RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么

reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本

地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,

在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce

端能够更快的进行结果计算。

groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列

(Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,

造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。

所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速

度,还可以防止使用 groupByKey 造成的内存溢出问题。

2.8.22 Spark Streaming 优雅关闭

把spark.streaming.stopGracefullyOnShutdown参数设置成ture,Spark会在JVM关闭时正常关闭StreamingContext,而不是立马关闭 Kill 命令:yarn application -kill 后面跟applicationid

2.8.23 SparkStreaming有哪几种方式消费 Kafka中的数据,区别?

1. receiver 方式:将数据拉取到 executor 中做操作,若数据量大,内存

存储不下,可以通过 WAL,设置了本地存储,保证数据不丢失,然后使用

Kafka 高级 API 通过 zk 来维护偏移量,保证消费数据。receiver 消费

的数据偏移量是在 zk 获取的,此方式效率低,容易出现数据丢失

在实际生产环境中大都用Direct方式

2. 基于 Direct 方式使用 Kafka 底层 Api,其消费者直接连接 kafka 的分

区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每

个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,但是需要自己

维护偏移量,即用即取,不会给内存造成太大的压力,效率高。

3. receiver 与和 direct 的比较:

· 基于 receiver 的方式,是使用 Kafka 的高阶 API 来在 ZooKeeper 中 保存消费过的offset 的。这是消费 Kafka 数据的传统方式。这种方式 配合着 WAL 机制可以保证数据零丢失的高可靠性,但是却无法保证数据 被处理一次且仅一次,可能会处理两次。因为 Spark 和 ZooKeeper 之间可能是不同步的。

· 基于 direct 的方式,使用 Kafka 的低阶 API,Spark Streaming 自己 就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

·Receiver 方式是通过 zookeeper 来连接 kafka 队列,Direct 方式是直 接连接到 kafka 的节点上获取数据。

2.8.23 介绍一下 cogroup rdd 实现原理,你在什么场景下用过这个 rdd?

cogroup:对多个(2~4)RDD 中的 KV 元素,每个 RDD 中相同 key 中的元素分

别聚合成一个集合。

与 reduceByKey 不同的是:reduceByKey 针对一个 RDD 中相同的 key 进行合并。 而 cogroup 针对多个 RDD 中相同的 key 的元素进行合并。

场景:表关联查询或者处理重复的 key。

2.8.24 Spark 中的 OOM 问题?

1. map 类型的算子执行中内存溢出如 flatMap,mapPatitions

· 原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个map 中产生了大量的对象导致的针对这种问题。

解决方案:

· 增加堆内内存。

2. shuffle 后内存溢出如 join,reduceByKey,repartition。

3. driver 内存溢出

· 用户在 Dirver 端口生成大对象,比如创建了一个大的集合数据结构。解 决方案:将大对象转换成 Executor 端加载,比如调用 sc.textfile 或 者评估大对象占用的内存,增加 dirver 端的内存

2.8.25 Spark SQL 是如何将数据写到 Hive 表的?

方式一:是利用 Spark RDD 的 API 将数据写入 hdfs 形成 hdfs 文件,

之后再将 hdfs 文件和 hive 表做加载映射。

· 方式二:利用 Spark SQL 将获取的数据 RDD 转换成 DataFrame,再将

DataFrame 写成缓存表,最后利用 Spark SQL 直接插入 hive 表中。而

对于利用 Spark SQL 写 hive 表官方有两种常见的 API,第一种是利用

JavaBean 做映射,第二种是利用 StructType 创建 Schema 做映射。

2.8.26 通常来说,Spark 与 MapReduce 相比,Spark 运行效率更高。请 说明效率更高来源于 Spark 内置的哪些机制?

1. 基于内存计算,减少低效的磁盘交互;

2. 高效的调度算法,基于 DAG;

3. 容错机制 Linage。

2.8.27 Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?

不会的。

因为程序在运行之前,已经申请过资源了,driver 和 Executors 通讯,不需要

和 master 进行通讯的。

备用节点会不断复制同步活跃节点的信息,其他节点也会上传相应的信息。主节点挂了,备用节点可以接管原来主节点的工作,因为该有的数据和信息备用节点都有。

2.8.28 Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?

可以这样说:

· 前期经过技术调研,查看官网相关资料,发现 sparkStreaming 整合 flume 有 2 种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这 2 种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。

· 推模式:Flume 将数据 Push 推给 Spark Streaming

· 拉模式:Spark Streaming 从 flume 中 Poll 拉取数据

2.8.29 RDD 有哪些缺陷?

1. 不支持细粒度的写和更新操作,Spark 写数据是粗粒度的,所谓粗粒度,

就是批量写入数据,目的是为了提高效率。但是 Spark 读数据是细粒度的,

也就是说可以一条条的读。

2. 不支持增量迭代计算,如果对 Flink 熟悉,可以说下 Flink 支持增量迭代

计算。

2.8.30 spark优化

1.常规性能调优一:最优资源配置

在资源允许的情况下,增加 Executor 的个数可以提高执行 task 的并行度

在资源允许的情况下 , 增加每个Executor 的 Cpu core 个数,可以提高执行task 的并行度。

在资源允许的情况下,增加每个 Executor 的内存量以后,对性能的提升有三点:

可以缓存更多的数据,可以为 shuffle 操作提供更多内存,可以为 task 的执行提供更多内存

2.RDD优化

在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算.

RDD 持久化 在 Spark 中,当多次对同一个 RDD 执行算子操作时,每一次都会对这个 RDD

以之前的父 RDD 重新计算一次,这种情况是必须要避免的,对同一个 RDD 的重复

计算是对资源的极大浪费,因此,必须对多次使用的 RDD 进行持久化.

3.广播大变量

广播变量在每个 Executor 保存一个副本,此 Executor 的所有 task 共用此广播变量,这让变量产生的副本数量大大减少。

4.算子调优mapPartition

如果是 mapPartition 算子,由于一个 task 处理一个 RDD 的 partition,那么一个task 只会执行一次 function,function 一次接收所有的 partition 数据,效率比较高。

但是如果使用 mapPartitions 算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会 OOM,即内存溢出。

5.算子调优reduceBykey本地聚合

reduceByKey 相较于普通的 shuffle 操作一个显著的特点就是会进行 map 端的本地聚合,map 端会先对本地的数据进行 combine 操作,然后将数据写入给下个 stage 的每个 task 创建的文件中

6.算子调优filter 与 coalesce 的配合使用

我们可以在 filter 操作之后,使用 coalesce 算子针对每个 partition 的数据量各不相同的情况,压缩 partition 的数量,而且让每个 partition 的数据量尽量均匀紧凑,以便于后面的 task 进行计算操作,在某种程度上能够在一定程度上提升性能。

  1. Shuffer调优

1.调节 map 端缓冲区大小可以避免频繁的磁盘IO 操作,进而提升 Spark 任务的整体性能

2.调节 reduce 端拉取数据缓冲区大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能

  1. JVM调优
  2. 降低 cache 操作的内存占比,让 task 执行算子函数式,有更多的内存可以使用

8.数据倾斜

2.9.1 Flink概述

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有

状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意

规模来执行计算。

Flink 流处理特性

1. 支持高吞吐、低延迟、高性能的流处理

2. 支持带有事件时间的窗口(

Window)操作

3. 支持有状态计算的 Exactly-once 语义

4. 支持高度灵活的窗口(

Window)操作,支持基于 time、count、session,

以及 data-driven 的窗口操作

5. 支持具有 Backpressure 功能的持续流模型

6. 支持基于轻量级分布式快照(

Snapshot)实现的容错

7. 一个运行时同时支持 Batch on Streaming 处理和 Streaming 处理

8. Flink 在 JVM 内部实现了自己的内存管理

9. 支持迭代计算

10.支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结

果有必要进行缓存

Flink 之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、

Time、Window

Flink 分别提供了面向 流式处理的接口(DataStream API)和面向批处理的接口(DataSet API)

DataStream API 可以流畅地分析无限数据流,并且可以用 Java 或者 Scala 等来实现

2.9.2 Flink 集群有哪些角色?各自有什么作用

JobManager 处理器:

也称之为 Master,用于协调分布式执行,它们用来调度 task,协调检查点,协调

失败时恢复等。Flink 运行时至少存在一个 master 处理器,如果配置高可用模式

则会存在多个 master 处理器,它们其中有一个是 leader,而其他的都是 standby。

TaskManager 处理器:

也称之为 Worker,用于执行一个 dataflow 的 task(或者特殊的 subtask)、数据缓

冲和 data stream 的交换,Flink 运行时至少会存在一个 worker 处理器。

Clint 客户端:

Client 是 Flink 程序提交的客户端,当用户提交一个 Flink 程序时,会首先创建一

个 Client,该 Client 首先会对用户提交的 Flink 程序进行预处理,并提交到 Flink

集群中处理,所以 Client 需要从用户提交的 Flink 程序配置中获取 JobManager 的

地址,并建立到 JobManager 的连接,将 Flink Job 提交给 JobManager

2.9.3 介绍一下Flink的容错机制

Checkpoint 机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为某些原因(如,异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink 的 Checkpoint 机制原理来自“Chandy-Lamport algorithm”算法。

每个需要 Checkpoint 的应用在启动时,Flink 的 JobManager 为其创建一个CheckpointCoordinator(检查点协调器),CheckpointCoordinator 全权负责本应用的快照制作

1. CheckpointCoordinator(检查点协调器) 周期性的向该流应用的所有source 算子发送barrier(屏障)。

2. 当某个 source 算子收到一个 barrier 时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator 报告自己快照制作情况,同时向自身所有下游算子广播该 barrier,恢复数据处理。

3. 下游算子收到 barrier 之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向

CheckpointCoordinator 报告自身快照情况,同时向自身所有下游算子广

播该 barrier,恢复数据处理。

4. 每个算子按照步骤 3 不断制作快照并向下游广播,直到最后 barrier 传递

到 sink 算子,快照制作完成。

5. 当 CheckpointCoordinator 收到所有算子的报告之后,认为该周期的快照

制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为

本周期快照制作失败。

2.9.4 Flink 相比 Spark Streaming 有什么区别

1. 架构模型

Spark Streaming 在运行时的主要角色包括:

Master、Worker、Driver、Executor,

Flink 在运行时主要包含:Jobmanager、Taskmanager 和 Slot。

2. 任务调度

Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图 DAG,Spark

Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。

Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后

提交给 JobManager 进行处理,JobManager 会根据 JobGraph 生成

ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,

JobManager 根据 ExecutionGraph 对 Job 进行调度。

3. 时间机制

Spark Streaming 支持的时间机制有限,只支持处理时间。Flink 支持了流处理

程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持

watermark 机制来处理滞后数据。

4. 容错机制

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并

重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢

失,可能会重复处理,不能做到恰一次处理语义。

Flink 则使用两阶段提交协议来解决这个问题。

2.9.5 Flink 常用的算子有哪些

分两部分:

1. 数据读取,这是 Flink 流计算应用的起点,常用算子有:

· 从内存读:fromElements

· 从文件读:readTextFile

· Socket 接入 :socketTextStream

· 自定义读取:createInput

2. 处理数据的算子,常用的算子包括:Map(单输入单输出)、FlatMap(单

输入、多输出)、Filter(过滤)、KeyBy(分组)、Reduce(聚合)、

Window(窗口)、Connect(连接)、Split(分割)等。

2.9.6 如何处理生产环境中的数据倾斜问题

1. flink 数据倾斜的表现

任务节点频繁出现反压,增加并行度也不能解决问题;

部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存

被爆,任务失败重启。

2. 数据倾斜产生的原因

业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的

订单量远远超过其他地区;

技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生

数据热点。

3. 解决问题的思路

业务上要尽量避免热点 key 的设计,例如我们可以把北京、上海等热点城市分

成不同的区域,并进行单独处理;

技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;此外 Flink 还

提供了大量的功能可以避免数据倾斜。

2.9.7 Flink 中的 Time 有哪几种

· Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采 集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间 戳分配器访问事件时间戳。

· Ingestion Time:是数据进入 Flink 的时间。

· Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。

2.9.8 Flink 中 window 出现数据倾斜怎么解决

window 产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质

上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况

一般通过两种方式来解决:

· 在数据进入窗口前做预聚合

· 重新设计窗口聚合的 key

2.9.9 Flink有没重启策略?说说有哪几种?

Flink 实现了多种重启策略。

固定延迟重启策略(Fixed Delay Restart Strategy)

故障率重启策略(Failure Rate Restart Strategy)

没有重启策略(No Restart Strategy)

Fallback重启策略(Fallback Restart Strategy)

2.10 Sqoop

2.10.1 Sqoop概述

Sqoop 是一款开源的工具,主要用于在 Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres 等)中的数据导进到 Hadoop的 HDFS 中,也可以将 HDFS 的数据导进到关系型数据库中。

原理:将导入或导出命令翻译成 mapreduce 程序来实现。在翻译出的 mapreduce 中主要是对 inputformat 和 outputformat 进行定制。

2.10.2 Sqoop常用命令

1 import ImportTool 将数据导入到集群

2 export ExportTool 将集群数据导出

3 codegen CodeGenTool 获取数据库中某张表数据生成Java 并打包Jar

4 create-hive-table CreateHiveTableTool 创建 Hive 表

5 eval EvalSqlTool 查看 SQL 执行结果

6 import-all-tables ImportAllTablesTool 导入某个数据库下所有表到 HDFS 中

7 job JobTool 用来生成一个 sqoop的任务,生成后,该任务并不执行,除非使用命令执行该任务。

8 list-databases ListDatabasesTool 列出所有数据库名

9 list-tables ListTablesTool 列出某个数据库下所有表

10 merge MergeTool 将 HDFS 中不同目录下面的数据合在一起,并存放在指定的目录中

2.10.3 Sqoop导入导出Null存储一致性问题

Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。

2.10.4 Sqoop底层运行的任务是什么 底层运行的任务是什么

只有Map阶段,没有Reduce阶段的任务。默认是4个MapTask

2.10.5 Sqoop一天导 一天导 入多少数据

100万日活=》10万订单,1人10条,每天1g左右业务数据

Sqoop每天将1G的数据量导入到数仓。

2.10.6 Sqoop数据导出的时候一次执行多长间?

每天晚上 每天晚上 00:30开始执行, 开始执行, 开始执行, 开始执行, Sqoop任务 一般情况 一般情况 一般情况 40 -50分钟 的都有 。取决于数据量 取决于数据量 取决于数据量 取决于数据量 (11:11,6:18等活动在 等活动在 等活动在 1个小时左右) 个小时左右) 个小时左右) 。

2.10.7 Sqoop在导入数据的时候倾斜在导入数据的时候倾斜

sqoop 抽数的并行化主要涉及到两个参数:num-mappers:启动N个map来并行导入数据,默认4个;split-by:按照某一列来切分表的工作单元。

通过ROWNUM() 生成一个严格均匀分布的字段,然后指定为分割字段

2.10.8 Sqoop数据导出Parquet(项目中遇到的问题)

Ads层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式

(1)创建临时表,把Parquet中表数据导入到临时表,把临时表导出到目标表用于可视化

(2)Sqoop里面有参数,可以直接把Parquet转换为text

(3)ads层建表的时候就不要建Parquet表

第3章 数据仓库

3.1 概念

3.1.1 数据集市与数据仓库的概念

数据仓库是企业级的数据存储仓库,能为整个企业各个部门的运行提供决策支持手段

数据集市则是一种微型的数据仓库,它通常用更少的数据,更少的主题区域,以及更少的历史数据,因此是部门级的,一般只能为某个局部范围内的人员服务

3.1.2 数仓分层和如何设计数据仓库?

数仓初期

数仓搭建初期,由于公司数据量少,经验不足,数仓没有层级概念,过来的数据

直接进行解析,每次计算一个指标的时候,都需要进行ETL 操作,每次都需要

进行join,造成了大量的重复操作,效率十分低下,浪费大量人力。

数仓后期

数仓在搭建一段时间后,重复的计算操作困恼了我很久,后来我参考了阿里的离

线数仓架构,我们对数仓进行了重新的架构搭建,对数仓进行了分层规划。

主要分为:

1. ods 层: 数据缓冲层;

2. dwd 层: 基础数据层;

3. dws 层: 数据汇总层;

4. app 层: 应用层;

分四层的原因主要是为了隔离数据然后还能复用上⼀层计算出来的数据,另一方

面也是为了数据的备份;

3.1.3 关系型数据库范式理论

目的:采用范式,可以降低数据的冗余性。

为什么要降低数据冗余性?

(1)十几年前,磁盘很贵,为了减少磁盘存储。

(2)以前没有分布式系统,都是单机,只能增加磁盘,磁盘个数也是有限的

(3)一次修改,需要修改多个表,很难保证数据一致性

3)缺点

范式的缺点是获取数据时,需要通过Join拼接出最后的数据。

1NF:属性不可再分割(例如不能存在5台电脑的属性,坏处:表都没法用)

2NF:不能存在部分函数依赖(例如主键(学号+课名)-->成绩,姓名,但学号--》姓名,所以姓名部分依赖于主键(学号+课名),所以要去除,坏处:数据冗余)

3NF:不能存在传递函数依赖(学号--》宿舍种类--》价钱,坏处:数据冗余和增删异常)

函数依赖分为:完全函数依赖,部分函数依赖,传递函数依赖

3.1.4 数据模型

雪花模型、星型模型和星座模型,我们用的是星型模型

(在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。)

星型模型(一级维度表),雪花(多级维度),星座模型(星型模型+多个事实表)

3.1.5 SKU&SPU

SKU = Stock Keeping Unit(库存量基本单位)。现在已经被引申为产品统一编号的简称,每种产品均对应有唯一的SKU号。

SKU:一台银色、128G内存的、支持联通网络的iPhoneX

SPU(Standard Product Unit):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合。

SPU:iPhoneX

3.2 数仓建模

3.2.1 关系建模与维度建模

关系建模将复杂的数据抽象为两个概念——实体和关系,并使用规范化的方式表示出来。

关系模型严格遵循第三范式(3NF),数据冗余程度低,数据的一致性容易得到保证。由于数据分布于众多的表中,查询会相对复杂,在大数据的场景下,查询效率相对较低。

维度模型相对清晰、简洁。

维度模型以数据分析作为出发点,不遵循三范式,故数据存在一定的冗余。维度模型面向业务,将业务用事实表和维度表呈现出来。表结构简单,故查询简单,查询效率较高。

3.2.2 维度表和事实表

维度表:一般是对事实的描述信息。每一张维表对应现实世界中的一个对象或者概念。 例如:用户、商品、日期、地区等。

维表的特征:

  • 维表的范围很宽(具有多个属性、列比较多)
  • 跟事实表相比,行数相对较小:通常< 10万条
  • 内容相对固定:编码表

日期ID

day of week

day of year

季度

节假日

2020-01-01

2

1

1

元旦

2020-01-02

3

2

1

2020-01-03

4

3

1

2020-01-04

5

4

1

2020-01-05

6

5

1

事实表中的每行数据代表一个业务事件(下单、支付、退款、评价等)

“事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等),例如,2020年5月21日,我在京东花了250块钱买了一瓶海狗人参丸。

维度表:时间、用户、商品、商家。

事实表:250块钱、一瓶。

每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键,通常具有两个和两个以上的外键。

事实表的特征:

  • 非常的大
  • 内容相对的窄:列数较少(主要是外键id和度量值)
  • 经常发生变化,每天会新增加很多。

3.2.3 拉链表、全量表、增量表、流水表

全量表:每天的所有的最新状态的数据

增量表:每天的新增数据

拉链表:维护历史状态,以及最新状态数据拉链表处理的业务场景:主要处理缓慢变化维的业务场景。(用户表、订单表)通常是对帐户信息的历史变动进行处理保留的结果;用于统计业务相关情况

流水表:对于表中的每一个修改都会记录,可以用于反映实际记录的变更流水表:每天的交易形成的历史;用于统计账户及客户的情况

3.2.4 数仓建模流程

1 ODS层

1)HDFS用户行为数据

2)HDFS业务数据

3)针对HDFS上的用户行为数据和业务数据,我们如何规划处理?

(1)保持数据原貌不做任何修改,起到备份数据的作用。

(2)数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右)

(3)创建分区表,防止后续的全表扫描

2 DIM层和DWD层

DIM层DWD层需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。

维度建模一般按照以下四个步骤:

选择业务过程→声明粒度→确认维度→确认事实

(1)选择业务过程

在业务系统中,挑选我们感兴趣的业务线,比如下单业务,支付业务,退款业务,物流业务,一条业务线对应一张事实表。

(2)声明粒度

数据粒度指数据仓库的数据中保存数据的细化程度或综合程度的级别。

声明粒度意味着精确定义事实表中的一行数据表示什么,应该尽可能选择最小粒度,以此来应各种各样的需求。

典型的粒度声明如下:

订单事实表中一行数据表示的是一个订单中的一个商品项。

支付事实表中一行数据表示的是一个支付记录。

(3)确定维度

维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。

确定维度的原则是:后续需求中是否要分析相关维度的指标。例如,需要统计,什么时间下的订单多,哪个地区下的订单多,哪个用户下的订单多。需要确定的维度就包括:时间维度、地区维度、用户维度。

(4)确定事实

此处的“事实”一词,指的是业务中的度量值(次数、个数、件数、金额,可以进行累加),例如订单金额、下单次数等。

在DWD层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的明细层事实表。事实表可做适当的宽表化处理。

事实表和维度表的关联比较灵活,但是为了应对更复杂的业务需求,可以将能关联上的表尽量关联上。

至此,数据仓库的维度建模已经完毕,DWD层是以业务过程为驱动。

DWS层、DWT层和ADS层都是以需求为驱动,和维度建模已经没有关系了。

DWS和DWT都是建宽表,按照主题去建表。主题相当于观察问题的角度。对应着维度表。

3 DWS层与DWT层

DWS层和DWT层统称宽表层,这两层的设计思想大致相同,通过以下案例进行阐述。

1)问题引出:两个需求,统计每个省份订单的个数、统计每个省份订单的总金额

2)处理办法:都是将省份表和订单表进行join,group by省份,然后计算。同样数据被计算了两次,实际上类似的场景还会更多。

那怎么设计能避免重复计算呢?

针对上述场景,可以设计一张地区宽表,其主键为地区ID,字段包含为:下单次数、下单金额、支付次数、支付金额等。上述所有指标都统一进行计算,并将结果保存在该宽表中,这样就能有效避免数据的重复计算。

3)总结:

(1)需要建哪些宽表:以维度为基准。

(2)宽表里面的字段:是站在不同维度的角度去看事实表,重点关注事实表聚合后的度量值。

(3)DWS和DWT层的区别:DWS层存放的所有主题对象当天的汇总行为,例如每个地区当天的下单次数,下单金额等,DWT层存放的是所有主题对象的累积行为,例如每个地区最近7天(15天、30天、60天)的下单次数、下单金额等。

4 ADS层

对电商系统各大主题指标分别进行分析。

3.2.5 数仓环境之hive引擎

Hive引擎包括:默认MR、tez、spark

Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。

Spark on Hive : Hive只作为存储元数据,Spark负责SQL解析优化,语法是Spark SQL语法,Spark负责采用RDD执行。

3.2.6 即席查询数据仓库

即席查询数据仓库

3.2.7 数据仓库每天跑多张表,大概什么时候运行,运行多久?

基本一个项目建一个库,表格个数为初始的原始数据表格加上统计结果表格的总数。(一般70-100张表格)。

用户行为11张;业务数据27张表=》ods 38 =》dwd=>32张=》dws 6张宽表=>ads=》30张=》106张

每天0:30开始运行。=》sqoop 40-50分钟:1点20:=》5-6个小时运行完指标

所有离线数据报表控制在8小时之内。大数据实时处理部分控制在5分钟之内。(分钟级别、秒级别)如果是实时推荐系统,需要秒级响应

3.2.8 活动的话,数据量会增加多少? 怎么解决?

日活增加50%,GMV增加多少。(留转G复活)情人节,促销手纸。集群资源都留有预量。11.11,6.18,数据量过大,提前动态增加服务器。

3.2.9 数仓中使用的哪种文件存储格式 数仓中使用的哪种文件存储格式?

常用的包括:textFile,rcFile,ORC,Parquet,一般企业里使用ORC或者Parquet,因为是列式存储,且压缩比非常高,所以相比于textFile,查询速度快,占用硬盘空间少。

3.2.10 你感觉数仓建设中最重要的是什么

数仓建设中,最重要的是数据准确性,数据的真正价值在于数据驱动决策,通过 数据指导运营,在一个不准确的数据驱动下,得到的一定是错误的数据分析,影 响的是公司的业务发展决策,最终导致公司的策略调控失败。

第3章 代码

3.1 电商指标

基于mysql的电商行业数据分析

3.1.1 指标体系

粘性指标

传播性指标

销量指标

3.1.2 数据分析

1.数据处理

#转换字符串类型的日期

update trade1
set day=str_to_date(day,'%Y%m%d');

#提取月份

alter table trade1 add  month int;
update trade1
set month=month(day);

#提取年份

alter table trade1 add  year int;
update trade1
set year=year(day);

#基本了解数据

select count(*) from trade1;
select count(distinct auction_id) from trade1;
select count(distinct user_id) from trade1;

2.具体分析

#1.活跃用户:有购买行为的用户

#1.1 不同年份月活跃用户

select year,month,count(distinct  user_id) as num
from trade1
group by year,month;

#1.2 年活跃用户及差值

select *,last_num-num as 差值
from
(select year,count(distinct user_id) as num,
       lead(count(distinct user_id),1)over(order by year asc) as last_num
from trade1
group by year) as a  #窗口函数在group by后执行,因此窗口函数可以配合聚合函数

#2.购买频率

2.1 仅计算有两次及以上购买行为的人的购买频率

select *,avg(cha)over() as mean
from
(select *,datediff(date1,day) as cha
from
   (select user_id,day,
           lead(day,1)over(partition by user_id order by day asc) as date1
    from trade1) as a
) as b
where cha is not null

#3.复购率

#3.1四年的的复购率

select concat(round(count(user_id)*100/(select count(distinct user_id) from trade1),2),'%') as rate
from
(select user_id,count(distinct auction_id) as num
from trade1
group by user_id
having count(distinct auction_id)>=2) as a

#3.2每一年的复购率

select year,
       count(user_id)*100/(select count(distinct user_id) from trade1 where year=a.year) as rate
from
(select year,user_id,count(distinct auction_id) as num
from trade1
group by year,user_id
having count(distinct auction_id)>=2) as a
group by year

#4.留存率

#4.1每年的月留存率

#方法一

select
year,month,
count(distinct(case when cha=1 then user_id else null end))/count(distinct(case when cha=0 then user_id else null end))as rate
from
    (select t.year,a.user_id,month,(month-mm) as cha
     from trade1 t
     inner join
              (select year,user_id,min(month) as mm
               from trade1
               group by year,user_id) as a
     on a.user_id=t.user_id
     and a.year=t.year) as b
group by year,month

#方法二

select
year,month,
count(distinct user_id)/
(select count(distinct user_id) from trade1 where year=b.year and month=b.month) as rate
from
    (select t.year,a.user_id,month,(month-mm) as cha
     from trade1 t
     inner join
              (select year,user_id,min(month) as mm
               from trade1
               group by year,user_id) as a
     on a.user_id=t.user_id
     and a.year=t.year) as b
where b.cha=1
group by year,month

3.传播性分析

#1.用户画像

#1.1不同性别用户数量占比

select *,num/(select count(distinct user_id) from baby where gender in (1,0)) as rate
       from (
                select gender, count(distinct b.user_id) as num
                from trade1 t
                inner join baby b
                on t.user_id = b.user_id
                where gender in (1,0)
                group by gender
            ) as a

#1.2不同年龄用户占比

update baby
set birthday=str_to_date(birthday,'%Y%m%d')
alter table baby add age int;
select age,count(distinct user_id) as num,
       count(distinct user_id)/sum(count(distinct user_id))over() as rate
from
     (
select t.user_id,t.year-year(b.birthday) as age
from baby b inner join trade1 t on b.user_id = t.user_id
     ) as a
group by age

#2.不同性别的用户趋势及增长率

#2.1男性

select * ,concat(round((late_num-num)*100/num,2),'%') as rate
from (
      select year,
      gender,
      count(distinct t.user_id) as num,
      lead(count(distinct b.user_id)) over (order by year asc) as late_num
      from baby b
      join trade1 t on b.user_id = t.user_id
      where gender = 0
      group by t.year, gender
    ) as a

#2.2女性

select * ,concat(round((late_num-num)*100/num,2),'%') as rate
from (
      select year,
      gender,
      count(distinct t.user_id) as num,
      lead(count(distinct b.user_id)) over (order by year asc) as late_num
      from baby b
      join trade1 t on b.user_id = t.user_id
      where gender = 1
      group by t.year, gender
    ) as a

#3.每年的高价值用户数量

#高价值用户数量:购买数量在本年占前10%

 select year,count(distinct user_id) as num1,
        lead(count(distinct user_id)) over (order by year asc) as late_num1,
        lead(count(distinct user_id)) over (order by year asc)/count(distinct user_id)-1 as cha
from
(select year,user_id,
       sum(buy_mount) as num,
       cume_dist() over (partition by year order by sum(buy_mount) desc ) as rank1
from trade1 t
group by year, user_id) as a
where rank1<=0.1
group by year

#4.高价值用户的特征

#4.1选出高价值用户,即消费数量占比前10%

create view  high_value
as
select a.user_id,b.gender,b.birthday
from
baby b inner join
     (select user_id,
             cume_dist() over (order by sum(buy_mount) desc) as rate
         from trade1
         group by user_id
     )
as a
on b.user_id=a.user_id
where a.rate<=0.1

#4.2高价值用户的性别分布

select gender,count(distinct user_id) as num,
       count(distinct user_id)/sum(count(distinct user_id))over() as rate
from high_value
where gender in (1,0)
group by gender
  #4.3高价值用户的年龄分布
select age,count(distinct user_id) as num,
       count(distinct user_id)/sum(count(distinct user_id))over() as rate
from(
     select h.*,year-year(birthday) as age
     from high_value h inner join trade1 t on h.user_id = t.user_id
    ) as a
group by age
order by rate desc

#5.不同类型用户的购买数量占比

#5.1不同性别的购买数量占比

select gender,sum(buy_mount) as num,
       sum(buy_mount)/sum(sum(buy_mount))over() as rate
from trade1 inner join baby b on trade1.user_id = b.user_id
where gender in (1,0)
group by gender
order by rate desc

#5.2不同年龄的购买数量占比

select age,sum(buy_mount) as num,
       sum(buy_mount)/sum(sum(buy_mount))over() as rate
from
(select trade1.*,year-year(b.birthday) as age
from trade1 inner join baby b on trade1.user_id = b.user_id) as a
group by age
order by rate desc

4.销量

#1.销量趋势

select year,sum(buy_mount) as num,
       lead(year,1)over(order by year asc) as 下一年,
       lead(sum(buy_mount))over(order by year asc) as 下一年的销量,
       lead(sum(buy_mount))over(order by year asc)/sum(buy_mount)-1 as 增长率,
       sum(sum(buy_mount))over(order by year asc) as 累计销量
from trade1
group by year

#1.1不同性别用户销量趋势、占比以及增长率

#方法一:窗口函数

#男性

select year,gender,sum(buy_mount) as num,
       sum(buy_mount)/sum(sum(buy_mount))over() as rate,
       lead(sum(buy_mount))over(order by year asc) as late_num,
       lead(sum(buy_mount))over(order by year asc)/sum(buy_mount)-1 as growth
from trade1 t inner join baby b on t.user_id = b.user_id
where gender=0
group by year,gender

#女性

select year,gender,sum(buy_mount) as num,
       sum(buy_mount)/sum(sum(buy_mount))over() as rate,
       lead(sum(buy_mount))over(order by year asc) as late_num,
       lead(sum(buy_mount))over(order by year asc)/sum(buy_mount)-1 as growth
from trade1 t inner join baby b on t.user_id = b.user_id
where gender=1
group by year,gender

#方法二:用子查询的方法

 select a.*,
        (select sum(buy_mount) from trade1 t inner join baby b
        on t.user_id = b.user_id
        where gender=a.gender and year-a.year=1
        ) as late_num,
        (select sum(buy_mount)
        from trade1 t inner join baby b on t.user_id = b.user_id
        where gender=a.gender and year-a.year=1
        )/a.num-1 as growth
from
(select year,gender,sum(buy_mount) as num
from
trade1 t inner join baby b on t.user_id = b.user_id
where gender in (1,0)
group by year,gender) as a
order by year asc,gender asc

#1.2不同年龄的销量趋势以及增长率

select year,age,sum(buy_mount) as num,
        (select sum(buy_mount)from
                 (select trade1.*,year-year(birthday) as age
                  from trade1 inner join baby b on trade1.user_id = b.user_id
                 )as b
        where age=a.age and year-a.year=1) as late_num, #用子查询的方法计算同一年龄在次年的总销售量
       (select sum(buy_mount)from
                 (select trade1.*,year-year(birthday) as age
                  from trade1 inner join baby b on trade1.user_id = b.user_id
                 )as b
        where age=a.age and year-a.year=1)/sum(buy_mount)-1 as growth #同上,然后计算增长率
from
(select trade1.*,year-year(birthday) as age
from trade1 inner join baby b on trade1.user_id = b.user_id) as a
group by year,age

#2.订单数量

select count(distinct auction_id) as num from trade1

#2.1每一年的订单量以及增长率

select year,count(distinct auction_id) as num,
       count(distinct auction_id)/ sum(count(distinct auction_id))over() as 占比,
       lead(count(distinct auction_id))over(order by year asc) as late_num,
       lead(count(distinct auction_id))over(order by year asc)/count(distinct auction_id) as growth,
       round(cume_dist() over (order by count(distinct auction_id) desc),2) as 名次百分比,
       sum(count(distinct auction_id))over(order by year asc) as 累计和
from trade1
group by year

#3.每年每个季度的销量,增长率,排名

#计算日期属于哪个季度

#判断日期是一年中的第几周

select week(day) from trade1

#判断日期是一年中的第几个季度

select quarter(day) from trade1

#添加季度列

alter table trade1 add season int;
update trade1
set season=quarter(day);
 
select year,season,sum(buy_mount) as num,
       sum(sum(buy_mount))over(partition by year) as year_sum,
       sum(buy_mount)/ sum(sum(buy_mount))over(partition by year) as rate,
       rank()over(partition by year order by sum(buy_mount) desc) as rank1,
       sum(sum(buy_mount))over(partition by year order by season asc) as 累计和,
       lead(sum(buy_mount),1)over(partition by year order by season asc)/sum(buy_mount)-1 as growth
from trade1
group by year,season

#4.不同品类的销量

#4.1四年中不同品类的销量

select cat1,sum(buy_mount) as num
from trade1
group by cat1
order by num desc

#4.2每年不同品类的销量、占比、下一年该品类的销量、增长率

select year,cat1,sum(buy_mount) as num,
       sum(buy_mount)/sum(sum(buy_mount))over() as rate,
       (select sum(buy_mount) from trade1 t where t.cat1=trade1.cat1 and year-trade1.year=1) as late_num,
       (select sum(buy_mount) from trade1 t where t.cat1=trade1.cat1 and year-trade1.year=1)/sum(buy_mount)-1 as growth
from trade1
group by year,cat1

#4.3每年销量前三名的品类

select *
from
     (   select year,
                cat1,
                sum(buy_mount) as num,
                rank() over (partition by year order by sum(buy_mount) desc) as rank1
         from trade1
         group by year, cat1
     ) as a
where rank1<=3

总结

1.用户粘性较差,考虑两个原因

(1)产品并不能在较大程度上满足客户需求,不具有吸引力

(2)在营销上具有劣势,其他商家的营销手段更容易吸引客户

2.产品受众性别分布较为均衡

3.产品受众当中儿童年龄普遍较低,集中在0-3岁,产品可以该群体为重点进行迭代

3.2 手写代码

3.2.1 手写 Spark-WordCount

val conf: SparkConf = 
new SparkConf().setMaster("local[*]").setAppName("WordCount") 
val sc = new SparkContext(conf) 
sc.textFile("/input") 
.flatMap(_.split(" ")) 
.map((_, 1)) 
.reduceByKey(_ + _) 
.saveAsTextFile("/output") 
sc.stop() 

3.2.2 冒泡排序

/**
* 冒泡排序 时间复杂度 O(n^2) 空间复杂度 O(1)
*/
public class BubbleSort {
 public static void bubbleSort(int[] data) {
System.out.println("开始排序");
 int arrayLength = data.length;
 for (int i = 0; i < arrayLength - 1; i++) {
 boolean flag = false;
 for (int j = 0; j < arrayLength - 1 - i; j++) {
 if(data[j] > data[j + 1]){
 int temp = data[j + 1];
 data[j + 1] = data[j];
 data[j] = temp;
 flag = true;
 }
 }
 System.out.println(java.util.Arrays.toString(data));
 if (!flag)
 break;
 }
 }

 public static void main(String[] args) {
 int[] data = { 9, -16, 21, 23, -30, -49, 21, 30, 30 };
 
System.out.println(" 排 序 之 前 : \n" + 
java.util.Arrays.toString(data));
 bubbleSort(data);
 
System.out.println(" 排 序 之 后 : \n" + 
java.util.Arrays.toString(data));
 } }

3.2.3 sql语句连续7天登录的用户信息

-- 方法一

-- 第一步:用户登录日期去重

select distinct date(date) as 日期,id from tb_user;

-- 第二步:用row_number()计数

select *,row_number() over(PARTITION by id order by 日期) as cum from (select DISTINCT date(date) as 日期,id from tb_user) a;

-- 第三步:日期减去计数值得到结果

select *,date(日期)-cum as 结果 from (select *,row_number() over(PARTITION by id order by 日期) as cum from (select DISTINCT date(date) as 日期,id from tb_user)a)b;

-- 第四步:根据id和结果分组并计算总和,大于等于7的即为连续登录7天的用户

select id,count(*) from (select *,date(日期)-cum as 结果 from (select *,row_number() over(PARTITION by id order by 日期) as cum from (select DISTINCT date(date) as 日期,id from tb_user)a)b)c GROUP BY id,结果 having count(*)>=7;
select * from tb_user;

3.2.4 用一条 SQL 语句显示所有可能的比赛组合

a,b,c,d,四只球队,team表中只有一个 name 字段,用一条 SQL 语句显示所有可能的比赛组合

SELECT
	*
FROM
	team a, team b
WHERE
	a.name > b.name;

3.2.5 用Scala编写Spark程序实现TopN

package spark
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.rdd.RDD.rddToPairRDDFunctionsobject TopN {
  def main(args: Array[String]) {
    var N = 10 //这里指定N的值
    val conf = new SparkConf().setAppName(" TopN ")
      .setMaster("local")
    var sc = new SparkContext(conf)
    sc.setLogLevel("Warn")
    val file = sc.textFile("e:\TopN.txt")
    val rdd = file.flatMap(_.split(" ")).map(x => (x.toInt, null))
      .sortByKey(false).map(_._1).take(N)
      .foreach { println }
  }
}

3.2.6 找出所有科目成绩都大于某一学科平均成绩的学生

1)建表语句

create table score( 
    uid string, 
    subject_id string, 
    score int) 
row format delimited fields terminated by '\t'; 

2)求出每个学科平均成绩

select uid, score, 
avg(score) over(partition by subject_id) avg_score 
from score;t1 

3)根据是否大于平均成绩记录 flag,大于则记为 0 否则记为 1

select uid, if(score>avg_score,0,1) flag from t1;t2 

4)根据学生 id 进行分组统计 flag 的和,和为 0 则是所有学科都大于平均成绩

select uid from t2 group by uid having sum(flag)=0; 

5)最终 SQL

select uid from (select uid, if(score>avg_score,0,1) flag from (select uid, score, avg(score) over(partition by subject_id) avg_score from score)t1)t2 group by uid having sum(flag)=0; 

3.2.7 用一条 SQL 语句查询出每门课都大于 80 分的学生姓名

方法1

select distinct name from table where name not in (select distinct name from 
table where fenshu<=80) 

方法2

select name from table group by name having min(fenshu)>80

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐