Saturday, October 30, 2010

Modeling relational data with Cassandra

We've been using Cassandra in our crawling/indexing system for a while now, but so far almost all I knew about it is that its there. I've been meaning to take a look at it for a while, but never had a need to, since the schema was already built and wrapper classes provided for accessing the data. Even now, as a consumer and producer of crawl/index data which ultimately ends up in Cassandra, I still don't have a need to check it out in depth. But I am the curious type, and figured that understanding it may help me design other systems where Cassandra's asynchronous writes, high availability and incremental scalability would come in useful.

Since I learn best by doing, I decided to take a simple example of a subsystem where we store crawl data in a pair of RDBMS tables, and convert it to a Cassandra data model. I also wanted to write some code to migrate the data off the RDBMS tables into Cassandra, and provide some rudimentary functionality to retrieve a record by primary key - basically explore a bit of Cassandra's Java client API.

The RDBMS Model

The RDBMS model consists of two tables set up as master-detail. The master table contains URL level information, and the detail table contains multiple rows per URL, each row corresponding to a (nodeId,score) pair - the nodeIds represent concepts from our taxonomy that match the content and the score correspond roughly to the number of times this concept was found in the URL's content. The schema for the two tables are shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Urls: {
  key: url_id,
  url,
  status,
  created,
  modified
};
Url_Map: {
  key: {
    url_id, 
    node_id
  },
  score,
  created
};

Cassandra Model

Cassandra's building blocks are analogous to RDBMS building blocks, but slightly different - the obvious difference is that RDBMS is row-based and Cassandra is column-based. There are many very good references available on the Internet, including this one which I found very helpful, so I will just briefly describe my understanding here. If you know nothing about Cassandra or column oriented databases, you should read up.

Just like a RDBMS server can host multiple databases, a Cassandra server can host multiple Keyspaces. Just like each RDBMS database can store multiple tables (and nested tables in Oracle), Cassandra keyspaces can store Column Families and Super Column Families. Each Column Family contains a set of key-value pairs, where the key is like a primary key and the value is a set of Columns. A Column is a collection of key-value pairs (triples actually if you count the timestamp). The key corresponds to the column name in a RDBMS and the value is the column value. A Super Column Family contains SuperColumns, which consist of a key-value pair, but the value is a Column. Because a column is a key-value pair, there can be different number of these per Column set (or row in RDBMS).

One thing to note is that a Column Family can only contain Columns, and a Super Column Family can only contain Super Columns. Also, Cassandra schema design is driven by the queries it will support, so denormalization is encouraged if it makes sense from a query point of view. With this in mind, I decided to model this data as a single Cassandra Super Column Family as shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Urls: { // Super Column Family
  key: url {
    info: {
      created : '2010-01-01',
      modified : '2010-01-01',
      status : 'crawled',
    },
    scores : {
      123456 : 123.4,  // nodeId : score
      ...
    }
  }
};

Cassandra Installation

I installed Cassandra 0.6.6 (the GA version at the time of writing this). To add the Super Column Family definition, you have to add the following definition to the conf/storage.xml file (under the default Keyspace declaration).

1
2
3
4
5
6
7
    <Keyspace Name="MyKeyspace">
      <ColumnFamily Name="Urls" CompareWith="UTF8Type"
        ColumnType="Super" CompareSubColumnsWith="UTF8Type"/>
      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
      <ReplicationFactor>1</ReplicationFactor>
      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
    </Keyspace>

To start the Cassandra server, you need to issue the following command on the Unix prompt:

1
2
sujit@cyclone:~$ cd $CASSANDRA_HOME
sujit@cyclone:cassandra-0.6.6$ bin/cassandra -f

This will start Cassandra in foreground, listening on port 9160.

Cassandra DAO

The DAO code is shown below. It provides methods to do CRUD operations on the Urls Super Column Family. I was just going for a broad understanding of the Cassandra Java API here, in a real application there will be additional methods to do application specific things.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Source: src/main/java/com/mycompany/cassandra/db/UrlBeanCassandraDao.java
package com.mycompany.cassandra.db;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

/**
 * Provides CRUD operations on a Cassandra Database.
 */
public class UrlBeanCassandraDao {

  private static final String CASSANDRA_HOST = "localhost";
  private static final int CASSANDRA_PORT = 9160;
  private static final String CASSANDRA_KEYSPACE = "MyKeyspace";
  private static final ConsistencyLevel CL_ONE = ConsistencyLevel.ONE;
  private static final SimpleDateFormat DATE_FORMATTER = 
    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  
  private TTransport transport;
  private Cassandra.Client client;
  
  public UrlBeanCassandraDao() {
    transport = new TSocket(CASSANDRA_HOST, CASSANDRA_PORT);
    TProtocol protocol = new TBinaryProtocol(transport);
    client = new Cassandra.Client(protocol);
  }

  public void init() throws Exception {
    transport.open();
  }
  
  public void destroy() {
    transport.close();
  }

  public void create(UrlBean urlBean) throws Exception {
    long now = System.nanoTime();
    // persist the URL master data under a key called "info"
    List<Mutation> mutations = new ArrayList<Mutation>();
    mutations.add(getInfoMutation("info", urlBean, now));
    if (urlBean.getScores().size() > 0) {
      mutations.add(getScoreMutation("scores", urlBean.getScores(), now));
    }
    Map<String,List<Mutation>> row = new HashMap<String,List<Mutation>>();
    row.put("Urls", mutations);
    Map<String,Map<String,List<Mutation>>> data = 
      new HashMap<String,Map<String,List<Mutation>>>();
    data.put(urlBean.getUrl(), row);
    client.batch_mutate(CASSANDRA_KEYSPACE, data, CL_ONE);
  }

  public List<UrlBean> retrieve(String url) throws Exception {
    SlicePredicate predicate = new SlicePredicate();
    SliceRange range = new SliceRange();
    range.setStart(new byte[0]);
    range.setFinish(new byte[0]);
    predicate.setSlice_range(range);
    ColumnParent columnFamily = new ColumnParent("Urls");
    KeyRange keyrange = new KeyRange();
    keyrange.start_key = url;
    keyrange.end_key = url;
    List<UrlBean> urlBeans = new ArrayList<UrlBean>();
    List<KeySlice> slices = client.get_range_slices(
      CASSANDRA_KEYSPACE, columnFamily, predicate, keyrange, CL_ONE);
    for (KeySlice slice : slices) {
      List<ColumnOrSuperColumn> coscs = slice.columns;
      UrlBean urlBean = new UrlBean();
      urlBean.setUrl(slice.key);
      for (ColumnOrSuperColumn cosc : coscs) {
        SuperColumn scol = cosc.super_column;
        String scolName = new String(scol.name, "UTF-8");
        if ("info".equals(scolName)) {
          List<Column> cols = scol.columns;
          for (Column col : cols) {
            String colName = new String(col.name, "UTF-8");
            if ("created".equals(colName)) {
              urlBean.setCreated(DATE_FORMATTER.parse(new String(col.value, "UTF-8")));
            } else if ("modified".equals(colName)) {
              urlBean.setModified(DATE_FORMATTER.parse(new String(col.value, "UTF-8")));
            } else if ("crawStatus".equals(colName)) {
              urlBean.setCrawlStatus(new String(col.value, "UTF-8"));
            }
          }
        } else if ("scores".equals(scolName)) {
          List<Column> cols = scol.columns;
          for (Column col : cols) {
            ScoreBean scoreBean = new ScoreBean();
            scoreBean.setImuid(new String(col.name, "UTF-8"));
            scoreBean.setScore(Float.valueOf(new String(col.value, "UTF-8")));
            urlBean.addScore(scoreBean);
          }
        }
      }
      urlBeans.add(urlBean);
    }
    return urlBeans;
  }

  public void update(UrlBean urlBean) throws Exception {
    delete(urlBean);
    create(urlBean);
  }
  
  public void delete(UrlBean urlBean) throws Exception {
    SlicePredicate predicate = new SlicePredicate();
    List<byte[]> cols = new ArrayList<byte[]>();
    cols.add("info".getBytes());
    cols.add("scores".getBytes());
    predicate.column_names = cols;
    Deletion deletion = new Deletion();
    deletion.predicate = predicate;
    deletion.timestamp = System.nanoTime();
    Mutation mutation = new Mutation();
    mutation.deletion = deletion;
    List<Mutation> mutations = new ArrayList<Mutation>();
    mutations.add(mutation);
    Map<String,List<Mutation>> row = new HashMap<String,List<Mutation>>();
    row.put("Urls", mutations);
    Map<String,Map<String,List<Mutation>>> data = 
      new HashMap<String,Map<String,List<Mutation>>>();
    data.put("info", row);
    data.put("scores", row);
    client.batch_mutate(CASSANDRA_KEYSPACE, data, CL_ONE);
  }
  
  private Mutation getInfoMutation(String name, UrlBean urlBean, long timestamp) {
    List<Column> cols = new ArrayList<Column>();
    cols.add(getColumn("url", urlBean.getUrl(), timestamp));
    cols.add(getColumn("created", DATE_FORMATTER.format(urlBean.getCreated()), 
      timestamp));
    cols.add(getColumn("modified", DATE_FORMATTER.format(urlBean.getModified()), 
      timestamp));
    cols.add(getColumn("status", urlBean.getCrawlStatus(), timestamp));
    SuperColumn scol = new SuperColumn("info".getBytes(), cols);
    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
    cosc.super_column = scol;
    Mutation mutation = new Mutation();
    mutation.column_or_supercolumn = cosc;
    return mutation;
  }

  private Mutation getScoreMutation(String name, List<ScoreBean> scores, 
      long timestamp) {
    List<Column> cols = new ArrayList<Column>();
    for (ScoreBean score : scores) {
      cols.add(getColumn(score.getImuid(), String.valueOf(score.getScore()), 
        timestamp));
    }
    SuperColumn scol = new SuperColumn("scores".getBytes(), cols);
    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
    cosc.super_column = scol;
    Mutation mutation = new Mutation();
    mutation.column_or_supercolumn = cosc;
    return mutation;
  }

  private Column getColumn(String name, String value, long timestamp) {
    return new Column(name.getBytes(), value.getBytes(), timestamp);
  }
}

As you can see, its pretty verbose, so there are already a number of initiatives to provide wrappers that expose simpler method signatures. Its also quite simple to write your own.

You will notice references to UrlBean and ScoreBean in the above code. These are just plain POJOs to hold the data. Here they are, with getters and setters removed for brevity.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Source: src/main/java/com/mycompany/cassandra/db/UrlBean.java
package com.mycompany.cassandra.db;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class UrlBean {

  private String url;
  private Date created;
  private Date modified;
  private String crawlStatus;
  private List<ScoreBean> scores = new ArrayList<ScoreBean>();
  ... 
  public void addScore(ScoreBean score) {
    scores.add(score);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Source: src/main/java/com/mycompany/cassandra/db/ScoreBean.java
package com.mycompany.cassandra.db;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public class ScoreBean {

  private String nodeId;
  private Float score;
  ...  
}

Migration Code

To migrate code off the two RDBMS tables, I wrote a little loader which loops through the tables and builds the UrlBean, then calls the create() method on the DAO. Here is the code for the loader - nothing to it, really, just some simple JDBC code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Source: src/main/java/com/mycompany/cassandra/db/CassandraDbLoader.java
package com.mycompany.cassandra.db;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class CassandraDbLoader {

  private static final String DB_URL = "...";
  private static final String DB_USER = "...";
  private static final String DB_PASS = "...";

  private UrlBeanCassandraDao urlBeanCassandraDao;
  
  public void setCassandraCrudService(UrlBeanCassandraDao urlBeanCassandraDao) {
    this.urlBeanCassandraDao = urlBeanCassandraDao;
  }
  
  public void load() throws Exception {
    // database
    Class.forName("oracle.jdbc.OracleDriver");
    Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS);
    PreparedStatement psMaster = conn.prepareStatement(
      "select url_id, url, status, created, modified from urls");
    PreparedStatement psDetail = conn.prepareStatement(
      "select node_id, score from url_map where url_id = ?");
    ResultSet rsMaster = null;
    // start cassandra client
    urlBeanCassandraDao.init();
    try {
      rsMaster = psMaster.executeQuery();
      while (rsMaster.next()) {
        String url = rsMaster.getString("url");
        System.out.println("Migrating URL: " + url);
        UrlBean urlBean = new UrlBean();
        urlBean.setUrl(url);
        urlBean.setCreated(rsMaster.getDate("created"));
        urlBean.setModified(rsMaster.getDate("modified"));
        urlBean.setCrawlStatus(rsMaster.getString("status"));
        int id = rsMaster.getBigDecimal("url_id").intValue();
        loadScores(psDetail, urlBean, id);
        urlBeanCassandraDao.create(urlBean);
      }
    } finally {
      if (rsMaster != null) {
        try { rsMaster.close(); } catch (Exception e) {}
        try { psMaster.close(); } catch (Exception e) {}
        try { psDetail.close(); } catch (Exception e) {}
        try { urlBeanCassandraDao.destroy(); } catch (Exception e) {}
      }
    }
  }

  private void loadScores(PreparedStatement psDetail, UrlBean urlBean, int id) 
      throws Exception {
    ResultSet rsDetail = null;
    try {
      psDetail.setBigDecimal(1, new BigDecimal(id));
      rsDetail = psDetail.executeQuery();
      while (rsDetail.next()) {
        ScoreBean scoreBean = new ScoreBean();
        scoreBean.setImuid(rsDetail.getString("node_id"));
        float score = rsDetail.getFloat("score");
        scoreBean.setScore(score);
        urlBean.addScore(scoreBean);
      }
    } finally {
      if (rsDetail != null) {
        try { rsDetail.close(); } catch (Exception e) {}
      }
    }
  }

  public static void main(String[] argv) throws Exception {
    UrlBeanCassandraDao dao = new UrlBeanCassandraDao();
    CassandraDbLoader loader = new CassandraDbLoader();
    loader.setCassandraCrudService(dao);
    loader.load();
  }
}

And to test that these made it in okay, I wrote a little JUnit test that calls the retrieve() method with a given URL, and it comes back fine.

If you already know about Cassandra or other column oriented databases, this is probably pretty basic stuff. However, this exercise has given me some understanding of Cassandra and its Java API, and I hope to use it in more detail at some point in the future.

2 comments (moderated to prevent spam):

Anonymous said...

Very nice post. I am new to Cassandra. Your post helped me a lot.

Sujit Pal said...

Thanks, I had forgotten about this :-). This was some time after I read the O'Reilly Cassandra book and figured that this would be a good way to learn it. The post uses the thrift API directly, which was a bit of a turn-off for me since its kind of verbose. I have since experimented with Pelops and Hector, and I like the Hector template API a lot. Or if you want an ORM, you may want to check out GORA.