目录

Flink 开发

开发环境搭建

Flink连接器

Source

Sink

Flink API

窗口


Flink 开发

Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示:

图片

代码程序结构

Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:

  • 基于本地集合的 source

  • 基于文件的 source

  • 基于网络套接字的 source

  • 自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:

  • 写入文件

  • 打印输出

  • 写入 socket

  • 自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 Sink。

开发环境搭建

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lagou</groupId>
    <artifactId>flinkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--flink核心包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!--flink流处理包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.7.2</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.12</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.7.2</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>

图片

Flink连接器

在实际生产环境中,数据通常分布在各种不同的系统中,包括文件系统、数据库、消息队列等。Flink作为一个大数据处理框架,需要与这些外部系统进行数据交互,以实现数据的输入、处理和输出。在Flink中,Source和Sink是两个关键模块,它们扮演着与外部系统进行数据连接和交互的重要角色,被统称为外部连接器(Connector)。

  1. Source(数据源): Source是Flink作业的输入模块,用于从外部系统中读取数据并将其转化为Flink的数据流。Source负责实现与不同数据源的交互逻辑,将外部数据源的数据逐条或批量读取到Flink的数据流中,以便后续的数据处理。常见的Source包括从文件中读取数据、从消息队列(如Kafka、RabbitMQ)中消费数据、从数据库中读取数据等。

  2. Sink(数据接收器): Sink是Flink作业的输出模块,用于将Flink计算的结果输出到外部系统中。Sink负责实现将Flink数据流中的数据写入到外部数据源,以便后续的持久化存储、展示或其他处理。Sink的实现需要考虑数据的可靠性、一致性以及可能的事务性要求。常见的Sink包括将数据写入文件、将数据写入数据库、将数据写入消息队列等。

外部连接器在Flink中的作用非常关键,它们使得Flink作业可以与各种不同类型的数据源和数据目的地进行交互,实现了数据的流入和流出。这种灵活的连接机制使得Flink在处理大数据时能够更好地集成已有的系统和数据,实现复杂的数据流处理和分析任务。

Source

Flink在批处理中常见的source主要有两大类。

  • 基于本地集合的source(Collection-based-source)

  • 基于文件的source(File-based-source)

基于本地集合的Source

在Flink中最常见的创建本地集合的DataSet方式有三种。

  1. 使用**env.fromElements()**,这种方式也支持Tuple,自定义对象等复合形式。

  2. 使用env.fromCollection(),这种方式支持多种Collection的具体类型。

  3. 使用env.generateSequence(),这种方法创建基于Sequence的DataSet。

使用方式如下:

package com.demo.broad;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;

import java.util.ArrayList;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Stack;
import java.util.stream.Stream;

public class BatchFromCollection {
    public static void main(String[] args) throws Exception {
        // 获取flink执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 0.用element创建DataSet(fromElements)
        DataSet<String> ds0 = env.fromElements("spark", "flink");
        ds0.print();

        // 1.用Tuple创建DataSet(fromElements)
        DataSet<Tuple2<Integer, String>> ds1 = env.fromElements(
            new Tuple2<>(1, "spark"),
            new Tuple2<>(2, "flink")
        );
        ds1.print();

        // 2.用Array创建DataSet
        DataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds2.print();

        // 3.用ArrayDeque创建DataSet
        DataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{
            add("spark");
            add("flink");
        }});
        ds3.print();

        // 4.用List创建DataSet
        DataSet<String> ds4 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds4.print();

        // 5.用ArrayList创建DataSet
        DataSet<String> ds5 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds5.print();

        // 6.用List创建DataSet
        DataSet<String> ds6 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds6.print();

        // 7.用List创建DataSet
        DataSet<String> ds7 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds7.print();

        // 8.用Stack创建DataSet
        DataSet<String> ds8 = env.fromCollection(new Stack<String>() {{
            add("spark");
            add("flink");
        }});
        ds8.print();

        // 9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
        DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink"));
        ds9.print();

        // 10.用List创建DataSet
        DataSet<String> ds10 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds10.print();

        // 11.用HashSet创建DataSet
        DataSet<String> ds11 = env.fromCollection(new HashSet<String>() {{
            add("spark");
            add("flink");
        }});
        ds11.print();

        // 12.用Iterable创建DataSet
        DataSet<String> ds12 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds12.print();

        // 13.用ArrayList创建DataSet
        DataSet<String> ds13 = env.fromCollection(new ArrayList<String>() {{
            add("spark");
            add("flink");
        }});
        ds13.print();

        // 14.用Stack创建DataSet
        DataSet<String> ds14 = env.fromCollection(new Stack<String>() {{
            add("spark");
            add("flink");
        }});
        ds14.print();

        // 15.用HashMap创建DataSet
        DataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{
            put(1, "spark");
            put(2, "flink");
        }}.entrySet());
        ds15.print();

        // 16.用Range创建DataSet
        DataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));
        ds16.print();

        // 17.用generateSequence创建DataSet
        DataSet<Long> ds17 = env.generateSequence(1, 9);
        ds17.print();
    }
}
基于文件的Source

Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:

  1. 读取本地文件数据

  2. 读取HDFS文件数据

  3. 读取CSV文件数据

  4. 读取压缩文件

  5. 遍历目录

下面分别介绍每个数据源的加载方式:

读取本地文件
package com.demo.batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

public class BatchFromFile {
    public static void main(String[] args) throws Exception {
        // 使用readTextFile读取本地文件
        // 初始化环境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();

        // 加载数据
        DataSet<String> datas = environment.readTextFile("data.txt");

        // 触发程序执行
        datas.print();
    }
}
读取HDFS文件数据
package com.demo.batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

public class BatchFromFile {
    public static void main(String[] args) throws Exception {
        // 使用readTextFile读取本地文件
        // 初始化环境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();

        // 加载数据
        DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt");

        // 触发程序执行
        datas.print();
    }
}
读取CSV文件数据
package com.demo.batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.MapFunction;

public class BatchFromCsvFile {
    public static void main(String[] args) throws Exception {
        // 初始化环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 用于映射CSV文件的POJO class
        public static class Student {
            public int id;
            public String name;

            public Student() {}

            public Student(int id, String name) {
                this.id = id;
                this.name = name;
            }

            @Override
            public String toString() {
                return "Student(" + id + ", " + name + ")";
            }
        }

        // 读取CSV文件
        DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv")
            .ignoreFirstLine()
            .pojoType(Student.class, "id", "name");

        csvDataSet.print();
    }
}
读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式 扩展名 并行化
DEFLATE .deflate no
GZIP .gz .gzip no
Bzip2 .bz2 no
XZ .xz no
package com.demo.batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

public class BatchFromCompressFile {
    public static void main(String[] args) throws Exception {
        // 初始化环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 加载数据
        DataSet<String> result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz");

        // 触发程序执行
        result.print();
    }
}
遍历目录

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。

对于从文件中读取数据,当读取数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取

package com.demo.batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

public class BatchFromCompressFile {
    public static void main(String[] args) throws Exception {
        // 初始化环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 加载数据
        DataSet<String> result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz");

        // 触发程序执行
        result.print();
    }
}
读取kafka
public class StreamFromKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","teacher2:9092");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("mytopic2", new SimpleStringSchema(), properties);
        DataStreamSource<String> data = env.addSource(consumer);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
        result.print();
        env.execute();
    }
}
自定义source
private static class SimpleSource  
implements SourceFunction<Tuple2<String, Integer>> { 
 
    private int offset = 0; 
    private boolean isRunning = true; 
 
    @Override 
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { 
        while (isRunning) { 
            Thread.sleep(500); 
            ctx.collect(new Tuple2<>("" + offset, offset)); 
            offset++; 
            if (offset == 1000) { 
                  isRunning = false; 
            } 
        } 
    } 
 
    @Override 
    public void cancel() { 
          isRunning = false; 
    } 
} 

自定义Source,从0开始计数,将数字发送到下游在主逻辑中调用这个Source。

DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource()); 

Sink

flink在批处理中常见的sink

  • 基于本地集合的sink(Collection-based-sink)

  • 基于文件的sink(File-based-sink)

基于本地集合的sink

目标:

基于下列数据,分别进行打印输出,error输出,collect()

(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)

代码:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class BatchSinkCollection {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>();
        stuData.add(new Tuple3<>(19, "zhangsan", 178.8));
        stuData.add(new Tuple3<>(17, "lisi", 168.8));
        stuData.add(new Tuple3<>(18, "wangwu", 184.8));
        stuData.add(new Tuple3<>(21, "zhaoliu", 164.8));

        DataSet<Tuple3<Integer, String, Double>> stu = env.fromCollection(stuData);

        stu.print();
        stu.printToErr();
        
        stu.collect().forEach(System.out::println);

        env.execute();
    }
}
基于文件的sink
  • flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

  • flink支持多种文件的存储格式,包括text文件,CSV文件等。

  • writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。

将数据写入本地文件

目标:

基于下列数据,写入到文件中

Map(1 -> "spark", 2 -> "flink")

代码:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.util.HashMap;
import java.util.Map;

public class BatchSinkFile {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Map<Integer, String> data1 = new HashMap<>();
        data1.put(1, "spark");
        data1.put(2, "flink");

        DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);

        ds1.setParallelism(1)
            .writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE)
            .setParallelism(1);

        env.execute();
    }
}
将数据写入HDFS
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.util.HashMap;
import java.util.Map;

public class BatchSinkFile {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Map<Integer, String> data1 = new HashMap<>();
        data1.put(1, "spark");
        data1.put(2, "flink");

        DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);

        ds1.setParallelism(1)
            .writeAsText("hdfs://bigdata1:9000/a", FileSystem.WriteMode.OVERWRITE)
            .setParallelism(1);

        env.execute();
    }
}

Flink API

Flink的API层提供了DataStream API和DataSet API,分别用于流式处理和批处理。这两个API允许开发者使用各种操作符和转换来处理数据,包括转换、连接、聚合、窗口等计算任务。

在Flink中,根据不同的场景(流处理或批处理),需要设置不同的执行环境。在批处理场景下,需要使用DataSet API,并设置批处理执行环境。在流处理场景下,需要使用DataStream API,并设置流处理执行环境。

以下是在不同场景下设置执行环境的示例代码,分别展示了批处理和流处理的情况,包括Scala和Java语言。

图片

批处理场景 - 设置DataSet API的批处理执行环境(Java):

import org.apache.flink.api.java.ExecutionEnvironment;

public class BatchJobExample {
    public static void main(String[] args) throws Exception {
        // 创建批处理执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 在这里添加批处理作业的代码逻辑
        // ...

        // 执行作业
        env.execute("Batch Job Example");
    }
}

流处理场景 - 设置DataStream API的流处理执行环境(Java):

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamJobExample {
    public static void main(String[] args) throws Exception {
        // 创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 在这里添加流处理作业的代码逻辑
        // ...

        // 执行作业
        env.execute("Stream Job Example");
    }
}

批处理场景 - 设置DataSet API的批处理执行环境(Scala):

import org.apache.flink.api.scala._

object BatchJobExample {
  def main(args: Array[String]): Unit = {
    // 创建批处理执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 在这里添加批处理作业的代码逻辑
    // ...

    // 执行作业
    env.execute("Batch Job Example")
  }
}

流处理场景 - 设置DataStream API的流处理执行环境(Scala):

import org.apache.flink.streaming.api.scala._

object StreamJobExample {
  def main(args: Array[String]): Unit = {
    // 创建流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 在这里添加流处理作业的代码逻辑
    // ...

    // 执行作业
    env.execute("Stream Job Example")
  }
}

根据以上示例代码,可以在不同的场景下选择合适的执行环境和API来构建和执行Flink作业。注意在导入包时,确保使用正确的包名和类名,以适应批处理或流处理的环境。

以下是一些常用的API函数和操作,以表格形式提供:

API 类型 常用函数和操作 描述
DataStream API mapflatMap 对数据流中的每个元素进行映射或扁平化操作。
filter 过滤出满足条件的元素。
keyBy 按指定的字段或键对数据流进行分区。
window 将数据流按照时间窗口或计数窗口划分。
reducefold 在窗口内对元素进行聚合操作。
union 合并多个数据流。
connectcoMapcoFlatMap 连接两个不同类型的数据流并应用相应的函数。
timeWindowcountWindow 定义时间窗口或计数窗口。
process 自定义处理函数,实现更复杂的流处理逻辑。
DataSet API mapflatMap 对数据集中的每个元素进行映射或扁平化操作。
filter 过滤出满足条件的元素。
groupBy 按指定的字段或键对数据集进行分组。
reducefold 对分组后的数据集进行聚合操作。
joincoGroup 对两个数据集进行内连接或外连接操作。
crosscartesian 对两个数据集进行笛卡尔积操作。
distinct 去除数据集中的重复元素。
groupByaggregate 分组并对分组后的数据集进行聚合操作。
firstminmax 获取数据集中的第一个、最小或最大元素。
sumavg 计算数据集中元素的和或平均值。
collect 将数据集中的元素收集到本地的集合中。

这些API函数和操作涵盖了Flink中流处理和批处理的常见操作,可以帮助用户实现各种复杂的数据处理和分析任务。根据实际需求,可以选择适合的API函数和操作来构建Flink作业。

下面是一些参见的API的说明:

map

将DataSet中的每一个元素转换为另外一个元素

示例

使用map操作,将以下数据

"1,张三", "2,李四", "3,王五", "4,赵六"

转换为一个scala的样例类。

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 创建一个User样例类

  4. 使用map操作执行转换

  5. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapFunction;

public class User {
    public String id;
    public String name;

    public User() {}

    public User(String id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "User(" + id + ", " + name + ")";
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> textDataSet = env.fromCollection(
            Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六")
        );

        DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() {
            @Override
            public User map(String text) throws Exception {
                String[] fieldArr = text.split(",");
                return new User(fieldArr[0], fieldArr[1]);
            }
        });

        userDataSet.print();
    }
}
flatMap

将DataSet中的每一个元素转换为0…n个元素

示例

分别将以下数据,转换成国家省份城市三个维度的数据。

将以下数据

张三,中国,江西省,南昌市
李四,中国,河北省,石家庄市
Tom,America,NewYork,Manhattan

转换为

(张三,中国)
(张三,中国,江西省)
(张三,中国,江西省,江西省)
(李四,中国)
(李四,中国,河北省)
(李四,中国,河北省,河北省)
(Tom,America)
(Tom,America,NewYork)
(Tom,America,NewYork,NewYork)

思路

  • 以上数据为一条转换为三条,显然,应当使用flatMap来实现

  • 分别在flatMap函数中构建三个数据,并放入到一个列表中

    姓名, 国家 姓名, 国家省份 姓名, 国家省份城市

步骤

  1. 构建批处理运行环境

  2. 构建本地集合数据源

  3. 使用flatMap将一条数据转换为三条数据

    • 使用逗号分隔字段

    • 分别构建国家、国家省份、国家省份城市三个元组

  4. 打印输出

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class UserProcessing {
    public static class User {
        public String name;
        public String country;
        public String province;
        public String city;

        public User() {}

        public User(String name, String country, String province, String city) {
            this.name = name;
            this.country = country;
            this.province = province;
            this.city = city;
        }

        @Override
        public String toString() {
            return "User(" + name + ", " + country + ", " + province + ", " + city + ")";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> userDataSet = env.fromCollection(new ArrayList<String>() {{
            add("张三,中国,江西省,南昌市");
            add("李四,中国,河北省,石家庄市");
            add("Tom,America,NewYork,Manhattan");
        }});

        DataSet<User> resultDataSet = userDataSet.flatMap(new FlatMapFunction<String, User>() {
            @Override
            public void flatMap(String text, Collector<User> collector) throws Exception {
                String[] fieldArr = text.split(",");
                String name = fieldArr[0];
                String country = fieldArr[1];
                String province = fieldArr[2];
                String city = fieldArr[3];

                collector.collect(new User(name, country, province, city));
                collector.collect(new User(name, country, province + city, ""));
                collector.collect(new User(name, country, province + city, city));
            }
        });

        resultDataSet.print();
    }
}

mapPartition

将一个分区中的元素转换为另一个元素

示例

使用mapPartition操作,将以下数据

"1,张三", "2,李四", "3,王五", "4,赵六"

转换为一个scala的样例类。

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 创建一个User样例类

  4. 使用mapPartition操作执行转换

  5. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class MapPartitionExample {
    public static class User {
        public String id;
        public String name;

        public User() {}

        public User(String id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "User(" + id + ", " + name + ")";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> userDataSet = env.fromCollection(new ArrayList<String>() {{
            add("1,张三");
            add("2,李四");
            add("3,王五");
            add("4,赵六");
        }});

        DataSet<User> resultDataSet = userDataSet.mapPartition(new MapPartitionFunction<String, User>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<User> collector) throws Exception {
                // TODO: 打开连接

                Iterator<String> iterator = iterable.iterator();
                while (iterator.hasNext()) {
                    String ele = iterator.next();
                    String[] fieldArr = ele.split(",");
                    collector.collect(new User(fieldArr[0], fieldArr[1]));
                }

                // TODO: 关闭连接
            }
        });

        resultDataSet.print();
    }
}

mapmapPartition的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:

访问mysql数据库,需要打开连接, 此时效率较低。而使用mapPartition可以有效减少连接数,提高效率

filter

过滤出来一些符合条件的元素

示例:

过滤出来以下以h开头的单词。

"hadoop", "hive", "spark", "flink"

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 使用filter操作执行过滤

  4. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

import java.util.ArrayList;
import java.util.List;

public class FilterExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> wordDataSet = env.fromCollection(new ArrayList<String>() {{
            add("hadoop");
            add("hive");
            add("spark");
            add("flink");
        }});

        DataSet<String> resultDataSet = wordDataSet.filter(word -> word.startsWith("h"));

        resultDataSet.print();
    }
}

reduce

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素

示例1

请将以下元组数据,使用reduce操作聚合成一个最终结果

("java" , 1) , ("java", 1) ,("java" , 1) 

将上传元素数据转换为("java",3)

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 使用redice执行聚合操作

  4. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;
import java.util.List;

public class ReduceExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("java", 1));
        }});

        DataSet<Tuple2<String, Integer>> resultDataSet = wordCountDataSet.reduce((wc1, wc2) ->
            new Tuple2<>(wc2.f0, wc1.f1 + wc2.f1)
        );

        resultDataSet.print();
    }
}

示例2

请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果

("java" , 1) , ("java", 1) ,("scala" , 1)  

转换为

("java", 2), ("scala", 1)

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 使用groupBy按照单词进行分组

  4. 使用reduce对每个分组进行统计

  5. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple;

import java.util.ArrayList;
import java.util.List;

public class GroupByReduceExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("scala", 1));
        }});

        DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0).reduce((wc1, wc2) ->
            new Tuple2<>(wc1.f0, wc1.f1 + wc2.f1)
        );

        groupedDataSet.print();
    }
}

reduceGroup

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素

reduce和reduceGroup的区别

reduce和reduceGroup区别

  • reduce是将数据一个个拉取到另外一个节点,然后再执行计算

  • reduceGroup是先在每个group所在的节点上执行计算,然后再拉取

示例

请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduceGroup操作进行单词计数

("java" , 1) , ("java", 1) ,("scala" , 1)  

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 使用groupBy按照单词进行分组

  4. 使用reduceGroup对每个分组进行统计

  5. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class GroupByReduceGroupExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("scala", 1));
        }});

        DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0).reduceGroup((Iterable<Tuple2<String, Integer>> iter, Collector<Tuple2<String, Integer>> collector) -> {
            Tuple2<String, Integer> result = new Tuple2<>();
            for (Tuple2<String, Integer> wc : iter) {
                result.f0 = wc.f0;
                result.f1 += wc.f1;
            }
            collector.collect(result);
        });

        groupedDataSet.print();
    }
}

aggregate

按照内置的方式来进行聚合, Aggregate只能作用于元组上。例如:SUM/MIN/MAX…

示例

请将以下元组数据,使用aggregate操作进行单词统计

("java" , 1) , ("java", 1) ,("scala" , 1)

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 使用groupBy按照单词进行分组

  4. 使用aggregate对每个分组进行SUM统计

  5. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;
import java.util.List;

public class GroupByAggregateExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("scala", 1));
        }});

        DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0);

        DataSet<Tuple2<String, Integer>> resultDataSet = groupedDataSet.aggregate(Aggregations.SUM, 1);

        resultDataSet.print();
    }
}

注意

要使用aggregate,只能使用字段索引名或索引名称来进行分组groupBy(0),否则会报一下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

distinct

去除重复的数据

示例

请将以下元组数据,使用distinct操作去除重复的单词

("java" , 1) , ("java", 2) ,("scala" , 1)

去重得到

("java", 1), ("scala", 1)

步骤

  1. 获取ExecutionEnvironment运行环境

  2. 使用fromCollection构建数据源

  3. 使用distinct指定按照哪个字段来进行去重

  4. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;
import java.util.List;

public class DistinctExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("java", 1));
            add(new Tuple2<>("scala", 1));
        }});

        DataSet<Tuple2<String, Integer>> resultDataSet = wordCountDataSet.distinct(0);

        resultDataSet.print();
    }
}

join

使用join可以将两个DataSet连接起来

示例:

有两个csv文件,有一个为score.csv,一个为subject.csv,分别保存了成绩数据以及学科数据。

图片

csv样例

需要将这两个数据连接到一起,然后打印出来。

join结果

步骤

  1. 分别将两个文件复制到项目中的data/join/input

  2. 构建批处理环境

  3. 创建两个样例类

* 学科Subject(学科ID、学科名字)
* 成绩Score(唯一ID、学生姓名、学科ID、分数——Double类型)
  1. 分别使用readCsvFile加载csv数据源,并制定泛型

  2. 使用join连接两个DataSet,并使用whereequalTo方法设置关联条件

  3. 打印关联后的数据源

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple4;

public class JoinExample {
    public static class Score {
        public int id;
        public String name;
        public int subjectId;
        public double score;

        public Score() {}

        public Score(int id, String name, int subjectId, double score) {
            this.id = id;
            this.name = name;
            this.subjectId = subjectId;
            this.score = score;
        }

        @Override
        public String toString() {
            return "Score(" + id + ", " + name + ", " + subjectId + ", " + score + ")";
        }
    }

    public static class Subject {
        public int id;
        public String name;

        public Subject() {}

        public Subject(int id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "Subject(" + id + ", " + name + ")";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Score> scoreDataSet = env.readCsvFile("./data/join/input/score.csv")
            .ignoreFirstLine()
            .pojoType(Score.class);

        DataSet<Subject> subjectDataSet = env.readCsvFile("./data/join/input/subject.csv")
            .ignoreFirstLine()
            .pojoType(Subject.class);

        DataSet<Tuple4<Integer, String, Integer, Double>> joinedDataSet = scoreDataSet.join(subjectDataSet)
            .where("subjectId")
            .equalTo("id")
            .projectFirst(0, 1, 2, 3)
            .projectSecond(1);

        joinedDataSet.print();
    }
}

union

图片

将两个DataSet取并集,不会去重。

示例

将以下数据进行取并集操作

数据集1

"hadoop", "hive", "flume"

数据集2

"hadoop", "hive", "spark"

步骤

  1. 构建批处理运行环境

  2. 使用fromCollection创建两个数据源

  3. 使用union将两个数据源关联在一起

  4. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

import java.util.ArrayList;
import java.util.List;

public class UnionExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> wordDataSet1 = env.fromCollection(new ArrayList<String>() {{
            add("hadoop");
            add("hive");
            add("flume");
        }});

        DataSet<String> wordDataSet2 = env.fromCollection(new ArrayList<String>() {{
            add("hadoop");
            add("hive");
            add("spark");
        }});

        DataSet<String> resultDataSet = wordDataSet1.union(wordDataSet2);

        resultDataSet.print();
    }
}

connect

connect()提供了和union()类似的功能,即连接两个数据流,它与union()的区别如下。

  • connect()只能连接两个数据流,union()可以连接多个数据流。

  • connect()所连接的两个数据流的数据类型可以不一致,union()所连接的两个或多个数据流的数据类型必须一致。

  • 两个DataStream经过connect()之后被转化为ConnectedStreams, ConnectedStreams会对两个流的数据应用不同的处理方法,且两个流之间可以共享状态。

图片

DataStream<Integer> intStream  = senv.fromElements(2, 1, 5, 3, 4, 7); 
DataStream<String> stringStream  = senv.fromElements("A", "B", "C", "D"); 
 
ConnectedStreams<Integer, String> connectedStream =  
intStream.connect(stringStream); 
DataStream<String> mapResult = connectedStream.map(new MyCoMapFunction()); 

// CoMapFunction的3个泛型分别对应第一个流的输入类型、第二个流的输入类型,输出类型 
public static class MyCoMapFunction implements CoMapFunction<Integer, String, String> 
{ 
    @Override 
    public String map1(Integer input1) { 
          return input1.toString(); 
    } 
 
    @Override 
    public String map2(String input2) { 
          return input2; 
    } 
} 

rebalance

Flink也会产生数据倾斜的时候,例如:当前的数据量有10亿条,在处理过程就有可能发生如下状况:

数据倾斜

rebalance会使用轮询的方式将数据均匀打散,这是处理数据倾斜最好的选择。

图片

rebalance

步骤

  1. 构建批处理运行环境

  2. 使用env.generateSequence创建0-100的并行数据

  3. 使用fiter过滤出来大于8的数字

  4. 使用map操作传入RichMapFunction,将当前子任务的ID和数字构建成一个元组

    在RichMapFunction中可以使用getRuntimeContext.getIndexOfThisSubtask获取子任务序号

  5. 打印测试

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class MapWithSubtaskIndexExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Long> numDataSet = env.generateSequence(0, 100);

        DataSet<Long> filterDataSet = numDataSet.filter(num -> num > 8);

        DataSet<Tuple2<Long, Long>> resultDataSet = filterDataSet.map(new RichMapFunction<Long, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(Long in) throws Exception {
                return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), in);
            }
        });

        resultDataSet.print();
    }
}

上述代码没有加rebalance,通过观察,有可能会出现数据倾斜。

在filter计算完后,调用rebalance,这样,就会均匀地将数据分布到每一个分区中。

hashPartition

按照指定的key进行hash分区

示例

基于以下列表数据来创建数据源,并按照hashPartition进行分区,然后输出到文件。

List(1,1,1,1,1,1,1,2,2,2,2,2)

步骤

  1. 构建批处理运行环境

  2. 设置并行度为2

  3. 使用fromCollection构建测试数据集

  4. 使用partitionByHash按照字符串的hash进行分区

  5. 调用writeAsText写入文件到data/partition_output目录中

  6. 打印测试

参考代码

 import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.core.fs.FileSystem;

import java.util.ArrayList;
import java.util.List;

public class PartitionByHashExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Set parallelism to 2
        env.setParallelism(2);

        DataSet<Integer> numDataSet = env.fromCollection(new ArrayList<Integer>() {{
            add(1);
            add(1);
            add(1);
            add(1);
            add(1);
            add(1);
            add(1);
            add(2);
            add(2);
            add(2);
            add(2);
            add(2);
        }});

        DataSet<Integer> partitionDataSet = numDataSet.partitionByHash(num -> num.toString());

        partitionDataSet.writeAsText("./data/partition_output", FileSystem.WriteMode.OVERWRITE);

        partitionDataSet.print();
        env.execute();
    }
}

sortPartition

指定字段对分区中的数据进行排序

示例

按照以下列表来创建数据集

List("hadoop", "hadoop", "hadoop", "hive", "hive", "spark", "spark", "flink")

对分区进行排序后,输出到文件。

步骤

  1. 构建批处理运行环境

  2. 使用fromCollection构建测试数据集

  3. 设置数据集的并行度为2

  4. 使用sortPartition按照字符串进行降序排序

  5. 调用writeAsText写入文件到data/sort_output目录中

  6. 启动执行

参考代码

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.core.fs.FileSystem;

import java.util.ArrayList;
import java.util.List;

public class SortPartitionExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> wordDataSet = env.fromCollection(new ArrayList<String>() {{
            add("hadoop");
            add("hadoop");
            add("hadoop");
            add("hive");
            add("hive");
            add("spark");
            add("spark");
            add("flink");
        }});

        wordDataSet.setParallelism(2);

        DataSet<String> sortedDataSet = wordDataSet.sortPartition(str -> str, Order.DESCENDING);

        sortedDataSet.writeAsText("./data/sort_output/", FileSystem.WriteMode.OVERWRITE);

        env.execute("App");
    }
}

窗口

在许多情况下,我们需要解决这样的问题:针对一个特定的时间段,例如一个小时,我们需要对数据进行统计和分析。但是,要实现这种数据窗口操作,首先需要确定哪些数据应该进入这个窗口。在深入了解窗口操作的定义之前,我们必须先确定作业将使用哪种时间语义。

换句话说,时间窗口是数据处理中的一个关键概念,用于将数据划分为特定的时间段进行计算。然而,在确定如何定义这些窗口之前,我们必须选择适合的时间语义,即事件时间、处理时间或摄入时间。不同的时间语义在数据处理中具有不同的含义和用途,因此在选择时间窗口之前,我们需要明确作业所需的时间语义,以便正确地界定和处理数据窗口。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐