春雨里洗过的太阳

世间所有的相遇,都是久别重逢

flume

一 概述

​ flume是一个高可用,高可靠的,分布式的海量日志采集,聚合和传输的软件.特别指的是数据流转的过程,或者说是数据搬运的过程。把数据从一个存储介质通过flume传递到另一个存储介质中。

​ 核心是把数据从数据源(source)收集过来,再将收集的数据送到指定的目的地(sink).为保证一定能送到目的地,再送到目的地之前,会先缓存数据(channel),等到数据到达目的地后,flume删除缓存.

​ flume支持定义各类数据发送方,用于收集各种类型数据,同时,Flume支持定制各种数据接受方,用于最终存储数据。一般的采集需求,通过对flume的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力。因此,flume可以适用于大部分的日常数据采集场景。

二 运行机制

img

​ 1 核心组件

  • source :用于对接各个不同的数据源

  • sink:用于对接各个不同存储数据的目的地(数据下沉地)

  • channel:用于中间临时存储缓存数据

    2 机制

  • flume本身是java程序 在需要采集数据机器上启动 —–>agent进程

  • agent进程里面包含了:source sink channel

  • 在flume中,数据被包装成event 真是的数据是放在event body中
    event是flume中最小的数据单元

    3 架构

  • 简单架构

    只需要部署一个agent进程即可

    img

  • 复杂架构

    多个agent之间的串联 相当于大家手拉手共同完成数据的采集传输工作

    在串联的架构中没有主从之分 大家的地位都是一样的

img

三 安装部署

​ 上传安装包到数据源所在节点上然后解压  tar -zxvf apache-flume-1.8.0-bin.tar.gz然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

flume开发步骤

在conf中,就是根据业务需求编写采集方案配置文件

  • 文件名见名知意 通常以souce——sink.conf
  • 具体需要描述清楚sink source channel组件配置信息 结合官网配置

启动命令:

1
2
3
4
5
bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console  命令完整版
bin/flume-ng agent -c ./conf -f ./conf/spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 命令精简版
--conf指定配置文件的目录
--conf-file指定采集方案路径
--name agent进程名字 要跟采集方案中保持一致

四 测试环境:

在conf目录下配置:vi   netcat-logger.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 描述和配置sink组件:k1
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent去采集数据:

1
bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

测试:

先要往agent采集监听的端口上发送数据,让agent有数据可采。

随便在一个能跟agent节点联网的机器上:

telnetanget-hostname  port   (telnet localhost 44444)

五 采集目录到hdfs:

需求:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

l 采集源,即source——监控文件目录 :  spooldir

l 下沉目标,即sink——HDFS文件系统  :  hdfs sink

l source和sink之间的传递通道——channel,可用file channel 也可以用内存channel

编写配置文件:vim spooldir-hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs
a1.sources.r1.fileHeader = true

# Describe the sink 目的地
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

参数解释(sink):

1
2
3
4
5
6
7
8
9
10
11
  roll控制写入hdfs文件 以何种方式进行滚动
 a1.sinks.k1.hdfs.rollInterval = 3 以时间间隔
 a1.sinks.k1.hdfs.rollSize = 20     以文件大小
 a1.sinks.k1.hdfs.rollCount = 5     event的个数
 以event个数如果三个都配置 谁先满足谁触发滚动如果不想以某种属性滚动 设置为0即可
 
 是否开启时间上的舍弃 控制文件夹以多少时间间隔滚动
 以下述为例:就会每10分钟生成一个文件夹
 a1.sinks.k1.hdfs.round = true
 a1.sinks.k1.hdfs.roundValue = 10
 a1.sinks.k1.hdfs.roundUnit = minute

Channel参数解释:

capacity:默认该通道中最大的可以存储的event数量

trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

注意:

  • 注意其监控的文件夹下面不能有同名文件的产生
  • 如果有 报错且罢工 后续就不再进行数据的监视采集了
  • 在企业中通常给文件追加时间戳命名的方式保证文件不会重名

启动同上:

测试 :往源目录里存入文件

六 采集文件到HDFS

需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

l 采集源,即source——监控文件内容更新 :  exec  ‘tail-F file’

l 下沉目标,即sink——HDFS文件系统  :  hdfssink

l Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

编写配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

参数解析:

·    rollInterval

默认值:30

hdfssink间隔多长将临时文件滚动成最终目标文件,单位:秒;

如果设置成0,则表示不根据时间来滚动文件;

注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

rollsize

默认值:1024

当临时文件达到该大小(单位:bytes)时,滚动成目标文件;

如果设置成0,则表示不根据临时文件大小来滚动文件;

rootcount

默认值:10

当events数据达到该数量时候,将临时文件滚动成目标文件;

如果设置成0,则表示不根据events数据来滚动文件;

round

默认值:false

是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。

roundvalue

默认值:1

时间上进行“舍弃”的值;

roundunit

默认值:seconds

时间上进行“舍弃”的单位,包含:second,minute,hour

示例:

a1.sinks.k1.hdfs.path= /flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.round= true

a1.sinks.k1.hdfs.roundValue= 10

a1.sinks.k1.hdfs.roundUnit= minute

当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:

/flume/events/20151016/17:30/00

因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。

启动同上:

测试:exec source 可以执行指定的linux command 把命令的结果作为数据进行收集

1
2
while true; do date >> /root/logs/test.log;done
使用该脚本模拟数据实时变化的过程

七 Flume的load-balance、failover

1 负载均衡

  • 所谓的负载均衡 用于解决一个进程或者程序处理不了所有请求 多个进程一起处理的场景
  • 同一个请求只能交给一个进行处理 避免数据重复
  • 如何分配请求就涉及到了负载均衡的算法:轮询(round_robin) 随机(random) 权重

img

例如三节点配置:

主节点:vim exec-avro.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

# set sources
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/logs/123.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true #如果开启,则将失败的sink放入黑名单
# round_robin 轮询 还支持random 随机
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长



#启动
bin/flume-ng agent -c conf -f conf/exec-avro.conf -n agent1 -Dflume.root.logger=INFO,console

第二节点:vim avro-logger.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node02
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#启动bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

第三节点:vim avro-logger.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node03
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1



#启动
bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

flume串联跨网络传输数据

  • avro sink

  • avro source

    使用上述两个组件指定绑定的端口ip 就可以满足数据跨网络的传递 通常用于flume串联架构中

  • flume串联启动

    通常从远离数据源的那一级开始启动

2 flume failover

  • 容错又称之为故障转移 容忍错误的发生。
  • 通常用于解决单点故障 给容易出故障的地方设置备份
  • 备份越多 容错能力越强 但是资源的浪费越严重

具体流程类似loadbalance,但是内部处理机制与load balance完全不同。

​ Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。故障转移机制的作用是将失败的Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦Sink成功发送一个事件,它将恢复到活动池。 Sink具有与之相关的优先级,数量越大,优先级越高。

​ 例如,具有优先级为100的sink在优先级为80的Sink之前被激活。如果在发送事件时汇聚失败,则接下来将尝试下一个具有最高优先级的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定优先级。

主节点: vim exec-avro.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

# set sources
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/logs/456.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10 #优先级值, 绝对值越大表示优先级越高
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000 #失败的Sink的最大回退期(millis)

#启动
bin/flume-ng agent -c conf -f conf/exec-avro.conf -n agent1 -Dflume.root.logger=INFO,console

第二节点与第三节点配置 同load-balance的

八 flume拦截器

1 静态拦截器 日志的采集与汇总

需求:

A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log

把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中

数据流程处理分析:

img

在第一台节点上配置 vim exec_source_avro_sink.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Name the components on this agent 
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/logs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /root/logs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

在第三台上配置vim avro_source_hdfs_sink.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#定义agent名, source、channel、sink的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1


#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port =41414

#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件不按时间生成
a1.sinks.k1.hdfs.rollInterval = 20
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 20
flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000

#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

使用静态拦截器前后对比:

1
2
3
4
5
6
7
如果没有使用静态拦截器
Event: { headers:{} body: 36 Sun Jun 2 18:26 }

使用静态拦截器之后 自己添加kv标识对
Event: { headers:{type=access} body: 36 Sun Jun 2 18:26 }
Event: { headers:{type=nginx} body: 36 Sun Jun 2 18:26 }
Event: { headers:{type=web} body: 36 Sun Jun 2 18:26 }

后续在存放数据的时候可以使用flume的规则语法获取到拦截器添加的kv内容

1
%{type}

模拟数据实时产生:

1
2
3
while true; do echo "access access....." >> /root/logs/access.log;sleep 0.5;done
while true; do echo "web web....." >> /root/logs/web.log;sleep 0.5;done
while true; do echo "nginx nginx....." >> /root/logs/nginx.log;sleep 0.5;done

配置完成后:

配置完成之后,在服务器A上的/root/data有数据文件access.log、nginx.log、web.log。

先启动服务器C上的flume,启动命令在flume安装目录下执行 :

bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

然后在启动服务器上的A,启动命令

在flume安装目录下执行 :

bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

2 自定义拦截器

拦截器简介:

​ Flume有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。

自定义拦截器:

根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销.

分为两部分;

1 编写java代码,自定义拦截器

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.baidu.cloud</groupId>
<artifactId>example-flume-intercepter</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

java代码:

  1. 定义一个类CustomParameterInterceptor实现Interceptor接口。
  2. 在CustomParameterInterceptor类中定义变量,这些变量是需要到
    Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
  3. 添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static cn.baidu.interceptor.CustomParameterInterceptor.Constants.*;

/**
* Created by itcast
*/
public class CustomParameterInterceptor implements Interceptor{
/** The field_separator.指明每一行字段的分隔符 */
private final String fields_separator;

/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
private final String indexs;

/** The indexs_separator. 多个下标的分隔符*/
private final String indexs_separator;

/** The encrypted_field_index. 需要加密的字段下标*/
private final String encrypted_field_index;
/**
*
*/
public CustomParameterInterceptor( String fields_separator,
String indexs, String indexs_separator,String encrypted_field_index) {
String f = fields_separator.trim();
String i = indexs_separator.trim();
this.indexs = indexs;
this.encrypted_field_index=encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeToString(f);
}
this.fields_separator =f;
if (!i.equals("")) {
i = UnicodeToString(i);
}
this.indexs_separator = i;
}

\t 制表符 的UNicode编码格式为(‘\u0009’)

将配置文件中传过来的unicode编码进行转换为字符串。

写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public static String UnicodeToString(String str) {
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(str);
char ch;
while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(2), 16);
str = str.replace(matcher.group(1), ch + "");
}
return str;
}

/*
* @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
*/
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
String line = new String(event.getBody(), Charsets.UTF_8);
String[] fields_spilts = line.split(fields_separator);
String[] indexs_split = indexs.split(indexs_separator);
String newLine="";
for (int i = 0; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//对加密字段进行加密
if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){
newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]);
}else{
newLine+=fields_spilts[parseInt];
}

if(i!=indexs_split.length-1){
newLine+=fields_separator;
}
}
event.setBody(newLine.getBytes(Charsets.UTF_8));
return event;
} catch (Exception e) {
return event;
}
}

/*
* @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
*/
public List<Event> intercept(List<Event> events) {
List<Event> out = new ArrayList<Event>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}

/*
* @see org.apache.flume.interceptor.Interceptor#initialize()
*/
public void initialize() {
// TODO Auto-generated method stub

}

/*
* @see org.apache.flume.interceptor.Interceptor#close()
*/
public void close() {
// TODO Auto-generated method stub

}

接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static class Builder implements Interceptor.Builder {

/** The fields_separator.指明每一行字段的分隔符 */
private String fields_separator;

/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
private String indexs;

/** The indexs_separator. 多个下标下标的分隔符*/
private String indexs_separator;

/** The encrypted_field. 需要加密的字段下标*/
private String encrypted_field_index;

/*
* @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
*/
public void configure(Context context) {
fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(INDEXS, DEFAULT_INDEXS);
indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index= context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);

}

/*
* @see org.apache.flume.interceptor.Interceptor.Builder#build()
*/
public Interceptor build() {

return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* The Class Constants.
*
*/
public static class Constants {
/** The Constant FIELD_SEPARATOR. */
public static final String FIELD_SEPARATOR = "fields_separator";

/** The Constant DEFAULT_FIELD_SEPARATOR. */
public static final String DEFAULT_FIELD_SEPARATOR =" ";

/** The Constant INDEXS. */
public static final String INDEXS = "indexs";

/** The Constant DEFAULT_INDEXS. */
public static final String DEFAULT_INDEXS = "0";

/** The Constant INDEXS_SEPARATOR. */
public static final String INDEXS_SEPARATOR = "indexs_separator";

/** The Constant DEFAULT_INDEXS_SEPARATOR. */
public static final String DEFAULT_INDEXS_SEPARATOR = ",";

/** The Constant ENCRYPTED_FIELD_INDEX. */
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";

/** The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";

/** The Constant PROCESSTIME. */
public static final String PROCESSTIME = "processTime";
/** The Constant PROCESSTIME. */
public static final String DEFAULT_PROCESSTIME = "a";

}

定义一个静态类,类中封装MD5加密方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 字符串md5加密
*/
public static class StringUtils {
// 全局数组
private final static String[] strDigits = { "0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };

// 返回形式为数字跟字符串
private static String byteToArrayString(byte bByte) {
int iRet = bByte;
// System.out.println("iRet="+iRet);
if (iRet < 0) {
iRet += 256;
}
int iD1 = iRet / 16;
int iD2 = iRet % 16;
return strDigits[iD1] + strDigits[iD2];
}

// 返回形式只为数字
private static String byteToNum(byte bByte) {
int iRet = bByte;
System.out.println("iRet1=" + iRet);
if (iRet < 0) {
iRet += 256;
}
return String.valueOf(iRet);
}

// 转换字节数组为16进制字串
private static String byteToString(byte[] bByte) {
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < bByte.length; i++) {
sBuffer.append(byteToArrayString(bByte[i]));
}
return sBuffer.toString();
}

public static String GetMD5Code(String strObj) {
String resultString = null;
try {
resultString = new String(strObj);
MessageDigest md = MessageDigest.getInstance("MD5");
// md.digest() 该函数返回值为存放哈希值结果的byte数组
resultString = byteToString(md.digest(strObj.getBytes()));
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
return resultString;
}
}
}

通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar,放到Flume的根目录下的lib中

新增配置文件: vim spool-interceptor-hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
a1.channels = c1
a1.sources = r1
a1.sinks = s1

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=200

#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs4/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.baidu.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://node01:9000/intercept/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = itcasr
a1.sinks.s1.hdfs.fileSuffix = .dat
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 2
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60


//启动
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

九 自定义组件

1flume自定义source

说明:

​ Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。

如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource

官方也提供了自定义source的接口:

官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source

实现原理:

根据官方说明自定义mysqlsource需要继承AbstractSource类并实现Configurable和PollableSource接口。

实现相应方法:

getBackOffSleepIncrement() //此处暂不用

getMaxBackOffSleepInterval() //此处暂不用

configure(Context context) //初始化context

process() //获取数据(从mysql获取数据,业务处理比较复杂,所以我们定义一个专门的类——QueryMysql来处理跟mysql的交互),封装成event并写入channel,这个方法被循环调用

stop() //关闭相关的资源

###实现:

创建数据库及表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
CREATE DATABASE `mysqlsource`;

USE `mysqlsource`;

/*Table structure for table `flume_meta` */
DROP TABLE
IF EXISTS `flume_meta`;

CREATE TABLE `flume_meta` (
`source_tab` VARCHAR (255) NOT NULL,
`currentIndex` VARCHAR (255) NOT NULL,
PRIMARY KEY (`source_tab`)
) ENGINE = INNODB DEFAULT CHARSET = utf8;


/*Table structure for table `student` */
DROP TABLE
IF EXISTS `student`;

CREATE TABLE `student` (
`id` INT (11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR (255) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE = INNODB AUTO_INCREMENT = 5 DEFAULT CHARSET = utf8;

/*Data for the table `student` */
INSERT INTO `student` (`id`, `name`)
VALUES
(1, 'zhangsan'),
(2, 'lisi'),
(3, 'wangwu'),
(4, 'zhaoliu');


pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.baidu.cloud</groupId>
<artifactId>example-flume</artifactId>
<version>1.1</version>


<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

自定义QueryMySql工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package cn.baidu.flumesource;

import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.apache.http.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@Data
@Getter
@Setter
public class QueryMySql {
private static final Logger LOG = LoggerFactory.getLogger(QueryMySql.class);

private int runQueryDelay, //两次查询的时间间隔
startFrom, //开始id
currentIndex, //当前id
recordSixe = 0, //每次查询返回结果的条数
maxRow; //每次查询的最大条数


private String table, //要操作的表
columnsToSelect, //用户传入的查询的列
customQuery, //用户传入的查询语句
query, //构建的查询语句
defaultCharsetResultSet;//编码集

//上下文,用来获取配置文件
private Context context;

//为定义的变量赋值(默认值),可在flume任务的配置文件中修改
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";

private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;

//加载静态资源
static {
Properties p = new Properties();
try {
p.load(QueryMySql.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (Exception e) {
LOG.error(e.toString());
}
}

//获取JDBC连接
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null)
throw new SQLException();
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}

//构造方法
QueryMySql(Context context) throws ParseException {
//初始化上下文
this.context = context;

//有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);

//无默认值参数:获取flume任务配置文件中的参数
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);

//校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
checkMandatoryProperties();
//获取当前的id
currentIndex = getStatusDBIndex(startFrom);
//构建查询语句
query = buildQuery();
}

//校验相应的配置信息(表,查询语句以及数据库连接的参数)
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}

//构建sql语句
private String buildQuery() {
String sql = "";
//获取当前id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
//以id作为offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}

//执行查询
List<List<Object>> executeQuery() {
try {
//每次执行查询时都要重新生成sql,因为id不同
customQuery = buildQuery();
//存放结果的集合
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
//存放一条数据的集合(多个列)
List<Object> row = new ArrayList<>();
//将返回结果放入集合
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// 重新连接
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}

//将结果集转化为字符串,每一条数据是一个list集合,将每一个小的list集合转化为字符串
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty())
return allRows;
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}

//更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset
void updateOffset2DB(int size) {
//以source_tab做为KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
+ this.table
+ "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}

//执行sql语句
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}

//获取当前id的offset
private Integer getStatusDBIndex(int startFrom) {
//从flume_meta表中查询出当前的id是多少
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
//如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值
return startFrom;
}

//查询一条数据的执行语句(当前id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}

//关闭相关资源
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

自定义MySqlSource主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package cn.baidu.flumesource;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import static org.slf4j.LoggerFactory.*;

public class MySqlSource extends AbstractSource implements Configurable, PollableSource {

//打印日志
private static final Logger LOG = getLogger(MySqlSource.class);
//定义sqlHelper
private QueryMySql sqlSourceHelper;


@Override
public long getBackOffSleepIncrement() {
return 0;
}

@Override
public long getMaxBackOffSleepInterval() {
return 0;
}

@Override
public void configure(Context context) {
//初始化
sqlSourceHelper = new QueryMySql(context);
}

@Override
public PollableSource.Status process() throws EventDeliveryException {
try {
//查询数据表
List<List<Object>> result = sqlSourceHelper.executeQuery();
//存放event的集合
List<Event> events = new ArrayList<>();
//存放event头集合
HashMap<String, String> header = new HashMap<>();
//如果有返回数据,则将数据封装为event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
//将event写入channel
this.getChannelProcessor().processEventBatch(events);
//更新数据表中的offset信息
sqlSourceHelper.updateOffset2DB(result.size());
}
//等待时长
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}

@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
//关闭资源
sqlSourceHelper.close();
} finally {
super.stop();
}
}
}

使用maven对工程进行打包,需要将mysql的依赖包一起打到jar包里,然后将打包好的jar包放到flume的lib目录下。

配置文件: vim mysqlsource.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = cn.baidu.flumesource.MySqlSource
a1.sources.r1.connection.url = jdbc:mysql://node01:3306/mysqlsource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = hadoop
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
a1.sources.r1.incremental.column.name = id
a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=3000

# Describe the sink
a1.sinks.k1.type = logger

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


启动:

1
bin/flume-ng agent -c conf -fconf/mysqlsource.conf -n a1 -Dflume.root.logger=INFO,console

2flume自定义sink

###说明:

同自定义source类似,对于某些sink如果没有我们想要的,我们也可以自定义sink实现将数据保存到我们想要的地方去,例如kafka,或者mysql,或者文件等等都可以

需求:从网络端口当中发送数据,自定义sink,使用sink从网络端口接收数据,然后将数据保存到本地文件当中去。

pom.xml 同自定义source

自定义mysink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package cn.baidu.flumesink;

import org.apache.commons.io.FileUtils;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import java.io.*;

public class MySink extends AbstractSink implements Configurable {
private Context context ;
private String filePath = "";
private String fileName = "";
private File fileDir;

//这个方法会在初始化调用,主要用于初始化我们的Context,获取我们的一些配置参数
@Override
public void configure(Context context) {
try {
this.context = context;
filePath = context.getString("filePath");
fileName = context.getString("fileName");
fileDir = new File(filePath);
if(!fileDir.exists()){
fileDir.mkdirs();
}
} catch (Exception e) {
e.printStackTrace();
}
}
//这个方法会被反复调用
@Override
public Status process() throws EventDeliveryException {
Event event = null;
Channel channel = this.getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
while(true){
event = channel.take();
if(null != event){
break;
}
}
byte[] body = event.getBody();
String line = new String(body);
try {
FileUtils.write(new File(filePath+File.separator+fileName),line,true);
transaction.commit();
} catch (IOException e) {
transaction.rollback();
e.printStackTrace();
return Status.BACKOFF;
}finally {
transaction.close();
}
return Status.READY;
}
}

将代码使用打包插件,打成jar包,注意一定要将commons-langs这个依赖包打进去,放到flume的lib目录下

配置conf文件 vim filesink.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node01
a1.sources.r1.port = 5678
a1.sources.r1.channels = c1
# # Describe the sink
a1.sinks.k1.type = cn.baidu.flumesink.MySink
a1.sinks.k1.filePath=/export/servers
a1.sinks.k1.fileName=filesink.txt
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
bin/flume-ng agent -c conf -f conf/filesink.conf -n a1 -Dflume.root.logger=INFO,console

Telnet node01 5678 连接到机器端口上输入数据