目录

一、理论依据

1、说明

2、saveAsHadoopFile算子

(1)形式

(2)解析说明

(3)MultipleOutputFormat

二、代码实例

1、SparkSaveAsHadoopFiles

2、自定义RDDMultipleTextOutputFormat

三、打包运行

1、运行

2、结果展示

四、在sparkStreaming中如何使用saveAsHadoopFile

1、代码

2、说明


一、理论依据

1、说明

在spark实际项目应用中,总会牵涉到数据的存储问题。如果选择将spark分析好的数据存储到hdfs上,则必定会用到saveAsHadoopFile方法和自定义MultipleOutputFormat类;

2、saveAsHadoopFile算子

(1)形式

def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit

(2)解析说明

这个算子里需要传入的参数依次是:文件路径、key类型、value类型、outputFormat方式。

saveAsHadoopFile算子属于org.apache.spark.rdd.PairRDDFunctions类,需要接收的参数是PairRDD,所以我们在使用前需要将原来的rdd做一下map操作,变成(key, value) 形式。

我们暂且定(K,V)类型为classOf[String]、classOf[String],再之后传入hdfs保存目录、类型,剩下的就是关键的需要传入OutputFormat。

path(hdfs保存路径可以已存在也可以不存在,事先不存在则会自己随着程序运行时创建)

(3)MultipleOutputFormat

自定义MultipleOutputFormat并且重写  override def generateFileNameForKeyValue(key: Any, value: Any, name: String):String方法可以按照自己设计的目录级别和文件名进行数据存储;

参数里的name就是原始的 part-00000,part-00001……

二、代码实例

1、SparkSaveAsHadoopFiles

package main.scala.com.cn.spark

import com.cn.spark.RDDMultipleTextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

object SparkSaveAsHadoopFiles {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("saveAsHadoopFiles").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List("0,00000,Aa,2019-02-11 03:20:06",
      "1,11111,Bb,2019-03-12 04:25:22",
      "2,22222,Cc,2019-04-14 05:26:33",
      "3,33333,Dd,2019-05-15 06:29:44"
    ))
    //saveAsHadoopFile需要的是pairRDD,因此,我们使用map将数据转换一下,数据内容作为key,空串“”作为value
    val rdd1 = rdd.map(s=>(s,""))
    rdd1.repartition(2)
      .saveAsHadoopFile("/hyj/myhadoop/", classOf[String],
        classOf[String],classOf[RDDMultipleTextOutputFormat])
  }
}

2、自定义RDDMultipleTextOutputFormat

package com.cn.spark

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any]{

  private val HOURFORMAT = new SimpleDateFormat("HH-mm-ss")
  private val start_time = System.currentTimeMillis()
  private val curDay=new Date(start_time)
  private val fileName=HOURFORMAT.format(curDay)
  //name:part-00000,part-00001
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String):String ={
      //"1,11111,Bb,2019-03-12 04:25:22"
    val line=key.toString
    //提取2019-03-12 04:25:22
    val time=line.split(",")(3)
    val date=time.substring(0,time.indexOf(" "))//2019-03-02
    val hour=time.substring(time.indexOf(" ")+1,time.indexOf(":"))//04
    val resultDir=date+"/"+hour+"/"+fileName+"_"+name.substring(name.length-2)
    resultDir
  }
}

三、打包运行

1、运行

[root@master bin]# ./spark-submit --master local[*] --class main.scala.com.cn.spark.SparkSaveAsHadoopFiles /home/test/sparkSysLearn_jar/sparkSysLearn.jar 

注意:--class 后面的参数,根据SparkSaveAsHadoopFiles类上面的 package main.scala.com.cn.spark

2、结果展示

四、在sparkStreaming中如何使用saveAsHadoopFile

1、代码

//...部分内容
 saveDstream.foreachRDD(rdd => {

   val start_time = System.currentTimeMillis()
   if (rdd.isEmpty) {
     logInfo(" No Data in this batchInterval --------")
   } else {
     //这里,因为saveAsHadoopFile需要接受pairRDD,所以用map转换一下
     val a: RDD[(String, String)] =rdd.map(x=>(x,""))
     a.saveAsHadoopFile(hdfsPath+"/", classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat])
    
   }
 })//foreachRDD
//...

2、说明

在实时流中使用,最终也是将DStream先转化为一个个RDD,再调用saveAsHadoopFile函数存储,思想和上面一样;

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐