DMOZ中文网站分类目录-免费收录各类优秀网站的中文网站目录.
  • DmozDir.org
DMOZ目录快速登录入口-免费收录各类优秀网站的中文网站目录.由人工编辑,并提供网站分类目录检索及地区分类目录检索,是站长免费推广网站的有力平台!

Flink

  • Flink

  • 已被浏览: 73 次2020年09月08日    来源:  https://www.dmozdir.org/
  • 基础的转换算子是无法访问事件的时间戳信息和水位线信息的。基于此,DataStream API 提供了一系列的Low-Level的转换算子。可以访问事件戳,watermarker以及注册定时事件。F

    基础的转换算子是无法访问事件的时间戳信息和水位线信息的。

    基于此,DataStream API 提供了一系列的Low-Level的转换算子。可以访问事件戳,watermarker以及注册定时事件。

    Flink 提供了8个Process Function。

    • ProcessFunction:最底层的;
    • KeyedProcessFunction:处理keyBy之后的流。
    • CoProcessFunction:处理collect操作之后的流。
    • ProcessJoinFunction:处理join操作之后的流。
    • BroadcastProcessFunction:广播的
    • KeyedBroadcastProcessFunction:处理keyBy之后广播的流。
    • ProcessWindowFunction:窗口函数
    • ProcessAllWindowFunction:全窗口函数

    KeyedProcessFunction

    keyedProcessFunction用来操作KeyedStream; 所有的Process Function都继承自RichFunction接口,都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY,IN,OUT]还额外提供了两个方法:
    在这里插入图片描述

    TimeService和定时器(Timers)

    Context 和 OnTimerContext 所持有的 TimerService对象拥有以下方法:

    在这里插入图片描述
    当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 Keyed Streams 上面使用。

    Emitting to Side Outputs(侧输出)

    大部分的 DataStream API的算子的输出是单一输出,也就是某种类型的数据流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。

    process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型不一样。一个side output 可以定义为 OutputTag[X]对象, X是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

    例子

    package com.dongk.flink.processfunction
    
    import com.dongk.flink.stream.SensorReading
    import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
    import org.apache.flink.util.Collector
    import org.apache.flink.streaming.api.scala._
    
    /**
     *
     *  侧输出流例子
     *
     *  判断传感器温度、低于某个值,输出到一个流。其余正常的输出到一个流
     **/
    object ProcessFunctionTest1 {
    
      def main(args: Array[String]): Unit = {
        //环境变量
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        /**
         *
         * 1、实现10s分钟的单词计数;
         * 2、利用socket数据
         *  nc -lk 7777
         * */
        val socketStream : DataStream[String] = env.socketTextStream("localhost",7777)
    
        val stream1 : DataStream[SensorReading] = socketStream.map(text => {
          val datas = text.split(",")
          SensorReading(datas(0).trim,datas(1).toLong,datas(2).trim.toDouble)
        })
    
        val stream2 = stream1.process(new FreezingAlter)  //主流
    
        stream2.print()
    
        stream2.getSideOutput(new OutputTag[String]("freezing")).print()
    
        env.execute("process function output ")
    
      }
    
    }
    
    /**
     * 低温预警
     *
     * 第一个泛型:输入的数据类型
     * 第二个泛型:主输出流的输出类型(不是侧输出流的输出类型)
     * */
    class FreezingAlter extends ProcessFunction[SensorReading,SensorReading]{
    
      lazy val alterOutput : OutputTag[String] = new OutputTag[String]("freezing")
    
      override def processElement(value : SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
        //输出到侧输出流
        if(value.temperature < 32.0){
          ctx.output(alterOutput,"freezing alter for" + value.id)
        }else{
          //输出到主流
          out.collect(value)
        }
      }
    }
    
    
    

    CoProcessFunction

    对于两条输入流,DataStream API 提供了CoProcessFunction这样的low-level操作。CoProcessFunction 提供了操作每一个输入流的方法:processElement1() 和 processElement()。

    类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器事件戳,TimerService,以及side outputs。

    CoProcessFunction 也提供了 onTimer() 回调函数。


    以上信息来源于网络,如有侵权,请联系站长删除。

    TAG:Flink

  • 上一篇:APP(UniAPP) 支付宝支付操作指导 + 遇到的坑(PHP实现)
  • 与“Flink”相关的资讯
  • 滴滴Flink
  • Flink SQL 结合 HiveCatalog 使用
  • Flink可靠性的基石-checkpoint机制详细解析
  • Flink学习笔记
  • Flink源码剖析:Jar包任务提交流程