Flink的Data Source 一 概述 Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。
Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)
Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source,
StreamExecutionEnvironment 中可以使用以下几个已实现的 stream sources:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 def fromElements [T : TypeInformation ](data: T *): DataStream [T ] = {....}def fromCollection [T : TypeInformation ](data: Seq [T ]): DataStream [T ] = {....}def fromCollection [T : TypeInformation ] (data: Iterator [T ]): DataStream [T ] = {... }def fromParallelCollection [T : TypeInformation ] (data: SplittableIterator [T ]): DataStream [T ] = {.....}def readTextFile (filePath: String ): DataStream [String ] = asScalaStream(javaEnv.readTextFile(filePath))def readTextFile (filePath: String , charsetName: String ): DataStream [String ] = asScalaStream(javaEnv.readTextFile(filePath, charsetName))def readFile [T : TypeInformation ](inputFormat: FileInputFormat [T ], filePath: String ): DataStream [T ] = asScalaStream(javaEnv.readFile(inputFormat, filePath))def readFileStream (StreamPath : String , intervalMillis: Long = 100 , watchType: FileMonitoringFunction .WatchType = FileMonitoringFunction .WatchType .ONLY_NEW_FILES ): DataStream [String ] = asScalaStream(javaEnv.readFileStream(StreamPath , intervalMillis, watchType)) @PublicEvolving @Deprecated def readFile [T : TypeInformation ]( inputFormat: FileInputFormat [T ], filePath: String , watchType: FileProcessingMode , interval: Long , filter: FilePathFilter ): DataStream [T ] = { asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter)) } def readFile [T : TypeInformation ]( inputFormat: FileInputFormat [T ], filePath: String , watchType: FileProcessingMode , interval: Long ): DataStream [T ] = { val typeInfo = implicitly[TypeInformation [T ]] asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo)) } def socketTextStream (hostname: String , port: Int , delimiter: Char = '\n', maxRetry: Long = 0 ): DataStream [String ] = asScalaStream(javaEnv.socketTextStream(hostname, port)) def createInput [T : TypeInformation ](inputFormat: InputFormat [T , _]): DataStream [T ] = if (inputFormat.isInstanceOf[ResultTypeQueryable [_]]) { asScalaStream(javaEnv.createInput(inputFormat)) } else { asScalaStream(javaEnv.createInput(inputFormat, implicitly[TypeInformation [T ]])) } def addSource [T : TypeInformation ](function: SourceFunction [T ]): DataStream [T ] = {...}def addSource [T : TypeInformation ](function: SourceContext [T ] => Unit ): DataStream [T ] = {...}
1 数据源为集合 1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。
2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。
4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。
5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。
1 2 3 4 5 6 7 8 StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();DataStream <Event > input = env.fromElements( new Event (1 , "barfoo" , 1.0 ), new Event (2 , "start" , 2.0 ), new Event (3 , "foobar" , 3.0 ), ... );
2 基于文件 1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。
2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。
3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。
1 2 3 4 5 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100 , FilePathFilter.createDefaultFilter(), typeInfo);
在具体实现上,Flink 把文件读取过程分为两个子任务,即目录监控和数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据 watchType 定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片分配给下游 reader。reader 负责读取数据。每个切分片只能由一个 reader 读取,但一个 reader 可以逐个读取多个切分片。
如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。这会打破“exactly-once”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,则 source 仅扫描路径一次然后退出,而不等待 reader 完成文件内容的读取。当然 reader 会继续阅读,直到读取所有的文件内容。关闭 source 后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。
3 基于Socket socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。
4 自定义Source addSource - 添加一个新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer09<>(…)) 以从 Apache Kafka 读取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def addSource [T : TypeInformation ](function: SourceFunction [T ]): DataStream [T ] = { require(function != null , "Function must not be null." ) val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation [T ]] asScalaStream(javaEnv.addSource(cleanFun, typeInfo)) } def addSource [T : TypeInformation ](function: SourceContext [T ] => Unit ): DataStream [T ] = { require(function != null , "Function must not be null." ) val sourceFunction = new SourceFunction [T ] { val cleanFun = scalaClean(function) override def run (ctx: SourceContext [T ]) { cleanFun(ctx) } override def cancel () = {} } addSource(sourceFunction) }
比如去消费 Kafka 某个 topic 上的数据,这时候就需要用到这个 addSource,可能因为用的比较多的原因吧,Flink 直接提供了 FlinkKafkaConsumer09 等类可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 这个基础类,它是 Flink Kafka 消费的最根本的类
4.1 Flink的kafka的source 首先启动zk,kafka,Flink
1 2 3 4 5 6 import scala.collection.mutable.Map case class Metric (var name: String ,var timestamp: Long ,var fields:Map [String ,String ],var tags: Map [String ,String ] ) { def this (){ this ("aa" ,2 ,Map (),Map ()) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package mz.kafkasouirceimport java.util.Properties import scala.collection.mutable.Map import com.alibaba.fastjson.JSON import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord object KafkaUtils { val broker_list = "localhost:9092" val topic = "metric" @throws [InterruptedException ] def writeToKafka (): Unit = { val props = new Properties props.put("bootstrap.servers" , broker_list) props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) val producer = new KafkaProducer [String , String ](props) val metric = new Metric () metric.timestamp=System .currentTimeMillis metric.name=("mz" ) var tags:Map [String ,String ] = Map () var fields:Map [String ,String ] = Map () tags += ("cluster" -> "mz" ) tags += ("host_ip" -> "" ) fields += ("used_percent" -> "a" ) fields += ("max" -> "b" ) fields += ("used" -> "c" ) fields += ("init" -> "d" ) metric.tags=tags metric.fields=fields val record: ProducerRecord [String , String ] = new ProducerRecord [String , String ](topic, null , null , metric.toString) producer.send(record) System .out.println("发送数据: " + metric.toString) producer.flush() } @throws [InterruptedException ] def main (args: Array [String ]): Unit = { while ( { true }) { Thread .sleep(300 ) writeToKafka() } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 import java.util.Properties import org.apache.calcite.avatica.Handler .ResultSink import org.apache.flink.api.common.typeinfo.{TypeHint , TypeInformation }import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.{CheckpointingMode , TimeCharacteristic }import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09 , KafkaDeserializationSchema }import org.apache.flink.streaming.util.serialization.{SimpleStringSchema , TypeInformationKeyValueSerializationSchema }import org.apache.flink.streaming.api.scala._import org.apache.kafka.clients.consumer.ConsumerRecord object FlinkKafkaSource { class RecordKafkaSchema extends KafkaDeserializationSchema [ConsumerRecord [String , String ]] { override def isEndOfStream (nextElement: ConsumerRecord [String , String ]): Boolean = false override def deserialize (record: ConsumerRecord [Array [Byte ], Array [Byte ]]): ConsumerRecord [String , String ] = { var key: String = null var value: String = null if (record.key != null ) { key = new String (record.key(),"UTF-8" ) } if (record.value != null ) { value = new String (record.value(),"UTF-8" ) } new ConsumerRecord [String , String ]( record.topic(), record.partition(), record.offset(), key, value ) } override def getProducedType : TypeInformation [ConsumerRecord [String , String ]] = TypeInformation .of(new TypeHint [ConsumerRecord [String , String ]] {}) } def main (args: Array [String ]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val kafkaProps = new Properties () kafkaProps.put("bootstrap.servers" , "localhost:9092" ) kafkaProps.put("zookeeper.connect" , "localhost:2181" ) kafkaProps.put("group.id" , "metric-group" ) kafkaProps.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ) kafkaProps.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ) kafkaProps.put("auto.offset.reset" , "latest" ) var schema3 = new RecordKafkaSchema val kafkaSource: FlinkKafkaConsumer09 [ConsumerRecord [String , String ]] = new FlinkKafkaConsumer09 [ConsumerRecord [String , String ]]("metric" ,schema3, kafkaProps) val specificStartOffsets = new java.util.HashMap [KafkaTopicPartition , java.lang.Long ]() specificStartOffsets.put(new KafkaTopicPartition ("metric" , 0 ), 0 L) kafkaSource.setStartFromSpecificOffsets(specificStartOffsets) val ds: DataStream [ConsumerRecord [String , String ]] = streamEnv.addSource(kafkaSource) val keyValue=ds.map(new MapFunction [ConsumerRecord [String , String ],String ] { override def map (message: ConsumerRecord [String , String ]): String = { "key" + message.key + " value:" + message.value } }) keyValue.print() streamEnv.execute() } }
若是想自定义Source,就要去看SourceFunction 接口了,它是所有 stream source 的根接口,它继承自一个标记接口(空接口)Function。
SourceFunction 定义了两个接口方法:
1 2 3 1、run : 启动一个 source,即对接一个外部数据源然后 emit 元素形成 stream(大部分情况下会通过在该方法里运行一 个 while 循环的形式来产生 stream)。 2、cancel : 取消一个 source,也即将 run 中的循环 emit 元素的行为终止。 正常情况下,一个 SourceFunction 实现这两个接口方法就可以了。其实这两个接口方法也固定了一种实现模板。
4.2 Mysql的自定义Source 添加mysql的依赖
1 2 3 4 5 <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.34</version > </dependency >
1 2 3 4 5 6 7 DROP TABLE IF EXISTS `user` ;CREATE TABLE `user` ( `id` int (11 ) unsigned NOT NULL AUTO_INCREMENT, `name` varchar (25 ) COLLATE utf8_bin DEFAULT NULL , `age` int (10 ) DEFAULT NULL , PRIMARY KEY (`id` ) ) ENGINE =InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET =utf8 COLLATE =utf8_bin;
1 INSERT INTO `user` VALUES ('1' , 'a' , '18' ), ('2' , 'b' , '17' ), ('3' , 'c' , '18' ), ('4' , 'd' , '16' );
1 case class User (var id:Int ,var name:String ,var age:Int )
新建 Source SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package mz.mysqlsourceimport java.lang.Exception import java.sqlimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction , SourceFunction }import java.sql.{DriverManager , PreparedStatement , ResultSet }import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.SourceFunction .SourceContext import org.apache.flink.streaming.api.scala._case class FlinkMysqlSource ( ) extends RichSourceFunction [User ]( ) { var connection:sql.Connection =_ var ps:PreparedStatement =_ @throws [Exception ] override def open (parameters: Configuration ): Unit = { super .open(parameters) connection = getConnection val sql = "select * from user;" ps = connection.prepareStatement(sql) } @throws [Exception ] override def close (): Unit = { super .close if (connection != null ) { connection.close } if (ps != null ) ps.close } @throws [Exception ] override def run (ctx:SourceContext [User ]): Unit = { val resultSet = ps.executeQuery while ( {resultSet.next}) { val user: User = new User (resultSet.getInt("id" ), resultSet.getString("name" ).trim, resultSet.getInt("age" )) ctx.collect(user) } } override def cancel (): Unit = { } private def getConnection :sql.Connection = { try { Class .forName("com.mysql.jdbc.Driver" ) connection = DriverManager .getConnection("jdbc:mysql://localhost:3306/lcflink?useUnicode=true&characterEncoding=UTF-8" , "root" , "123456" ) }catch { case e:Exception => { print(e.getMessage) } } connection } }
1 2 3 4 5 6 7 8 9 10 11 package mz.mysqlsourceimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._object FlinkMysqlTest { def main (args: Array [String ]): Unit = { val environment = StreamExecutionEnvironment .getExecutionEnvironment environment.addSource(new FlinkMysqlSource ()).print() environment.execute("mysql_query" ) } }
1 2 3 4 1 public abstract class RichSourceFunction <OUT > extends AbstractRichFunction implements SourceFunction <OUT > {} 2.1 public abstract class AbstractRichFunction implements RichFunction , Serializable {} 2.1 .1 public interface RichFunction extends Function {} 2.2 public interface SourceFunction <T > extends Function , Serializable {}
一个抽象类,继承自 AbstractRichFunction。为实现一个 Rich SourceFunction 提供基础能力。该类的子类有三个,两个是抽象类,在此基础上提供了更具体的实现,另一个是 ContinuousFileMonitoringFunction。
1 2 3 4 5 6 7 8 1 public class ContinuousFileMonitoringFunction <OUT > extends RichSourceFunction <TimestampedFileInputSplit > implements CheckpointedFunction {} 2 public abstract class MessageAcknowledgingSourceBase <Type , UId > extends RichSourceFunction <Type > implements CheckpointedFunction , CheckpointListener {} 2.1 public abstract class MultipleIdsMessageAcknowledgingSourceBase <Type , UId , SessionId > extends MessageAcknowledgingSourceBase <Type , UId > {}
MessageAcknowledgingSourceBase :它针对的是数据源是消息队列的场景并且提供了基于 ID 的应答机制。
MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基础上针对 ID 应答机制进行了更为细分的处理,支持两种 ID 应答模型:session id 和 unique message id。
ContinuousFileMonitoringFunction:这是单个(非并行)监视任务,它接受 FileInputFormat,并且根据 FileProcessingMode 和 FilePathFilter,它负责监视用户提供的路径;决定应该进一步读取和处理哪些文件;创建与这些文件对应的 FileInputSplit 拆分,将它们分配给下游任务以进行进一步处理。
5 说下上面几种的特点 1、基于集合:有界数据集,更偏向于本地测试用
3、基于 Socket:监听主机的 host port,从 Socket 中获取数据
4、自定义 addSource:大多数的场景数据都是无界的,会源源不断的过来。或者其他复杂源数据