VOOZH about

URL: https://www.javacodegeeks.com/2012/05/mapreduce-questions-and-answers-part-2.html

⇱ MapReduce Questions and Answers Part 2 - Java Code Geeks


4 Inverting Indexing for Text Retrieval

The chapter contains a lot of details about integer numbers encoding and compression. Since these topics are not directly about MapReduce, I made no questions about them.

4.4 Inverting Indexing: Revised Implementation

Explain inverting index retrieval algorithm. You may assume that each document fits into the memory. Assume also then there is a huge number of documents. Which part should be optimized by integer encoding and compression?

Input: text documents
key: document id
value: text document

Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences]
list elements must be sorted by numberOfOccurences

The answer is:

Mapper counts number of occurrences in the document for each word. As the whole document fits into the memory, we can hold partial results in a map.

Intermediate key/values:
key: word, numberOfOccurences
value: documentId

Custom partitioner groups intermediate key/values by word. Custom sort sorts them primary by word and secondary by the number of occurrences.

Reducer uses initialize method to initialize list of all postings. Reduce method handles two cases:

  • current word equal to previous word – add documentId and numberOfOccurences to posting list.
  • current word equal to previous word – emit previous word and posting list; initialize posting list.

Posting list in reducer should be compressed.

class MAPPER
 method INITIALIZE
 H = new hash map 

 method MAP(docid, doc d)
 H = new hash map
 for all term w in doc d do
 H(w) = H(w) + 1

 for all term w in H do
 emit(pair(u, w), count 1)

 method CLOSE 
 for all term w in H
 emit(pair(w, H(w)), docid) 

class REDUCER
 variable previous_word = 0
 variable PL = new list of postings

 method REDUCE(pair (w, #occurrences), docid)
 if w <> previous_word && previous_word <> 0 do
 emit(w, PL)
 PL = new list of postings
 
 PL.add(pair(#occurrences, docid))
 previous_word = w

 method compare(key (w1, o1), key (w2, o2))
 if w1 = w2 
 return keys are equal

 return keys are different

class SORTING_COMPARATOR
 method compare(key (w1, o1), key (w2, o2))
 if w1 = w2 do
 return compare(o1, o2)
 
 return compare(w1, w2)

5 Graph Algorithms

The chapter contains two algorithms: shortest path in the graph and page ranking algorithm. The questions are straightforward.

5.2 Parallel Breadth-First Search

Find shortest path from one node origin to all other nodes. Each edge has a weight associated. Input key/value pairs have already bean preprocessed into comfortable form.

Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]

Output: key/value pairs where
key: node id
value: distance to origin, list[adjacent node, edge length]

The answer is:

The algorithm requires multiple iterations. It stops the iteration does not change any ‘distance to origin’. At worst, there will be O(n) iterations where n is a number of nodes in the graph.

Mapper passes original graph to the next iteration as it is. Plus, it generates key/value pair for each adjacent node. The value contains the minimum known distance from origin if the route would go through node.

class MAPPER
 method MAP(node, pair(dist, adjacencylist))
 emit(node, pair(dist, adjacencylist))
 for all (closenode, nodedist) in adjacencylist do
 emit(closenode, pair(dist + nodedist, empty))

Reducer finds the minimum known distance from each node. It passes the distance along with the original graph to the next iteration. It also increments global counter whenever minimum known distance to any node changes.

class REDUCER
 method REDUCE(node, list(dist, adjacencylist))
 minimum = infinity
 previous_iteration_solution = infinity
 original_graph = empty
 for all (dist, adjacencylist) in list do
 if adjacencylist not empty do 
 original_graph = adjacencylist
 previous_iteration_solution = dist
 if minimum > dist
 minimum = dist
 
 if previous_iteration_solution <> minimum
 increment global counter
 emit(node, pair(minimum, original_graph)) 

If the global counter is 0, the algorithm stops. Otherwise another iteration is needed.

Explain page rank algorithm, assume alpha = 0. 

The answer is:

Page rank P(n) of a page n is calculated form page ranks of all pages linking to it.

P(n) = sum_m (P(m)/C(m))

The sum goes through all pages m linking to the page n. C(m) is the number of outgoing links of the page m.

Page rank algorithm runs in iterations. Mapper passes page rank contribution of each page to adjacent pages. Reducer updates page rank of each node. The algorithm stops when page ranks no longer moves.

class MAPPER
 method MAP(page, (page_rank, adjacency_list))
 emit(page, (0, adjacency_list))
 contribution = page_rank/adjacency_list.length
 for all node in adjacency_list do
 emit(node, (contribution, empty))

class REDUCER
 method REDUCE(page, contributions[c1, c2, ..., cn])
 rank = 0
 adjacency_list = new list
 for all c in contributions do
 adjacency_list.addAll(c.adjacency_list)
 rank = rank + c.contribution 

 emit(page, (rank, adjacency_list))

6 EM Algorithms For Text Processing

I made no questions out of this chapter.

Exercises
This chapter contains hands-on exercises for MapReduce. Some of them require multiple iterations.

Warm Up

Count number of occurrences of every word in a text collection.

Input:
key: document id,
value: text document.

Output:
key: word,
value: number of occurences.

The answer is:

Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.

class MAPPER
 method MAP(docid a, doc d)
 for all term w in doc d do
 emit(w, 1)

class COMBINER
 method COMBINE(word w, counts[c1, c2, ..., cn])
 s = 0
 for all c in counts[c1, c2, ..., cn] do 
 s = s + c

 emit(word w, s)

class REDUCER
 variable total_occurrences = 0

 method REDUCE(word w, counts[c1, c2, ..., cn])
 s = 0
 for all c in counts[c1, c2, ..., cn] do 
 s = s + c

 emit(word w, s)

Alternative solution would use in-mapper combining.  

Web Store

Website user log contains user ids and length of each session. The website has modest number of registered users. Compute the average session length for each user.

Input:
key: user id,
value: session length.

Output:
key: user id,
value: average session length.

The answer is:

As the number of registered users is modest, we can use in-mapper combining.

class MAPPER
 variable total_time = new hash map 
 variable sessions_number = new hash map 

 method MAP(user_id, session_length)
 total_time(user_id) = total_time(user_id) + session_length
 sessions_number(user_id) = sessions_number(user_id) + 1

 method CLOSE 
 for all user_id in total_logged_in_time
 tt = total_time(user_id)
 sn = sessions_number(user_id)
 emit(user_id, pair(tt, sn)) 

class REDUCER
 method REDUCE(user_id, [pairs(time, sessions_number)])
 total_time = 0
 total_sessions = 0
 for all pairs in [pairs(time, sessions_number)] do 
 total_time = total_time + time
 total_sessions = total_sessions + sessions_number

 emit(user_id, total_time/total_sessions)

Web store log contains user id and bought item for each sale. You need to implement “buyers of item also bought” functionality. Whenever the item is shown, the store will suggest five items most often bought by items buyers.

Input:
key: user id,
value: brought item.

Output:
key: item,
value: list of five most common "buyers of item also bought" items.

The answer is:

Our solution has two iterations. First iteration generates lists of all items brought by the same user. Grouping is done by the framework, both mapper and reducer perform an identity function.

Input:
key: user id,
value: brought item.

Output:
key: user id,
value: list of all brought items.

class MAPPER
 method MAP(user_id, item)
 emit(user_id, item)

class REDUCER
 method REDUCE(user_id, items[i1, i2, ..., in])
 emit(user_id, items)

Second iteration solves co-occurrences problem on list items. It uses the stripes approach. Only difference against the standard solution is that we have emit only five most common co-occurrences.

Input:
key: user id,
value: list of all brought items.

Output:
key: item,
value: list of five most common co-occurrences.

class MAPPER
 method MAP(user_id, items[i1, i2, ..., in])
 for all item in items do
 H = new hash map
 for all item j in items do
 H(j) = H(j) + 1
 emit(item, H)

class REDUCER
 method REDUCE(item, stripes[H1, H2, ..., Hn])
 T = new hash map
 for all H in stripes do
 for all (key/value) in H do
 T(key) = T(key) + value
 emit(user_id, max_five(T))

Web store log contains user id, timestamp, item and number of brought pieces for each sale. The store is looking for items whose sales rise or decline at the same time. Find 20 item couples with maximum of such months.

Input:
key: user id,
value: timestamp, brought item, count.

Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value

The answer is:

Our solution requires multiple MapReduce iterations. We have to:

  • calculate whether items sales for any given month went up or down,
  • create lists of items with the same sales change during the same month,
  • find number of co-occurrences in those lists,
  • choose items with maximum co-occurrences.

First iteration calculates sales changes for any given month. We have to supply mapper, partitioner, custom sort and reducer. Mapper generates one intermediate key/value pair for each input key/value. Key is composed of sold item and sales month. Value contains number of sold pieces.

Partitioner sends all key/value pairs with the same item to the same reducer. Custom sort sorts them by months. Finally, reducer calculates sales changes.

Input:
key: user id,
value: timestamp, item, count.

Intermediate key/values:
key: item, month
value: count.

Output:
key: month, up/down/equal
value: item.

class MAPPER
 method MAP(user_id, (timestamp, item, count))
 month = get_month(timestamp) 
 emit((item, month), count)

class PARTITIONING_COMPARATOR
 method compare(key (item1, month1), key (item2, month2))
 if item1 = item2 
 return keys are equal

 return keys are different

class SORTING_COMPARATOR
 method compare(key (item1, month1), key (item2, month2))
 if item1 = item2 do
 return compare(month1, month2)
 
 return compare(item1, item2)

class REDUCER
 method REDUCE((item, month), counts[c1, c2, ..., cn])
 c = sum([c1, c2, ..., cn])
 if last_item = item
 if last_month + 1 = month
 //emit correct up/down/equal flags
 if last_count < count
 emit((item, month), up)
 if last_count > count
 emit((item, month), down)
 if last_count = count
 emit((item, month), equal)
 else
 //no sales during some months
 emit((item, last_month + 1), down)
 emit((item, month), up)
 else 
 // new item
 emit((last_item, last_month + 1), down)
 emit((item, month), up)

 last_item = item
 last_count = count
 last_month = month

Second iteration groups first iteration results by keys. It generates lists of items with same sales changes during the same month. Framework does all the work. Both mapper and reducer perform an identity function.

Input:
key: month, up/down/equal
value: item.

Output:
key: month, up/down/equal
value: [items].

Third iteration performs standard ‘co-occurrences by pairs’ algorithm.

Input:
key: month, up/down/equal
value: [items].

Intermediate key/values:
key: item, item
value: partial number of co-occurrences.

Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples

class MAPPER
 method MAP((month, change), items[i1, i2, ..., in])
 for each i in items do 
 for each j in items do
 if i != j 
 emit((i, j), 1) 

class COMBINER
 method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn])
 s = 0
 for all c in co-occurrences[c1, c2, ..., cn] do 
 s = s + c

 emit((item1, item2), s)

class REDUCER
 method REDUCE((item, item), co-occurrences[c1, c2, ..., cn])
 s = 0
 for all c in co-occurrences[c1, c2, ..., cn] do 
 s = s + c

 emit((item1, item2), s)

Finally, we have to choose 20 key/value pairs with maximum value. Each mapper selects 20 key/value pairs with maximum value and emits them with the same key. There will be only one reducer which selects final 20 key/value pairs.

Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples

Intermediate key/values:
key: 1
value: item, item, number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value for each mapper

Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value

the code is very simple but long

Criminal Agency

Inputs to all exercises in this chapter uses the same data structure.

Criminal agency stole Facebook’s friendships database and wants to analyze new data. Friendships are stored in form key/value pairs, each friendship corresponds to two key/value pairs:

Friends:
key: first friend name
value: second friend name

key: second friend name
value: first friend name

The agency owns also criminal records of all citizens:

Criminal record:
key: citizen name
value: when, where, accomplices, description

Find at risk youths. A person is considered at risk youth if more than half of his friends have criminal record.

The answer is:

Our solution has two iterations. First iteration joins two sets and flags each ‘value friend’ with has_record/law_abiding flags.

Output:
key: first friend
value: second friend, has_record/law_abiding

The mapper flags each key with data set name. Partitioner groups data according to names in keys and sorter puts criminal records before friendships. We could use local aggregation to remove multiple criminal records for the same person.

class MAPPER
 method MAP(name, value)
 if value is name do
 emit(name, friendship, item)
 else
 emit(name, criminal, item)

class PARTITIONING_COMPARATOR
 method compare(key (name1, dataset1), key (name2, dataset2))
 if name1 = name2
 return keys are equal
 
 return keys are different
 
class SORTING_COMPARATOR
 method compare(key (name1, dataset1), key (name2, dataset2))
 if name1 = name2 AND dataset1 is criminal
 return key1 is lower

 if name1 = name2 AND dataset2 is criminal
 return key2 is lower

 return compare(name1, name2)

class REDUCER
 variable previous_name

 method REDUCE(pair(name, flag), items[i1, i2, ..., in])
 if flag is criminal do 
 previous_name = name
 has_record = criminal
 return 

 if previous_name <> name do 
 has_record = law_abiding
 else 
 has_record = criminal

 previous_name = name
 for all i in items do
 emit(i.name, pair(name, has_record))

Second iteration counts both total number of friends and number of friends with criminal record. Reducer emits key/value pairs only for at risk youths. Also this iteration could use some kind of local aggregation.

Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets

Output:
key: name
value: empty
# only at risk youths

class MAPPER
 method MAP(name, pair(name, has_record))
 if has_record is law_abiding do
 emit(name, pair(0, 1))
 else
 emit(name, pair(1, 1))

class REDUCER
 method REDUCE(name, items[pair(total, criminals)])
 total = 0
 criminals = 0
 for all i in items do
 total = total + i.total
 criminals = criminals + i.criminals

 if criminals / total > 0.5 do
 emit(name, empty) 

Find gangs. Gang is a group of people that:

  • has exactly 5 members,
  • each member is friend with all other members,
  • each two members committed at least 3 crimes together.
The answer is:

Again, we need three iterations. The idea is to first clean up the graph of all useless edges, so that only criminal contacts remain. Then, we split graph into smaller manageable sub-graphs. We attach all criminal contacts and edges between them to each person:

Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.

Final reducer takes smaller graphs represented by value in each key/value pair and finds complete sub-graphs with 4 vertices in it. Add person from the key in it, and you have found a complete sub-graph with 5 vertices. The reducer may use any polynomial algorithm.

First iteration uses pairs approach to clear the graph. We omit both local aggregation and removal of duplicities. Both would make the algorithm more efficient.

Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1

Output:
key: first friend, second friend
value: empty
# only friends with common criminal record

class MAPPER
 method MAP(name, value)
 if value is name do
 emit(triple(name, value, friendship), empty)
 else
 for all crime_accomplice in value.accomplices do
 emit(triple(name, crime_accomplice, accomplice), 1)

class PARTITIONING_COMPARATOR
 method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
 if name1 = name2 AND accomplice1 = accomplice2
 return keys are equal
 
 return keys are different
 
class SORTING_COMPARATOR
 method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
 if name1 = name2 AND accomplice1 AND flag1 is friendship
 return key1 is lower

 if name1 = name2 AND accomplice1 AND flag2 is friendship
 return key2 is lower

 return compare(pair(name1, accomplice1), pair(name2, accomplice2))

class REDUCER
 variable previous_name
 variable previous_accomplice

 method sameAsPrevious(name, accomplice) 
 if previous_name <> name
 return false

 if previous_accomplice <> accomplice
 return false

 return true

 method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in])
 if sameAsPrevious(name, accomplice) do 
 if items.length > 2 do 
 emit(name, accomplice)
 return

 if flag is friendship do 
 previous_name = name
 previous_accomplice = accomplice

Second iteration attaches lists of all ‘second degree’ friends to edges:

Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend

key: second friend
value: first friend, second friend

Output:
key: first friend, second friend
value: all friends of second friend

key: second friend, first friend
value: all friends of first friend

class MAPPER
 method MAP((first friend, second friend), empty)
 emit(first friend, (first friend, second friend))
 emit(second friend, (first friend, second friend))

class REDUCER
 method REDUCE(name, edges[e1, e2, ..., en])
 friends = new Set
 friends.add(name)

 for all edge in edges do
 friends.add(edge.v1, edge.v2)

 for all edge in edges do
 emit(edge, friends) 

Finally, mapper and shuffle and sort phase together generate lists of all friends of any given person and relationships between them.

Input
key: friend 1, friend 2
value: all friends of friend 2

Intermediate key/values:
key: friend 1
value: friend 2, all friends of friend 2

Reducers input (after shuffle and sort):
key: person
values: all his friends and relationships between them.

Output:
key: first friend, second friend, third friend, fourth friend, fifth friend
value: gang

class MAPPER
 method MAP((friend , friend 2), all friends of second friend)
 emit(friend 1, (friend 2, all friends of friend 2))

class REDUCER
 method REDUCE(name, graph attached to it)
 any polynomial algorithm will work

Reference: MapReduce Questions and Answers from our JCG partner Maria Jurcovicova at the This is Stuff blog.

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Thank you!

We will contact you soon.

👁 Photo of Maria Jurcovicova
Maria Jurcovicova
May 17th, 2012Last Updated: October 22nd, 2012
0 256 12 minutes read
Back to top button
Close
wpDiscuz