Hadoop Experiment - MapReduce on Cloudera
In this example I will extract data with NES reviews from http://videogamecritic.com. I will create a dataframe, add some extra fields and save the data to a CSV-file. This file will be used for a simple MapReduce script.
Note: I have a Docker container running with Selenium instead of installing all dependencies on my system. See this page.
Extract script
The script below is used to retrieve the data. It is ugly code, but it is doing the job.
import lxml.html
import requests
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import pandas as pd
URL = 'http://videogamecritic.com/nes.htm'
def map_rating(rating):
""" Function to convert the rating to a number """
if (rating == "A+"): return 1;
if (rating == "A"): return 2;
if (rating == "A-"): return 3;
if (rating == "B+"): return 4;
if (rating == "B"): return 5;
if (rating == "B-"): return 6;
if (rating == "C+"): return 7;
if (rating == "C"): return 8;
if (rating == "C-"): return 9;
if (rating == "D+"): return 10;
if (rating == "D"): return 11;
if (rating == "D-"): return 12;
if (rating == "F"): return 13;
if (rating == "F-"): return 14;
return 15;
resp = requests.get(URL)
if resp.status_code != 200:
raise Exception('GET '+link+' {}'.format(resp.status_code))
tree= lxml.html.fromstring(resp.content)
gamepages = ['http://videogamecritic.com/' + game.get('href') for game in tree.cssselect('h3 a')]
driver = webdriver.Remote("http://localhost:4444/wd/hub", DesiredCapabilities.CHROME)
gamedata = []
for page in gamepages:
# Retrieve the data
driver.get(page)
data = lxml.html.fromstring(driver.page_source)
# Extract the fields
grades = [elem.text_content() for elem in data.cssselect('div[class*=\'hdr\']')]
names = [elem.text_content() for elem in data.cssselect('div[class*=\'hdl\']')]
metadata = [elem.text_content() for elem in data.cssselect('div[class*=\'mdl\']')]
votes = [elem.text_content() for elem in data.cssselect('div[class*=\'vote\']')]
# Append to dataset
gamedata += list(zip(names, votes, grades, metadata))
driver.quit()
##
# DataFrame magic
##
df = pd.DataFrame.from_dict(gamedata)
df = df.rename(columns={0: "name", 1: "vote", 2: "grade", 3: "publisher"})
# Extract and convert data
df['reader_rating'] = df['vote'].str.extract('Readers:\s(.*?)\s\(', expand=True)
df['reader_rating'] = df['reader_rating'].apply(lambda x: map_rating(x)).astype('int')
df['number_of_votes'] = df['vote'].str.extract('\((\d*)\svotes\)', expand=True).astype('int')
df['grade'] = df['grade'].str.replace("Grade:", "").str.strip().apply(lambda x: map_rating(x)).astype('int')
df['publish_year'] = df['publisher'].str.extract('\((\d*)\)Reviewed', expand=True)
df['publisher'] = df['publisher'].str.extract("Publisher:\s(.*?)\s\(", expand=True)
df.drop('vote', axis=1, inplace=True)
# Calculate the total grade
df['total_grade'] = (df['grade'] + df['reader_rating']*df['number_of_votes']) / (df['number_of_votes']+1)
# Corrections
df['publisher'] = df['publisher'].str.replace('Electrobrain', 'Electro Brain')
# Save to file
df.to_csv('nesgamedata.csv', sep='\t', header=False)
This will result in the following file.
0 10-Yard Fight 12 Nintendo 10 44 1985 10.044444444444444
1 1942 11 Capcom 8 65 1985 8.045454545454545
2 1943 5 Capcom 4 58 1988 4.016949152542373
3 720 Degrees 13 Tengen 11 24 1989 11.08
4 8 Eyes 13 Taxan 10 30 1989 10.096774193548388
5 Abadox 11 Abadox 6 34 1989 6.142857142857143
6 Adventure Island 7 Hudson Soft 6 63 1987 6.015625
7 Adventure Island 2 4 Hudson Soft 5 40 1990 4.975609756097561
8 Adventure Island 3 4 Hudson Soft 5 27 1992 4.964285714285714
9 Adventures in the Magic Kingdom 5 Capcom 8 23 1990 7.875
...
MapReduce script
Calculate how often what rating is used.
- Map the reader ratings
- Reduce to counts per rating
Lets create the script that will perform my first MapReduce action. The content of the following cell will be saved to a file which can in turn be used to perform the mapping and reducing.
from mrjob.job import MRJob
from mrjob.step import MRStep
class GamesBreakdown(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_ratings,
reducer=self.reducer_count_ratings)
]
def mapper_get_ratings(self, _, line):
(index, name, grade, publisher, reader_rating, number_of_votes, publish_year, total_grade) = line.split('\t')
yield reader_rating, 1
def reducer_count_ratings(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
GamesBreakdown.run()
Execution
To use Hadoop, the command should look like the following and should be run on the Hadoop machine, with the hadoop-streaming-jar argument only given in case the .jar is not found:
python gamesbreakdown.py -r hadoop --hadoop-streaming-jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar nesgamedata.csv
To do this, upload the nesgamesdata.csv
file and the script gamesbreakdown.py
to HDFS, using the Hue files view and upload functionality. Next copy them to the local folder to be used in the command. Of course other methods to get the files locally on the Hadoop machine can be used.
I am using the Cloudera Quickstart image to experiment with Docker and Cloudera and the docker-compose.yml
contains the following:
version: '2'
services:
cloudera:
hostname: quickstart.cloudera
command: /usr/bin/docker-quickstart
tty: true
privileged: true
image: cloudera/quickstart:latest
volumes:
- ./data:/opt/data
ports:
- "8020:8020"
- "8022:22" # ssh
- "7180:7180" # Cloudera Manager
- "8888:8888" # HUE
- "11000:11000" # Oozie
- "50070:50070" # HDFS REST Namenode
- "2181:2181"
- "11443:11443"
- "9090:9090"
- "8088:8088"
- "19888:19888"
- "9092:9092"
- "8983:8983"
- "16000:16000"
- "16001:16001"
- "42222:22"
- "8042:8042"
- "60010:60010"
- "8080:8080"
- "7077:7077"
Connect to the machine and verify the file is present:
jitsejan@ssdnodes-jj-kvm:~/cloudera_docker$ docker exec -ti clouderadocker_cloudera_1 bash
[root@quickstart /]# cd home/
[root@quickstart home]# mkdir gamedata && cd $_
[root@quickstart gamedata]# hadoop fs -get gamedata/gamesbreakdown.py gamesbreakdown.py
[root@quickstart gamedata]# hadoop fs -get gamedata/nesgamedata.csv nesgamedata.csv
[root@quickstart gamedata]# ll
total 20
-rw-r--r-- 1 root root 590 Sep 5 13:42 gamesbreakdown.py
-rw-r--r-- 1 root root 15095 Sep 5 13:56 nesgamedata.csv
Install the mrjob library on the Cloudera container.
[root@quickstart gamedata]# yum install python-pip -y
[root@quickstart gamedata]# pip install mrjob
Run the script without using Hadoop to verify the installation.
[root@quickstart gamedata]# python gamesbreakdown.py nesgamedata.csv
No configs found; falling back on auto-configuration
Creating temp directory /tmp/gamesbreakdown.root.20170905.135809.859796
Running step 1 of 1...
Streaming final output from /tmp/gamesbreakdown.root.20170905.135809.859796/output...
"10" 15
"11" 14
"12" 12
"13" 4
"2" 9
"3" 28
"4" 44
"5" 34
"6" 48
"7" 31
"8" 28
"9" 15
Removing temp directory /tmp/gamesbreakdown.root.20170905.135809.859796...
Now use Hadoop to start the cluster magic.
[root@quickstart gamedata]# python gamesbreakdown.py -r hadoop nesgamedata.csv
No configs found; falling back on auto-configuration
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 2.6.0
Looking for Hadoop streaming jar in /home/hadoop/contrib...
Looking for Hadoop streaming jar in /usr/lib/hadoop-mapreduce...
Found Hadoop streaming jar: /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
Creating temp directory /tmp/gamesbreakdown.root.20170905.135908.241472
Copying local files to hdfs:///user/root/tmp/mrjob/gamesbreakdown.root.20170905.135908.241472/files/...
Running step 1 of 1...
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3556720292725669631.jar tmpDir=null
Connecting to ResourceManager at /0.0.0.0:8032
Connecting to ResourceManager at /0.0.0.0:8032
Total input paths to process : 1
number of splits:2
Submitting tokens for job: job_1504618114531_0001
Submitted application application_1504618114531_0001
The url to track the job: http://quickstart.cloudera:8088/proxy/application_1504618114531_0001/
Running job: job_1504618114531_0001
Job job_1504618114531_0001 running in uber mode : false
map 0% reduce 0%
map 50% reduce 0%
map 100% reduce 0%
map 100% reduce 100%
Job job_1504618114531_0001 completed successfully
Output directory: hdfs:///user/root/tmp/mrjob/gamesbreakdown.root.20170905.135908.241472/output
Counters: 49
File Input Format Counters
Bytes Read=19191
File Output Format Counters
Bytes Written=86
File System Counters
FILE: Number of bytes read=2307
FILE: Number of bytes written=358424
FILE: Number of large read operations=0
FILE: Number of read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=19527
HDFS: Number of bytes written=86
HDFS: Number of large read operations=0
HDFS: Number of read operations=9
HDFS: Number of write operations=2
Job Counters
Data-local map tasks=2
Launched map tasks=2
Launched reduce tasks=1
Total megabyte-seconds taken by all map tasks=8678400
Total megabyte-seconds taken by all reduce tasks=3588096
Total time spent by all map tasks (ms)=8475
Total time spent by all maps in occupied slots (ms)=8475
Total time spent by all reduce tasks (ms)=3504
Total time spent by all reduces in occupied slots (ms)=3504
Total vcore-seconds taken by all map tasks=8475
Total vcore-seconds taken by all reduce tasks=3504
Map-Reduce Framework
CPU time spent (ms)=2450
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=367
Input split bytes=336
Map input records=282
Map output bytes=1737
Map output materialized bytes=2313
Map output records=282
Merged Map outputs=2
Physical memory (bytes) snapshot=651063296
Reduce input groups=12
Reduce input records=282
Reduce output records=12
Reduce shuffle bytes=2313
Shuffled Maps =2
Spilled Records=564
Total committed heap usage (bytes)=679477248
Virtual memory (bytes) snapshot=4099473408
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Streaming final output from hdfs:///user/root/tmp/mrjob/gamesbreakdown.root.20170905.135908.241472/output...
"10" 15
"11" 14
"12" 12
"13" 4
"2" 9
"3" 28
"4" 44
"5" 34
"6" 48
"7" 31
"8" 28
"9" 15
Removing HDFS temp directory hdfs:///user/root/tmp/mrjob/gamesbreakdown.root.20170905.135908.241472...
Removing temp directory /tmp/gamesbreakdown.root.20170905.135908.241472...
Now lets add another reducer step to sort the counts of the ratings.
from mrjob.job import MRJob
from mrjob.step import MRStep
class GamesBreakdownUpdate(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_ratings,
reducer=self.reducer_count_ratings),
MRStep(reducer=self.reducer_sorted_output)
]
def mapper_get_ratings(self, _, line):
(index, name, grade, publisher, reader_rating, number_of_votes, publish_year, total_grade) = line.split('\t')
yield reader_rating, 1
def reducer_count_ratings(self, key, values):
yield str(sum(values)).zfill(2), key
def reducer_sorted_output(self, count, ratings):
for rating in ratings:
yield rating, count
if __name__ == '__main__':
GamesBreakdownUpdate.run()
[root@quickstart gamedata]# python gamesbreakdownupdate.py nesgamedata.csv
No configs found; falling back on auto-configuration
Creating temp directory /tmp/gamesbreakdownupdate.root.20170905.142738.885314
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /tmp/gamesbreakdownupdate.root.20170905.142738.885314/output...
"13" "04"
"2" "09"
"12" "12"
"11" "14"
"10" "15"
"9" "15"
"3" "28"
"8" "28"
"7" "31"
"5" "34"
"4" "44"
"6" "48"
Removing temp directory /tmp/gamesbreakdownupdate.root.20170905.142738.885314...
That concludes my first experiments with Hadoop and MapReduce. Next step: using Pig or Spark to calculate similar statistics without all the overhead.