一:UDF
1.自定义UDF
二:UDAF
2.UDAF
3.介绍AbstractGenericUDAFResolver
4.介绍GenericUDAFEvaluator
5.程序
1 package org.apache.hadoop.hive_udf; 2 3 import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 4 import org.apache.hadoop.hive.ql.metadata.HiveException; 5 import org.apache.hadoop.hive.ql.parse.SemanticException; 6 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; 7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; 8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; 9 import org.apache.hadoop.hive.serde2.io.DoubleWritable; 10 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 11 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; 12 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector; 13 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 14 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; 15 import org.apache.hadoop.io.LongWritable; 16 17 /** 18 * 19 * 需求:实现sum函数,支持int和double类型 20 * 21 */ 22 23 public class UdafProject extends AbstractGenericUDAFResolver{ 24 public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) 25 throws SemanticException { 26 //判断参数是否是全部列 27 if(info.isAllColumns()){ 28 throw new SemanticException("不支持*的参数"); 29 } 30 31 //判断是否只有一个参数 32 ObjectInspector[] inspector = info.getParameterObjectInspectors(); 33 if(inspector.length != 1){ 34 throw new SemanticException("参数只能有一个"); 35 } 36 //判断输入列的数据类型是否为基本类型 37 if(inspector[0].getCategory() != ObjectInspector.Category.PRIMITIVE){ 38 throw new SemanticException("参数必须为基本数据类型"); 39 } 40 41 AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector) inspector[0]; 42 43 //判断是那种基本数据类型 44 45 switch(woi.getPrimitiveCategory()){ 46 case INT: 47 case LONG: 48 case BYTE: 49 case SHORT: 50 return new udafLong(); 51 case FLOAT: 52 case DOUBLE: 53 return new udafDouble(); 54 default: 55 throw new SemanticException("参数必须是基本类型,且不能为string等类型"); 56 57 58 } 59 60 } 61 62 /** 63 * 对整形数据进行求和 64 */ 65 public static class udafLong extends GenericUDAFEvaluator{ 66 67 //定义输入数据类型 68 public PrimitiveObjectInspector inputor; 69 70 //实现自定义buffer 71 static class sumlongagg implements AggregationBuffer{ 72 long sum; 73 boolean empty; 74 } 75 76 //初始化方法 77 @Override 78 public ObjectInspector init(Mode m, ObjectInspector[] parameters) 79 throws HiveException { 80 // TODO Auto-generated method stub 81 82 super.init(m, parameters); 83 if(parameters.length !=1 ){ 84 throw new UDFArgumentException("参数异常"); 85 } 86 if(inputor == null){ 87 this.inputor = (PrimitiveObjectInspector) parameters[0]; 88 } 89 //注意返回的类型要与最终sum的类型一致 90 return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 91 } 92 93 @Override 94 public AggregationBuffer getNewAggregationBuffer() throws HiveException { 95 // TODO Auto-generated method stub 96 sumlongagg slg = new sumlongagg(); 97 this.reset(slg); 98 return slg; 99 }100 101 @Override102 public void reset(AggregationBuffer agg) throws HiveException {103 // TODO Auto-generated method stub104 sumlongagg slg = (sumlongagg) agg;105 slg.sum=0;106 slg.empty=true;107 }108 109 @Override110 public void iterate(AggregationBuffer agg, Object[] parameters)111 throws HiveException {112 // TODO Auto-generated method stub113 if(parameters.length != 1){114 throw new UDFArgumentException("参数错误");115 }116 this.merge(agg, parameters[0]);117 118 }119 120 @Override121 public Object terminatePartial(AggregationBuffer agg)122 throws HiveException {123 // TODO Auto-generated method stub124 return this.terminate(agg);125 }126 127 @Override128 public void merge(AggregationBuffer agg, Object partial)129 throws HiveException {130 // TODO Auto-generated method stub131 sumlongagg slg = (sumlongagg) agg;132 if(partial != null){133 slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputor);134 slg.empty=false;135 }136 }137 138 @Override139 public Object terminate(AggregationBuffer agg) throws HiveException {140 // TODO Auto-generated method stub141 sumlongagg slg = (sumlongagg) agg;142 if(slg.empty){143 return null;144 }145 return new LongWritable(slg.sum);146 }147 148 }149 150 /**151 * 实现浮点型的求和152 */153 public static class udafDouble extends GenericUDAFEvaluator{154 155 //定义输入数据类型156 public PrimitiveObjectInspector input;157 158 //实现自定义buffer159 static class sumdoubleagg implements AggregationBuffer{160 double sum;161 boolean empty;162 }163 164 //初始化方法165 @Override166 public ObjectInspector init(Mode m, ObjectInspector[] parameters)167 throws HiveException {168 // TODO Auto-generated method stub169 170 super.init(m, parameters);171 if(parameters.length !=1 ){172 throw new UDFArgumentException("参数异常");173 }174 if(input == null){175 this.input = (PrimitiveObjectInspector) parameters[0];176 }177 //注意返回的类型要与最终sum的类型一致178 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;179 }180 181 182 183 @Override184 public AggregationBuffer getNewAggregationBuffer() throws HiveException {185 // TODO Auto-generated method stub186 sumdoubleagg sdg = new sumdoubleagg();187 this.reset(sdg);188 return sdg;189 }190 191 @Override192 public void reset(AggregationBuffer agg) throws HiveException {193 // TODO Auto-generated method stub194 sumdoubleagg sdg = (sumdoubleagg) agg;195 sdg.sum=0;196 sdg.empty=true;197 }198 199 @Override200 public void iterate(AggregationBuffer agg, Object[] parameters)201 throws HiveException {202 // TODO Auto-generated method stub203 if(parameters.length != 1){204 throw new UDFArgumentException("参数错误");205 }206 this.merge(agg, parameters[0]);207 }208 209 @Override210 public Object terminatePartial(AggregationBuffer agg)211 throws HiveException {212 // TODO Auto-generated method stub213 return this.terminate(agg);214 }215 216 @Override217 public void merge(AggregationBuffer agg, Object partial)218 throws HiveException {219 // TODO Auto-generated method stub220 sumdoubleagg sdg =(sumdoubleagg) agg;221 if(partial != null){222 sdg.sum += PrimitiveObjectInspectorUtils.getDouble(sdg, input);223 sdg.empty=false;224 }225 }226 227 @Override228 public Object terminate(AggregationBuffer agg) throws HiveException {229 // TODO Auto-generated method stub230 sumdoubleagg sdg = (sumdoubleagg) agg;231 if (sdg.empty){232 return null;233 }234 return new DoubleWritable(sdg.sum);235 }236 237 }238 239 }
6.打成jar包
并放入路径:/etc/opt/datas/
7.添加jar到path
格式:
add jar linux_path;
即:
add jar /etc/opt/datas/af.jar
8.创建方法
create temporary function af as 'org.apache.hadoop.hive_udf.UdafProject';
9.在hive中运行
select sum(id),af(id) from stu_info;
三:UDTF
1.UDTF
2.程序
1 package org.apache.hadoop.hive.udf; 2 3 import java.util.ArrayList; 4 5 import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 6 import org.apache.hadoop.hive.ql.metadata.HiveException; 7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; 8 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 9 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;10 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;11 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;12 13 public class UDTFtest extends GenericUDTF {14 15 @Override16 public StructObjectInspector initialize(StructObjectInspector argOIs)17 throws UDFArgumentException {18 // TODO Auto-generated method stub19 if(argOIs.getAllStructFieldRefs().size() != 1){20 throw new UDFArgumentException("参数只能有一个");21 }22 ArrayListfieldname = new ArrayList ();23 fieldname.add("name");24 fieldname.add("email");25 ArrayList fieldio = new ArrayList ();26 fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);27 fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);28 29 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldname, fieldio);30 }31 32 @Override33 public void process(Object[] args) throws HiveException {34 // TODO Auto-generated method stub35 if(args.length == 1){36 String name = args[0].toString();37 String email = name + "@ibeifneg.com";38 super.forward(new String[] {name,email});39 }40 }41 42 @Override43 public void close() throws HiveException {44 // TODO Auto-generated method stub45 super.forward(new String[] {"complete","finish"});46 }47 48 }
3.同样的步骤
4.在hive中运行
select tf(ename) as (name,email) from emp;