任务D:数据采集与实时计算(20分)

子任务二:使用Flink处理Kafka中的数据

编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此列,则使用create_time填充,允许数据延迟5s,订单状态分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。

  1. 使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;

object Exam2_1 {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    val jedisPoolConfig: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("master").build()

    val data: DataStream[String] = env.socketTextStream("127.0.0.1", 888
    val filterStatus: Array[String] = Array("1003", "1005,1006")
    data.assignTimestampsAndWatermarks(
      watermarkStrategy = WatermarkStrategy
        .forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[String] {
          override def extractTimestamp(element: String, recordTimestamp: Long): Long = {

            // 略 详细答案请私聊
        
            

详细答案请私聊

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐