Flume Query拦截器组件

时间:2022-12-15   阅读:78

拦截器效果

Flume官方Static拦截器如需设置多组键值:

    
        agent.sources.r.interceptors = i1 i2 i3 agent.sources.r.interceptors.i1.type
        =
         static 
        agent.sources.r.interceptors.i1.key = country agent.sources.r.interceptors.i1.
         value 
        = china agent.sources.r.interceptors.i2.type =
         static 
        agent.sources.r.interceptors.i2.key = province agent.sources.r.interceptors.i2.
         value 
        = fujian agent.sources.r.interceptors.i3.type =
         static 
        agent.sources.r.interceptors.i3.key = city agent.sources.r.interceptors.i3.
         value 
        = xiamen
    

Query拦截器(Query Interceptor)的等效配置:

Flume Query拦截器组件

    
        agent.sources.r.interceptors = i agent.sources.r.interceptors.i.type =
        QueryInterceptor$Builder agent.sources.r.interceptors.i.query = country=china&province=fujian&city=xiamen
    

拦截器配置

参数 说明
type 固定值:QueryInterceptor$Builder
query 格式:key1=val1&key2=val2&key3=val3

创建Maven项目

pom.xml中加入Flume依赖:

    
         <  dependencies  > 
         <  dependency  > 
         <  groupId  > 
        org.apache.flume
         </  groupId  > 
         <  artifactId  > 
        flume-ng-core
         </  artifactId  > 
         <  version  > 
        1.7.0
         </  version  > 
         </  dependency  > 
         </  dependencies  > 
    

开发拦截器

自定义拦截器实现 org.apache.flume.interceptor.Interceptor 接口:

    
         import 
        java.util.Map;
         import 
        java.util.List;
         import 
        org.apache.flume.Event;
         import 
        org.apache.flume.Context;
         import 
        org.apache.flume.interceptor.Interceptor;
         public 
          class   QueryInterceptor   implements   Interceptor  
        {
         private 
         final 
        String query;
         //定义key的名称为query 
         public 
         static 
          class   Constants  
        {
         public 
         static 
         final 
        String KEY =
         "query" 
        ; }
         //Builder类(用于Flume获取拦截器实例) 
         public 
         static 
          class   Builder   implements   Interceptor  .  Builder  
        {
         private 
        String query;
         //返回拦截器实例 
          public  Interceptor  build   ()  
        {
         return 
         new 
        QueryInterceptor(query); }
         //读取Flume配置 
          public   void   configure   (Context context)  
        { query = context.getString(Constants.KEY);
         //读取query的值 
        } }
         //得到Builder类传来的query值 
          private   QueryInterceptor   (String query)  
        {
         this 
        .query = query; }
         //初始化操作(无) 
          public   void   initialize   ()  
        { }
         //遍历Events并逐个调用intercept方法 
          public  List<Event>  intercept   (List<Event> events)  
        {
         for 
        (Event e : events) { intercept(e); }
         return 
        events; }
         //解析query数据并放入header 
          public  Event  intercept   (Event event)  
        { Map<String, String> headers = event.getHeaders(); String[] params
        = query.split(
         "&" 
        );
         for 
        (String param : params) { String[] kv = param.split(
         "=" 
        );
         if 
        (kv.length ==
         2 
        ) { headers.put(kv[
         0 
        ], kv[
         1 
        ]); } }
         return 
        event; }
         //关闭时的操作(无) 
          public   void   close   ()  
        { } }
    

启用拦截器

生成Jar包

编译项目生成jar包(依赖不用打包),把jar包放进$FLUME_HOME/plugins.d/QueryInterceptor/lib目录(需创建)

Flume配置

自定义拦截器的type项配置如果有package包名也需写上,并用$分割加上自定义的Builder类名:

    
         # source 
        agent.sources = r agent.sources.r.type =
         exec 
        agent.sources.r.command = tail -F /tmp/test.log agent.sources.r.interceptors
        = i agent.sources.r.interceptors.i.type = QueryInterceptor$Builder agent.sources.r.interceptors.i.query
        = country=china&province=fujian&city=xiamen agent.sources.r.channels
        = c
         # channel 
        agent.channels = c agent.channels.c.type = memory
         # sink 
        agent.sinks = k agent.sinks.k.channel = c agent.sinks.k.type = logger
    

启动Flume

启动Flume时设置-DFlume.root.logger参数:

    
        $FLUME_HOME/bin/flume-ng agent -c
         <  Flume配置目录  > 
        -f
         <  配置文件路径  > 
        -n agent -Dflume.root.logger=INFO,console
    

往/tmp/test.log文件写数据,在控制台可以看到如下信息:

    
        ...
         Event 
        : {
         headers 
        :{country=china, province=fujian, city=xiamen}
         body 
        : ...
    

上一篇:Flask 微信卡券小项目从开发到上线

下一篇:分享一些Typecho中常用的调用函数(欢迎使用 Typecho)

相关网站

网友评论