package com.example.chapter11;
import com.example.chapter05.ClickSource;
import com.example.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
/**
* 快速上手
* SQL
* Table API
*/
public class SimpleTableExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据,得到DataStream
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//将DataStream转换Table
Table eventTable = tableEnv.fromDataStream(eventStream);
//4、直接写SQL转换
Table resultTable = tableEnv.sqlQuery("select user, url, `timestamp` from " + eventTable);
//Table API
Table resultTable2 = eventTable.select($("user"), $("url"))
.where($("user").isEqual("Alice"));
//Table 不能打印------------最简单转换成流输出
tableEnv.toDataStream(resultTable).print("result");
tableEnv.toDataStream(resultTable2).print("result2");
tableEnv.createTemporaryView("clickTable", eventTable);
Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");
//toChangelogStream 更新日志流
tableEnv.toChangelogStream(aggResult).print("agg");
env.execute();
}
}
package com.example.chapter11;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* 1、创建表的执行环境
* 2、注册表
* 3、转换注册表为Table对象
* 4、为Table对象增加查询条件
* 5、生成新的Table对象
* 6、可以在次放入当前环境供后续SQL使用
*
* -----使用TableAPI 使用SQL 都可以得到Table对象
*
* --------使用TableAPI
* --------使用SQL
* --------两者混用
*/
public class CommonApiTest {
public static void main(String[] args) {
/**
* 方式一
*/
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
// StreamTableEnvironment TableEnv = StreamTableEnvironment.create(env); //默认什么都不传递的话就是流处理、
/**
* 方式二
* flink流处理环境
*/
//1、定义环境配置来创建表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode() //使用流处理模式
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
/**
* 方式三
* 基于老版本planner进行流处理
*/
// EnvironmentSettings settings1 = EnvironmentSettings.newInstance()
// .inStreamingMode() //使用流处理模式
// .useOldPlanner()
// .build();
//
// TableEnvironment tableEnv1 = TableEnvironment.create(settings1);
/**
* 方式三.1
* 基于老版本planner进行批处理
*/
// ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(batchEnv);
/**
* 方式三.2
* 基于blink版本planner进行批处理
*/
// EnvironmentSettings setting3 = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .useAnyPlanner()
// .build();
//
// TableEnvironment tableEnv3 = TableEnvironment.create(setting3);
// 2、创建表
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
//executeSql 注册表
tableEnv.executeSql(createDDL);
//得到Table对象
// Table java 对象
// clickTable 是真正注册到表环境里的
Table clickTable = tableEnv.from("clickTable");
Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
.select($("user_name"), $("url"));
//如果还想把这个table放入到当前环境中、直接用在后续的SQL里面
tableEnv.createTemporaryView("Result2", resultTable);
//执行SQL进行表的查询转换
Table resultTable2 = tableEnv.sqlQuery("select user_name,url from Result2");
// 创建一张用于输出的表
String createOutDDL = "CREATE TABLE outTable (" +
"url STRING, " +
"user_name STRING " +
") WITH (" +
"'connector'= 'filesystem'," +
"'path' = 'output'," +
"'format' = 'csv'" +
")";
tableEnv.executeSql(createOutDDL);
//创建一张用于控制台打印输出的表
String creatPrintOutDDL = "CREATE TABLE printOutTable (" +
"url STRING, " +
"user_name STRING " +
") WITH (" +
"'connector'= 'print'" +
")";
tableEnv.executeSql(creatPrintOutDDL);
//输出表
resultTable.executeInsert("outTable");
//输出控制台
resultTable2.executeInsert("printOutTable");
}
}
package com.example.chapter11;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
/**
* TODO 统计每个用户的PV值
* 1、创建表的执行环境
* 2、注册表
* 3、转换注册表为Table对象
* 4、为Table对象增加查询条件
* 5、生成新的Table对象
* 6、输出
*/
public class CommonApiTestCount {
public static void main(String[] args) {
/**
* 方式二
* flink流处理环境
*/
//1、定义环境配置来创建表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode() //使用流处理模式
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 2、创建表
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
//executeSql 注册表
tableEnv.executeSql(createDDL);
//针对每个用户、统计当前他到底点击了多少次---统计每个用户的PV
Table aggResult = tableEnv.sqlQuery("select user_name,count(url) as cnt from clickTable group by user_name");
//创建一张用于控制台打印输出的a每个用户的PV
String creatPrintOutDDLUserPV = "CREATE TABLE printOutTable (" +
" user_name STRING, " +
" cnt BIGINT " +
") WITH (" +
" 'connector'= 'print'" +
")";
tableEnv.executeSql(creatPrintOutDDLUserPV);
//输出每个用户的PV打印控制台
aggResult.executeInsert("printOutTable");
}
}
package com.example.chapter11;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TopNExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、在创建表的DDL中直接定义时间属性
//FROM_UNIXTIME 长整型的值转换为String 格式的年月日的、时分秒、标准的UTC时间
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT, " +
"et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
"WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
//注册表
tableEnv.executeSql(createDDL);
/**
* 普通Top N,选取当前所有用户中浏览量最大的2个
* cnt 访问量
* row_num 到底排第几
*
* ROW_NUMBER() 函数
* OVER 窗口: 在获取一个新的字段、就是当前排序之后、按照cnt排序之后、到底排第几、对应的那个行号
*/
Table topNResultTable = tableEnv.sqlQuery("select user_name, cnt, row_num " +
"FROM (" +
" SELECT *,ROW_NUMBER() OVER (" +
"order by cnt DESC" +
" ) AS row_num " +
" FROM (SELECT user_name,COUNT(url) AS cnt FROM clickTable GROUP BY user_name)" +
") WHERE row_num <=2");
// tableEnv.toChangelogStream(topNResultTable).print("top 2 :");
//窗口TOP N 统计一段时间内的(前两名)活跃用户
String subQuery = "SELECT user_name,COUNT(url) AS cnt, window_start,window_end " +
"FROM Table (" +
"TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)" +
")" +
"GROUP BY user_name,window_start,window_end";
Table windowTopN = tableEnv.sqlQuery("select user_name, cnt, row_num,window_end " +
"FROM (" +
" SELECT *,ROW_NUMBER() OVER (" +
" PARTITION BY window_start,window_end " +
" order by cnt DESC" +
" ) AS row_num " +
" FROM ( " + subQuery + " )) WHERE row_num <=2");
tableEnv.toChangelogStream(windowTopN).print("window TOP N: ");
env.execute();
}
}
package com.example.chapter11;
import com.example.chapter05.ClickSource;
import com.example.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class TimeAndWindowsTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、在创建表的DDL中直接定义时间属性
//FROM_UNIXTIME 长整型的值转换为String 格式的年月日的、时分秒、标准的UTC时间
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT, " +
"et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
"WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
// 创建表
tableEnv.executeSql(createDDL);
//2、再流转换成Table的时候定义时间属性
SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp"),
$("et").rowtime());
//聚合查询转换
//1、分组聚合
Table aggTable = tableEnv.sqlQuery("select user_name,count(1) from clickTable group by user_name");
//2、分组创窗口聚合---- 老版本
Table groupWindowResultTable = tableEnv.sqlQuery("select " +
"user_name,count(1) as cnt, " +
"TUMBLE_END(et,INTERVAL '10' SECOND) AS entT " +
"FROM clickTable " +
"group by " +
"user_name, " +
"TUMBLE(et,INTERVAL '10' SECOND )"
);
// clickTable.printSchema();
/**
* 3、窗口聚合
* 3.1 滚动窗口 TUMBLE
* 参数 1 数据表
* 参数 2 时间字段
* 参数 3 核心参数、滚动时长
*
*
* 必须有窗口信息放在GROUP BY 后面
*/
Table tumbleWindowResultTable = tableEnv.sqlQuery(
"select user_name,COUNT(1) as cnt, " +
" window_end as endT " +
"from TABLE( " +
"TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND )" +
")" +
"GROUP BY user_name,window_end,window_start");
/**
* 3、窗口聚合
* 3.2 滚动窗口 TUMBLE
* 参数 1 数据表
* 参数 2 时间字段
* 参数 3 核心参数、滚动时长
*
*
* 必须有窗口信息放在GROUP BY 后面
*/
Table hopWindowResultTable = tableEnv.sqlQuery(
"select user_name,COUNT(1) as cnt, " +
" window_end as endT " +
"from TABLE( " +
"HOP(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND ,INTERVAL '10' SECOND )" +
")" +
"GROUP BY user_name,window_end,window_start");
/**
* 3、窗口聚合
* 3.3 滚动窗口 TUMBLE
* 参数 1 数据表
* 参数 2 时间水平字段
* 参数 3 核心参数、步长、累计统计输出的步长、也就是每隔多长时间输出一次当前10秒窗口有多少个
* 参数 4 当前窗口长度、相当于统计的周期
*
*
* 必须有窗口信息放在GROUP BY 后面
*/
Table cumulateWindowResultTable = tableEnv.sqlQuery(
"select user_name,COUNT(1) as cnt, " +
" window_end as endT " +
"from TABLE( " +
"CUMULATE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND ,INTERVAL '10' SECOND )" +
")" +
"GROUP BY user_name,window_end,window_start");
/**
* 4、
* 统计每个用户当前这次访问、以及之前三次访问的平均时间戳
*/
Table overWindowResultTable = tableEnv.sqlQuery("select user_name," +
" avg(ts) OVER(" +
" PARTITION BY user_name " +
"ORDER BY et " +
"ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
") AS avg_ts " +
"from clickTable"
);
// tableEnv.toChangelogStream(aggTable).print("agg");
// tableEnv.toDataStream(groupWindowResultTable).print("group windows");
// tableEnv.toDataStream(tumbleWindowResultTable).print("tumble window");
// tableEnv.toDataStream(hopWindowResultTable).print("hop window");
// tableEnv.toDataStream(cumulateWindowResultTable).print("cumulate window");
tableEnv.toDataStream(overWindowResultTable).print("over window: ");
env.execute();
}
}
package com.example.chapter11;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
public class UdfTest_AggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、在创建表的DDL中直接定义时间属性
//FROM_UNIXTIME 长整型的值转换为String 格式的年月日的、时分秒、标准的UTC时间
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT, " +
"et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
"WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
//注册表
tableEnv.executeSql(createDDL);
//2、注册自定义的聚合函数
tableEnv.createTemporarySystemFunction("WeighedAverage", WeighedAverage.class);
//3、调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user_name, WeighedAverage(ts, 1) as w_avg " +
" from clickTable group by user_name");
//4、转换流打印输出
tableEnv.toDataStream(resultTable).print();
env.execute();
}
//单独定义一个累加器类型
public static class WeightedAvgAccumulator {
public long sum = 0;
public int count = 0;
}
//实现自定义的聚合函数、计算加权衡平均值
public static class WeighedAverage extends AggregateFunction<Long, WeightedAvgAccumulator> {
@Override
public Long getValue(WeightedAvgAccumulator accumulator) {
if (accumulator.count == 0)
return null;
else
return accumulator.sum / accumulator.count;
}
@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}
// 累加器计算方法
public void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight) {
accumulator.sum += iValue * iWeight;
accumulator.count += iWeight;
}
}
}
package com.example.chapter11;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
public class UdfTest_ScalarFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、在创建表的DDL中直接定义时间属性
//FROM_UNIXTIME 长整型的值转换为String 格式的年月日的、时分秒、标准的UTC时间
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT, " +
"et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
"WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
//注册表
tableEnv.executeSql(createDDL);
//2、注册自定义标量函数
tableEnv.createTemporaryFunction("MyHash", MyHashFunction.class);
//3、调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user_name, MyHash(user_name) from clickTable");
//4、转换流打印输出
tableEnv.toDataStream(resultTable).print();
env.execute();
}
public static class MyHashFunction extends ScalarFunction {
public int eval(String str) {
return str.hashCode();
}
}
}
package com.example.chapter11;
/**
*
* 表聚合函数
* <p>
* Project: FlinkTutorial
* <p>
* Created by wushengran
*/
import com.example.chapter05.ClickSource;
import com.example.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class UdfTest_TableAggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 自定义数据源,从流转换
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 2. 将流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream,
$("user"),
$("url"),
$("timestamp").as("ts"),
$("rt").rowtime());
tableEnv.createTemporaryView("EventTable", eventTable);
// 3. 开滚动窗口聚合,得到每个用户在每个窗口中的浏览量
Table windowAggTable = tableEnv.sqlQuery("select user, count(url) as cnt, " +
"window_end " +
"from TABLE(" +
" TUMBLE( TABLE EventTable, DESCRIPTOR(rt), INTERVAL '10' SECOND )" +
")" +
"group by user," +
" window_start," +
" window_end");
tableEnv.createTemporaryView("AggTable", windowAggTable);
// 4. 注册表聚合函数函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
// 5. 在Table API中调用函数
Table resultTable = tableEnv.from("AggTable")
.groupBy($("window_end"))
.flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
.select($("window_end"), $("value"), $("rank"));
// 6. 输出到控制台
tableEnv.toChangelogStream(resultTable).print();
env.execute();
}
// 聚合累加器的类型定义,包含最大的第一和第二两个数据
public static class Top2Accumulator {
public Long first;
public Long second;
}
// 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组
public static class Top2 extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Accumulator> {
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Long.MIN_VALUE; // 为方便比较,初始值给最小值
acc.second = Long.MIN_VALUE;
return acc;
}
// 每来一个数据调用一次,判断是否更新累加器
public void accumulate(Top2Accumulator acc, Long value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
// 输出(数值,排名)的二元组,输出两行数据
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Long, Integer>> out) {
if (acc.first != Long.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Long.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
}
package com.example.chapter11;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
public class UdfTest_TableFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、在创建表的DDL中直接定义时间属性
//FROM_UNIXTIME 长整型的值转换为String 格式的年月日的、时分秒、标准的UTC时间
String createDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT, " +
"et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
"WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector'= 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
//注册表
tableEnv.executeSql(createDDL);
//2、注册自定义标量函数
tableEnv.createTemporaryFunction("MySplint", MySplint.class);
//3、调用UDF进行查询转换
Table resultTable = tableEnv.sqlQuery("select user_name, url, word, length" +
" from clickTable, " +
"LATERAL TABLE( MySplint(url) ) AS T(word,length)");
//4、转换流打印输出
tableEnv.toDataStream(resultTable).print();
env.execute();
}
//实现自定义的函数
public static class MySplint extends TableFunction<Tuple2<String, Integer>> {
public void eval(String str) {
String[] fields = str.split("\\?");
for (String field : fields) {
collect(Tuple2.of(field, field.length()));
}
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- sarr.cn 版权所有 赣ICP备2024042794号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务