总结出UDF/UDAF/UDTF三种函数的写法,并配以代码和注释说明。

一、UDF
介绍:自定义标量函数(User Defined Scalar Function),一行输入,一行输出。

import org.apache.flink.table.functions.ScalarFunction;

/**
 *   功能:字符串转小写
 *   注册函数:create function DemoUDF as 'packagename.DemoUDF';
 *   使用案例:select DemoUDF('Abc')
 */
public class DemoUDF extends ScalarFunction {

    public DemoUDF() {}

    public String eval(String str) {
        return str.toLowerCase();
    }
}

二、UDAF
介绍:自定义聚合函数,多行输入,一行输出。

import org.apache.flink.table.functions.AggregateFunction;

/**
 -   功能:求和
 -   注册函数:create function DemoUDAF as 'packagename.DemoUDAF';
 -   使用案例:
         select student_id,DemoUDAF(score)
         from tablename
         group by student_id
 */
// AggregateFunction<聚合的最终结果类型,聚合期间的中间结果类型>
public class DemoUDAF extends AggregateFunction<Long, DemoUDAF.SumAccumulator> {

    //定义一个累加器,存放聚合的中间结果
    public static class SumAccumulator{
        public long sumPrice;
    }

    //初始化累加器
    @Override
    public SumAccumulator createAccumulator() {
        SumAccumulator sumAccumulator = new SumAccumulator();
        sumAccumulator.sumPrice=0;
        return sumAccumulator;
    }

    //根据输入,更新累加器
    @Override
    public void accumulate(SumAccumulator accumulator,Long input){
        accumulator.sumPrice += input;
    }

    //返回聚合的最终结果
    @Override
    public Long getValue(SumAccumulator accumulator) {
        return accumulator.sumPrice;
    }
}

三、UDTF
介绍:自定义<表函数>,这类函数是作用于表,而不是字段。
UDTF可以细分为3类:

  • 一行输入,多行输出
  • 一列输入,多列输出
  • 一行输入,多行多列输出

UDTF之一行输入,多行输出

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


/**
 *   功能:根据指定的分隔符,将一行变成多行
 *   注册函数:create function DemoRowMore as 'packagename.DemoRowMore';
 *   使用案例:
         select *,word
         from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word)
 */

public class DemoRowMore extends TableFunction<Row> {

    public DemoRowMore() {
    }

    // 使用注解指定输出数据的名称和类型
    // 没有返回值,使用collect收集数据,可以收集多次
    @FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
    public void eval(String data,String split){
        String[] arr = data.split(split);
        for (String s : arr) {
            collect(Row.of(s));
        }
    }
}
数据样例:
a,b,c
d,e

Flink SQL>
select word
from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word);

Flink SQL>
a
b
c
d
e

UDTF之一列输入,多列输出

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 *   功能:根据给定的分隔符,将一列变成两列
 *   注册函数:create function DemoColumnMore as 'packagename.DemoColumnMore';
 *   使用案例:
        select *,name,city
        from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city)
 */
public class DemoColumnMore extends TableFunction<Row> {

    public DemoColumnMore() {
    }

    // 使用注解指定输出数据的名称和类型
    @FunctionHint(output = @DataTypeHint("ROW<name STRING,city STRING>"))
    public void eval(String data, String split){
        String[] arr = data.split(split);
        Row row = new Row(2);
        row.setField(0,arr[0]);
        row.setField(1,arr[1]);
        collect(row);
    }
}
数据样例:
'张三,上海'

Flink SQL>
select name,city
from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city)

Flink SQL>
'张三','上海'

创作不易,希望对你有所帮助!

一健三联就是对我最大的鼓励,笔芯~

交流加微wex997520707~

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐