该练习的重点是充分全面地了解DataStreamAPI,以便于编写流式应用入门。
什么能被转化成流?
Flink的Java和ScalaDataStreamAPI可以将任何可序列化的对象转化为流。Flink自带的序列化器有
基本类型,即String、Long、Integer、Boolean、Array复合类型:Tuples、POJOs和Scalacaseclasses而且Flink会交给Kryo序列化其他类型。也可以将其他序列化器和Flink一起使用。特别是有良好支持的Avro。
Javatuples和POJOs
Flink的原生序列化器可以高效地操作tuples和POJOs
Tuples
对于Java,Flink自带有Tuple0到Tuple25类型。
POJOs
如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用):
该类是公有且独立的(没有非静态内部类)该类有公有的无参构造函数类(及父类)中所有的所有不被static、transient修饰的属性要么是公有的(且不被final修饰),要么是包含公有的getter和setter方法,这些方法遵循Javabean命名规范。示例:
Scalatuples和caseclasses
如果你了解Scala,那一定知道tuple和caseclass。
一个完整的示例
Stream执行环境
每个Flink应用都需要有执行环境,在该示例中为env。流式应用需要用到StreamExecutionEnvironment。DataStreamAPI将你的应用构建为一个jobgraph,并附加到StreamExecutionEnvironment。当调用env.execute()时此graph就被打包并发送到JobManager上,后者对作业并行处理并将其子任务分发给TaskManager来执行。每个作业的并行子任务将在taskslot中执行。
注意,如果没有调用execute(),应用就不会运行。
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。
基本的streamsource
上述示例用env.fromElements(...)方法构造DataStreamPerson。这样将简单的流放在一起是为了方便用于原型或测试。
StreamExecutionEnvironment上还有一个fromCollection(Collection)方法。因此,你可以这样做:
另一个获取数据到流中的便捷方法是用socket
DataStreamStringlines=env.socketTextStream(localhost,)
或读取文件
DataStreamStringlines=env.readTextFile(file:///path);
在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如ApacheKafka,Kinesis和各种文件系统。RESTAPI和数据库也经常用于增强流处理的能力(streamenrichment)。
Flink内核原理与实现京东月销量好评率97%无理由退换京东配送官方店¥99.4购买基本的streamsink
上述示例用adults.print()打印其结果到taskmanager的日志中(如果运行在IDE中时,将追加到你的IDE控制台)。它会对流中的每个元素都调用toString()方法。
输出看起来类似于
1Fred:ageWilma:age35
1和2指出输出来自哪个sub-task(即thread)
Inproduction,
转载请注明:http://www.0431gb208.com/sjszlfa/3751.html