Saturday, December 07, 2013

Bare Metal Recommender Development with Apache Mahout


I am almost done with the Introduction to Recommender Systems course at Coursera (still have to give the final exam). The course uses Lenskit, a Java framework for developing recommender systems, as a teaching tool. While I am reasonably familiar with Lenskit at this point (having been through 6 Programming Assignments using it), I figured it would be interesting to see if I could build the recommenders described in the assignments using Apache Mahout, the "other" recommender framework I have been trying to learn for a while. This post describes Scala implementations of 4 of these 5 recommenders using Apache Mahout.

Both Lenskit and Mahout offer abstractions that can potentially reduces recommender development to plug-and-play. Common recommenders can be built by simply wiring up the appropriate classes. More specialized recommenders can be built by subclassing some of the provided interfaces or classes, then wiring them up. However, this requires you to know how the different components interact with each other, which can vary by type of recommender. A common thread I noticed among all the recommenders I've built in the course so far is that its basically just going Fruit Ninja on (ie, slicing and dicing) a ratings matrix (and optionally some associated data such as tags, etc). Modeling recommenders as a series of operations against the rating matrix seemed to me easier to understand and implement, as well as more extensible, so that is the approach I use here.

For each recommender, I describe the intent and the algorithm as best as I can and follow it up with my implementation. My test cases are based on the example correct answer(s) provided in the instructions and are modeled as JUnit tests. I also use the same data as provided for the assignments (I needed to match up the answers to verify that my implementation is correct). The data is movie ratings similar to the MovieLens dataset. I have used Apache Mahout's DataModel functionality to load up the data and get back different views of it, as well Mahout Math's linear algebra classes for Matrix operations.

Non-Personalized Recommender


This recommender uses association rules to recommend items (movies in our case) similar to one specified. The user does not figure anywhere in this recommender, hence it is non-personalized. A possible use case could be to recommend other items a user may find interesting while he is interacting with an item. It does so by finding items which have been co-rated with the specified item. Although the data provides the ratings on a 5 point scale, for the purposes of this recommender, we consider it 0 (if no rating provided) or 1 (if rating provided). We score the similarity between two items as follows:


               |X ∧ Y|
    s(x,y) = -----------                  ... simple formula
                 |X|

                |X ∧ Y|
               ---------
                  |X|
    s(x,y) = ---------------              ... advanced formula
                |¬X ∧ Y|
               ----------
                  |¬X|

The recommender code is shown below. The client calls similarMovies() with an itemID and the number of similar items desired, along with the function that should be used (one of basicSimilarity or advancedSimilarity or anything that takes 2 Longs and yields a Double). The similarMovies() method returns a List of (itemID,similarity) tuples to the client.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
// Source: src/main/scala/com/mycompany/mia/recsys/NonPersonalizedRecommender.scala
package com.mycompany.mia.recsys

import java.io.File

import scala.collection.JavaConversions.asScalaIterator

import org.apache.mahout.cf.taste.impl.model.file.FileDataModel

/**
 * Given a movie ID, find top 5 movies like it using the 
 * following association metrics:
 *   basic = p(x AND y) / p(x)
 *   advanced = (p(x AND y) / p(x)) / (p(NOT x AND Y) / p(NOT x))
 */
class NonPersonalizedRecommender(ratingsFile: File) {

  val model = new FileDataModel(ratingsFile)

  /**
   * This is the method that will be called from the client.
   * @param movieID a movieID for which similar movies need
   *                to be found.
   * @param topN    the number of most similar movies to be
   *                returned (minus the movieID itself).
   * @param simfunc a function that returns the similarity 
   *                between two movieIDs.
   * @return a List of (movieID, similarity) tuples for the
   *                topN similar movies to movieID.
   */
  def similarMovies(movieId: Long, topN: Int, 
      simfunc: (Long, Long) => Double): List[(Long,Double)] = {
    val items = model.getItemIDs()
    val similarItems = items.filter(item => item != movieId)
      .map(item => (item.asInstanceOf[Long], 
         simfunc.apply(movieId, item)))
      .toList
      .sortWith((a, b) => a._2 > b._2)
      .slice(0, topN)
    similarItems
  }
  
  /**
   * Models basic association similarity. This function is
   * passed in from the similarMovies() method by client.
   * @param x a movieID.
   * @param y another movieID.
   * @return the similarity score between x and y.
   */
  def basicSimilarity(x: Long, y: Long): Double = {
    val xprefs = binaryRatings(x)
    val yprefs = binaryRatings(y)
    val pXY = numAssociations(xprefs, yprefs).toDouble
    val pX = xprefs.filter(x => x).size.toDouble
    pXY / pX
  }
  
  /**
   * Models advanced association similarity. This function is
   * passed in from the similarMovies() method by client.
   * @param x a movieID.
   * @param y another movieID.
   * @return the similarity score between x and y.
   */
  def advancedSimilarity(x: Long, y: Long): Double = {
    val xprefs = binaryRatings(x)
    val yprefs = binaryRatings(y)
    val notXprefs = xprefs.map(pref => !pref)
    val pXY = numAssociations(xprefs, yprefs).toDouble
    val pX = xprefs.filter(x => x).size.toDouble
    val pNotXY = numAssociations(notXprefs, yprefs).toDouble
    val pNotX = notXprefs.filter(x => x).size.toDouble
    (pXY / pX) / (pNotXY / pNotX)
  }

  /**
   * Given a item (movie ID), converts the preference array of
   * (userID:rating) elements to a Boolean List of true if user
   * has a rating and false if not.
   * @param item the movieID.
   * @return List[Boolean] of "binary" true/false ratings.
   */
  def binaryRatings(item: Long): List[Boolean] = {
    model.getUserIDs()
      .map(user => model.getPreferenceValue(user, item) != null)
      .toList
  }
  
  /**
   * Calculates the number of associations between Boolean lists
   * xs and ys. This differs from intersection in that we only 
   * count if corresponding elements of xs and ys are both true.
   * @param xs a list of binary ratings for all users.
   * @param ys another list of binary ratings for all users.
   * @return a count of where both elements of xs and ys are true.
   */
  def numAssociations(xs: List[Boolean], ys: List[Boolean]): Int = {
    xs.zip(ys).filter(xy => xy._1 && xy._2).size
  }
}

Calling code is structured as a JUnit and is shown below. It basically sets up a repeatable test using the answer hints supplied in the assignment to verify that the recommender works correctly before submitting.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Source: src/test/scala/com/mycompany/mia/recsys/NonPersonalizedRecommenderTest.scala
package com.mycompany.mia.recsys

import java.io.File

import org.junit.Assert
import org.junit.Test

class NonPersonalizedRecommenderTest {

  val testMovieIds = List(11, 121, 8587)
  val simpleResults = Map[Long,List[(Long,Double)]](
    11L   -> List((603,0.96), (1892,0.94), (1891,0.94), 
               (120,0.93), (1894,0.93)),
    121L  -> List((120,0.95), (122,0.95), (603,0.94), 
               (597,0.89), (604,0.88)),
    8587L -> List((603,0.92), (597,0.90), (607,0.87),
               (120,0.86), (13,0.86)))
  val advancedResults = Map[Long,List[(Long,Double)]](
    11L   -> List((1891,5.69), (1892,5.65), (243,5.00),
               (1894,4.72), (2164,4.11)),
    121L  -> List((122,4.74), (120,3.82), (2164,3.40),
               (243,3.26), (1894,3.22)),
    8587L -> List((10020,4.18), (812,4.03), (7443,2.63),
               (9331,2.46), (786,2.39)))
  
  @Test def testBasicSimilarity(): Unit = {
    val npr = new NonPersonalizedRecommender(new File(
      "data/recsys/ratings.csv"))
    testMovieIds.foreach(movieId => {
      val similarMoviesWithBasicSimilarity = npr.similarMovies(
        movieId, 5, npr.basicSimilarity)
      Console.println(movieId + " => " + similarMoviesWithBasicSimilarity)
      Console.println("expected: " + simpleResults(movieId))
      assertEquals(similarMoviesWithBasicSimilarity, simpleResults(movieId))
    })
  }
  
  @Test def testAdvancedSimilarity(): Unit = {
    val npr = new NonPersonalizedRecommender(new File(
      "data/recsys/ratings.csv"))
    testMovieIds.foreach(movieId => {
      val similarMoviesWithAdvancedSimilarity = npr.similarMovies(
        movieId, 5, npr.advancedSimilarity)
      Console.println(movieId + " => " + similarMoviesWithAdvancedSimilarity)
      Console.println("expected: " + advancedResults(movieId))
      assertEquals(similarMoviesWithAdvancedSimilarity, advancedResults(movieId))
    })
  }
  
  def assertEquals(actual: List[(Long,Double)], 
      expected: List[(Long,Double)]): Unit = {
    Assert.assertEquals(actual.size, expected.size)
    actual.zip(expected).foreach(aze => {
      Assert.assertEquals(aze._1._1, aze._2._1)
      Assert.assertEquals(aze._1._2, aze._2._2, 0.01)
    })
  }
}

Content Based Filtering Recommender


The Content Based Filtering Recommender can be used to recommend items similar to ones the user has already expressed a preference for. Even though the scoring is based on item-item similarity, it is limited to the items already rated by the user, hence it is personalized to the user. It works by converting the user to a vector of item features, then matching this aggregated vector against the previously built item vectors.

In our implementation, our item features are its tags, so we first convert our items into a TF-IDF matrix of tags. We implement two approaches for reducing a user to an item tag vector. The first just adds up item vectors for items that the user has rated above 3.5 (makeUnweightedUserProfile). The second approach sums up weighted item vectors, where the weights are the deviance of the rating for that item from the user's mean rating across all items. Recommended items are scored using Cosine Similarity between the user's aggregated item vector and each item vector.

Calling this recommender is similar to the previous one. The similarMovies() method takes a userID, the number of similar items desired and a function to compute and return the user's tag vector. It returns a list of itemID and similarity tuples in descending order of similarity. The code is shown below:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Source: src/main/scala/com/mycompany/mia/recsys/ContentBasedFilteringRecommender.scala
package com.mycompany.mia.recsys

import java.io.File

import scala.actors.threadpool.AtomicInteger
import scala.collection.JavaConversions.asScalaIterator
import scala.collection.JavaConversions.iterableAsScalaIterable
import scala.io.Source

import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.cf.taste.model.DataModel
import org.apache.mahout.math.ConstantVector
import org.apache.mahout.math.SparseMatrix
import org.apache.mahout.math.Vector
import org.apache.mahout.math.function.DoubleDoubleFunction
import org.apache.mahout.math.function.VectorFunction

/**
 * Recommend items that are similar to the ones user has 
 * expressed a preference for.
 */
class ContentBasedFilteringRecommender(
    modelfile: File, tagfile: File) {

  val model = new FileDataModel(modelfile)
  val itemIndex = model.getItemIDs().zipWithIndex.toMap  
  
  val tfidfBuilder = new TfIdfBuilder(model, itemIndex, tagfile)
  val tfidf = tfidfBuilder.build()
  val tagIndex = tfidfBuilder.tagIndex
  
  /**
   * Find movies similar to the ones that user has already
   * expressed a preference for.
   * @param user the userID.
   * @param topN the top N similar movies.
   * @return a List of (item,score) tuples.
   */
  def similarMovies(user: Long, topN: Int,
      profileFunc: (Long) => Vector): List[(Long,Double)] = {
    val userProfile = profileFunc.apply(user)
    val alreadyRated = model.getItemIDsFromUser(user).toSet
    val similarMovies = model.getItemIDs()
      .filter(item => (! alreadyRated.contains(item)))
      .map(item => (item, tfidf.viewRow(itemIndex(item))))
      .map(itemvector => (
        itemvector._1.toLong, 
        cosineSimilarity(userProfile, itemvector._2)))
      .toList
      .sortWith((a, b) => a._2 > b._2)
      .slice(0, topN)
    similarMovies
  }
  
  /**
   * Compute the user's tag profile. This is done by adding up
   * all the TFIDF item vectors for which the user rated >= 3.5.
   * @param user the userID for the user.
   * @return vector representing the user's tag profile.
   */
  def makeUnweightedUserProfile(user: Long): Vector = {
    val highlyRatedItemVectors = model.getItemIDsFromUser(user)
      .map(item => (item, model.getPreferenceValue(user, item)))
      .filter(itemrating => itemrating._2 >= 3.5D)
      .map(itemrating => itemrating._1)
      .map(item => tfidf.viewRow(itemIndex(item)))
      .toList
    val numTags = tagIndex.size
    val zeros = new ConstantVector(0.0D, numTags).asInstanceOf[Vector]
    highlyRatedItemVectors.foldLeft(zeros)((a, b) => a.plus(b))
  }
  
  /**
   * Compute the average rating for the user, then calculate
   * weights for each item rating based on the sum of the
   * deviance of the rating from the user mean. Compute the
   * sum of item tags as before but this time weight each
   * item vector with this computed weight.
   * @param user the userID.
   * @return the weighted user profile.
   */
  def makeWeightedUserProfile(user: Long): Vector = {
    val ratings = model.getItemIDsFromUser(user)
      .map(item => model.getPreferenceValue(user, item))
      .toList
    val mu = ratings.foldLeft(0.0D)(_ + _) / ratings.size.toDouble
    val weights = model.getItemIDsFromUser(user)
      .map(item => model.getPreferenceValue(user, item) - mu)
      .toList
    val ratingVectors = model.getItemIDsFromUser(user)
      .map(item => tfidf.viewRow(itemIndex(item)))
      .toList
    val weightedRatingVector = weights.zip(ratingVectors)
      .map(wv => wv._2.times(wv._1))
    val numTags = tagIndex.size
    val zeros = new ConstantVector(0.0D, numTags).asInstanceOf[Vector]
    weightedRatingVector.foldLeft(zeros)((a, b) => a.plus(b))
  }
  
  /**
   * Compute cosine similarity between the user vector and
   * each item vector.
   * @param u the user vector.
   * @param v the item vector.
   * @return the cosine similarity between u and v.
   */
  def cosineSimilarity(u: Vector, v: Vector): Double = {
    u.dot(v) / (u.norm(2) * v.norm(2))
  }
}

/**
 * Model to convert item tags into a Tag-Movie (TD) matrix,
 * which is then normalized to TFIDF and normalized across
 * items (movies).
 * @param model the DataModel.
 * @param itemIndex the itemID to row index mapping.
 * @param tagfile the File containing the (itemID, tag) tuples.
 */
class TfIdfBuilder(val model: DataModel, 
    val itemIndex: Map[java.lang.Long,Int], val tagfile: File) {
  
  val tagIndex = collection.mutable.Map[String,Int]()

  /**
   * Returns a Sparse Matrix of movies vs tags, with the tag
   * frequency TFIDF normalized and each item vector further
   * unit normalized. Also builds the tagIndex as a side effect.
   * @return SparseVector of movies vs tags.
   */
  def build(): SparseMatrix = { 
    // assign each tag an id
    val tagIds = new AtomicInteger(0)
    Source.fromFile(tagfile)
      .getLines()
      .foreach(line => {
        val tag = line.split(",")(1)
        if (! tagIndex.contains(tag)) 
          tagIndex(tag) = tagIds.getAndIncrement()
      })
    // populate the SparseMatrix in the second scan through tagfile
    val movieTagMatrix = new SparseMatrix(itemIndex.size, tagIndex.size)
    Source.fromFile(tagfile)
      .getLines()
      .foreach(line => {
        val columns = line.split(",")
        val row = itemIndex(columns(0).toLong)
        val col = tagIndex(columns(1))
        movieTagMatrix.setQuick(row, col, 
          movieTagMatrix.getQuick(row, col) + 1.0D)
      })
    // we got our TF (raw term freqs), now find IDFs
    val numdocs = movieTagMatrix.numRows()
    val docfreq = movieTagMatrix.aggregateColumns(
      new AddOneIfNonZeroFunc)
    val idf = docfreq.assign(new IdfFunc, numdocs)    
    // now calculate TF-IDF
    for (r <- 0 until movieTagMatrix.numRows()) {
      val row = movieTagMatrix.viewRow(r).times(idf)
      movieTagMatrix.assignRow(r, row)
    }
    // then unit-normalize over each item (use Eucledian norm)
    val enorm = movieTagMatrix.aggregateRows(new SumOfSquaresFunc())
    for (r <- 0 until movieTagMatrix.numRows()) {
      val row = movieTagMatrix.viewRow(r).times(1.0D / enorm.get(r))
      movieTagMatrix.assignRow(r, row)
    }
    movieTagMatrix
  }

  /**
   * Sums the elements in a vector.
   */
  class SumOfSquaresFunc extends VectorFunction {
    override def apply(vec: Vector): Double = vec.norm(2)  
  }
  
  /**
   * Calculate number of documents in which a tag appears.
   * Document needs to be computed once regardless of the
   * number of tags.
   */
  class AddOneIfNonZeroFunc extends VectorFunction {
    override def apply(vec: Vector): Double = 
      vec.all().filter(e => e.get() > 0.0D).size.toDouble
  }
  
  /**
   * Given the number of documents per tag and the number of
   * documents (movies), calculates the IDF as 
   * log(numDocPerTag)/numDocs.
   */
  class IdfFunc extends DoubleDoubleFunction {
    override def apply(elem: Double, ext: Double): Double = {
        scala.math.log(elem / ext)
    }
  }
}

The JUnit code shown below runs the recommender with the provided test case.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Source: src/test/scala/com/mycompany/mia/recsys/ContentBasedFilteringRecommenderTest.scala
package com.mycompany.mia.recsys

import java.io.File

import org.junit.Assert
import org.junit.Test

class ContentBasedFilteringRecommenderTest {

  val modelfile = new File("data/recsys/ratings.csv")
  val tagfile = new File("data/recsys/movie-tags.csv")
  
  val testUsers = List(4045, 144, 3855, 1637, 2919)
  val expectedUnweightedResults = Map(
    4045L -> List((11L, 0.3596), (63L, 0.2612), (807L, 0.2363),
                  (187L, 0.2059), (2164L, 0.1899)),
    144L  -> List((11L, 0.3715), (585L, 0.2512), (38L, 0.1908),
                  (141L, 0.1861), (807L, 0.1748)),
    3855L -> List((1892L, 0.4303), (1894L, 0.2958), (63L, 0.2226),
                  (2164L, 0.2119), (604L, 0.1941)),
    1637L -> List((2164L, 0.2272), (141L, 0.2225), (745L, 0.2067),
                  (601L, 0.1995), (807L, 0.1846)),
    2919L -> List((11L, 0.3659), (1891L, 0.3278), (640L, 0.1958),
                  (424L, 0.1840), (180L, 0.1527)))
  val expectedWeightedResults = Map(
    4045L -> List((807L, 0.1932), (63L, 0.1438), (187L, 0.0947),
                  (11L, 0.0900), (641L, 0.0471)),
    144L  -> List((11L, 0.1394), (585L, 0.1229), (671L, 0.1130),
                  (672L, 0.0878), (141L, 0.0436)),
    3855L -> List((1892L, 0.2243), (1894L, 0.1465), (604L, 0.1258),
                  (462L, 0.1050), (10020L, 0.0898)),
    1637L -> List((393L, 0.1976), (24L, 0.1900), (2164L, 0.1522),
                  (601L, 0.1334), (5503L, 0.0992)),
    2919L  -> List((180L, 0.1454), (11L, 0.1238), (1891L, 0.1172),
                  (424L, 0.1074), (2501L, 0.0973)))      
  
  @Test def testUnweightedRecommendations(): Unit = {
    val cbf = new ContentBasedFilteringRecommender(
      new File("data/recsys/ratings.csv"),
      new File("data/recsys/movie-tags.csv"))
    testUsers.foreach(user => {
      val actualResults = cbf.similarMovies(user, 5, 
        cbf.makeUnweightedUserProfile)
      Console.println("results for user: " + user)
      Console.println("actuals=" + actualResults)
      Console.println("expected=" + expectedUnweightedResults(user))
      assertEquals(actualResults, expectedUnweightedResults(user))
    })
  }

  @Test def testWeightedRecommendations(): Unit = {
    val cbf = new ContentBasedFilteringRecommender(
      new File("data/recsys/ratings.csv"),
      new File("data/recsys/movie-tags.csv"))
    testUsers.foreach(user => {
      val actualResults = cbf.similarMovies(user, 5, 
        cbf.makeWeightedUserProfile)
      Console.println("results for user: " + user)
      Console.println("actuals=" + actualResults)
      Console.println("expected=" + expectedWeightedResults(user))
      assertEquals(actualResults, expectedWeightedResults(user))
    })
  }

  def assertEquals(xs: List[(Long,Double)], 
      ys: List[(Long,Double)]): Unit = {
    Assert.assertEquals(xs.size, ys.size)
    xs.zip(ys).foreach(xy => {
      Assert.assertEquals(xy._1._1, xy._2._1)
      Assert.assertEquals(xy._1._2, xy._2._2, 0.01D)
    })
  }
}

User-User Collaborative Filtering Recommender


The third recommender on our list is the User-User Collaborative Filtering recommender. It recommends items to a user by looking at predictions made for items by users who are similar to this user. This particular implementation takes an user and item ID and returns a prediction about the rating that the user would give the item (movie), but the idea is similar.

This kind of recommender typically uses a pre-built user-user similarity matrix. My initial implementation did this, but startup was very slow, probably not very surprising given that the user-user similarity matrix is of rank 5564x5564, and unlike Numpy or MATLAB, matrix operations don't seem to be optimized in Mahout Math.

To calculate user-user similarity, we need to find the user neighborhood for that user item pair. These are the users who have rated this item, and their similarity to the original user is calculated as the cosine similarity between their mean centered rating vectors. The top N users (in our case 30) are chosen as the user neighborhood. The predicted rating is calculated as the weighted average of deviances of each neighboring user's rating for item against his mean rating, where the weight is the cosine similarity between the original user and the neighbor user, plus the mean rating of the original user. As a formula:

v∈N s(u,v) * (r(v,i) - μ(v))
    p(u,i) = μ(u) + -------------------------------
                          ∑v∈N |s(u,v)|

The client for this recommender would call the predictRating() method with the user and item IDs, and get back a predicted rating. The method computes the mean rating for the specified user and creates its mean centered user vector, then computes the user neighborhood by first finding the most similar users by cosine similarity of their mean centered user vectors who have rated that item, then calculating the weighted average deviance from the mean rating of the original user using the formula above. Here's the code:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Source: src/main/scala/com/mycompany/mia/recsys/UserUserCollaborativeFilteringRecommender.scala
package com.mycompany.mia.recsys

import java.io.File

import scala.collection.JavaConversions.asScalaIterator
import scala.collection.JavaConversions.iterableAsScalaIterable

import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.math.RandomAccessSparseVector
import org.apache.mahout.math.Vector

/**
 * Predict a rating that a user would give a movie by looking
 * at predictions made by users most similar to the user.
 */
class UserUserCollaborativeFilteringRecommender(modelfile: File) {

  val model = new FileDataModel(modelfile)
  val itemIndex = model.getItemIDs().zipWithIndex.toMap
  val userIndex = model.getUserIDs().zipWithIndex.toMap
  
  /**
   * Compute a predicted rating using the following
   * formula:
   *                    sum(sim(u,v) * (r(v,i) - mu(v))
   *   p(u,i) = mu(u) + --------------------------------
   *                           sum(|sim(u,v)|)
   * sum is over all users v in neighborhood.
   * @param user the user ID
   * @param item the item ID
   * @return a predicted rating for (userID,itemID).
   */
  def predictRating(user: Long, item: Long): Double = {
    val muU = meanRating(user)
    val vectorU = centerUserVector(user, muU) 
    val neighbors = getUserNeighborhood(user, item, vectorU, 30)
    val ndpairs = neighbors.map(usersim => {
      val otheruser = usersim._1
      val simUV = usersim._2
      val muV = meanRating(otheruser)
      val rVI = model.getPreferenceValue(otheruser, item)
      (simUV * (rVI - muV), scala.math.abs(simUV))
    })
    val numer = ndpairs.map(x => x._1).foldLeft(0.0D)(_ + _)
    val denom = ndpairs.map(x => x._2).foldLeft(0.0D)(_ + _)
    muU + (numer / denom)
  }
  
  /**
   * Returns a neighborhood of similar users to a user
   * for a given item. Similarity metric used is Cosine
   * Similarity.
   * @param user the userID.
   * @param item the itemID.
   * @param vectorU the mean centered user vector for
   *        specified user.
   * @param nnbrs the number of neighbors to return.
   * @return a List of (userID,similarity) tuples for
   *        users in the neighborhood.
   */
  def getUserNeighborhood(user: Long, item: Long,
      vectorU: Vector,
      nnbrs: Int): List[(Long,Double)] = {
    model.getPreferencesForItem(item)
      // for the item, find all users that have rated the item
      // except the user itself.
      .map(pref => pref.getUserID().toLong)
      .filter(_ != user)
      // then mean center that rating and compute 
      // the cosine similarity between this user 
      // and our user
      .map(otheruser => {
        val muV = meanRating(otheruser)
        val vectorV = centerUserVector(otheruser, muV)
        (otheruser, cosineSimilarity(vectorU, vectorV))
      })
      .toList
      // sort by similarity and return the topN
      .sortWith((a,b) => a._2 > b._2)
      .slice(0, nnbrs)
  }
  
  /**
   * Calculate the mean rating for a user.
   * @param user the userID.
   * @return the mean user rating for that user.
   */
  def meanRating(user: Long): Double = {
    val ratings = model.getPreferencesFromUser(user)
      .map(pref => pref.getValue())
      .toList
    ratings.foldLeft(0.0D)(_ + _) / ratings.size
  }
  
  /**
   * Build a vector of item ratings for the user and
   * center them around the mean specified.
   * @param user the userID.
   * @param meanRating the mean item rating for user.
   * @return a vector containing mean centered ratings.
   */
  def centerUserVector(user: Long, meanRating: Double): Vector = {
    val uservec = new RandomAccessSparseVector(itemIndex.size)
    model.getPreferencesFromUser(user)
      .foreach(pref => uservec.setQuick(
        itemIndex(pref.getItemID()), 
        pref.getValue() - meanRating))
    uservec
  }

  /**
   * Compute cosine similarity between user vectors.
   * The isNAN() check is for cases where the user
   * has rated everything the same, so the mean
   * centered vector is all zeros, and the norm(2)
   * is also zero. The correct behavior (based on
   * np.linalg.norm()) is to return 0.0 in that case.
   * @param u vector for user u
   * @param v vector for user v
   * @return the cosine similarity between vectors.
   */
  def cosineSimilarity(u: Vector, v: Vector): Double = {
    val cosim = u.dot(v) / (u.norm(2) * v.norm(2))
    if (cosim.isNaN()) 0.0D else cosim
  }
}

The JUnit code exercises this code for various user item pairs, and also illustrates the calling pattern.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Source: src/test/scala/com/mycompany/mia/recsys/UserUserCollaborativeFilteringRecommenderTest.scala
package com.mycompany.mia.recsys

import java.io.File
import org.junit.Test
import org.junit.Assert

class UserUserCollaborativeFilteringRecommenderTest {

  val modelfile = new File("data/recsys/ratings.csv")
  
  val ratingTriples = List(
    (1024, 77,    4.3848),
    (1024, 268,   2.8646),
    (1024, 393,   3.8722),
    (1024, 462,   3.1082),
    (1024, 36955, 2.3524),
    (2048, 77,    4.8493),
    (2048, 788,   3.8509),
    (2048, 36955, 3.9698))
  
  @Test def testPredictions(): Unit = {
    val uucf = new UserUserCollaborativeFilteringRecommender(
      modelfile)
    ratingTriples.foreach(rt => { 
      val score = uucf.predictRating(rt._1, rt._2)
      Console.println("Pred(%d:%d) = actual %f, expected %f"
        .format(rt._1, rt._2, score, rt._3))
      Assert.assertEquals(score, rt._3, 0.01D)
    })
  }
}

Item-Item Collaborative Filtering Recommender


Our last recommender is the Item Item Collaborative Filtering Recommender. Like the User User CF Recommender described above, this one also predicts a rating, although we could easily adapt it to return ranked recommendations like the first two. This recommender predicts a rating that a user would give an item (movie in our case) by looking at ratings the user has given to other items like this one.

Analogous to User User CF, Item Item CF needs to compute an item neighborhood. Item similarity is calculated as the cosine similarity of vectors containing mean centered (by each user) ratings for each item, and the item neighborhood is composed of the top N (20 in our case) similar items. The predicted rating is the sum of the mean user rating plus the weighted average of item ratings, where the weights are the cosine similarity between the specified item and each of the other items in its neighborhood. In formula:

j∈N s(i,j) * r(j)
    p(u,i) = μ(u) + ---------------------
                      ∑j∈N |s(i,j)|

The code for this is shown below. As in the User User CF Recommender, the client calls the predictRating() method with user and item IDs, and gets back a rating value. The method first calculates the item neighborhood using the item-item similarity matrix that was built at startup (this is just a 100x100 matrix in our case so takes much less time to build), then calculates the predicted rating. The code is shown below:

The item neighborhood can also be used to build a global recommender that recommends similar items to the one specified. To access this functionality, the client needs to call the findSimilar() method, passing in the item ID and the number of similar items desired.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Source: src/main/scala/com/mycompany/mia/recsys/ItemItemCollaborativeFilteringRecommender.scala
package com.mycompany.mia.recsys

import java.io.File
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import scala.collection.JavaConversions._
import org.apache.mahout.math.SparseMatrix
import org.apache.mahout.math.RandomAccessSparseVector
import org.apache.mahout.math.function.DoubleDoubleFunction
import org.apache.mahout.math.Vector
import org.apache.mahout.math.Matrix

/**
 * Predict a rating that a user would give a movie by looking
 * at ratings given by this user to other movies like this
 * one.
 */
class ItemItemCollaborativeFilteringRecommender(modelfile: File) {

  val model = new FileDataModel(modelfile)
  val itemIndex = model.getItemIDs().zipWithIndex.toMap
  val userIndex = model.getUserIDs().zipWithIndex.toMap
  val iiModel = buildIIModel()
  val userMeans = iiModel._1
  val mcRatings = iiModel._2
  val itemSims = iiModel._3

  /**
   * Compute a predicted rating using the following formula:
   *                     sum(sim(i,j) * (r(j))
   *   p(u,i) = mu(u) + -------------------------
   *                        sum(|sim(i,j)|)
   *                        
   * summing over all items j in item neighborhood.
   * @param user the userID.
   * @param item the itemID.
   * @return the predicted rating for (userID,itemID).
   */
  def predictRating(user: Long, item: Long): Double = {
    val neighborhood = getItemNeighborhood(user, item, 20)
    val nd = neighborhood.map(itemsim => itemsim._1)
      .map(nitem => (
        mcRatings.get(userIndex(user), itemIndex(nitem)) * 
        itemSims.get(itemIndex(item), itemIndex(nitem)),
        math.abs(itemSims.get(itemIndex(item), itemIndex(nitem)))))
    val num = nd.map(x => x._1).foldLeft(0.0D)(_ + _)
    val den = nd.map(x => x._2).foldLeft(0.0D)(_ + _)
    userMeans.get(userIndex(user)) + (num / den)
  }
  
  /**
   * Find the topN similar items to the specified item.
   * @param item the itemID.
   * @param topN the number of most similar items.
   * @return List of (itemID, score) tuples.
   */
  def findSimilar(item: Long, topN: Int): List[(Long,Double)] = {
    model.getItemIDs()
      .filter(itemID => itemID != item)
      .map(itemID => (itemID.toLong, 
        itemSims.get(itemIndex(item), itemIndex(itemID))))
      .toList
      .sortWith((a, b) => a._2 > b._2)
      .slice(0, topN)
  }
  
  /**
   * Find other items rated by this user (except this
   * item), sort items by item similarity and return 
   * the top numNeighbors items.
   * @param user the user ID.
   * @param item the item ID.
   * @param numNeighbors number of neighbors to find.
   * @return List of (itemID, similarity) tuples.
   */
  def getItemNeighborhood(user: Long, item: Long, 
      numNeighbors: Int): List[(Long,Double)] = {
    model.getItemIDsFromUser(user)
      .filter(itemID => itemID != item)
      .map(itemID => (itemID.toLong, 
        itemSims.get(itemIndex(item), itemIndex(itemID))))
      .toList
      .sortWith((a, b) => a._2 > b._2)
      .slice(0, numNeighbors)
  }
  
  /**
   * Builds ItemItem model by mean centering user ratings,
   * then considering only items with ratings > 0, compute
   * item item similarities using cosine similarity.
   * @return a triple containing the user means, a matrix
   *         containing the mean centered ratings, and
   *         another matrix containing Item-Item cosine
   *         similarities.
   */
  def buildIIModel(): (Vector, Matrix, Matrix) = {
    // build rating matrix
    val ratingMatrix = new SparseMatrix(
      model.getNumUsers(), model.getNumItems())
    for (user <- model.getUserIDs()) {
      for (item <- model.getItemIDsFromUser(user)) {
        ratingMatrix.setQuick(userIndex(user), itemIndex(item), 
          model.getPreferenceValue(user, item).toDouble)
      }
    }
    // find user mean for each user and subtract it
    // then normalize so we can calculate cosine
    // similarity by doing matrix multiplication
    val userMeans = new RandomAccessSparseVector(model.getNumUsers())
    for (user <- model.getUserIDs()) {
      val userRow = ratingMatrix.viewRow(userIndex(user))
      val len = userRow.all()
        .filter(e => e.get() > 0.0D).size.toDouble
      val sum = userRow.zSum()
      val userMean = sum / len
      userMeans.setQuick(userIndex(user), userMean)
      ratingMatrix.assignRow(userIndex(user), 
        userRow.assign(new AddIfPositive, -userMean))
    }
    // Item similarity is computed using cosine similarity.
    // Rather than do this across all item-item pairs, we
    // do the equivalent, ie, normalize the matrix using 
    // norm(2) and then multiplying the normalized matrix 
    // with its transpose.
    val normRatingMatrix = ratingMatrix.clone()
    for (item <- model.getItemIDs()) {
      val itemCol = normRatingMatrix.viewColumn(itemIndex(item))
      val norm = itemCol.norm(2.0D)
      normRatingMatrix.assignColumn(itemIndex(item), 
        itemCol.times(1.0D / norm))
    }
    val itemsims = normRatingMatrix.transpose()
      .times(normRatingMatrix)
    (userMeans, ratingMatrix, itemsims)    
  }

  /**
   * For mean centering, we only subtract the user mean for
   * ratings that the user has made. Since unknown ratings
   * are represented by 0, we use this function to skip the
   * non-existent ratings.
   */
  class AddIfPositive extends DoubleDoubleFunction {
    override def apply(elem: Double, other: Double): Double = {
      if (elem > 0.0D) elem + other
      else 0.0D
    }
  }
}

The JUnit code shows how to call both the recommenders in this code, as well as test for correctness of the recommender by comparing with the test data.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Source: src/test/scala/com/mycompany/mia/recsys/ItemItemCollaborativeFilteringRecommenderTest.scala
package com.mycompany.mia.recsys

import java.io.File
import org.junit.Test
import org.junit.Assert

class ItemItemCollaborativeFilteringRecommenderTest {

  val modelfile = new File("data/recsys/ratings.csv")
  
  val predictRatingData = List(
    (1024, 77,    4.1968),
    (1024, 268,   2.3366),
    (1024, 393,   3.7702),
    (1024, 462,   2.9900),
    (1024, 36955, 2.5612),
    (2048, 77,    4.5102),
    (2048, 788,   4.1253),
    (2048, 36955, 3.8545))
  val findSimilarData = List(
    (550, 0.3192),
    (629, 0.3078),
    (38,  0.2574),
    (278, 0.2399),
    (680, 0.2394))
  
  val iicf = new ItemItemCollaborativeFilteringRecommender(
    modelfile)
  
  @Test def testPredictRating(): Unit = {
    predictRatingData.foreach(rating => {
      val predicted = iicf.predictRating(rating._1, rating._2)
      Console.println("Pred(%d:%d) = actual %f, expected %f".format(
        rating._1, rating._2, predicted, rating._3))
      Assert.assertEquals(predicted, rating._3, 0.01)
    })
  }
  
  @Test def testFindSimilar(): Unit = {
    val similarItems = iicf.findSimilar(77L, 5)
    var i = 0
    similarItems.foreach(similarItem => {
      val actualItem = similarItem._1
      val actualSimilarity = similarItem._2
      val expectedItem = findSimilarData(i)._1
      val expectedSimilarity = findSimilarData(i)._2
      Console.println("Found (%d,%f), expected (%d,%f)".format(
        actualItem, actualSimilarity, expectedItem, expectedSimilarity))
      Assert.assertEquals(actualItem, expectedItem)
      Assert.assertEquals(actualSimilarity, expectedSimilarity, 0.01D)
      i = i + 1
    })
  }
}

And thats all I have for today. I think that working directly with the rating matrix makes for cleaner and more readable code that is easier to extend, compared with having to learn the nuances of either framework. Hopefully, after reading this, you think so too. Apache Mahout's math library is quite useful for operating directly on the ratings matrix.

All code in this post is available as part of my mia-scala-examples project on GitHub here and here.

Be the first to comment. Comments are moderated to prevent spam.