Advertisement

Sri Lanka's First and Only Platform for Luxury Houses and Apartment for Sale, Rent

Wednesday, March 4, 2015

Observable in Reactive Programming, Hybrid of Publisher/Subscriber Pattern and Iterator Pattern

As Programmers we are advised to use Design Patterns as much as possible. But making use of Design Patterns doesn't guarantee that our code becomes Flexible, Maintainable and Bug Free. But it is certainly a step towards achieving those goals.

When I was in University I was told by lecturers to read the book Design Patterns: Elements of Reusable Object-Oriented Software by authors Erich Gamma, Richard Helm, Ralph Johnson and John Vlissides famously known as the Gang of Four. The authors went a long way to find the Design Patterns that are used by Industry professionals and managed to document all those in detail in a nice and understandable book.

In that book under Behavioral Patterns there were two patterns which could be taught as most widely used patterns which are Publisher/Subscriber pattern and Iterator pattern.

In brief Publisher/Subscriber pattern enables to decouple data production and consumption clearly by having Producers notify Subscribed consumers as and when data is available. And Iterator pattern enables to loop through a collection transparently without knowing underlying implementation. 

At first the two patterns looks totally different but as Jafar Husain points out in the presentation video in the references section, any problem that can be solved by Publisher/Subscriber pattern can also be solved by Iterator pattern and vice-versa. 

Yet both were distinguishable from each other because with Publisher/Subscriber pattern there is no way of finding whether the Producer is finished producing data and with Iterator pattern one could do it. 

Reactive Programming is a topic which is catching up in the industry and fueled by Netflix is actually an hybrid of both Publisher/Subscriber pattern and Iterator pattern. This model enables separation of Publisher/Subscriber logic along with Iterator pattern's feature of being notified of completion and exceptions. RxJava provides this model programming to be performed in Java language and it enables Declarative style of programming while producing concise code. Below is an example which shows both Declarative/Functional (RxJava based implementation) and Imperative (Core Java based implementation) doing the same task. 

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.List;

import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

/**
 * RxJava Observable vs Imperative Java Code
 * 
 * @author shazin
 *
 */
public class Main {

    public static void main(String[] args) {
        List<String> urls = Arrays.asList("http://www.google.com", "http://www.lkbotics.com", "http://www.yahoo.com");
        
        System.out.println("============================= Declarative/Functional ==========================================");
        
        Observable.from(urls)
                //.subscribeOn(Schedulers.io()) // Uncommenting this line will enable concurrent execution
                .flatMap(url -> Observable.just(getHtml(url)))
                .filter(html -> html != null && html.toString().length() > 0)
                .doOnNext(html -> saveHtml(html))
                .map(html -> getTitle(html))
                .doOnError(t -> System.out.println("Error : "+t))
                .doOnCompleted(() -> {System.out.println("Completed!");})
                .subscribe(title -> System.out.println(title));
        
        System.out.println("=================================== Imperative ================================================");
        
        for(String url:urls) {
            try {
                String html = getHtml(url);
                if(html != null && html.length() > 0) { 
                    saveHtml(html);
                    System.out.println(getTitle(html));
                }
            } catch(Exception e) {
                System.out.println("Error : "+e);
                break;
            }
        }
        System.out.println("Completed!");
    }

    public static String getHtml(String url) {
        System.out.printf("Retrieving Html for url : %s\n", url);
        StringBuilder out = new StringBuilder();
        HttpURLConnection httpURLConnection = null;
        try {
            httpURLConnection = (HttpURLConnection) new URL(
                    url).openConnection();
            try (BufferedReader br = new BufferedReader(new InputStreamReader(
                    httpURLConnection.getInputStream()))) {

                String line = null;
                while ((line = br.readLine()) != null) {
                    out.append(line).append("\n");
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if(httpURLConnection != null) {
                httpURLConnection.disconnect();
            }
        }

        return out.toString();
    }
    
    public static String getTitle(String html) {
        String content = html.toLowerCase();
        return html.substring(content.indexOf("<title>") + 7, content.indexOf("</title>"));
    }
    
    public static void saveHtml(String html) {
        try {
            MessageDigest digest = MessageDigest.getInstance("MD5");
            byte[] data = digest.digest(html.getBytes("UTF-8"));
            
            System.out.printf("Saving html for %s : %s\n", getTitle(html), hex(data));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    public static String hex(byte[] data) {
        StringBuilder hex = new StringBuilder();
        for(byte d:data) {
            hex.append(String.format("%x", d & 0xFF));
        }
        return hex.toString();
    }

}

The real power and flexibility of Reactive Programming against Imperative Programming is its ability scale and produce low response time. Just for example to run the tasks concurrently, un-commenting the //.subscribeOn(Schedulers.io()) line would suffice whereas running the imperative tasks concurrently would require a lot more  careful coding.

There are plenty of more benefits of Reactive Programming and this post doesn't cover all of them, Please go through the References section to learn more.

References