flink 实战例子 createRemoteEnvironment
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)意思是将你本地的代码打的jar包,远程提交到已经存在的flink集群上.注意此程序再idea运行的时候,idea上不会有任何输出的.在这种模式下idea就是相当于一个传输所需jar文件的客户端,程序一旦执行之后,就和idea无关了.1.代码如下:package co
·
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
意思是将你本地的代码打的jar包,远程提交到已经存在的flink集群上.注意此程序再idea运行的时候,idea上不会有任何输出的.在这种模式下idea就是相当于一个传输所需jar文件的客户端,程序一旦执行之后,就和idea无关了.
1.代码如下:
package com.flink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingJob {
def main(args: Array[String]) {
//最后一个参数指的是当前代码打的包的路径
val env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.68.137",8081,"D:\\IT\\Project\\FlinkDemo\\target\\FlinkDemo-1.0-SNAPSHOT.jar")
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
//下面的输出不会再idea上输出
counts.print()
env.execute("Window 333 WordCount")
}
}
2.在idea运行程序之前需要先maven打包
3.linux 运行: nc -l 9999
自己输入需要统计的单词.
4.在flink集群, ip:8081 查看运行的结果

注:在上面的解说中:我没有提linux flink环境的安装,以及项目的构建,详情参考:安装以及demo
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)