Friday, November 05, 2010

Cassandra DAO with Pelops

Last week, I shared some code that provides CRUD access to a Cassandra datastore. As you can see from the code in there, the Cassandra Java code is a bit verbose - its probably not that big a deal if you are dealing with a small set of Column Families, since the verbose part would be hidden away in DAOs such as the one shown. But more code means more time to write it, more scope for errors, and more (and a harder) time debugging, so I decided to check out the various wrapper APIs that are used by people developing with Cassandra. Two of these wrapper APIs seem to be quite popular - Hector and Pelops.

Hector was the first, and appears to be more widely used, going by the coverage I found for it. From my (rather cursory) look at Hector, it looks to be quite a thin layer on top of the Cassandra Java API. Pelops is relatively new and aims to provide a "simple and beautiful" API. It provides just three abstractions - Selector (for selecting records), Mutator (for updating and inserting records) and KeyDeletor (for deleting records) - the Pelops link above has a 5-minute Pelops Tutorial that explains how to use them. Each of these objects have a large number of methods to do specific things, so if you are using an IDE (who isn't these days?) there is almost no learning curve. You still need to drop down into Cassandra's API to iterate through columns, for example, but I think Pelops strikes a nice balance between abstraction and conciseness.

So as you may have guessed, I decided on Pelops as my Cassandra super-API of choice, and as a first step, decided to convert my DAO code to use the Pelops API. However, in order to use the Pelops version with the latest bugfixes, I had to upgrade to using Cassandra 0.7.0-beta2, since there have been non-backward compatible API changes between Cassandra 0.6 and 0.7. Its not as bad as it sounds, though, because the 0.7-beta2 version is stable (both based on general Internet consensus and my limited testing).

There is a Pelops version that works against Cassandra 0.6 but it is no longer actively maintained. In fact, even the Pelops version that I chose (0.907-0.7.0-SNAPSHOT) is also not actively maintained, the Pelops team only promises bugfixes to Pelops version 0.909-0.7.0-SNAPSHOT, which works against Cassandra version 0.7.0-beta3.

Installing Cassandra 0.7-beta2 is similar to installing Cassandra 0.6.6. Just download and explode it and follow the instructions in the README.txt file. With 0.7, it is now possible to create Keyspaces and Column Families, so no more having to mess with conf/storage-conf.xml (in fact, the conf/storage-conf.xml is now gone, there is a conf/cassandra.yaml instead). After starting cassandra with "bin/cassandra -f", we can open up cassandra-cli (bin/cassandra-cli) in another terminal, and issue the following commands to create our Keyspace and our "Urls" Super Column Family:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
[default@unknown] connect localhost/9160
[default@unknown] create keyspace MyKeyspace with replication_factor=1
[default@MyKeyspace] use MyKeyspace
[default@MyKeyspace] create column family Urls with column_type="Super" \
    and comparator="UTF8Type"
[default@MyKeyspace] describe keyspace MyKeyspace
Keyspace: MyKeyspace
  Replication Factor: 1
  Column Families:
    Column Family Name: Urls {
      Column Family Type: Super
      Column Sorted By: org.apache.cassandra.db.marshal.UTF8Type
    }
[default@MyKeyspace] quit

Pelops provides the Maven incantations to put in the pom.xml file on their Download page, so once you add these in, running a mvn eclipse:eclipse will add these to your project classpath.

The code for UrlBeanCasssandraDao.java, now using Pelops, is shown below. If you compare this with the code in my previous post, you will appreciate how much shorter and readable it is.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
// Source: src/main/java/com/mycompany/cassandra/db/UrlBeanCassandraDao.java
package com.mycompany.cassandra.db;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.scale7.cassandra.pelops.Cluster;
import org.scale7.cassandra.pelops.Mutator;
import org.scale7.cassandra.pelops.Pelops;
import org.scale7.cassandra.pelops.RowDeletor;
import org.scale7.cassandra.pelops.Selector;

/**
 * Provides CRUD operations on a Cassandra Database.
 */
public class UrlBeanCassandraDao {

  private static final String CASSANDRA_HOST = "localhost";
  private static final int CASSANDRA_PORT = 9160;
  private static final String CASSANDRA_KEYSPACE = "MyKeyspace";
  private static final String CASSANDRA_POOL = "Main";
  private static final ConsistencyLevel CL_ONE = ConsistencyLevel.ONE;
  private static final SimpleDateFormat DATE_FORMATTER = 
    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  
  public void init() throws Exception {
    Pelops.addPool(CASSANDRA_POOL, 
      new Cluster(new String[] {CASSANDRA_HOST}, CASSANDRA_PORT), 
      CASSANDRA_KEYSPACE);
  }
  
  public void destroy() {
    Pelops.shutdown();
  }

  public void create(UrlBean urlBean) throws Exception {
    Mutator mutator = Pelops.createMutator(CASSANDRA_POOL);
    // persist url master data under a key called "info"
    mutator.writeSubColumns("Urls", urlBean.getUrl(), "info", 
      mutator.newColumnList(
      mutator.newColumn("url", urlBean.getUrl()),
      mutator.newColumn("created", DATE_FORMATTER.format(urlBean.getCreated())),
      mutator.newColumn("modified", DATE_FORMATTER.format(urlBean.getModified())),
      mutator.newColumn("status", urlBean.getCrawlStatus())));
    // persist score (url detail) data under a key called "scores"
    List<Column> scoreCols = new ArrayList<Column>();
    for (ScoreBean scoreBean : urlBean.getScores()) {
      scoreCols.add(mutator.newColumn(
        scoreBean.getImuid(), String.valueOf(scoreBean.getScore())));
    }
    mutator.writeSubColumns("Urls", urlBean.getUrl(), "scores", scoreCols); 
    mutator.execute(CL_ONE);
  }

  public UrlBean retrieve(String url) throws Exception {
    Selector selector = Pelops.createSelector(CASSANDRA_POOL);
    UrlBean urlBean = new UrlBean();
    // get the info super column and populate
    SuperColumn info = selector.getSuperColumnFromRow(
      "Urls", url, "info", CL_ONE);
    for (Column col : info.columns) {
      String colname = new String(col.getName());
      String colvalue = new String(col.getValue());
      if ("url".equals(colname)) {
        urlBean.setUrl(colvalue);
      } else if ("created".equals(colname)) {
        urlBean.setCreated(DATE_FORMATTER.parse(colvalue));
      } else if ("modified".equals(colname)) {
        urlBean.setModified(DATE_FORMATTER.parse(colvalue));
      } else if ("status".equals(colname)) {
        urlBean.setCrawlStatus(colvalue);
      }
    }
    // get the scores super column and populate
    SuperColumn scores = selector.getSuperColumnFromRow(
      "Urls", url, "scores", CL_ONE);
    for (Column col : scores.columns) {
      ScoreBean scoreBean = new ScoreBean();
      scoreBean.setImuid(new String(col.getName()));
      scoreBean.setScore(Float.valueOf(new String(col.getValue())));
      urlBean.addScore(scoreBean);
    }
    return urlBean;
  }

  public void update(UrlBean urlBean) throws Exception {
    delete(urlBean);
    create(urlBean);
  }
  
  public void delete(UrlBean urlBean) throws Exception {
    RowDeletor deletor = Pelops.createRowDeletor(CASSANDRA_POOL);
    deletor.deleteRow("Urls", urlBean.getUrl(), CL_ONE);
  }
}

The new class using Pelops is signature compatible with the old one, so no changes were required to reload the data and to run the unit test to verify the select.

Be the first to comment. Comments are moderated to prevent spam.