(二)PySpark3:SparkSQL编程,2024年最新2024年阿里大数据开发岗面试必问
df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list。#例如,这句代码会报错:df.replace({“James”: “Jim”,1000: 100}).show()#从df中移除与df_sample相同的行,返回一个新的DataFrame。df.sort(df[“age”].desc()).show() #降序。df.sort(d
先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。




既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
正文
{“id”:4,“name”:“Jon”,“age”:29,“sal”:1200}
**读取parquet文件:**
df = spark.read.parquet(“data/users.parquet”)
df.show()
**读取csv文件:**
#方式1
df = spark.read.option(“header”,“true”)
.option(“inferSchema”,“true”)
.option(“delimiter”, “,”)
.csv(“test.csv”)
#方式2
df = spark.read.format(“com.databricks.spark.csv”)
.option(“header”, “true”)
.option(“inferSchema”, “true”)
.option(“delimiter”, “,”)
.load(“test.csv”)
#方式3
df = spark.read.csv(path=“test.csv”,
header=True, #指定将第一行作为列名
inferSchema=True, #自动推断出每列的数据类型
sep=‘,’ #分隔符
)
df.show()
输出结果:
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| Jon| 29|1200|
±–±----±–±—+
**读取Hive数据表:**
spark.sql(“CREATE TABLE IF NOT EXISTS test (id INT, name STRING, age INT, sal FLOAT) USING hive”)
spark.sql(“LOAD DATA LOCAL INPATH ‘data/test.txt’ INTO TABLE test”)
df = spark.sql(“SELECT * FROM test”)
## 三、保存DataFrame
通过df.write()对DataFrame进行保存。
#保存为csv文件
df.write.format(“csv”).option(“header”,“true”).save(“data/test.csv”)
#保存为json文件
df.write.json(“data/test.json”)
#保存成parquet文件
df.write.parquet(“data/test.parquet”)
df.write.partitionBy(“age”).format(“parquet”).save(“data/test.parquet”)
#保存成hive数据表
df.write.bucketBy(2, “name”).sortBy(“age”).saveAsTable(“test”)
## 四、DataFrame API
#### 1、显示数据
**①df.collect()**
用于将DataFrame中的所有行收集到Driver节点上,并以列表的形式返回这些行。
df = spark.createDataFrame([(1,“James”,27,1000),
(2,“Bob”,22,500),
(3,“Alice”,25,800),
(4,“Jon”,29,1200)]
,[“id”,“name”,“age”,“sal”])
df.collect()
输出结果:
[Row(id=1, name=‘James’, age=27, sal=1000),
Row(id=2, name=‘Bob’, age=22, sal=500),
Row(id=3, name=‘Alice’, age=25, sal=800),
Row(id=4, name=‘Jon’, age=29, sal=1200)]
**②df.first()**
获取第一行数据。
df.first()
输出结果:
Row(id=1, name=‘James’, age=27, sal=1000)
**③df.head(n)**
获取前n行数据。
df.head(2)
输出结果:
[Row(id=1, name=‘James’, age=27, sal=1000),
Row(id=2, name=‘Bob’, age=22, sal=500)]
**④df.show(n)**
与df.head(n)类似,但是df.show(n)是打印成表格。
df.show(2)
输出结果:
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
±–±----±–±—+
only showing top 2 rows
**⑤df.printSchema()**
用于打印DataFrame的模式schema,定义了各列的名称和类型。
df.printSchema()
输出结果:
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- sal: long (nullable = true)
#### 2、统计信息
**①df.describe()**
一般与df.show()连用,用于查看DataFrame的数据分布。
df.describe().show()
输出结果:
±------±-----------------±----±-----------------±---------------+
|summary| id| name| age| sal|
±------±-----------------±----±-----------------±---------------+
| count| 4| 4| 4| 4|
| mean| 2.5| null| 25.75| 875.0|
| stddev|1.2909944487358056| null|2.9860788111948193|298.607881119482|
| min| 1|Alice| 22| 500|
| max| 4| Jon| 29| 1200|
±------±-----------------±----±-----------------±---------------+
若只想查看某一列的数据分布,如:df.describe('age').show()。
**②df.count()**
返回数据总行数。
df.count()
输出结果:
4
**③聚合函数**
一些常用的聚合函数如下sum()、mean()、min()、max()、avg()
#求最小工资
df.select(F.min(df[‘sal’])).show()
#输出结果:
±-------+
|min(sal)|
±-------+
| 500|
±-------+
#求最大工资
df.select(F.max(df[‘sal’])).show()
#输出结果:
±-------+
|max(sal)|
±-------+
| 1200|
±-------+
#求总工资
df.select(F.sum(df[‘sal’])).show()
#输出结果:
±-------+
|sum(sal)|
±-------+
| 3500|
±-------+
#求平均工资
df.select(F.avg(df[‘sal’])).show()
#输出结果:
±-------+
|avg(sal)|
±-------+
| 875.0|
±-------+
同时对多列操作:
df.agg({“name”:“count”,“age”:“max”,“sal”:“avg”}).show()
#输出:
±----------±-------±-------+
|count(name)|max(age)|avg(sal)|
±----------±-------±-------+
| 4| 29| 875.0|
±----------±-------±-------+
**④df.stat.freqItems()**
统计值出现的频率。
#统计age、name两列出现频率超过0.25的值
df.stat.freqItems((“age”,“name”),0.25).show()
输出结果:
±---------------±-------------------+
| age_freqItems| name_freqItems|
±---------------±-------------------+
|[29, 22, 25, 27]|[Bob, Jon, Alice,…|
±---------------±-------------------+
#### 3、类RDD操作
可以把DataFrame当做数据类型为Row的RDD来进行操作。
部分操作需要先转换为RDD才能运行,如map、flatMap等等。
部分操作可以直接在DataFrame上进行,如filter、distinct、sample、cache、intersect等等。
**①df.map()**
#所有人age+1
rdd = df.rdd.map(lambda x:Row(x[2]+1))
rdd.toDF([“age”]).show()
输出结果:
±–+
|age|
±–+
| 28|
| 23|
| 26|
| 30|
±–+
**②df.flatMap()**
rdd = df.rdd.flatMap(lambda x:x[1].split(‘o’)).map(lambda x:Row(x))
rdd.toDF([“name”]).show()
输出结果:
±----+
| name|
±----+
|James|
| B|
| b|
|Alice|
| J|
| n|
±----+
**③df.filter()**
#筛选工资大于800的
df.filter(F.col(“sal”)>800).show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 4| Jon| 29|1200|
±–±----±–±—+
#筛选姓名为Bob,以下三种方式输出结果一致
df.filter(F.col(“name”)“Bob”).show()
df.filter(df[“name”]“Bob”).show()
df.filter(“name=‘Bob’”).show()
±–±—±–±–+
| id|name|age|sal|
±–±—±–±–+
| 2| Bob| 22|500|
±–±—±–±–+
#筛选姓名以J开头的
df.filter(F.col(“name”).startswith(“J”)).show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 4| Jon| 29|1200|
±–±----±–±—+
#筛选除指定值外的其他数据
broads = sc.broadcast([“James”,“Bob”])
df.filter(~F.col(“name”).isin(broads.value)).show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 3|Alice| 25| 800|
| 4| Jon| 29|1200|
±–±----±–±—+
**④df.distinct()**
#去重
df.distinct().show()
**⑤df.cache()**
#cache缓存
df.cache()
#释放缓存
df.unpersist()
**⑥df.sample()**
随机抽样。
#withReplacement=False表示无放回,即抽取不重复的数据
#fraction=0.5表示抽样的比例为50%
#seed为随机种子,用于复现
df_sample = df.sample(withReplacement=False, fraction=0.5, seed=2)
df_sample.show()
输出结果:
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 3|Alice| 25| 800|
±–±----±–±—+
**⑦df.intersect(df)**
取两个DataFrame所有交集的行,返回结果不包含重复行
df.intersect(df_sample).show()
输出结果:
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 3|Alice| 25| 800|
| 1|James| 27|1000|
±–±----±–±—+
**⑧df.exceptAll()**
求差集。
#从df中移除与df_sample相同的行,返回一个新的DataFrame
df.exceptAll(df_sample).show()
输出结果:
±–±—±–±—+
| id|name|age| sal|
±–±—±–±—+
| 2| Bob| 22| 500|
| 4| Jon| 29|1200|
±–±—±–±—+
#### 4、类Excel操作
**①df.withColumn()**
增加列。
df = df.withColumn(“birthyear”,-df[“age”]+2024)
df.show()
输出结果:
±–±----±–±—±--------+
| id| name|age| sal|birthyear|
±–±----±–±—±--------+
| 1|James| 27|1000| 1997|
| 2| Bob| 22| 500| 2002|
| 3|Alice| 25| 800| 1999|
| 4| Jon| 29|1200| 1995|
±–±----±–±—±--------+
**②df.select()**
筛选列。
df.select(“name”,“age”).show()
输出结果:
±----±–+
| name|age|
±----±–+
|James| 27|
| Bob| 22|
|Alice| 25|
| Jon| 29|
±----±–+
**③df.drop()**
删除列。
#删除一列
df.drop(“age”).show()
#删除多列
df.drop(*[“age”,“birthyear”]).show()
**④df.withColumnRenamed()**
对列进行重命名。
#对一列进行重命名
df.withColumnRenamed(“sal”,“salary”).show()
#对多列进行重命名
df.withColumnRenamed(“sal”,“salary”).withColumnRenamed(“birthyear”,“year”).show()
**⑤df.sort()、df.orderBy()**
按照某一列或某几列进行排序。
#按照某一列进行排序
df.sort(df[“age”].desc()).show() #降序
df.sort(df[“age”].asc()).show() #升序
#按照某几列进行排序
df.orderBy(F.col(“age”).asc(), F.col(“sal”).desc()).show()
**⑥df.na.drop()、df.na.fill()**
处理带空值的行。
注意,在填充空值时,只能对相同数据类型的列的空值进行填充。
df = spark.createDataFrame([(1,“James”,27,1000),(2,“Bob”,22,500),(3,“Alice”,25,800),(4,None,29,None)]
,[“id”,“name”,“age”,“sal”])
df.show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| null| 29|null|
±–±----±–±—+
#删除带有nan值的行
df.na.drop().show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
±–±----±–±—+
#填充nan值
df.na.fill(“Jon”).show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| Jon| 29|null|
±–±----±–±—+
df.na.fill(0).show()
±–±----±–±—+
| id| name|age| sal|
±–±----±–±—+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| null| 29| 0|
±–±----±–±—+
**⑦df.replace()**
替换指定的值。
#注意,不能同时对不同数据类型的值进行替换
#例如,这句代码会报错:df.replace({“James”: “Jim”,1000: 100}).show()
df.replace({“James”: “Jim”, “Bob”:“Bieber” }).show()
df.replace({1000: 100}).show()
**⑧df.dropDuplicates()**
跟distinct方法不同的是,dropDuplicates方法接收传参,可以根据指定字段去重。
df.dropDuplicates([“name”]).show()
#### 5、类SQL表操作
类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。
**①df.select()**
df.select()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算列。
#筛选两列,并限制输出前两行
#df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list
df.select(“age”,“name”).limit(2).show()
±–±----+
|age| name|
±–±----+
| 27|James|
| 22| Bob|
±–±----+
#可以对列进行操作
df.select(“name”,df[“age”] + 1).show()
±----±--------+
| name|(age + 1)|
±----±--------+
|James| 28|
| Bob| 23|
|Alice| 26|
| Jon| 30|
±----±--------+
#通过toDF()对列进行重命名
df.select(“name”,-df[“age”]+2024).toDF(“name”,“birth_year”).show()
±----±---------+
| name|birth_year|
±----±---------+
|James| 1997|
| Bob| 2002|
|Alice| 1999|
| Jon| 1995|
±----±---------+
**②df.selectExpr()**
df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。
#创建一个UDF函数
spark.udf.register(“getBirthYear”,lambda x:2024-x)
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
- 1)|
±----±--------+
|James| 28|
| Bob| 23|
|Alice| 26|
| Jon| 30|
±----±--------+
#通过toDF()对列进行重命名
df.select(“name”,-df[“age”]+2024).toDF(“name”,“birth_year”).show()
±----±---------+
| name|birth_year|
±----±---------+
|James| 1997|
| Bob| 2002|
|Alice| 1999|
| Jon| 1995|
±----±---------+
**②df.selectExpr()**
df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。
#创建一个UDF函数
spark.udf.register(“getBirthYear”,lambda x:2024-x)
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
[外链图片转存中…(img-4xmE5ZV8-1713223250219)]
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)