Flink SQL自定义UDF指南

肖钟城
  • 大数据技术栈
  • Flink
大约 3 分钟

Flink SQL自定义UDF指南

自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。

自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 语言开发自定义函数。

概述

当前 Flink 有如下几种函数:

  • 标量函数 将标量值转换成一个新标量值;
  • 表值函数 将标量值转换成新的行数据;
  • 聚合函数 将多行数据里的标量值转换成一个新标量值;
  • 表值聚合函数 将多行数据里的标量值转换成新的行数据;
  • 异步表值函数 是异步查询外部数据系统的特殊函数。

标量和表值函数已经使用了新的基于数据类型open in new window的类型系统,聚合函数仍然使用基于 TypeInformation 的旧类型系统。

以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。

函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。

java版本

import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

// 定义函数逻辑
public static class SubstringFunction extends ScalarFunction {
  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里不经注册直接“内联”调用函数
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));

// 注册函数
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);

// 在 Table API 里调用注册好的函数
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

// 在 SQL 里调用注册好的函数
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

对于交互式会话,还可以在使用或注册函数之前对其进行参数化,这样可以把函数 实例 而不是函数 用作临时函数。

为确保函数实例可应用于集群环境,参数必须是可序列化的。

import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

// 定义可参数化的函数逻辑
public static class SubstringFunction extends ScalarFunction {

  private boolean endInclusive;

  public SubstringFunction(boolean endInclusive) {
    this.endInclusive = endInclusive;
  }

  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, endInclusive ? end + 1 : end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里不经注册直接“内联”调用函数
env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12));

// 注册函数
env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true));

开发指南

在聚合函数使用新的类型系统前,本节仅适用于标量和表值函数。

所有的自定义函数都遵循一些基本的实现原则。

函数类

实现类必须继承自合适的基类之一(例如 org.apache.flink.table.functions.ScalarFunction )。

该类必须声明为 public ,而不是 abstract ,并且可以被全局访问。不允许使用非静态内部类或匿名类。

为了将自定义函数存储在持久化的 catalog 中,该类必须具有默认构造器,且在运行时可实例化。

pom文件只需要添加如下依赖即可:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

写法用例:

package com.zh.ch.bigdata.flink.udf;

import org.apache.flink.table.functions.ScalarFunction;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

/**
 * 日期格式化UDF
 *
 * @author Xiaozhch5
 * @version 1.0
 */
public class DateFormatter extends ScalarFunction {

    public String eval(String dateStr, String originalDatePattern, String targetDatePattern) {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(originalDatePattern);
        return LocalDate.parse(dateStr, formatter).format(DateTimeFormatter.ofPattern(targetDatePattern));
    }

    public String eval(String dateStr, String originalDatePattern) {
        String targetDatePattern = "yyyyMMdd";
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(originalDatePattern);
        return LocalDate.parse(dateStr, formatter).format(DateTimeFormatter.ofPattern(targetDatePattern));
    }
}

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.14.1