hadoop四----数据收集flume
Flume是一个分布式的、可靠的、可用的服务,用于从许多不同的源上有效地搜集、汇总、移动大量数据日志到一个集中式的数据存储中。并且它是一个简单的和灵活的基于流的数据流架构。它具有鲁棒性和容错机制以及故障转移和恢复的机制。对于分析的应用中它使用一个简单的可扩展的数据模型。Flume传输的数据可以是网络,媒体等产生。
创新互联网站建设由有经验的网站设计师、开发人员和项目经理组成的专业建站团队,负责网站视觉设计、用户体验优化、交互设计和前端开发等方面的工作,以确保网站外观精美、成都网站制作、成都网站建设、外贸营销网站建设易于使用并且具有良好的响应性。
Apache Flume是Apache软件基金会的一个顶级项目。
源-Source,接收器-Sink,通道-Channel
flume是cloudera公司的一款高性能、高可能的分布式日志收集系统。
flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
flume传输的数据的基本单位是event,如果是文本文件,通常是一行记录,这也是事务的基本单位。
flume运行的核心是agent。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。通过这些组件,event可以从一个地方流向另一个地方
Apache Flume系统需求:
Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended)
Memory - Sufficient memory for configurations used by sources, channels or sinks
Disk Space - Sufficient disk space for configurations used by channels or sinks
Directory Permissions - Read/Write permissions for directories used by agent
数据流
Flume事件被定义为一个单位的数据流量有一个字节的有效载荷和一个可选字符串属性。Flume代理(JVM)的过程中,承载组件,通过这些事件流从外部源的下一个目的地(跳)。
多agent模式:
一对多路输出模型:
source:
Client端操作消费数据的来源,Flume支持Avro,log4j,syslog和http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以写一个Source,以IPC或RPC的方式接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议。具体代码级别的Client端数据接入,可以参考官方手册。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
对于直接读取文件Source,有两种方式:
ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail -F 文件名指令,在这种方式下,取的文件名必须是指定的。 ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
SpoolSource:监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。 在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
channel:
Channel有多种方式:有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。
MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
sink:
Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
一个web服务器的产生的事件由 Flume源消耗。外部源发送事件发送到Flume中,会带着一个识别的格式。例如: 例如:一个Avro Flume源可以用来接收从Avro clients 或其他flume代理从Avro link发送事件。当一个Flume 源接收一个事件,他会存储到一个活多个channels中,这些channel会一直保存着event,直到被Flume sink消费处理掉,例如JDBC Channel作为一个例子-它使用一个文件系统支持嵌入式数据库,sink从channel中移除事件,同时放入到一个外部的仓库,比如HDFS,或者流转到下一个Flume source 源,source和sink在agent中是以异步运行方式运行事件。
复杂数据流:
Flume到达最终目的地之前,允许用户建立多跳流活动,通过多个代理。对于失败的每一跳它还允许fan-in和fan-out flows,内容路由和备份路由失败(故障转移)。
可靠性:
Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event会被成功存储起来。而Channel的多种实现在可恢复性上有不同的保证。也保证了event不同程度的可靠性。比如Flume支持在本地保存一份文件channel作为备份,而memory channel将event存在内存queue里,速度快,但丢失的话无法恢复。
具体看一下Transaction。Source和Sink封装了Channel提供的对Event的事务存、取接口,下图为一个transaction过程:
一
一个Channel的实现里会包括一个transaction的实现,每个与channel打交道的source和sink都得带有一个transaction对象。下面的例子中可以看到一个Event的状态和变化会在一次transation中完成。transaction的状态也对应了时序图中的各个状态。
故障恢复:
在每一个agent的channel中的event都可以在失败中恢复。Flume支持持久文件channel(本地文件系统必须支持)内存channel仅仅将event存储在内存队列中,这样内存中的event一旦丢失就不能恢复。
配置一个代理(agent)
Flume代理配置存储在本地配置文件。这是一个文本文件格式如下Java属性文件格式。在相同的配置文件,可以指定一个或多个代理的配置。配置文件包括每个源,接收器和代理渠道的性质和它们连接在一起,形成数据流。
配置单个组件
流中每个组件(源,接收器或通道)的名称,类型,和一组特定的类型和实例的属性。例如 Avro源需要一个主机名(或IP地址)和接收数据的端口号。一个内存通道可以有最大队列大小(“能力”),HDFS的散热器需要知道文件系统的URI,路径创建文件,文件的旋转频率(“hdfs.rollInterval”)等,所有这些组件的属性需要设置在托管 Flume 代理的属性文件。
组合组件(Wiring the pieces together)
代理需要知道什么加载各个组件以及它们是如何连接,以构成的流动。这是通过列出的源,汇和代理渠道的名称,然后指定每个接收器和源的连接通道。例如,代理到HDFS flume HDFS cluster1中通过JDBC JDBC通道通道流动称为avroWeb Avro 源的事件。该配置文件将包含这些组件和JDBC通道为avroWeb源和HDFS cluster1中汇作为共享信道的名称。
启动代理(starting an agent)
代理人是开始使用shell脚本称为flume-NG是位于flume分布在bin目录。你需要在命令行上指定的代理的名称,config目录,配置文件:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
现在,代理将开始运行的源和汇的配置在给定的属性文件。
A simple example
在这里,我们举一个例子,配置文件,描述一个单节点的Flume部署。这种配置可以让用户生成的事件和随后输出到控制台。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这个配置定义了一个单一的代理,称为agent1。 agent1监听44444端口,通道缓存在内存中事件数据,事件数据记录到控制台和一个接收器上的数据源。配置文件名的各个组成部分,然后介绍了他们的类型和配置参数。一个给定的配置文件可能会定义多个命名的代理人;一个给定的Flume进程启动时传递一个标志,告诉它的具名代理体现。
结合此配置文件,我们启动Flume按如下参数:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
请注意,在完整部署,我们通常会包括一个选项: - CONF=
我们可以从一个单独的终端,然后telnet端口44444和发送flume事件:
$ telnet localhost 44444Trying 127.0.0.1...Connected to localhost.localdomain (127.0.0.1).Escape character is '^]'.Hello world!OK
他原来的flume终端输出日志信息的事件。
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
至此,你已经成功地配置和部署了一个flume代理!随后的章节涵盖更详细的代理配置。
数据获取
flume支持从从外部数据源获取数据的机制。
RPC
在flume中 ,Avro客户端使用AVRO RPC机制可以发送一个给定的文件 Avro 源:
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上面的命令将发送的/ usr/logs/log.10的内容到 flume源监听端
Executing commands
还有一个exec执行一个给定的命令获得输出的源。一个单一的输出,即“line”。回车('\ R')或换行符('\ N'),或两者一起的文本。
注:Flume不支持tail做为一个源,不过可以通过exec tail
Network streams
Flume支持以下的机制,从流行的日志流类型读取数据
Avro
Syslog
Netcat
定义流
在一个单一的代理定义的流,你需要通过一个通道的来源和接收器链接。你需要列出源,接收器和通道,为给定的代理,然后指向源和接收器及通道。一个源的实例可以指定多个通道,但只能指定一个接收器实例通道。格式如下:
# list the sources, sinks and channels for the agent.sources =
例如一个代理名为weblog-agent,外部通过avro客户端,并且发送数据通过内存通道给hdfs。在配置文件weblog.config的可能看起来像这样:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
这将使事件流从avro-AppSrv-source到hdfs-Cluster1-sink通过内存通道mem-channel-1。当代理开始weblog.config作为其配置文件,它会实例化流。
配置单个组件
定义流之后,你需要设置每个源,接收器和通道的属性。可以分别设定组件的属性值。
# properties for sources.sources. . =
# properties for channels.channel. . =
# properties for sinks.sources. . =
“type”属性必须为每个组件设置,以了解它需要什么样的对象。每个源,接收器和通道类型有其自己的一套,它所需的性能,以实现预期的功能。所有这些,必须根据需要设置。在前面的例子中,我们拿到从hdfs-Cluster1-sink中的流到HDFS,通过内存通道mem-channel-1的avro-AppSrv-source源。下面是一个例子,显示了这些组件的配置。
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
在一个代理中添加多个流
单个Flume代理可以包含几个独立的流。你可以在一个配置文件中列出多个源,接收器和通道。这些组件可以连接形成多个流。
# list the sources, sinks and channels for the agent.sources = .sinks = .channels =
那么你就可以连接源和接收器到其相应的通道,设置两个不同的流。例如,如果您需要设置一个weblog代理两个流,一个从外部Avro客户端到HDFS,另外一个是tail的输出到Avro接收器,然后在这里是做一个配置:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
配置多代理流程
设置一个多层的流,你需要有一个指向下一跳avro源的第一跳的avro 接收器。这将导致第一Flume代理转发事件到下一个Flume代理。例如,如果您定期发送的文件,每个事件(1文件)AVRO客户端使用本地Flume代理,那么这个当地的代理可以转发到另一个有存储的代理。
Weblog agent config:
list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000
# configure other pieces
#...
HDFS agent config:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces
#...
这里我们连接从weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最终结果从外部源的appserver最终存储在HDFS的事件。
Fan out flow
Flume支持Fan out流从一个源到多个通道。有两种模式的Fan out,分别是复制和复用。在复制的情况下,流的事件被发送到所有的配置通道。在复用的情况下,事件被发送到可用的渠道中的一个子集。Fan out流需要指定源和Fan out通道的规则。这是通过添加一个通道“选择”,可以复制或复。再进一步指定选择的规则,如果它是一个多路。如果你不指定一个选择,则默认情况下它复制
# List the sources, sinks and channels for the agent.sources = .sinks = .channels =
# set list of channels for source (separated by space).sources. .channels =
# set channel for sinks.sinks. .channel = .sinks. .channel =
.sources. .selector.type = replicating
复用的选择集的属性进一步分叉。这需要指定一个事件属性映射到一组通道。选择配置属性中的每个事件头检查。如果指定的值相匹配,那么该事件被发送到所有的通道映射到该值。如果没有匹配,那么该事件被发送到设置为默认配置的通道。
# Mapping for multiplexing selector.sources. .selector.type = multiplexing .sources. .selector.header = .sources. .selector.mapping. = .sources. .selector.mapping. = .sources. .selector.mapping. =
#...
.sources. .selector.default =
映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。下面的示例中有一个单一的流复用两条路径。代理有一个单一的avro源和连接道两个接收器的两个通道。
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
“State”作为Header的选择检查。如果值是“CA”,然后将其发送到mem-channel-1,如果它的“AZ”的,那么jdbc-channel-2,如果它的“NY”那么发到这两个。如果“State”头未设置或不匹配的任何三个,然后去默认的mem-channel-1通道。
Flume Sources
Avro Source
Avro端口监听并接收来自外部的Avro客户流的事件。当内置AvroSink另一个(前跳)Flume代理,它可以创建分层集合配对拓扑。
Example for agent named agent_foo:
a1.sources = r1a1.channels = c1a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySourcea1.sources.r1.host = 0.0.0.0a1.sources.r1.bind = 6666a1.sources.r1.channels = c1
Exec Source
此源启动运行一个给定的Unix命令,预计这一过程中不断产生标准输出(stderr被简单地丢弃,除非logStdErr= TRUE)上的数据。如果因任何原因的进程退出时,源也退出,并不会产生任何进一步的数据。
备注: 在ExecSource不能保证,如果有一个失败的放入到通道的事件,客户也知道。在这种情况下,数据将丢失。
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
shell:
agent_foo.sources.tailsource-1.type = exec
agent_foo.sources.tailsource-1.shell = /bin/bash -c
agent_foo.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS source:
Required properties are in bold.
Converter:
Example for agent named a1:
a1.sources = r1a1.channels = c1a1.sources.r1.type = jmsa1.sources.r1.channels = c1a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactorya1.sources.r1.connectionFactory = GenericConnectionFactorya1.sources.r1.providerURL = tcp://mqserver:61616a1.sources.r1.destinationName = BUSINESS_DATAa1.sources.r1.destinationType = QUEUE
Spooling Directory Source
Example for an agent named agent-1:
agent-1.channels = ch-1
agent-1.sources = src-1
agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
agent-1.sources.src-1.fileHeader = true
书写agent配置
使用flume的核心是如何配置agent文件。agent的配置是一个普通文本文件,使用键值对形式存储配置信息,可以设置多个agent信息。配置的内容包括source、channel、sink等。组件source、channel、sink都有名称、类型和很多个性化的属性配置。
配置文件应该这么写
# list the sources, sinks and channels for the agent.sources = .sinks = .channels =
# set channel for source.sources. .channels = ...
# set channel for sink.sinks. .channel =
# properties for sources.sources. . =
# properties for channels.channel. . =
# properties for sinks.sources. . =
# 下面是示例
#下面的agent1是代理名称,对应有source,名称是src1,;有一个sink,名称是sink1;有一个channel,名称是ch2.
gent1.sources = src1
agent1.sinks = sink1
agent1.channels = ch3
# 配置目录 source,监控目录(必须存在)的变化,要求文件名必须唯一,否则flume报错
agent1.sources.src1.type = spooldir
agent1.sources.src1.channels = ch3
agent1.sources.src1.spoolDir = /root/hmbbs
agent1.sources.src1.fileHeader = false
agent1.sources.src1.interceptors = i1
agent1.sources.src1.interceptors.i1.type = timestamp
# 配置内存 channel
agent1.channels.ch2.type = memory
agent1.channels.ch2.capacity = 1000
agent1.channels.ch2.transactionCapacity = 1000
agent1.channels.ch2.byteCapacityBufferPercentage = 20
agent1.channels.ch2.byteCapacity = 800000
# 配置文件 channel
agent1.channels.ch3.type = file
agent1.channels.ch3.checkpointDir = /root/flumechannel/checkpoint
agent1.channels.ch3.dataDirs = /root/flumechannel/data
# 配置hdfs sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = ch3
agent1.sinks.sink1.hdfs.path = hdfs://hadoop0:9000/flume/%Y-%m-%d/
agent1.sinks.sink1.hdfs.rollInterval=1
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
# 配置hbase sink
#配置hbase sink2
agent1.sinks.sink2.type = hbase
agent1.sinks.sink2.channel = channel1
agent1.sinks.sink2.table = hmbbs
agent1.sinks.sink2.columnFamily = cf
agent1.sinks.sink2.serializer = flume.HmbbsHbaseEventSerializer
agent1.sinks.sink2.serializer.suffix = timestamp
agent1.sinks.sink2.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
5.启动代理的脚本是flume-ng agent,需要指定agent name、配置目录、配置文件
-n 指定agent名称
-c 指定配置文件目录
-f 指定配置文件
-Dflume.root.logger=DEBUG,console
因此完整的启动命令应该这么写
bin/flume-ng agent –n agent1 –c conf –f conf/example –Dflume.root.logger=DEBUG,console
启动成功后,可以向目录/root/hmbbs中放入文件,flume会感知到新文件,然后上传到hdfs的/flume目录下。
网站题目:hadoop四----数据收集flume
URL网址:http://ybzwz.com/article/iedeec.html