傳智播客旗下品牌:|||||

全國咨詢/投訴熱線:400-618-4000

Apache Flume timestamp和host攔截器使用

更新時間:2019年11月08日14時53分 來源:傳智播客 瀏覽次數:

一、 Flume攔截器介紹

攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的時間,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。

flume內置了很多攔截器,并且會定期的添加一些攔截器,下面我們學習flume內置的,兩個經常使用的攔截器。

1. Timestamp Interceptor(時間戳攔截器)

flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置。

參數默認值描述type類型名稱timestamp,也可以使用類名的全路徑。preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值。

a1.sources.r1.interceptors = timestamp

a1.sources.r1.interceptors.timestamp.type=timestamp

a1.sources.r1.interceptors.timestamp.preserveExisting=false

2. Host Interceptor(主機攔截器)

主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。時間報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置參數 默認值 描述 type 類型名稱host hostHeader host 事件投的key useIP true 如果設置為false,host鍵插入主機名 preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換host報頭的值

a1.sources.r1.interceptors = host

a1.sources.r1.interceptors.host.type=host

a1.sources.r1.interceptors.host.useIP=false

a1.sources.r1.interceptors.timestamp.preserveExisting=true

二、 業務需求

使用flume內置攔截器完成如下需求:

Flume 攔截器1

1. agent配置


# 03 timestamp and host interceptors work before source
a1.sources.r1.interceptors = i1
i2  # 兩個interceptor串聯,依次作用于event
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i2.type = host
# flume event的頭部將添加 “hostname”:實際主機名
a1.sources.r1.interceptors.i2.hostHeader = hostname  # 指定key,value將填充為flume agent所在節點的主機名
a1.sources.r1.interceptors.i2.useIP = false  # IP和主機名,二選一即可

# 04 hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs: // master:9000 / flume / % Y - % m - % d /  
# hdfs sink將根據event header中的時間戳進行替換
# hostHeader的值保持一致,hdfs sink將提取eventkeyhostnmae的值,基于該值創建文件名前綴
a1.sinks.k1.hdfs.filePrefix = % {hostname}  # hdfs sink將根據event header中的hostnmae對應的value進行替換
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 10
a1.sinks.k1.hdfs.rollSize = 1024000

# channel,memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind source,sink to channel
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

三、 驗證攔截器效果

1. 驗證思路

1)先將interceptor作用后的event,通過logger sink打印到console,驗證header是否正常添加

2)修改sink為hdfs, 觀察目錄和文件的名稱是否能夠按照預期創建(時間戳-目錄,hostname-文件前綴)

2. 驗證過程

1)發送header為空的http請求,logger sink打印event到終端,觀察event header中是否被添加了timestamp以及hostname


# 01 define agent name, source/sink/channel name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 02 source,http,jsonhandler
a1.sources.r1.type = http
a1.sources.r1.bind = node - 1
a1.sources.r1.port = 8888
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

# 03 timestamp and host interceptors work before source
a1.sources.r1.interceptors = i1
i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false

# 04 hdfs sink
a1.sinks.k1.type = logger

# channel,memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind source,sink to channel 
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

啟動flume agent


bin/flume-ng agent -c ./conf -f ./conf/http_sink_logger_source.conf -n a1 -Dflume.root.logger=INFO,console

發送請求測試:


curl -X POST -d '[{"header":{},"body":"time-host-interceptor001"}]' http://node-1:8888

可以看到終端輸出的event header中已經有了攔截器的信息

Flume 攔截器2

修改sink為hdfs, 觀察HDFS的目錄名(時間戳)和文件前綴(hostnme)

flume攔截器3


目錄名被正常替換(基于event header中的時間戳)

flume攔截器4

文件前綴被正常替換(基于event header中的hostname:實際主機名)

flume 攔截器5

文件內容被寫入為event的body

Flume攔截器6

本文來自:傳智播客

javaee

python

web

ui

cloud

test

c

netmarket

pm

Linux

movies

robot

uids

北京校區

    14天免費試學

    基礎班入門課程限時免費

    申請試學名額

    15天免費試學

    基礎班入門課程限時免費

    申請試學名額

    15天免費試學

    基礎班入門課程限時免費

    申請試學名額

    15天免費試學

    基礎班入門課程限時免費

    申請試學名額

    20天免費試學

    基礎班入門課程限時免費

    申請試學名額

    8天免費試學

    基礎班入門課程限時免費

    申請試學名額

    20天免費試學

    基礎班入門課程限時免費

    申請試學名額

    5天免費試學

    基礎班入門課程限時免費

    申請試學名額

    0天免費試學

    基礎班入門課程限時免費

    申請試學名額

    12天免費試學

    基礎班入門課程限時免費

    申請試學名額

    5天免費試學

    基礎班入門課程限時免費

    申請試學名額

    5天免費試學

    基礎班入門課程限時免費

    申請試學名額

    10天免費試學

    基礎班入門課程限時免費

    申請試學名額
    2019年信誉网赚网站 做网赚新手先做什么 加拿大28 上海快3 2019做什么网赚 什么都不会怎么网赚 2019灰色暴力网赚 大资本登录 幸运快乐8 上网赚美金平台