GZ033 大数据应用开发赛题 答案 解析
另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。任务D:数据采集与实时计算(20分)
·
任务D:数据采集与实时计算(20分)
子任务二:使用Flink处理Kafka中的数据
编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此列,则使用create_time填充,允许数据延迟5s,订单状态分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。
- 使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
object Exam2_1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val jedisPoolConfig: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("master").build()
val data: DataStream[String] = env.socketTextStream("127.0.0.1", 888
val filterStatus: Array[String] = Array("1003", "1005,1006")
data.assignTimestampsAndWatermarks(
watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[String] {
override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
// 略 详细答案请私聊
详细答案请私聊
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)