博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HIVE 练习册
阅读量:4344 次
发布时间:2019-06-07

本文共 9666 字,大约阅读时间需要 32 分钟。

数据集:

下载地址:

ratings.csv

(base) [root@master data]# cat ratings.csv | head -5    1,31,2.5,1260759144    1,1029,3.0,1260759179    1,1061,3.0,1260759182    1,1129,2.0,1260759185    1,1172,4.0,1260759205

格式: userid , movieid , rating(评分) , timestamp

movies.csv

(base) [root@master data]# cat movies.csv | head -5movieId,title,genres1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy2,Jumanji (1995),Adventure|Children|Fantasy3,Grumpier Old Men (1995),Comedy|Romance4,Waiting to Exhale (1995),Comedy|Drama|Romance

格式: movieid , title , genres(流派)

创建表:

movie 表

create  table movie_table(movieid string,title string,genres string)row format delimited fields terminated by ','stored as textfilelocation '/movie_table';

rating 表

create  external table rating_table(userid string,movieid string,rating string,time string)row format delimited fields terminated by ','stored as textfilelocation '/rating_table'

查看表信息:

  1. 简易:
desc 表名;
  1. 详细:
desc formatted 表名;

检查:

select * from movie_table limit 10;

删除:

drop table movie_table;

数据导入:

  1. 从本地批量往table中导入数据
load data local inpath '/hive_data/ratings.csv' overwrite into table rating_table;

发现数据存在于HDFS的 /user/hive/warehouse/rating_table 目录

  1. 从HDFS批量往table中导入数据
load data inpath '/hive_data/ratings.csv' overwrite into table rating_table;

join 操作

hive> create table join_table as    > select /* +MAPJOIN(b)*/ B.userid,A.title,B.rating    > from movie_table A    > join rating_table B    > on A.movieid=B.movieid    > limit 10;

导出数据

  1. 导出到本地:
insert overwrite local directory '/home/master/study/hive/tmp' select * from join_table;
  1. 导出到HDFS:
insert overwrite directory '/hive/tmp' select * from join_table;

partition

create external table rating_table_p(userid string,movieid string,rating string,time string)partitioned by(dt STRING)row format delimited fields terminated by '\t'lines terminated by '\n';

导入数据

load data inpath '/hive_data/2008-08.data' overwrite into table rating_table_p partition(dt='2008-08');

验证

(base) [root@master tmp]# hadoop fs -ls /home/hive/warehouse/rating_table_pFound 2 itemsdrwx-wx-wx   - root supergroup          0 2019-09-22 23:50 /home/hive/warehouse/rating_table_p/dt=2003-10drwx-wx-wx   - root supergroup          0 2019-09-22 23:53 /home/hive/warehouse/rating_table_p/dt=2008-08(base) [root@master tmp]# hadoop fs -ls /home/hive/warehouse/rating_table_p/dt=2008-08Found 1 items-rwx-wx-wx   2 root supergroup      16096 2019-09-22 22:14 /home/hive/warehouse/rating_table_p/dt=2008-08/2008-08.data(base) [root@master tmp]# hadoop fs -text /home/hive/warehouse/rating_table_p/dt=2008-08/2008-08.data | head -268      260     3.0     2008-0868      318     4.0     2008-08(base) [root@master tmp]#

删除分区

alter table rating_table_p drop if exists partition(dt='2008-08');

添加分区

alter table rating_table_p add if not exists partition(dt='2008-08');

Bucket

// 开启开关set hive.enforce.bucketing=true;create table rating_table_b(userid string,movieid string,rating string)clustered by (userid) into 16 buckets;

导入数据:

from rating_tableinsert overwrite table rating_table_bselect userid, movieid, rating;

验证

(base) [root@master tmp]# hadoop fs -ls /home/hive/warehouse/rating_table_bFound 16 items-rwx-wx-wx   2 root supergroup      74544 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000000_0-rwx-wx-wx   2 root supergroup     119542 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000001_0-rwx-wx-wx   2 root supergroup      83047 2019-09-23 00:06 /home/hive/warehouse/rating_table_b/000002_0-rwx-wx-wx   2 root supergroup     125551 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000003_0-rwx-wx-wx   2 root supergroup     113070 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000004_0-rwx-wx-wx   2 root supergroup      85340 2019-09-23 00:06 /home/hive/warehouse/rating_table_b/000005_0-rwx-wx-wx   2 root supergroup      82565 2019-09-23 00:07 /home/hive/warehouse/rating_table_b/000006_0.....

采样

select * from  rating_table_b tablesample(bucket 3 out of 16 on userid) limit 10;

编程部分

1. UDF

JAVA部分:

pom.xml (网上找的模板)

4.0.0
11
11
1.0-SNAPSHOT
org.apache.hive
hive-exec
1.2.2

代码

import org.apache.hadoop.hive.ql.exec.UDF;public class Upper extends UDF{    public String evaluate(final String s){        return new String(s.toUpperCase());    }}

hive操作

添加 jar

hive> add jar /home/master/study/hive/1.jar;Added [/home/master/study/hive/1.jar] to class pathAdded resources: [/home/master/study/hive/1.jar]

创建函数

语法: create (可选:temporary) function 函数名 as '类名' (可选:using jar ' HDFS&Local 绝对路径') ; temporary 为临时函数,不带则永久。

hive> create temporary function upper_func as 'Upper';OKTime taken: 0.837 seconds

验证

hive> select movieid,upper_func(title) from movie_table limit 3;OKmovieId TITLE1       TOY STORY (1995)2       JUMANJI (1995)Time taken: 0.894 seconds, Fetched: 3 row(s)hive> select movieid,title from movie_table limit 3;OKmovieId title1       Toy Story (1995)2       Jumanji (1995)Time taken: 0.084 seconds, Fetched: 3 row(s)

2. UDAF

JAVA部分:

1、import org.apache.hadoop.hive.ql.exec.UDAF和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator

2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。

3、Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数。

a)init函数实现接口UDAFEvaluator的init函数。

b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。

c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。

d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。

e)terminate返回最终的聚集函数结果。

代码

import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;public class Avg extends UDAF {    public static class AvgState{        private long mCount;        private double mSum;    }    public static class AvgEvaluator implements UDAFEvaluator{        AvgState state;        public AvgEvaluator(){            super();            state = new AvgState();            init();        }        public void init() {            state.mSum = 0;            state.mCount = 0;        }        public boolean iterate(Double o){            if (o != null){                state.mSum += o;                state.mCount ++;            }            return true;        }        public AvgState terminatePartial(){            return state.mCount == 0 ? null:state;        }        public boolean merge(AvgState o) {            if (o != null){                state.mCount += o.mCount;                state.mSum += o.mSum;            }            return true;        }        public Double terminate(){            return state.mCount == 0 ? null : Double.valueOf(state.mSum/state.mCount);        }    }}

hive操作

添加 jar

hive> add jar /home/master/study/hive/2.jar;

创建函数

hive> create temporary function avg_test as 'Avg';OK

验证

hive> select avg_test(rating) from rating_table;OK......Total MapReduce CPU Time Spent: 8 seconds 150 msecOK3.543608255669773Time taken: 55.696 seconds, Fetched: 1 row(s)

3. UDTF

JAVA部分:

一个 UDTF 必须继承 GenericUDTF 抽象类然后实现抽象类中的 initialize,process,和 close方法。其中,Hive 调用 initialize 方法来确定传入参数的类型并确定 UDTF 生成表的每个字段的数据类型(即输入类型和输出类型)。initialize 方法必须返回一个生成表的字段的相应的 object inspector。一旦调用了 initialize() ,Hive将把 UDTF 参数传给 process() 方法,调用这个方法可以产生行对象并将行对象转发给其他操作器。最后当所有的行对象都传递出 UDTF 调用 close() 方法。

代码

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;public class Expload extends GenericUDTF {    public void process(Object[] objects) throws HiveException {        String input = objects[0].toString();        String[] tmp = input.split(";");        for (String i:tmp){            try {                String[] result = i.split(":");                forward(result);    //转发            }catch (Exception e){                continue;            }        }    }    public StructObjectInspector initialize(StructObjectInspector[] argOIs) throws UDFArgumentException {        if (argOIs.length != 1){            //判断参数是否一个            throw new UDFArgumentLengthException("ExploadMap只有一个参数");        }        if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {            //判断参数是否是PRIMITIVE,LIST,MAP,STRUCT,UNION类型;            throw new UDFArgumentLengthException("参数类型不符合");        }        ArrayList
fileNames = new ArrayList
(); ArrayList
fileOIs = new ArrayList
(); fileNames.add("1"); fileOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fileNames,fileOIs); } public void close() throws HiveException { }}

hive操作

添加 jar

hive> add jar /home/master/study/hive/UDTF.jar;

创建函数

hive> create temporary function expload_func as 'Expload';OK

验证

代码好像出错了。。。。。咕咕咕。。。

Transform函数

  • 类似于UDF等函数
  • Hive的 TRANSFORM 关键字提供了在SQL中调用自写脚本的功能
  • 适合实现Hive中没有的功能又不想写UDF的情况。

py代码

import sysfor line in sys.stdin:    ss = line.strip().split('\t')    print('_'.join(ss))

测试

hive> add file home/master/study/hive/transform.py;      hive> select transform(title) using "python transform.py" as test from movie_table limit 10;

转载于:https://www.cnblogs.com/pipemm/articles/11570097.html

你可能感兴趣的文章
RobotFramework自动化2-自定义关键字
查看>>
[置顶] 【cocos2d-x入门实战】微信飞机大战之三:飞机要起飞了
查看>>
BABOK - 需求分析(Requirements Analysis)概述
查看>>
第43条:掌握GCD及操作队列的使用时机
查看>>
Windows autoKeras的下载与安装连接
查看>>
CMU Bomblab 答案
查看>>
微信支付之异步通知签名错误
查看>>
2016 - 1 -17 GCD学习总结
查看>>
linux安装php-redis扩展(转)
查看>>
Vue集成微信开发趟坑:公众号以及JSSDK相关
查看>>
技术分析淘宝的超卖宝贝
查看>>
i++和++1
查看>>
react.js
查看>>
P1313 计算系数
查看>>
NSString的长度比较方法(一)
查看>>
Azure云服务托管恶意软件
查看>>
My安卓知识6--关于把项目从androidstudio工程转成eclipse工程并导成jar包
查看>>
旧的起点(开园说明)
查看>>
生产订单“生产线别”带入生产入库单
查看>>
crontab导致磁盘空间满问题的解决
查看>>