原创

Flink编程模型及核心概念

简要记录学习flink,详细参考官网http://flink.apache.org/

核心概念概述

Flink程序是实现分布式集合转换的常规程序(例如,filtering, mapping, updating state, joining, grouping, defining windows, aggregating)。最初从源创建集合(例如,by reading from files, kafka topics, or from local, in-memory collections)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。根据数据源的类型,即有界或无界源,您可以编写批处理程序或流程序,其中DataSet API用于批处理,DataStream API用于流式处理。(https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html)

DataSet&DataStream

Flink具有特殊类DataSet和DataStream来表示程序中的数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet的情况下,数据是有限的,而对于DataStream,元素的数量可以是无限的。这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。最初通过在Flink程序中添加源来创建集合,并通过使用诸如map,filter等API方法对它们进行转换来从这些集合中派生新集合。

Flink编程模型

Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

  1. 获得执行环境

  2. 加载/创建初始数据

  3. 指定此数据的转换

  4. 指定放置计算结果的位置

  5. 触发程序执行

延迟执行

所有Flink程序都是延迟地执行:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个操作并将其添加到程序的执行计划中。当执行环境上的execute()调用显式触发执行时,实际执行操作。程序是在本地执行还是在集群上执行取决于执行环境的类型,延迟执行可以为构建Flink一个整体计划单元执行的复杂程序。

编程如何指定Key

某些转换(join,coGroup,keyBy,groupBy)需要一个key在元素集合上。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据按照key进行分组。

  • 定义tuple的keys

    def main(args: Array[String]) {
        //获取参数
        var port = 0
        try {
          val parameterTool = ParameterTool.fromArgs(args)
          port = parameterTool.getInt("port")
        } catch {
          case e: Exception =>
            System.err.println(e+"端口未设置,使用默认端口9999")
            port = 9999
        }
        // 1.获取运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    ​
        //2.获取数据
        val text = env.socketTextStream("localhost", port)
    ​
        //3.transform
        text.flatMap(_.split(","))
          .map((_, 1))      //定义keys通过tuple
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
          .print()
          .setParallelism(1)
    ​
        // execute program
        env.execute("Flink Streaming Scala API Skeleton")
      }

     

  • 使用Field Expressions定义key

    def main(args: Array[String]) {
        //获取参数
        var port = 0
        try {
          val parameterTool = ParameterTool.fromArgs(args)
          port = parameterTool.getInt("port")
        } catch {
          case e: Exception =>
            System.err.println(e + "端口未设置,使用默认端口9999")
            port = 9999
        }
        // 1.获取运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    ​
        //2.获取数据
        val text = env.socketTextStream("localhost", port)
    ​
        //3.transform
        text.flatMap(_.split(","))
          .map(x => WC(x, 1)) //定义keys通过定义一个类
          .keyBy("word")
          .timeWindow(Time.seconds(5))
          .sum("count")
          .print()
          .setParallelism(1)
    ​
        // execute program
        env.execute("Flink Streaming Scala API Skeleton")
      }
    ​
      case class WC(word: String, count: Int)
    ​
    字段表达式语法:
    1.按字段名称选择POJO字段。例如,“user”指的是POJO类型的“user”字段。
    2.通过1偏移字段名称或0偏移字段索引选择元组字段。例如,“_ 1”和“5”分别表示Scala Tuple类型的第一个和第六个字段。
    3.可以在POJO和Tuples中选择嵌套字段。例如,“user.zip”指的是POJO的“zip”字段,其存储在POJO类型的“user”字段中。
    4.支持任意嵌套和混合POJO和元组,例如“_2.user.zip”或“user._4.1.zip”。您可以使用“_”通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。

     

  • 使用Selector定义key

    def main(args: Array[String]) {
        //获取参数
        var port = 0
        try {
          val parameterTool = ParameterTool.fromArgs(args)
          port = parameterTool.getInt("port")
        } catch {
          case e: Exception =>
            System.err.println(e + "端口未设置,使用默认端口9999")
            port = 9999
        }
        // 1.获取运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    ​
        //2.获取数据
        val text = env.socketTextStream("localhost", port)
    ​
        //3.transform
        text.flatMap(_.split(","))
          .map(x => WC(x, 1))
          .keyBy(_.word)   //定义keys通过选择器
          .timeWindow(Time.seconds(5))
          .sum("count")
          .print()
          .setParallelism(1)
    ​
        // execute program
        env.execute("Flink Streaming Scala API Skeleton")
      }
    ​
      case class WC(word: String, count: Int)

     

支持的数据类型

  1. Java Tuples and Scala Case Classes

  2. JAVA POJO

  3. Primitive Types

  4. Regular Classes

  5. Values

  6. Hadoop Writables

  7. Special Types

正文到此结束
本文目录