Thursday, March 24, 2016

Hive Dynamic , Static Partitions,User defined functions(UDF) with Java

This post is having more advanced concepts in Hive like Dynamic Partition, Static Partition, custom map reduce script, hive UDF using java and python.

Configuring Hive to allow partitions
A query across all partitions can trigger with an enormous Map Reduce Job, if the table data and number of partitions are large. A highly suggested safety measure is putting Hive into strict mode, which prohibits queries of a partitioned table without a WHERE clause that filters the partitions.
We can set the mode to nonstrict, as in the following session.

Dynamic Partitioning –configuration

Hive> set hive.exec.dynamic.partition.mode=nonstrict;
Hive> set hive.exec.dynamic.partition=true;
Hive> set hive.enforce.bucketing=true;

Once we have configured, Then we will see how we will create a dynamic partition
Example:
Source table:
1. Hive> create table transaction_records(txnno INT,txndate STRING,custno INT,amount DOUBLE,category STRING, product STRING,City STRING,State String,Spendby String )row format delimited fields terminated by ‘,’ stored as textfile;
Create Partitioned table:
1.  Hive> create table transaction_recordsByCat(txnno INT,txndate STRING,custno INT,amount DOUBLE, product STRING,City STRING,State String,Spendby String )
Partitioned by (category STRING)
Clustered by(state) INTO 10 buckets 
row format delimited fields terminated by ‘,’ stored as textfile;


In the above partitioned query we are portioning table depending on the category and bucketing by 10 that means it will create 0-9 buckets and assign the hash value the same.

Column category no need to provide in table structure , Since we are creating partition based on the category


Insert existing table data into newly created partition table.
Hive>from transaction_records txn  INSERT OVERWRITE TABLE table transaction_recordsByCat PARTITION(category) select txn.txnno ,txn.txndate,txn.custno,txn.amount,
txn. product,txn.City,txn.State,txn.Spendby ,txn.category DISTRIBUTE BY category;
Static partition
If we get data every month to process the same, we can use the static partition
Hive> create table logmessage(name string,id int) partitioned by (year int,month int) row format delimited fileds terminated by ‘\t’;
How to insert data for static partition table?

Hive>alter table logmessage add partition(year=2014,month=2);

Custom Map Reduce script using Hive

Hive QL allows traditional map/reduce programmers to be able to plug I their custom mappers and reducers to do more sophisticated analysis that may not be supported by the built-in capabilities of the language.


Sample data scenario
We are having movie data, different users will give different ratings for same movie or different movies.

user_movie_data.txt file having data like belowuserid,rating,unixtime
1      1       134564324567
2      3       134564324567
3      1       134564324567
4      2       134564324567
5      2       134564324567
6      1       134564324567

Now with above data, we need to create a table called u_movie_data,then we will load the data to the same.

Hive>CREATE TABLE u_movie_data(userid INT,rating INT,unixtime STRING) ROW FORMATED DELIMITED FIELDS TERMINATED BY ‘\t’ STROED AS TEXTFILE;
Hive> LOAD DATA LOCAL INPATH ‘/usr/local/hive_demo/user_movie_data.txt’ OVERWRITE INTO TABLE u_movie_data;

We can use any logic which will be converted unix time into weekday, any custom integration. Here we used python script.
Import sys
Import datetime
for line in sys.stdin:
          line = line.strip()
         userid,movieid,rating,unixtime=line.split(‘\t’)
        weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
      print ‘\t’.join([userid,movieid,rating,str(weekday)])

How we will execute python script in hive, first add the file into Hive shell?

Hive> add FILE /usr/local/hive_demo/weekday_mapper.py;

Now load the data into table, we need to do TRANSFORM

INSERT OVERWRITE TABLE u_movie_data_new
       SELECT  TRANSFORM(userid,movieid,rating,unixtime)
      USING ‘python weekday_mapper.py’ 
      AS (userid,movieid,rating,weekday) from u_movie_data;


Hive QL- User-defined function
1.Suppose we have 2 columns – 1 is id of type string and another one is unixtimestamp of type String.
Create a data set with 2 columns(udf_input.txt) and place it inside /usr/local/hive_demo/

one,1456432145676
       two, 1456432145676
       three, 1456432145676
       four, 1456432145676
       five, 1456432145676
       six, 1456432145676
Now we can create a table and load the data the same.
create table udf_testing (id string,unixtimestamp string)
              Row format delimited fields terminated by ‘,’;
   Hive>  load data local inpath ‘/usr/local/hive_demo/udf_input.txt’
   Hive>select * from udf_testing;
Now we will write User defined function using java to get more meaningful date and time format.

Open eclipse->create new java project and New class- add the below code inside java class.
Add the jars from hive location.
Import java.util.Date;
Import java.text.DateFormat;
Import org.apache.hadoop.hive.ql.exec.UDF;
Import org.apache.hadoop.io.Text;
public class UnixTimeToDate extends UDF {
    public Text evaluate(Text text){
     if(text==null) return null;
        long timestamp = Long.parseLong(text.toString());
        return new Text(toDate(timestamp));
   }
private String toDate(long timestamp){
   Date date = new Date(timestamp*1000);
   Return DateFormat.getInstance().format(date).toString();
}
}   

Once created, then export jar file as unixtime_to_java_date.jar
Now we need to execute jar file from Hive
1. We need to add the jar file in hive shell
Hive>add JAR /usr/local/hive_demo/ unixtime_to_java_date.jar;
      Hive>create temporary FUNCTION  userdate  AS  ‘UnixTimeToDate’;
      Hive> select id,userdate(unixtimestamp) from udf_testing;

This is how we will work with hive. Hope you like this post.
Thank you for viewing this post.

2 comments:

AddToAny

Contact Form

Name

Email *

Message *