Sunday, July 29, 2012

More Fun with Hadoop In Action Exercises (Pig and Hive)

In my last post, I described a few Java based Hadoop Map-Reduce solutions from the Hadoop in Action (HIA) book. According to the Hadoop Fundamentals I course from Big Data University, part of being a Hadoop practioner also includes knowing about the many tools that are part of the Hadoop ecosystem. The course briefly touches on the following four tools - Pig, Hive, Jaql and Flume.

Of these, I decided to focus (at least for the time being) on Pig and Hive (for the somewhat stupid reason that the HIA book covers these too). Both of these are are high level DSLs that produce sequences of Map-Reduce jobs. Pig provides a data flow language called PigLatin, and Hive provides a SQL-like language called HiveQL. Both tools provide a REPL shell, and both can be extended with UDFs (User Defined Functions). The reason they coexist in spite of so much overlap is because they are aimed at different users - Pig appears to be aimed at the programmer types and Hive at the analyst types.

The appeal of both Pig and Hive lies in the productivity gains - writing Map-Reduce jobs by hand gives you control, but it takes time to write. Once you master Pig and/or Hive, it is much faster to generate sequences of Map-Reduce jobs. In this post, I will describe three use cases (the first of which comes from the HIA book, and the other two I dreamed up).

Patent (Jaccard) Similarity - Pig

Given the Patent Citation Data (cite75_99.zip), the objective is to find "similar" patents. The HIA book provides a worked example of finding patents with similar citations using the same dataset. However, I found the online Pig documentation much more useful for learning PigLatin.

For the problem, I decided to use Jaccard similarity between the cited patents to indicate the similarity between two (citing) patents. The PigLatin script below first groups the cited patents and then passes it into a Pig UDF JaccardSimilarity (Java Code | JunitTest). You can find the script on GitHub here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* 
 * patent_similarity.pig
 * Given an input file of (citing_patent_number,cited_patent_number), this
 * script computes the Jaccard similarity between individual patents by
 * considering the overlap of the cited patents.
 */

-- pigudfs-1.0-SNAPSHOT.jar contains custom UDF JaccardSimilarity
REGISTER ./pigudfs-1.0-SNAPSHOT.jar; 

-- Load the text into a relation. Example:
-- (1,2)
-- (1,3)
-- (2,3)
-- (2,4)
-- (3,5)
-- citings = LOAD 'test.txt'
citings = LOAD 'input/cite75_99.txt' 
  USING PigStorage(',') AS (citing:int, cited:int);

-- Group by citing patent number. Example:
-- (1,{(1,2),(1,3)})
-- (2,{(2,3),(2,4)})
-- (3,{(3,5)})
citings_grpd = GROUP citings BY citing; 

-- Join previous 2 relations to include the cited patent in the relation
-- Tuple. Example:
-- (1,2,1,{(1,2),(1,3)})
-- (1,3,1,{(1,2),(1,3)})
-- (2,3,2,{(2,3),(2,4)})
-- (2,4,2,{(2,3),(2,4)})
-- (3,5,3,{(3,5)})
citings_joined = JOIN citings BY citing, citings_grpd BY group;

-- Eliminate the extra citings_grpd.group column and rename for sanity.
-- (1,2,{(1,2),(1,3)})
-- (1,3,{(1,2),(1,3)})
-- (2,3,{(2,3),(2,4)})
-- (2,4,{(2,3),(2,4)})
-- (3,5,{(3,5)})
citings_joined2 = FOREACH citings_joined 
  GENERATE $0 AS citing, $1 AS cited, $3 AS cite_pairs;

-- JOIN previous relation with citings_grpd to add the patents list
-- for the cited patent. We already have the patent list for the citing
-- patent. For reference, these relations are as follows:
-- citings_joined2: {citing: int,cited: int,cite_pairs: {(citing: int,cited: int)}}
-- citings_grpd: {group: int,citings: {(citing: int,cited: int)}}
-- Resulting data looks like this:
-- (1,2,{(1,2),(1,3)},2,{(2,3),(2,4)})
-- (1,3,{(1,2),(1,3)},3,{(3,5)})
-- (2,3,{(2,3),(2,4)},3,{(3,5)})
citings_joined3 = JOIN citings_joined2 BY cited, citings_grpd BY group;

-- Eliminate the extra citings_grpd.group value and rename cols for sanity.
-- Also eliminate the citing part of the tuples in both left_citeds and
-- right_citeds, so we can calculate similarity.
-- (1,2,{(2),(3)},{(3),(4)})
-- (1,3,{(2),(3)},{(5)})
-- (2,3,{(3),(4)},{(5)})
citings_joined4 = FOREACH citings_joined3 
  GENERATE $0 AS citing, $1 AS cited, 
           $2.cited AS left_citeds, $4.cited AS right_citeds;

-- Remove sim(A,A) because we know its always 1.0
citings_joined5 = FILTER citings_joined4 BY citing != cited;

-- Project the relation through a UDF
-- (1,2,0.0)
-- (1,3,0.0)
-- (2,3,0.0)
citings_similarity = FOREACH citings_joined5 
  GENERATE citing, cited, 
  com.mycompany.pigudfs.JaccardSimilarity(left_citeds, right_citeds) AS jsim;

-- Remove entries with 0 similarity
citings_similarity2 = FILTER citings_similarity BY jsim > 0.0;

-- Order the output by descending order of similarity
citings_similarity_ordrd = ORDER citings_similarity2 BY jsim DESC;

-- Store the output in a comma-separated format into output
STORE citings_similarity_ordrd INTO 'patent_sim_output' USING PigStorage(',');

The above script results in a linear sequence of 5 Map-Reduce jobs. As you can imagine, building the script above would probably take less time than building a sequence of 5 Map-Reduce jobs.

This Data Chef Blog post has a solution to a similar problem where it demonstrates how to calculate Jaccard Similarity without a UDF. However, I find my solution simpler, based on my (very limited) experience that UDFs make for easier to read PigLatin code).

Another trick I found useful and wanted to share... While building Pig (or for that matter, Hive) programs, it is often helpful to work with a very small (often cooked up) subset of data. The data I used can be found in comments above each block of code (it is the data produced by DUMPing the last tuple in each block). As you can see, the data I used is unrelated in terms of content to real data, but it exhibits characteristics that my code will exploit to produce the results. Also since the dataset size is small, its easy to do multiple quick runs during development.

Movie (Collaborative Filtering) Similarity - Pig

This example is something I dreamt up, based on reading something about collaborative filtering. The objective here is to find movies to recommend to a user given his choice of movies so far, using Collaborative Filtering against the ratings file in the MovieLens 1M dataset. For each movie, we find users who have rated the same movie, then we find movies (call this related movies) rated by each of those users - the number of times a related movie is mentioned for a given movie constitutes its "similarity" to the original movie. Here is the script. It uses a UDF to Deduplicate, Rank and Count occurrences of related movies called OrderByCountDesc (Java Code | JUnit test). The script can be downloaded from GitHub here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
 * movie_collab_filter.pig
 * Finds movies to recommend based on collaborative filtering.
 * Each movie is mapped to a user and then each user is mapped to other
 * movies rated highly by the user. The other movies are candidates for
 * recommendation order by rating desc.
 */
-- Register custom UDF jar
REGISTER ./pigudfs-1.0-SNAPSHOT.jar;

-- load data
-- The field delimiter here is "::" which can't be parsed by PigStorage, so
-- we need to sed the input to replace "::" to "\t". Output:
-- (1,100,4,20120724)
-- (2,100,5,20120724)
-- (3,100,4,20120724)
-- (1,200,4,20120724)
-- (3,200,5,20120724)
-- (1,300,1,20120724)
-- (2,300,4,20120724)
ratings = LOAD 'input/ml-ratings.dat' USING PigStorage('\t') 
  AS (uid:int, mid:int, rating:int, timestamp:chararray);

-- since this is a recommender system we want to only consider entries whose
-- ratings >= 4 (on a 5-point rating scale). Also remove extraneous cols.
-- (1,100)
-- (2,100)
-- (3,100)
-- (1,200)
-- (3,200)
-- (2,300)
ratings2 = FILTER ratings BY rating > 3;
ratings3 = FOREACH ratings2 GENERATE uid, mid;

-- Build copy of ratings3 for self JOINs below
ratings3_copy = FOREACH ratings3 GENERATE *;

-- For each movie, first find all other users who have rated the movie
-- highly, then for each such movie, find all the other movies for the
-- same user. The other movies are the ones related to the original 
-- movie through the magic of collaborative filtering.
ratings_join_mid = JOIN ratings3 BY mid, ratings3_copy BY mid;
ratings_join_mid2 = FOREACH ratings_join_mid 
  GENERATE $0 AS uid, $1 AS mid, $3 AS tmid;
ratings_join_uid = JOIN ratings_join_mid2 BY uid, ratings3 BY uid;

-- Remove rows where the original movie and the "other" movie are the
-- same (because we don't want to recommend the same movie to the user).
-- Finally remove extraneous columns. Final output after this block:
-- (100,200)
-- (200,100)
-- (100,200)
-- (200,100)
-- (100,200)
-- (300,100)
-- (100,300)
-- (100,300)
-- (100,300)
-- (100,200)
-- (100,200)
-- (200,100)
-- (200,100)
-- (100,200)
ratings_join_uid2 = FILTER ratings_join_uid BY $1 != $4;
ratings_join_uid3 = FOREACH ratings_join_uid2 
  GENERATE $1 AS mid, $4 AS rmid;

-- Group the related movies so we can generate a count.
-- (100,{(100,200),(100,200),(100,200),(100,300),(100,300),
--        (100,300),(100,200),(100,200),(100,200)})
-- (200,{(200,100),(200,100),(200,100),(200,100)})
-- (300,{(300,100)})
ratings_cnt = group ratings_join_uid3 BY mid;

-- Use custom UDF to dedup and rerank related movie tuples by count.
-- (100,{(100,200),(100,300)})
-- (200,{(200,100)})
-- (300,{(300,100)})
ratings_cnt_ordrd = FOREACH ratings_cnt 
  GENERATE group AS mid, 
  com.mycompany.pigudfs.OrderByCountDesc($1) AS ordered_mids;

-- Flatten the result so data can be fed into a relational table.
-- (100,200)
-- (100,300)
-- (200,100)
-- (300,100)
ratings_cnt_flat = FOREACH ratings_cnt_ordrd
  GENERATE mid,
  FLATTEN(ordered_mids.mid_r) AS rmid;

-- Store output into HDFS
STORE ratings_cnt_flat INTO 'movie_collab_filter_output' 
  USING PigStorage('\t');

The script results in a non-linear sequence of 4 Map-Reduce jobs.

Movie (Collaborative Filtering) Similarity - Hive

While I was building the Pig solution above, it occurred to me that this would be better solved with Hive, since this is basically pivoting on two columns and grouping, so I decided to try doing this with Hive. If you are familiar with SQL, then Hive has a learning curve thats almost non-existent. Here is Hive-QL script.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
--
-- movie_collab_filter.q
-- Builds up a table of similar movie pairs ordered by count.
--

-- Create table to hold input data and load input data
-- Output:
-- 1 100 4 20120724
-- 2 100 5 20120724
-- 3 100 4 20120724
-- 1 200 4 20120724
-- 3 200 5 20120724
-- 1 300 1 20120724
CREATE TABLE ratings (uid INT, mid INT, rating INT, tstamp INT)
  ROW FORMAT DELIMITED                             
  FIELDS TERMINATED BY '\t'
  STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '/tmp/test.txt' OVERWRITE INTO TABLE ratings;

-- Only use ratings which are > 3
CREATE TABLE ratings2 (uid INT, mid INT);
INSERT OVERWRITE TABLE ratings2
  SELECT uid, mid FROM ratings
  WHERE rating > 3;

-- For each (uid,mid) pair, find all users who have the same mid
-- Then for each such record, find all movies with the same uid.
-- Output:
-- 100 200
-- 100 300
-- 300 100
-- 300 200
-- 100 200
-- 100 300
-- 200 100
-- 200 300
-- 200 100
-- 200 300
-- 100 200
-- 100 300
-- 300 100
-- 100 300
-- 100 300
-- 100 300
-- 200 100
-- 100 200
-- 200 100
-- 100 200
-- 100 200
CREATE TABLE mid_pairs (mid INT, rmid INT);
INSERT OVERWRITE TABLE mid_pairs 
  SELECT a.mid, c.mid 
  FROM ratings2 a JOIN ratings2 b ON (a.mid = b.mid) 
                  JOIN ratings2 c ON (b.uid = c.uid);

-- Eliminate pairs where the source and related mid are identical.
CREATE TABLE mid_pairs2 (mid INT, rmid INT);
INSERT OVERWRITE TABLE mid_pairs2
  SELECT mid, rmid
  FROM mid_pairs
  WHERE mid != rmid;

-- Group by (mid, rmid) and count occurrences
-- 100 200 6
-- 100 300 6
-- 200 100 4
-- 200 300 2
-- 300 100 2
-- 300 200 1
CREATE TABLE mid_counts (mid INT, rmid INT, cnt INT);
INSERT OVERWRITE TABLE mid_counts
  SELECT mid, rmid, COUNT(rmid)
  FROM mid_pairs2
  GROUP BY mid, rmid;

DROP TABLE ratings2;
DROP TABLE mid_pairs;
DROP TABLE mid_pairs2;

This translates into 5 underlying Map-Reduce jobs. As you can see, not too hard to understand if you know SQL (which I am guessing most of you do).

Thats it for today. See you again next week!

2 comments (moderated to prevent spam):

Anonymous said...

Hi.

I am developing a parse plugin, the plugin is working fine but i have small problem.

It is possible to parse redirected pages? I mean, i need that fetcher calls parse on that pages so i can save the result on a database.

thanks

Sujit Pal said...

Hi, I'm guessing the question is for Nutch? In that case, would it perhaps help if Nutch could follow redirects? It ooes not by default, but you can make it do so. Hopefully this fixes the issue?