Sunday, January 09, 2011

Feed Aggregation using ROME Fetcher and Nutch 2.0 - Part I

With libraries such as ROME, it is possible for an application to parse RSS and Atom feeds without having to worry about the actual feed format itself. The ROME Fetcher subproject provides a FeedFetcher class that can convert a remote URL into a ROME SyndFeed object. David Herron has built a (the only so far?) ROME based Feed Aggregator using Groovy, inspired by the PlanetPlanet feed reader.

My goals are much less ambitious, all I need is a way to aggregate news from various newspaper feeds, and make that aggregate searchable. My aggregator is written in Java using ROME and ROME Fetcher. It reads the list of sites from an OPML file (an idea I borrowed from David Herron's aggregator), downloads and parses each page, and writes the feed entry information (the link URL and its metadata such as title and description) into a flat file. This flat file is then injected into HBase using Nutch 2.0, and we go through a single cycle of generate, fetch and parse. At this point, we are ready to index - unfortunately, Nutch 2.0 only supports indexing to Solr, so I need a running Solr installation, which I don't have as yet.

The list of sites comes from the NewsRack project - they are doing interesting things with a combined concept/rule based approach to classification. They were kind enough to post a list of RSS feeds from various Indian newspapers that they use as part of their work. Here is what my sites.xml file looks like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<?xml version="1.0"?>
<opml version="1.1">
  <head>
    <title>Feed Aggregator Test</title>
    <ownerName>Sujit Pal</ownerName>
    <description>India Newspapers Feed (Source:newsrack.in)</description>
  </head>
  </meta>
  <body>
    <outline text="Business Today" 
      siteurl="http://businesstoday.digitaltoday.in" 
      xmlUrl="http://businesstoday.digitaltoday.in/rssforallsec.php?issueid=14"/>
    <outline text="Business World" 
      siteurl="http://www.businessworld.in" 
      xmlUrl="http://www.businessworld.in/component/option,com_rss/feed,RSS2.0/no_html,1/"/>
    <!-- more feed sites ... -->
  </body>
</opml>

The code for the aggregator, as expected, is quite simple. Parse out the configuration file above, and hit each of the URLs in it, then parse the returned page to retrieve the set of (link, title, description, pubDate) tuples from it, and write them out to a flat file in the structured format described below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// Source: src/main/java/com/mycompany/myagg/fetcher/FeedAggregator.java
package com.mycompany.myagg.fetcher;

import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.commons.httpclient.ConnectTimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;

import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.fetcher.FetcherException;
import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
import com.sun.syndication.fetcher.impl.HttpClientFeedFetcher;
import com.sun.syndication.io.FeedException;

/**
 * Standalone Feed crawler that can be launched as a standalone command
 * (usually from cron). Configured using an OPML file that lists the feed
 * URLs to be crawled. Links on each feed along with their metadata are
 * written to a structured flat file to inject into Nutch.
 */
public class FeedAggregator {

  private final Log log = LogFactory.getLog(getClass());
  
  private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
  private static final int DEFAULT_READ_TIMEOUT = 5000;
  private static final SimpleDateFormat ISO_DATE_FORMATTER =
    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  
  private String configFile;
  private String seedFile;
  private int connectTimeout;
  private int readTimeout;
  
  public void setConfigFile(String configFile) {
    this.configFile = configFile;
  }
  
  public void setSeedFile(String seedFile) {
    this.seedFile = seedFile;
  }
  
  public void setConnectTimeout(int connectTimeout) {
    this.connectTimeout = connectTimeout;
  }

  public void setReadTimeout(int readTimeout) {
    this.readTimeout = readTimeout;
  }

  @SuppressWarnings("unchecked")
  public void aggregate() throws Exception {
    AggregatorConfig config = parseConfig();
    HttpClientFeedFetcher feedFetcher = buildFeedFetcher(config);
    PrintWriter seedWriter = new PrintWriter(new FileWriter(seedFile), true);
    for (SiteInfo site : config.sites) {
      try {
        SyndFeed feed = feedFetcher.retrieveFeed(new URL(site.feedUrl));
        if (feed == null) {
          continue;
        }
        List<SyndEntry> entries = feed.getEntries();
        for (SyndEntry entry : entries) {
          String title = entry.getTitle();
          String link = entry.getLink();
          String description = entry.getDescription() != null ?
            entry.getDescription().getValue() : "";
          Date pubDate = entry.getPublishedDate();
          store(seedWriter, title, link, description, pubDate);
        }
      } catch (FetcherException e) {
        log.info("Fetcher Exception for " + site.publisher, e);
        continue;
      } catch (FeedException e) {
        log.info("Feed Exception for " + site.publisher, e);
        continue;
      } catch (ConnectTimeoutException e) {
        log.info("Connect Timeout Exception for " + site.publisher, e);
        continue;
      }
    }
    seedWriter.flush();
    seedWriter.close();
  }
  
  private HttpClientFeedFetcher buildFeedFetcher(AggregatorConfig config) {
    HttpClientFeedFetcher feedFetcher = new HttpClientFeedFetcher();
    feedFetcher.setUserAgent(config.title + "(" + config.owner + ")");
    feedFetcher.setConnectTimeout(connectTimeout == 0 ? 
      DEFAULT_CONNECT_TIMEOUT : connectTimeout);
    feedFetcher.setReadTimeout(readTimeout == 0 ? 
      DEFAULT_READ_TIMEOUT : readTimeout);
    feedFetcher.setFeedInfoCache(HashMapFeedInfoCache.getInstance());
    return feedFetcher;
  }

  private void store(PrintWriter seedWriter, String title, String link, 
      String description, Date pubDate) throws Exception {
    if (pubDate == null) {
      pubDate = new Date();
    }
    seedWriter.println(StringUtils.join(new String[] {
      link,
      StringUtils.join(new String[] {"u_idx", "news"}, "="),
      StringUtils.join(new String[] {
        "u_title",
        clean(title)
      }, "="),
      StringUtils.join(new String[] {
        "u_summary",
        clean(description)
      }, "="),
      StringUtils.join(new String[] {
        "u_pubdate", 
        ISO_DATE_FORMATTER.format(pubDate)
      }, "=")
    }, "\t"));
  }
  
  private String clean(String str) {
    if (StringUtils.isEmpty(str)) {
      return "";
    }
    return str.replaceAll("\\s+", " ");
  }

  @SuppressWarnings("unchecked")
  protected AggregatorConfig parseConfig() throws Exception {
    AggregatorConfig config = new AggregatorConfig();
    SAXBuilder builder = new SAXBuilder();
    Document doc = builder.build(new File(configFile));
    Element opml = doc.getRootElement();
    config.title = opml.getChild("head").getChildTextTrim("title");
    config.owner = opml.getChild("head").getChildTextTrim("ownerName");
    config.description = opml.getChild("head").getChildTextTrim("description");
    List<Element> outlines = opml.getChild("body").getChildren("outline");
    for (Element outline : outlines) {
      SiteInfo siteInfo = new SiteInfo();
      siteInfo.publisher = outline.getAttributeValue("text");
      siteInfo.siteUrl = outline.getAttributeValue("siteurl");
      siteInfo.feedUrl = outline.getAttributeValue("xmlUrl");
      config.sites.add(siteInfo);
    }
    return config;
  }
  
  protected class AggregatorConfig {
    public String title;
    public String owner;
    public String description;
    public List<SiteInfo> sites = new ArrayList<SiteInfo>();
  }
  
  protected class SiteInfo {
    public String publisher;
    public String siteUrl;
    public String feedUrl;
  }
  
  public static void main(String[] args) {
    if (args.length != 1) {
      System.out.println(
        "Usage: FeedAggregator config_file.xml connect_timeout " + 
        "read_timeout seed_file.txt");
      System.exit(-1);
    }
    try {
      FeedAggregator aggregator = new FeedAggregator();
      aggregator.setConfigFile(args[0]);
      aggregator.setConnectTimeout(Integer.valueOf(args[1]));
      aggregator.setReadTimeout(Integer.valueOf(args[2]));
      aggregator.setSeedFile(args[3]);
      aggregator.aggregate();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Running this with the appropriate parameters produces a flat file, a line of which is shown below. This is a tab-separated list of name-value pairs which has been broken into multiple lines below for readability. The first part is the familiar seed URL. The rest of the fields are seed URL metadata name-value pairs. Here we choose to add the index name (assuming we will be storing multiple content types in our HBase table), the title, summary and publication date, which should be used to populate the corresponding index column instead of trying to parse these values from the HTML.

1
2
3
4
5
6
7
8
9
http://www.indiatogether.org/2011/jan/env-brahm.htm\
 u_idx=news\
 u_title=Mine-ing the Brahmaputra's waters\
 u_summary=India and China make competing plans for the river's \
precious waters, ignoring the functions it already performs - in sustaining \
rich ecosystems, flora and fauna, cultures and a wide range of livelihoods. \
Shripad Dharmadhikary reports.\
 u_pubdate=2011-01-06T16:42:56.623
...

Note that this is new-ish functionality - the Nutch Injector supports adding custom metadata in this way since version 1.1, so this feature is usable with a file based crawling system also. In our HBase backed Nutch, the metadata live inside the "mtdt" column family after injection.

We then go through one cycle of generate, fetch and index, since we want to crawl only the pages referenced in the feed XMLs (ie depth 0). The end product of this exercise is, of course, an index with the metadata in it. Nutch 2.0 has made the decision to go with Solr as its search front end, so the only implementation of an indexer job is the SolrIndexerJob, which needs to be pointed at a running Solr installation. I hope to set up Solr and try this part out (and write about it) next week.

2 comments (moderated to prevent spam):

Anonymous said...

Dumb question... how do you avoid writing the same items each time you run the program?

Sujit Pal said...

Since its going to Nutch, we depend on Nutch's mechanism to update in place records that are already present in its db backend.