数据集:
下载地址:
ratings.csv
(base) [root@master data]# cat ratings.csv | head -5 1,31,2.5,1260759144 1,1029,3.0,1260759179 1,1061,3.0,1260759182 1,1129,2.0,1260759185 1,1172,4.0,1260759205
格式: userid , movieid , rating(评分) , timestamp
movies.csv
(base) [root@master data]# cat movies.csv | head -5movieId,title,genres1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy2,Jumanji (1995),Adventure|Children|Fantasy3,Grumpier Old Men (1995),Comedy|Romance4,Waiting to Exhale (1995),Comedy|Drama|Romance
格式: movieid , title , genres(流派)
创建表:
movie 表
create table movie_table(movieid string,title string,genres string)row format delimited fields terminated by ','stored as textfilelocation '/movie_table';
rating 表
create external table rating_table(userid string,movieid string,rating string,time string)row format delimited fields terminated by ','stored as textfilelocation '/rating_table'
查看表信息:
- 简易:
desc 表名;
- 详细:
desc formatted 表名;
检查:
select * from movie_table limit 10;
删除:
drop table movie_table;
数据导入:
- 从本地批量往table中导入数据
load data local inpath '/hive_data/ratings.csv' overwrite into table rating_table;
发现数据存在于HDFS的 /user/hive/warehouse/rating_table 目录
- 从HDFS批量往table中导入数据
load data inpath '/hive_data/ratings.csv' overwrite into table rating_table;
join 操作
hive> create table join_table as > select /* +MAPJOIN(b)*/ B.userid,A.title,B.rating > from movie_table A > join rating_table B > on A.movieid=B.movieid > limit 10;
导出数据
- 导出到本地:
insert overwrite local directory '/home/master/study/hive/tmp' select * from join_table;
- 导出到HDFS:
insert overwrite directory '/hive/tmp' select * from join_table;
partition
create external table rating_table_p(userid string,movieid string,rating string,time string)partitioned by(dt STRING)row format delimited fields terminated by '\t'lines terminated by '\n';
导入数据
load data inpath '/hive_data/2008-08.data' overwrite into table rating_table_p partition(dt='2008-08');
验证
(base) [root@master tmp]# hadoop fs -ls /home/hive/warehouse/rating_table_pFound 2 itemsdrwx-wx-wx - root supergroup 0 2019-09-22 23:50 /home/hive/warehouse/rating_table_p/dt=2003-10drwx-wx-wx - root supergroup 0 2019-09-22 23:53 /home/hive/warehouse/rating_table_p/dt=2008-08(base) [root@master tmp]# hadoop fs -ls /home/hive/warehouse/rating_table_p/dt=2008-08Found 1 items-rwx-wx-wx 2 root supergroup 16096 2019-09-22 22:14 /home/hive/warehouse/rating_table_p/dt=2008-08/2008-08.data(base) [root@master tmp]# hadoop fs -text /home/hive/warehouse/rating_table_p/dt=2008-08/2008-08.data | head -268 260 3.0 2008-0868 318 4.0 2008-08(base) [root@master tmp]#
删除分区
alter table rating_table_p drop if exists partition(dt='2008-08');
添加分区
alter table rating_table_p add if not exists partition(dt='2008-08');
Bucket
// 开启开关set hive.enforce.bucketing=true;create table rating_table_b(userid string,movieid string,rating string)clustered by (userid) into 16 buckets;
导入数据:
from rating_tableinsert overwrite table rating_table_bselect userid, movieid, rating;
验证
(base) [root@master tmp]# hadoop fs -ls /home/hive/warehouse/rating_table_bFound 16 items-rwx-wx-wx 2 root supergroup 74544 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000000_0-rwx-wx-wx 2 root supergroup 119542 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000001_0-rwx-wx-wx 2 root supergroup 83047 2019-09-23 00:06 /home/hive/warehouse/rating_table_b/000002_0-rwx-wx-wx 2 root supergroup 125551 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000003_0-rwx-wx-wx 2 root supergroup 113070 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000004_0-rwx-wx-wx 2 root supergroup 85340 2019-09-23 00:06 /home/hive/warehouse/rating_table_b/000005_0-rwx-wx-wx 2 root supergroup 82565 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000006_0.....
采样
select * from rating_table_b tablesample(bucket 3 out of 16 on userid) limit 10;
编程部分
1. UDF
JAVA部分:
pom.xml (网上找的模板)
4.0.0 11 11 1.0-SNAPSHOT org.apache.hive hive-exec 1.2.2
代码
import org.apache.hadoop.hive.ql.exec.UDF;public class Upper extends UDF{ public String evaluate(final String s){ return new String(s.toUpperCase()); }}
hive操作
添加 jar
hive> add jar /home/master/study/hive/1.jar;Added [/home/master/study/hive/1.jar] to class pathAdded resources: [/home/master/study/hive/1.jar]
创建函数
语法: create (可选:temporary) function 函数名 as '类名' (可选:using jar ' HDFS&Local 绝对路径') ; temporary 为临时函数,不带则永久。
hive> create temporary function upper_func as 'Upper';OKTime taken: 0.837 seconds
验证
hive> select movieid,upper_func(title) from movie_table limit 3;OKmovieId TITLE1 TOY STORY (1995)2 JUMANJI (1995)Time taken: 0.894 seconds, Fetched: 3 row(s)hive> select movieid,title from movie_table limit 3;OKmovieId title1 Toy Story (1995)2 Jumanji (1995)Time taken: 0.084 seconds, Fetched: 3 row(s)
2. UDAF
JAVA部分:
1、import org.apache.hadoop.hive.ql.exec.UDAF和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
3、Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数。
a)init函数实现接口UDAFEvaluator的init函数。
b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
e)terminate返回最终的聚集函数结果。
代码
import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;public class Avg extends UDAF { public static class AvgState{ private long mCount; private double mSum; } public static class AvgEvaluator implements UDAFEvaluator{ AvgState state; public AvgEvaluator(){ super(); state = new AvgState(); init(); } public void init() { state.mSum = 0; state.mCount = 0; } public boolean iterate(Double o){ if (o != null){ state.mSum += o; state.mCount ++; } return true; } public AvgState terminatePartial(){ return state.mCount == 0 ? null:state; } public boolean merge(AvgState o) { if (o != null){ state.mCount += o.mCount; state.mSum += o.mSum; } return true; } public Double terminate(){ return state.mCount == 0 ? null : Double.valueOf(state.mSum/state.mCount); } }}
hive操作
添加 jar
hive> add jar /home/master/study/hive/2.jar;
创建函数
hive> create temporary function avg_test as 'Avg';OK
验证
hive> select avg_test(rating) from rating_table;OK......Total MapReduce CPU Time Spent: 8 seconds 150 msecOK3.543608255669773Time taken: 55.696 seconds, Fetched: 1 row(s)
3. UDTF
JAVA部分:
一个 UDTF 必须继承 GenericUDTF 抽象类然后实现抽象类中的 initialize,process,和 close方法。其中,Hive 调用 initialize 方法来确定传入参数的类型并确定 UDTF 生成表的每个字段的数据类型(即输入类型和输出类型)。initialize 方法必须返回一个生成表的字段的相应的 object inspector。一旦调用了 initialize() ,Hive将把 UDTF 参数传给 process() 方法,调用这个方法可以产生行对象并将行对象转发给其他操作器。最后当所有的行对象都传递出 UDTF 调用 close() 方法。
代码
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;public class Expload extends GenericUDTF { public void process(Object[] objects) throws HiveException { String input = objects[0].toString(); String[] tmp = input.split(";"); for (String i:tmp){ try { String[] result = i.split(":"); forward(result); //转发 }catch (Exception e){ continue; } } } public StructObjectInspector initialize(StructObjectInspector[] argOIs) throws UDFArgumentException { if (argOIs.length != 1){ //判断参数是否一个 throw new UDFArgumentLengthException("ExploadMap只有一个参数"); } if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { //判断参数是否是PRIMITIVE,LIST,MAP,STRUCT,UNION类型; throw new UDFArgumentLengthException("参数类型不符合"); } ArrayListfileNames = new ArrayList (); ArrayList fileOIs = new ArrayList (); fileNames.add("1"); fileOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fileNames,fileOIs); } public void close() throws HiveException { }}
hive操作
添加 jar
hive> add jar /home/master/study/hive/UDTF.jar;
创建函数
hive> create temporary function expload_func as 'Expload';OK
验证
代码好像出错了。。。。。咕咕咕。。。
Transform函数
- 类似于UDF等函数
- Hive的 TRANSFORM 关键字提供了在SQL中调用自写脚本的功能
- 适合实现Hive中没有的功能又不想写UDF的情况。
py代码
import sysfor line in sys.stdin: ss = line.strip().split('\t') print('_'.join(ss))
测试
hive> add file home/master/study/hive/transform.py; hive> select transform(title) using "python transform.py" as test from movie_table limit 10;