Apache Hadoop WordCount example is the HelloWorld of Hadoop. Using this to Database Sinking of Hadoop output makes it easy to understand. Database I used is MySQL and the DDL for table used is as following;
CREATE TABLE word_count(word VARCHAR(254), count INT);
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class WordCount {
public static class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, DBOutput, IntWritable> {
private static IntWritable one = new IntWritable(1);
private static DBOutput text = new DBOutput();
@Override
public void map(LongWritable key, Text value,
OutputCollector<DBOutput, IntWritable> collect, Reporter arg3)
throws IOException {
StringTokenizer token = new StringTokenizer(value.toString());
while(token.hasMoreTokens()) {
text.setText(token.nextToken());
collect.collect(text, one);
}
}
}
public static class WordCountReducer extends MapReduceBase implements Reducer<DBOutput, IntWritable, DBOutput, IntWritable> {
@Override
public void reduce(DBOutput key, Iterator<IntWritable> values,
OutputCollector<DBOutput, IntWritable> collect, Reporter arg3)
throws IOException {
int sum = 0;
IntWritable no = null;
DBOutput dbKey = new DBOutput();
while(values.hasNext()) {
no = values.next();
sum += no.get();
}
dbKey.setText(key.getText());
dbKey.setNo(sum);
collect.collect(dbKey, new IntWritable(sum));
}
}
public void run(String inputPath, String outputPath) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
DistributedCache.addFileToClassPath(new Path("<Absolute Path>/mysql-connector-java-5.1.7-bin.jar"), conf);
// the keys are DBOutput
conf.setOutputKeyClass(DBOutput.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(WordCountReducer.class);
conf.setOutputFormat(DBOutputFormat.class);
FileInputFormat.addInputPath(conf, new Path(inputPath));
DBOutputFormat.setOutput(conf, "word_count", "word", "count");
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/sample", "root", "root");
//FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf);
}
public static void main(String[] args) throws Exception {
WordCount wordCount = new WordCount();
wordCount.run(args[0], args[1]);
}
private static class DBOutput implements DBWritable, WritableComparable<DBOutput> {
private String text;
private int no;
@Override
public void readFields(ResultSet rs) throws SQLException {
text = rs.getString("word");
no = rs.getInt("count");
}
@Override
public void write(PreparedStatement ps) throws SQLException {
ps.setString(1, text);
ps.setInt(2, no);
}
public void setText(String text) {
this.text = text;
}
public String getText() {
return text;
}
public void setNo(int no) {
this.no = no;
}
public int getNo() {
return no;
}
@Override
public void readFields(DataInput input) throws IOException {
text = input.readUTF();
no = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeUTF(text);
output.writeInt(no);
}
@Override
public int compareTo(DBOutput o) {
return text.compareTo(o.getText());
}
}
}
Furthermore I have written a custom Hadoop type for key which implements DBWritable and WritableComparable. I have used this as the Output Key Class. Command to run this is as following;
./bin/hadoop jar <Path to Jar>/HadoopTest.jar WordCount <Input Folder> <Dummy Output Folder>
7 comments:
Hi,
I just followed your blog and was able to put the data into the database as was supposed to do by the job but i now want to read the data and currently I am facing a problem with it. It would be of great help if you could post a job to retrieve the same data that you put in the DB.
Hi Lotus,
You can either use sqoop to retrieve data from your database as flat files and use them as input in your map reduce or you can write your custom DBInput.
Thanks man. Saved the day !!!
Hey Can u write a similar tutorial to take data from databse ... would be very helpful!!
I compiled this example and after map phase is 50% completed, I get an error which says that "DBOutput cannot be cast to DBWritable".
Please help
Great job and thanks for sharing the informative data and if wanna solve any issues refer at
lawyers in hyderabad
Great job and thanks for sharing the informative data and if wanna solve any issues refer at
family lawyers in Hyderabad
Post a Comment