一 flink的Sink flink中没有类似于spark中的foreach方法,让用户进行迭代操作,虽然对外的操作都需要sink完成,flink一般通过一下方法
1 stream.addSink(new mySink('xx x'))
官方提供了一部分sink,其他的需要自己自定义实现sink
官方提供的api:
第三方的包实现Apache Bahir:
1 flume,redis,akka,netty(source)
二 实现 1 写入文件和打印 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val ds: DataStream [String ] = env.fromElements("123" ,"456" ) ds.print() ds.writeAsCsv("a.scv" ) ds.writeAsText("b.txt" ) ds.addSink(StreamingFileSink .forRowFormat( new Path ("a" ) ,new SimpleStringEncoder [String ]() ).build()) ds.addSink(new PrintSinkFunction [String ]()) env.execute()
2 sinkTokafka 1 2 3 val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val ds: DataStream [String ] = env.fromElements("123" ,"456" ) ds.addSink(new FlinkKafkaProducer09 [String ]("localhost:9999" ,"sinkTest" ,new SimpleStringSchema ()))
3 sinkToRedis 引入依赖bahir
1 2 3 4 5 <dependency > <groupId > org.apache.bahir</groupId > <artifactId > flink-connector-redis_2.11</artifactId > <version > 1.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand , RedisCommandDescription , RedisMapper }object SinkRedis { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val ds: DataStream [String ] = env.fromElements("123" ,"456" ) val conf = new FlinkJedisPoolConfig .Builder () .setHost("loaclhost" ) .setPort(6379 ) .build() ds.addSink( new RedisSink [String ](conf,new MyRedisMapper )) env.execute("redis" ) } } class MyRedisMapper extends RedisMapper [String ] { override def getCommandDescription : RedisCommandDescription =new RedisCommandDescription (RedisCommand .HSET ,"num" ) override def getKeyFromData (t: String ): String = t.concat("_k" ) override def getValueFromData (t: String ): String = t }
4 sinkEs 引入依赖
1 2 3 4 5 6 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-elasticsearch6_2.12</artifactId > <version > 1.10.1</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import java.utilimport org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction , RequestIndexer }import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests object SinkEs { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val ds: DataStream [String ] = env.fromElements("123" ,"456" ) val httpHosts = new util.ArrayList [HttpHost ]() httpHosts.add(new HttpHost ("localhost" ,9200 )) val myEsSinkFunction =new ElasticsearchSinkFunction [String ] { override def process (t: String , runtimeContext: RuntimeContext , requestIndexer: RequestIndexer ): Unit = { val dataSource = new util.HashMap [String ,String ]() dataSource.put("id" ,t.concat("_id" )) dataSource.put("num" ,t) val indexRe= Requests .indexRequest() .index("num" ) .`type `("reading" ) .source (dataSource ) //用index发送请求 requestIndexer .add (indexRe ) } } ds .addSink (new ElasticsearchSink .Builder [String ](httpHosts,myEsSinkFunction ).build ( )) } }
5 sinkMysql –JDBC自定义Sink 引入依赖 MySQL:
1 2 3 4 5 <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.38</version > </dependency >
添加MyJdbcSink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.sql.{Connection , Driver , DriverManager , PreparedStatement }import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction , SinkFunction }class MyJdbcSink extends RichSinkFunction [String ] { var con:Connection =_ var insertStmt:PreparedStatement = _ var updateStmt:PreparedStatement = _ override def open (parameters: Configuration ): Unit = { val con = DriverManager .getConnection("jdbc:mysql://localhost:3306/test" ,"username" ,"password" ) insertStmt = con.prepareStatement("insert into flinktest (id,temp) values(?,?)" ) updateStmt = con.prepareStatement("update flinktest set temp= ? where id=?" ) } override def invoke (value: String ): Unit = { updateStmt.setString(1 ,value.concat("_temp" )) updateStmt.setString(2 ,value.concat("_id" )) updateStmt.execute() if ( updateStmt.getUpdateCount ==0 ){ insertStmt.setString(1 ,value.concat("_id" )) insertStmt.setString(2 ,value.concat("_temp" )) insertStmt.execute() } } override def close (): Unit = { insertStmt.close() updateStmt.close() con.close() } }
1 2 3 4 val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val ds: DataStream [String ] = env.fromElements("123" ,"456" ) ds.addSink( new MyJdbcSink ()) env.execute("sinkmysql" )