Tuesday, November 13, 2012

Storing Apache Hadoop WordCount Example Output to Database

Apache Hadoop WordCount example is the HelloWorld of Hadoop. Using this to Database Sinking of Hadoop output makes it easy to understand. Database I used is MySQL and the DDL for table used is as following;

CREATE TABLE word_count(word VARCHAR(254), count INT);
After creating the following Apache Hadoop Job along with Mapper and Reducer to Sink the output to Database. For this I use DBOutputFormat as the OutputFormat and DBConfiguration to specify DB configuration parameters.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;

public class WordCount {
    public static class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, DBOutput, IntWritable> {
        private static IntWritable one = new IntWritable(1);
        private static DBOutput text = new DBOutput();
        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<DBOutput, IntWritable> collect, Reporter arg3)
                throws IOException {
            StringTokenizer token = new StringTokenizer(value.toString());
            while(token.hasMoreTokens()) {
                text.setText(token.nextToken());
                collect.collect(text, one);
            }            
                        
        }
        
    }
    
    public static class WordCountReducer extends MapReduceBase implements Reducer<DBOutput, IntWritable, DBOutput, IntWritable> {

        
        @Override
        public void reduce(DBOutput key, Iterator<IntWritable> values,
                OutputCollector<DBOutput, IntWritable> collect, Reporter arg3)
                throws IOException {
            int sum = 0;
            IntWritable no = null;
            DBOutput dbKey = new DBOutput();
            
            while(values.hasNext()) {
                no = values.next();
                sum += no.get();
            }
            dbKey.setText(key.getText());
            dbKey.setNo(sum);
            collect.collect(dbKey, new IntWritable(sum));
            
        }
        
    }
    
    public void run(String inputPath, String outputPath) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");
        DistributedCache.addFileToClassPath(new Path("<Absolute Path>/mysql-connector-java-5.1.7-bin.jar"), conf);

        // the keys are DBOutput
        conf.setOutputKeyClass(DBOutput.class);
        // the values are counts (ints)
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(WordCountMapper.class);
        conf.setReducerClass(WordCountReducer.class);        
        
        conf.setOutputFormat(DBOutputFormat.class);

        FileInputFormat.addInputPath(conf, new Path(inputPath));
        DBOutputFormat.setOutput(conf, "word_count", "word", "count");
        
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/sample", "root", "root");
        
        //FileOutputFormat.setOutputPath(conf, new Path(outputPath));

        JobClient.runJob(conf);
      }
    
    public static void main(String[] args) throws Exception {
        WordCount wordCount = new WordCount();
        wordCount.run(args[0], args[1]);
    }
    
    private static class DBOutput implements DBWritable, WritableComparable<DBOutput> {
        
        private String text;
        
        private int no;

        @Override
        public void readFields(ResultSet rs) throws SQLException {
            text = rs.getString("word");
            no = rs.getInt("count");
        }

        @Override
        public void write(PreparedStatement ps) throws SQLException {
            ps.setString(1, text);
            ps.setInt(2, no);
        }
        
        public void setText(String text) {
            this.text = text;
        }
        
        public String getText() {
            return text;
        }
        
        public void setNo(int no) {
            this.no = no;
        }
        
        public int getNo() {
            return no;
        }

        @Override
        public void readFields(DataInput input) throws IOException {
            text = input.readUTF();    
            no = input.readInt();
        }

        @Override
        public void write(DataOutput output) throws IOException {
            output.writeUTF(text);
            output.writeInt(no);
        }

        @Override
        public int compareTo(DBOutput o) {
            return text.compareTo(o.getText());
        }
        
    }
}
Furthermore I have written a custom Hadoop type for key which implements DBWritable and WritableComparable. I have used this as the Output Key Class. Command to run this is as following;
./bin/hadoop jar <Path to Jar>/HadoopTest.jar WordCount <Input Folder> <Dummy Output Folder>

Monday, November 12, 2012

Hadoop Vs Cassandra and HBase

The Vanilla hadoop consists of a Distributed File System (DFS) at the core and libraries to support Map Reduce model to write programs to do analysis. DFS is what enables Hadoop to be scalable. It takes care of chunking data into multiple nodes in a multi node cluster so that Map Reduce can work on individual chunks of data available nodes thus enabling parallelism.
The paper for Google File System which was the basis for Hadoop Distributed File System (HDFS) can be found here
The paper for Map Reduce model can be found here

For a detailed explanation on Map Reduce read this post
Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store. It is not a conventional database but is more like Hashtable or HashMap which stores a key/value pair. Both Cassandra and HBase are implementations of Google's BigTable. Paper for Google BigTable can be found here.
BigTable makes use of a String Sorted Table (SSTable) to store key/value pairs. SSTable is just a File in HDFS which stores key followed by value. Furthermore BigTable maintains a index which has key and offset in the File for that key which enables reading of value for that key using only a seek to the offset location. SSTable is effectively immutable which means after creating the File there is no modifications can be done to existing key/value pairs. New key/value pairs are appended to the file. Update and Delete of records are appended to the file, update with a newer key/value and deletion with a key and tombstone value. Duplicate keys are allowed in this file for SSTable.  The index is also modified with whenever update or delete take place so that offset for that key points to the latest value or tombstone value.

Thus you can see Cassandra's/HBase's internals allow fast read/write which is crucial for real time data handling. Whereas Vanilla Hadoop with Map Reduce can be used to process batch oriented passive data.

Saturday, November 3, 2012

Distance Scanner - Low Cost Walking Aid for the Blind

I got the time to work on arduino and put up a small project to aid Blind people navigate through obstacles. There are many attempts made by the arduino community to accomplish this but most of them used motors, servo motors to give feedback. But Blind people have to get used to different inputs they get from these devices and driving motor requires a lot of battery power, thus users might have to end up changing batteries frequently.

Furthermore many Blind use sound which comes from tapping the White cane to navigate through obstacles so sound is something that they are already familiar with.

My approach uses a Ultrasonic Ping Sensor and a Buzzer to notify the Blind person who wears it with a Buzzing sound. Items required are;

  1. Arduino (Any Model)
  2. Ultrasonic Ping Sensor
  3. Buzzer or Piezo Speaker
  4. Jumper Cables

Arduino Sketch Source code is below 

/* 
  Distance Scanner 
  By Shazin Sadakath
*/

#define BUZZER 10

int URPWM = 3; 
int URTRIG=5; 

int minDistanceCm = 50;

unsigned int Distance=0;
uint8_t EnPwmCmd[4]={0x44,0x02,0xbb,0x01};    
 
void setup(){                                 
 
    pinMode(URTRIG,OUTPUT);                   
  digitalWrite(URTRIG,HIGH);                  
  
  pinMode(URPWM, INPUT);                      
    
  pinMode(BUZZER, OUTPUT);
}
 
void loop()
{
 digitalWrite(URTRIG, LOW);
 digitalWrite(URTRIG, HIGH);               
     
 unsigned long DistanceMeasured=pulseIn(URPWM,LOW);
     
 if(DistanceMeasured==50000){              
     // Invalid    
 }else{
     Distance=DistanceMeasured/50;           
 }
 delay(20);
 buzzBasedOnDistance();
}                      

void buzzBasedOnDistance() {
  int far = Distance - minDistanceCm;
  if(far <= 0) {
    digitalWrite(BUZZER, 200); 
  } else {
    digitalWrite(BUZZER, 0);
  }
}

There is a configurable minimum distance threshold and when objects get closer than that the buzzer makes the noise so that obstacle can be avoided. Check the video below for a small demonstration.




The USB cable is only used for Power. It can be replaced with a battery pack powering the arduino.

Constructive criticism is always welcome!