High performance RSS/Atom parsing

Parsing RSS feeds is very easy in Java. Several libs exist to get the job done: feed4j, rssowl, Apache Abdera and many others. But the most commly used is ROME. ROME is a set of RSS and Atom Utilities for Java. It makes it easy to work in Java with most syndication formats: RSS 0.9x, 1.0, 2.0 and Atom 0.3, 1.0.

Reading RSS from a source is dead-simple, you need these dependencies:

   <!-- Rome Atom+RSS -->
   <dependency>
    <groupId>net.java.dev.rome</groupId>
    <artifactId>rome</artifactId>
    <version>1.0.0</version>
   </dependency>
   <dependency>
    <groupId>net.java.dev.rome</groupId>
    <artifactId>rome-fetcher</artifactId>
    <version>1.0.0</version>
   </dependency>

and a piece of code that looks like this:

        URL url = new URL("http://feeds.feedburner.com/manishchhabra27");
        HttpURLConnection httpcon = (HttpURLConnection)url.openConnection();
        // Reading the feed
        SyndFeedInput input = new SyndFeedInput();
        SyndFeed feed = input.build(new XmlReader(httpcon));
        List entries = feed.getEntries();
        Iterator itEntries = entries.iterator();

        while (itEntries.hasNext()) {
            SyndEntry entry = itEntries.next();
            System.out.println("Title: " + entry.getTitle());
            System.out.println("Link: " + entry.getLink());
            System.out.println("Author: " + entry.getAuthor());
            System.out.println("Publish Date: " + entry.getPublishedDate());
            System.out.println("Description: " + entry.getDescription().getValue());
            System.out.println();
        }

(from: http://manishchhabra.com/2011/10/rome-library-example-for-parsing-rss-and-atom-feeds/)

But in a context of an enterprise-grade application, especially if you want build a “Google Reader”-like service , we just must NOT use ROME out-of-the-box. ROME can be tuned on many points.

Cache

ROME allows you to cache feed details by implementing the interface com.sun.syndication.fetcher.impl.FeedFetcherCache. By default, three classes implement this interface:
-com.sun.syndication.fetcher.impl.HashMapFeedInfoCache
-com.sun.syndication.fetcher.impl.LinkedHashMapFeedInfoCache
-com.sun.syndication.fetcher.impl.DiskFeedInfoCache

If you already have a cache (Infinispan, memcache, EhCache…), you can reuse it and benefit from its features (distribution, replication, time-to-live…). Instead of creating a specific implementation, let’s create a generic class delegating the caching logic to a Spring cache manager. Then, you will be able to change the cache depending on your needs. Spring cache already has a Spring cache manager delegating to ehCache. You will find more implementation by Googling “spring cache ‘the-name-of-the-awesome-cache-framework-you-use'”, for instance:

So, choose your poison, and let’s code the ROME FeedFetcherCache delegating to a Spring’s cache manager:

import java.net.URL;
import javax.annotation.PostConstruct;
import org.springframework.cache.Cache;
import org.springframework.cache.Cache.ValueWrapper;
import org.springframework.cache.CacheManager;
import com.google.common.base.Preconditions;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;
import com.sun.syndication.fetcher.impl.SyndFeedInfo;

public class SpringFeedFetcherCache implements FeedFetcherCache {

  private String cacheName;

  private CacheManager cacheManager;

  public SpringFeedFetcherCache(String cacheName, CacheManager cacheManager) {
  this.cacheName = cacheName;
  this.cacheManager = cacheManager;
 }

  private Cache cache;

  @PostConstruct
 public void init() {
  Preconditions.checkNotNull(cacheManager);
  Preconditions.checkNotNull(cacheName);
  cache = cacheManager.getCache(cacheName);
 }

  @Override
 public SyndFeedInfo getFeedInfo(URL feedUrl) {
  return get(feedUrl);
 }

  @Override
 public void setFeedInfo(URL feedUrl, SyndFeedInfo syndFeedInfo) {
  cache.put(feedUrl, syndFeedInfo);
 }

  @Override
 public void clear() {
  cache.clear();
 }

  @Override
 public SyndFeedInfo remove(URL feedUrl) {
  SyndFeedInfo syndFeedInfo = get(feedUrl);
  cache.evict(feedUrl);
  return syndFeedInfo;
 }

  private SyndFeedInfo get(URL feedUrl) {
  ValueWrapper valueWrapper = cache.get(feedUrl);
  if (valueWrapper != null) {
   return (SyndFeedInfo) valueWrapper.get();
  }
  return null;
 }
}

Not trap here…

Http connections

An another thing is that ROME use URLConnection or the old commons-httpclient to fetch the RSS. In your application you certainly have an instance of an HttpClient (httpcomponents-httpclient) from your social layer, mongodb java connector, rest framework or other. In all case, we can reuse it. So, let’s implement a new AbstractFeedFetcher:

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.zip.GZIPInputStream;

import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;

import com.google.common.base.Preconditions;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.fetcher.FetcherEvent;
import com.sun.syndication.fetcher.FetcherException;
import com.sun.syndication.fetcher.impl.AbstractFeedFetcher;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;
import com.sun.syndication.fetcher.impl.SyndFeedInfo;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedInput;
import com.sun.syndication.io.XmlReader;

public class HttpComponentsFeedFetcher extends AbstractFeedFetcher {

 private CloseableHttpClient client;

 private FeedFetcherCache feedInfoCache;

 private volatile HttpClientMethodCallbackIntf httpClientMethodCallback;

 public HttpComponentsFeedFetcher(CloseableHttpClient client) {
  this.client = client;
 }

 public interface HttpClientMethodCallbackIntf {
  public void afterHttpClientMethodCreate(HttpGet request);
 }

 @Override
 public SyndFeed retrieveFeed(URL feedUrl) throws IllegalArgumentException,
   IOException, FeedException, FetcherException {
  Preconditions.checkNotNull(feedUrl, "null is not a valid URL");

  String urlStr = feedUrl.toString();

  HttpGet request = new HttpGet(urlStr);
  request.addHeader("Accept-Encoding", "gzip");
  request.addHeader("User-Agent", getUserAgent());

  if (httpClientMethodCallback != null) {
   httpClientMethodCallback.afterHttpClientMethodCreate(request);
  }

  FeedFetcherCache cache = feedInfoCache;
  if (cache != null) {
   // retrieve feed
   if (isUsingDeltaEncoding()) {
    request.addHeader("A-IM", "feed");
   }

   // get the feed info from the cache
   // Note that syndFeedInfo will be null if it is not in the
   // cache
   SyndFeedInfo syndFeedInfo = cache.getFeedInfo(feedUrl);
   if (syndFeedInfo != null) {
    request.addHeader("If-None-Match", syndFeedInfo.getETag());

    if (syndFeedInfo.getLastModified() instanceof String) {
     request.addHeader("If-Modified-Since",
       (String) syndFeedInfo.getLastModified());
    }
   }

   try (CloseableHttpResponse response = client.execute(request)) {
    int statusCode = response.getStatusLine().getStatusCode();
    fireEvent(FetcherEvent.EVENT_TYPE_FEED_POLLED, urlStr);
    handleErrorCodes(statusCode);

    SyndFeed feed = getFeed(syndFeedInfo, urlStr, response,
      statusCode);

    syndFeedInfo = buildSyndFeedInfo(feedUrl, urlStr, response,
      feed, statusCode);

    cache.setFeedInfo(new URL(urlStr), syndFeedInfo);

    // the feed may have been modified to pick up cached values
    // (eg - for delta encoding)
    feed = syndFeedInfo.getSyndFeed();

    return feed;
   } finally {
    request.releaseConnection();
   }

  } else {
   // cache is not in use
   try (CloseableHttpResponse response = client.execute(request)) {
    int statusCode = response.getStatusLine().getStatusCode();
    fireEvent(FetcherEvent.EVENT_TYPE_FEED_POLLED, urlStr);
    handleErrorCodes(statusCode);
    return getFeed(null, urlStr, response, statusCode);
   } finally {
    request.releaseConnection();
   }
  }
 }

 private SyndFeed getFeed(SyndFeedInfo syndFeedInfo, String urlStr,
   CloseableHttpResponse response, int statusCode) throws IOException,
   FetcherException, FeedException {

  if (statusCode == HttpURLConnection.HTTP_NOT_MODIFIED
    && syndFeedInfo != null) {
   fireEvent(FetcherEvent.EVENT_TYPE_FEED_UNCHANGED, urlStr);
   return syndFeedInfo.getSyndFeed();
  }

  SyndFeed feed = retrieveFeed(urlStr, response);
  fireEvent(FetcherEvent.EVENT_TYPE_FEED_RETRIEVED, urlStr, feed);
  return feed;
 }

 private SyndFeed retrieveFeed(String urlStr, CloseableHttpResponse response)
   throws IOException, FetcherException, FeedException {

  InputStream stream = null;
  if ((response.getFirstHeader("Content-Encoding") != null)
    && ("gzip".equalsIgnoreCase(response.getFirstHeader(
      "Content-Encoding").getValue()))) {
   stream = new GZIPInputStream(response.getEntity().getContent());
  } else {
   stream = response.getEntity().getContent();
  }
  try {
   XmlReader reader = null;
   if (response.getFirstHeader("Content-Type") != null) {
    reader = new XmlReader(stream, response.getFirstHeader(
      "Content-Type").getValue(), true);
   } else {
    reader = new XmlReader(stream, true);
   }
   SyndFeedInput syndFeedInput = new SyndFeedInput();
   syndFeedInput.setPreserveWireFeed(isPreserveWireFeed());

   return syndFeedInput.build(reader);
  } finally {
   if (stream != null) {
    stream.close();
   }
  }
 }

 private SyndFeedInfo buildSyndFeedInfo(URL feedUrl, String urlStr,
   CloseableHttpResponse response, SyndFeed feed, int statusCode)
   throws MalformedURLException {
  SyndFeedInfo syndFeedInfo;
  syndFeedInfo = new SyndFeedInfo();

  // this may be different to feedURL because of 3XX redirects
  syndFeedInfo.setUrl(new URL(urlStr));
  syndFeedInfo.setId(feedUrl.toString());

  Header imHeader = response.getFirstHeader("IM");
  if (imHeader != null && imHeader.getValue().indexOf("feed") >= 0
    && isUsingDeltaEncoding()) {
   FeedFetcherCache cache = feedInfoCache;
   if (cache != null && statusCode == 226) {
    // client is setup to use http delta encoding and the server
    // supports it and has returned a delta encoded response
    // This response only includes new items
    SyndFeedInfo cachedInfo = cache.getFeedInfo(feedUrl);
    if (cachedInfo != null) {
     SyndFeed cachedFeed = cachedInfo.getSyndFeed();

     // set the new feed to be the orginal feed plus the new
     // items
     feed = combineFeeds(cachedFeed, feed);
    }
   }
  }

  Header lastModifiedHeader = response.getFirstHeader("Last-Modified");
  if (lastModifiedHeader != null) {
   syndFeedInfo.setLastModified(lastModifiedHeader.getValue());
  }

  Header eTagHeader = response.getFirstHeader("ETag");
  if (eTagHeader != null) {
   syndFeedInfo.setETag(eTagHeader.getValue());
  }

  syndFeedInfo.setSyndFeed(feed);

  return syndFeedInfo;
 }

 public synchronized void setFeedInfoCache(FeedFetcherCache feedInfoCache) {
  this.feedInfoCache = feedInfoCache;
 }

 public synchronized void setHttpClientMethodCallback(
   HttpClientMethodCallbackIntf httpClientMethodCallback) {
  this.httpClientMethodCallback = httpClientMethodCallback;
 }
}

Putting the things together

The configuration for the httpclient:

@Configuration
public class HttpConfig {

  private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 100;
 private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
 private static final int DEFAULT_READ_TIMEOUT_MILLISECONDS = (60 * 1000);

  @Bean
 public CloseableHttpClient httpClient() {
  PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
  connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
  connectionManager
    .setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);
  connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
    "blogspot.com")), 20);
  RequestConfig config = RequestConfig.custom()
    .setConnectTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS).build();

   CloseableHttpClient defaultHttpClient = HttpClientBuilder.create()
    .setConnectionManager(connectionManager)
    .setDefaultRequestConfig(config).build();
  return defaultHttpClient;
 }
}

for the RSS feed fetcher:

import javax.inject.Inject;
import org.apache.http.impl.client.CloseableHttpClient;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.hangar2.syndic.HttpComponentsFeedFetcher;
import com.streaming.syndic.SpringFeedFetcherCache;
import com.sun.syndication.fetcher.FeedFetcher;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;

@Configuration
public class SyndicConfig {

  @Inject
 private CacheManager cacheManager;

  @Inject
 private CloseableHttpClient httpClient;

  @Bean
 public FeedFetcher feedFetcher() {
  HttpComponentsFeedFetcher httpFeedFetcher = new HttpComponentsFeedFetcher(
    httpClient);
  httpFeedFetcher.setFeedInfoCache(feedInfoCache());
  httpFeedFetcher.setUserAgent("Bot");
  return httpFeedFetcher;
 }

  @Bean
 public FeedFetcherCache feedInfoCache() {
  return new SpringFeedFetcherCache("rss", cacheManager);
 }
}

and for the cache:

import javax.inject.Inject;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cache.ehcache.EhCacheCacheManager;
import org.springframework.cache.ehcache.EhCacheManagerFactoryBean;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.ResourceLoader;

@Configuration
@EnableCaching(mode = AdviceMode.ASPECTJ)
public class CacheConfig {

  @Configuration
 @Profile("!test")
 static class Default implements CachingConfigurer {

   @Inject
  private ResourceLoader resourceLoader;

   @Bean
  public KeyGenerator keyGenerator() {
   return new ReflectionBasedKeyGenerator();
  }

   @Bean
  public CacheManager cacheManager() {
   EhCacheCacheManager ehCacheCacheManager = new EhCacheCacheManager();
   try {
    ehCacheCacheManager.setCacheManager(ehcacheCacheManager()
      .getObject());
   } catch (Exception e) {
    throw new IllegalStateException(
      "Failed to create an EhCacheManagerFactoryBean", e);
   }
   return ehCacheCacheManager;
  }

   @Bean
  public EhCacheManagerFactoryBean ehcacheCacheManager() throws Exception {
   EhCacheManagerFactoryBean bean = new EhCacheManagerFactoryBean();
   bean.setShared(true);
   bean.setConfigLocation(resourceLoader
     .getResource("classpath:ehcache.xml"));
   return bean;
  }
 }

  @Configuration
 @Profile(Profiles.TEST)
 static class Test implements CachingConfigurer {

   @Bean
  public KeyGenerator keyGenerator() {
   return new ReflectionBasedKeyGenerator();
  }

   @Bean
  public CacheManager cacheManager() {
   return new ConcurrentMapCacheManager();
  }
 }

}

Related Posts

Leave a comment

About privacy:

This site uses Akismet to reduce spam. Learn how your comment data is processed.