博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
064 UDF
阅读量:6132 次
发布时间:2019-06-21

本文共 10862 字,大约阅读时间需要 36 分钟。

一:UDF

1.自定义UDF

  

二:UDAF 

2.UDAF

  

 

3.介绍AbstractGenericUDAFResolver

  

 

4.介绍GenericUDAFEvaluator

  

5.程序

1 package org.apache.hadoop.hive_udf;  2   3 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;  4 import org.apache.hadoop.hive.ql.metadata.HiveException;  5 import org.apache.hadoop.hive.ql.parse.SemanticException;  6 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;  7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;  8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;  9 import org.apache.hadoop.hive.serde2.io.DoubleWritable; 10 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 11 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; 12 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector; 13 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 14 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; 15 import org.apache.hadoop.io.LongWritable; 16  17 /** 18  *  19  * 需求:实现sum函数,支持int和double类型 20  * 21  */ 22  23 public class UdafProject extends AbstractGenericUDAFResolver{ 24     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) 25             throws SemanticException { 26         //判断参数是否是全部列 27         if(info.isAllColumns()){ 28             throw new SemanticException("不支持*的参数"); 29         } 30          31         //判断是否只有一个参数 32         ObjectInspector[] inspector = info.getParameterObjectInspectors(); 33         if(inspector.length != 1){ 34             throw new SemanticException("参数只能有一个"); 35         } 36         //判断输入列的数据类型是否为基本类型 37         if(inspector[0].getCategory() != ObjectInspector.Category.PRIMITIVE){ 38             throw new SemanticException("参数必须为基本数据类型"); 39         } 40          41         AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector) inspector[0]; 42          43         //判断是那种基本数据类型 44          45         switch(woi.getPrimitiveCategory()){ 46         case INT: 47         case LONG: 48         case BYTE: 49         case SHORT: 50             return new udafLong(); 51         case FLOAT: 52         case DOUBLE: 53             return new udafDouble(); 54             default: 55                 throw new SemanticException("参数必须是基本类型,且不能为string等类型"); 56          57          58         } 59            60     } 61      62     /** 63      * 对整形数据进行求和 64      */ 65     public static class udafLong extends  GenericUDAFEvaluator{ 66          67         //定义输入数据类型 68         public  PrimitiveObjectInspector inputor; 69          70         //实现自定义buffer 71         static class sumlongagg implements AggregationBuffer{ 72             long sum; 73             boolean empty; 74         } 75          76         //初始化方法 77         @Override 78         public ObjectInspector init(Mode m, ObjectInspector[] parameters) 79                 throws HiveException { 80             // TODO Auto-generated method stub 81              82             super.init(m, parameters); 83             if(parameters.length !=1 ){ 84                 throw new UDFArgumentException("参数异常"); 85             } 86             if(inputor == null){ 87                 this.inputor = (PrimitiveObjectInspector) parameters[0]; 88             } 89             //注意返回的类型要与最终sum的类型一致 90             return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 91         } 92  93         @Override 94         public AggregationBuffer getNewAggregationBuffer() throws HiveException { 95             // TODO Auto-generated method stub 96             sumlongagg slg = new sumlongagg(); 97             this.reset(slg); 98             return slg; 99         }100 101         @Override102         public void reset(AggregationBuffer agg) throws HiveException {103             // TODO Auto-generated method stub104             sumlongagg slg = (sumlongagg) agg;105             slg.sum=0;106             slg.empty=true;107         }108 109         @Override110         public void iterate(AggregationBuffer agg, Object[] parameters)111                 throws HiveException {112             // TODO Auto-generated method stub113             if(parameters.length != 1){114                 throw new UDFArgumentException("参数错误");115             }116             this.merge(agg, parameters[0]);117             118         }119 120         @Override121         public Object terminatePartial(AggregationBuffer agg)122                 throws HiveException {123             // TODO Auto-generated method stub124             return this.terminate(agg);125         }126 127         @Override128         public void merge(AggregationBuffer agg, Object partial)129                 throws HiveException {130             // TODO Auto-generated method stub131             sumlongagg slg = (sumlongagg) agg;132             if(partial != null){133                 slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputor);134                 slg.empty=false;135             }136         }137 138         @Override139         public Object terminate(AggregationBuffer agg) throws HiveException {140             // TODO Auto-generated method stub141             sumlongagg slg = (sumlongagg) agg;142             if(slg.empty){143                 return null;144             }145             return new LongWritable(slg.sum);146         }147         148     }149     150     /**151      * 实现浮点型的求和152      */153     public static class udafDouble extends GenericUDAFEvaluator{154         155         //定义输入数据类型156         public  PrimitiveObjectInspector input;157         158         //实现自定义buffer159         static class sumdoubleagg implements AggregationBuffer{160             double sum;161             boolean empty;162         }163         164         //初始化方法165         @Override166         public ObjectInspector init(Mode m, ObjectInspector[] parameters)167                 throws HiveException {168             // TODO Auto-generated method stub169             170             super.init(m, parameters);171             if(parameters.length !=1 ){172                 throw new UDFArgumentException("参数异常");173             }174             if(input == null){175                 this.input = (PrimitiveObjectInspector) parameters[0];176             }177             //注意返回的类型要与最终sum的类型一致178             return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;179         }180         181         182 183         @Override184         public AggregationBuffer getNewAggregationBuffer() throws HiveException {185             // TODO Auto-generated method stub186             sumdoubleagg sdg = new sumdoubleagg();187             this.reset(sdg);188             return sdg;189         }190 191         @Override192         public void reset(AggregationBuffer agg) throws HiveException {193             // TODO Auto-generated method stub194             sumdoubleagg sdg = (sumdoubleagg) agg;195             sdg.sum=0;196             sdg.empty=true;197         }198 199         @Override200         public void iterate(AggregationBuffer agg, Object[] parameters)201                 throws HiveException {202             // TODO Auto-generated method stub203             if(parameters.length != 1){204                 throw new UDFArgumentException("参数错误");205             }206             this.merge(agg, parameters[0]);207         }208 209         @Override210         public Object terminatePartial(AggregationBuffer agg)211                 throws HiveException {212             // TODO Auto-generated method stub213             return this.terminate(agg);214         }215 216         @Override217         public void merge(AggregationBuffer agg, Object partial)218                 throws HiveException {219             // TODO Auto-generated method stub220             sumdoubleagg sdg =(sumdoubleagg) agg;221             if(partial != null){222                 sdg.sum += PrimitiveObjectInspectorUtils.getDouble(sdg, input);223                 sdg.empty=false;224             }225         }226 227         @Override228         public Object terminate(AggregationBuffer agg) throws HiveException {229             // TODO Auto-generated method stub230             sumdoubleagg sdg = (sumdoubleagg) agg;231             if (sdg.empty){232                 return null;233             }234             return new DoubleWritable(sdg.sum);235         }236         237     }238 239 }

 

6.打成jar包

  并放入路径:/etc/opt/datas/

 

7.添加jar到path

  格式:

    add jar linux_path;

  即:

    add jar /etc/opt/datas/af.jar

 

8.创建方法

  create temporary function af as 'org.apache.hadoop.hive_udf.UdafProject';

 

9.在hive中运行

  select sum(id),af(id) from stu_info;

 

三:UDTF

1.UDTF

  

 

2.程序 

1 package org.apache.hadoop.hive.udf; 2  3 import java.util.ArrayList; 4  5 import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 6 import org.apache.hadoop.hive.ql.metadata.HiveException; 7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; 8 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 9 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;10 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;11 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;12 13 public class UDTFtest extends GenericUDTF {14 15     @Override16     public StructObjectInspector initialize(StructObjectInspector argOIs)17             throws UDFArgumentException {18         // TODO Auto-generated method stub19         if(argOIs.getAllStructFieldRefs().size() != 1){20             throw new UDFArgumentException("参数只能有一个");21         }22         ArrayList
fieldname = new ArrayList
();23 fieldname.add("name");24 fieldname.add("email");25 ArrayList
fieldio = new ArrayList
();26 fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);27 fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);28 29 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldname, fieldio);30 }31 32 @Override33 public void process(Object[] args) throws HiveException {34 // TODO Auto-generated method stub35 if(args.length == 1){36 String name = args[0].toString();37 String email = name + "@ibeifneg.com";38 super.forward(new String[] {name,email});39 }40 }41 42 @Override43 public void close() throws HiveException {44 // TODO Auto-generated method stub45 super.forward(new String[] {"complete","finish"});46 }47 48 }

 

3.同样的步骤

 

4.在hive中运行

  select tf(ename) as (name,email) from emp;

 

转载地址:http://suaua.baihongyu.com/

你可能感兴趣的文章
经典计算机算法设计方法(2) -- 回溯法
查看>>
radio选择
查看>>
JS中关于clientWidth offsetWidth scrollWidth 等的含义
查看>>
1、NIO--NIO和IO的区别
查看>>
ArcGIS for qml -添加自由文本
查看>>
Apache Shiro 使用手册(二) Shiro 认证
查看>>
Hibernate的七种映射关系之七种关联映射(一)
查看>>
Centos7 yum安装chrome浏览器
查看>>
go http
查看>>
设计模式-单例模式
查看>>
第5次作业+105032014108+曾宏宇
查看>>
IOS 获取 文件(UIImage) 创建时间
查看>>
26 Arcpy跳坑系列——ExportToPNG
查看>>
Java关闭线程的安全方法
查看>>
Python全栈开发-Day1-Python基础1
查看>>
RequireJS -Javascript模块化(一、简介)
查看>>
Vue界面中关于APP端回调方法问题
查看>>
jQuery同步Ajax带来的UI线程阻塞问题及解决办法
查看>>
程序的模块化的一些见解2
查看>>
曲演杂坛--蛋疼的ROW_NUMBER函数
查看>>