DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

Dataset是在Spark1.6中添加的新的接口,是DataFrame API的一个扩展,是Spark最新的数据抽象,结合了RDD和DataFrame的优点。

  1. 与RDD相比:保存了更多的描述信息,概念上等同于关系型数据库中的二维表

  2. 与DataFrame相比:保存了类型信息,是强类型的,提供了编译时类型检查,调用Dataset的方法先会生成逻辑计划,然后被Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行;
    Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。

从Spark 2.0开始,DataFrame与Dataset合并,每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]。

创建 DataSet

  1. 使用样例类序列创建 DataSet

    scala> case class Person(name: String, age: Long)
    defined class Person
    scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
    scala> caseClassDS.show
    +---------+---+
    | name|age|
    +---------+---+
    | zhangsan| 2|
    +---------+---+
    
    
  2. 使用基本类型的序列创建 DataSet

    scala> val ds = Seq(1,2,3,4,5).toDS
    ds: org.apache.spark.sql.Dataset[Int] = [value: int]
    scala> ds.show
    +-----+
    |value|
    +-----+
    | 1|
    | 2|
    | 3|
    | 4|
    | 5|
    +-----+
    

RDD 转换为 DataSet

SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过放射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。

scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

DataSet 转换为 RDD

DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD

scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, 
t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at 
<console>:25
scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

DataFrame 和 DataSet 转换

DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。

  1. DataFrame 转换为 DataSet

    scala> case class User(name:String, age:Int)
    defined class User
    scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    
  2. DataSet 转换为 DataFrame

    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    scala> val df = ds.toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐