版权申明:转载请注明出处。
文章来源:
flume中提供了多种source来应对不同场景的数据传输,常用的有exec source和spooling directory source。下面对flume的source做详细的说明。
1.Spooling Directory Source
这种方式是将要传输的文件放在磁盘的某个目录下,这个目录可以理解为一个池子,当池子中有文件的时候就会被放入channel,当确认文件已经放入channel,原始文件会被重命名或者删除。
这种方式是可靠的、并且不会丢失数据,当agent挂掉重启后也能接着传输。需要注意的是放入池子的文件不能再被写入或者不能出现重名文件,否则传输会报错。
需要注意的是尽管这种方式提供了可靠性保证,但是下游的某种失败也会导致event重复。
以下是该source的配置属性,加粗的为必选属性。
属性名称 | 默认值 | 说明
--| -- | --
channels | - | channel,可以为多个
type | - | 组件的类型名称,这里为spooldir
spoolDir | - | 读取文件的目录
fileSuffix | .COMPLETED | 为已经读取的文件添加的后缀
deletePolicy | never | 删除spool中文件的策略,有never和immediate
fileHeader | false | 是否在全路径的文件名上添加header
fileHeaderKey | file | 将一个路径的文件名添加到event header时的key
basenameHeader | false | 是否在基本文件名上添加一个header
basenameHeaderKey | basename | 将一个基本文件添加到event header时的key
includePattern | ^.*$ | 用来标识需要被传输的文件名的正则表达式,可以与ignorePattern同时使用
如果同时命中了两条规则,则这个文件会被忽略。
ignorePattern | ^$ | 用来标识将要被忽略的文件的正则表达式。
trackerDir | .flumespool | 用来存储正在处理的文件的元数据的目录,如果这个目录不是一个路径,
可以将其理解为一个和poolDir相关的目录
consumeOrder | oldest | 消耗spool中文件的顺序,有oldest,youngest,random,当为oldest或者youngest时,
会通过文件后一次修改时间去排序,若修改时间一致,则按照文件名称的字典顺序排序。
这两种方式每次都会扫描整个目录,当有大量文件的时候速度会比较慢。
当为random时会随机的消耗文件,这种方式速度快但是有可能造成早生成的文件后消耗,对于时序有要求的不建议使用。
pollDelay | 500 | 轮巡新文件的延时,单位为毫秒
recursiveDirectorySearch | false | 是否监控并读取子目录的文件。
maxBackoff | 4000 | 两次写入channel的时间间隔,单位为毫秒,
source会以一个较大的间隔时间开始,然后每次以指数减少的方式缩短间隔时间,直到channel写满抛出异常,是一个动态的参数。
batchSize | 100 | 每个批次的大小
inputCharset | UTF-8 | deserializers用来处理文件的字符集
decodeErrorPolicy | FAIL | 字符无法解码时的策略。
FAIL:抛出一个解码失败的异常。
REPLACE:替换字符,一般使用Unicode字符U+FFFD。
IGNORE:丢弃无法解析的队列
deserializer | LINE | deserializer用于将file解析成event,默认将文件的每一行解析为一个event。
deserializer.* | | 可以使用多个 deserializer对一个event处理。
bufferMaxLineLength | 5000 | 每个需要提交的buffer中的大文件行数,使用deserializer.maxLineLength替代。
selector.type | replicating | 取值有replicating、multiplexing
selector.* | | 依赖于selector.type
interceptors | - | 拦截器,可配置多个,使用空格分割。
配置示例
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
2.Scribe Source
scribe是facebook 提供的另外一种数据传输系统,使用scribe source相当于在scribe之后又套接了flume传输数据。以下是各个数据传输系统的对比图。
scribe source 的属性配置如下:
属性 | 默认值 | 说明
-- | -- | --
type | - | org.apache.flume.source.scribe.ScribeSource
port | 1499 | 端口
maxReadBufferBytes | 16384000 | thrift默认的配置
workerThreads | 5 | 工作线程数
selector.type | replicating | 取值有replicating、multiplexing
selector.* | | 依赖于selector.type
配置示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
3.Exec Source
exec source是一种比较简单的用来实时传输的source,它会启动一个用户给定的类Unix命令,这个命令会不断的产生数据流,exec source接收它产生的数据流。一般常见的命令有tail -f ,也可以使用一些比较复杂的命令。
但是这种方式无法保证可靠性,source无法感知event放入channel时产生的错误。比如当channel写满的时候,source还会不断的发送数据到channel,这会导致数据丢失。
exec source的属性配置如下:
属性 | 默认值 | 说明
-- | -- | --
channels | - | channel,可以为多个
type | - | 组件的类型名称,这里为exec
command | - | 需要执行的命令。
shell | - | 一个用户执行命令的shell调用。
restartThrottle | 10000 | 重启之前等待的时间。
restart | false | 当执行的命令挂掉后是否需要重启。
logStdErr | false | 是否要将命令的错误日志记录。
batchSize | 20 | 读取并发送到channel的大行数。
batchTimeout | 3000 | 批次间隔时间。
selector.type | replicating | 取值有replicating、multiplexing
selector.* | | 依赖于selector.type
interceptors | - | 拦截器,可配置多个,使用空格分割。
配置示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
4.Taildir Source
这种方式会监听指定的文件,并以接近实时的频率去读取文件新产生的行。这种方式是可靠的并且不会丢失数据,因为在读取文件的过程中,会周期的记录文件读取的位置到一个json格式的文件中。当agent出现问题重启时会去读取位置文件中的信息,然后重新传输,也可以从指定的位置进行传输。taildir source不支持传输二进制的文件。
Taildir source的属性配置如下:
属性 | 默认值 | 说明
-- | -- | --
channels | - | channel,可以为多个
type | - | 组件的类型名称,这里为TAILDIR
filegroups | - | 空格分割的多个文件组,每个组中有一组需要被tail的文件。
filegroupName | - | 文件组的路径。
positionFile | ~/.flume/taildir_position.json | 用于tail文件时记录新的位置。
headers | - | 多个header可以用来标识一个文件组
byteOffsetHeader | false | 是否为已经tail的行添加一个叫byteoffset的字节偏移
skipToEnd | false | 在位置文件没有写入的情况下是否跳过位置信息直接到末尾
idleTimeout | 120000 | 当一个文件不活动的时间达到idleTimeout指定的时间,
则关闭对该文件的传输,
当有新的行写入时,会自动重新传输。
writePosInterval | 3000 | 将每个文件后一次读取位置写入位置文件的间隔时间。
batchSize | 100 | 一次读取并发送到channel的文件行数的大小。
backoffSleepIncrement | 1000 | 上一次没有读取到新数据,再一次读取的时延的增量,单位毫秒。
maxBackoffSleep | 5000 |上一次没有读取到新数据,再一次读取数据时间隔的大时间,单位毫秒。
cachePatternMatching | true | 当一个目录有成千上万个文件,使用正则表达式匹配比较耗时,
缓存文件列表能改善这个问题。文件消耗的顺序也会被缓存
要求文件系统保持至少一秒的间隔去跟踪文件的修改时间。
fileHeader | false | 是否添加一个header去存储文件的路径。
fileHeaderKey | file | 上述header的key
配置示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true