Sunday, April 03, 2016

Hyperparameter Optimization on Spark MLLib using Monte Carlo methods


Some time back I wrote a post titled Hyperparameter Optimization using Monte Carlo Methods, which described an experiment to find optimal hyperparameters for a Scikit-Learn Random Forest classifier. This week, I describe an experiment doing much the same thing for a Spark ML based Logistic Regression classifier, and discuss how one could build this functionality into Spark if the community thought that this might be useful.

Using Monte Carlo approaches for hyperparameter optimization is not a new concept. There exist several Python libraries such as HyperOpt and Spearmint to do this. There seems to be interest this approach also in the R community, going by this R-bloggers post by Automatic Hyperparameter Tuning Methods by John Myles White. On the industry front, I learned at this talk at Elasticon 2016 that ElasticSearch uses it for setting parameters for their Moving Average Aggregator. I also learned that H2O has similar functionality, based on this PyData Amsterdam 2016 talk by Jo-Fai Chow.

At a high level, the process involves specifying the boundaries of the hyperparameter space to be searched, then picking random points in this space at each step. At each step i, we compute an acceptance rate given by:


If the current accuracy exceeds the previous one, the acceptance rate is 1 and the new point is accepted for exploration without question. However, in case the current accuracy is less than the previous one, we throw a virtual dice that generates a random number between 0 and 1 and accept the new point only if the acceptance rate is less than the number generated. This is to give the process a chance to jump out of local minima in the space.

One improvement in this version over the previous one is the introduction of Simulated Annealing, inspired by the Elasticon talk referenced above. The idea is to damp the number generated by the virtual dice by a "temperature" function. The intuition is that as the process progresses, it "cools down" and settles into a stable state, so it should become harder for it to jump out of these states. In terms of math, the virtual dice generates a number given by this formula:


Data Preprocessing


For our experiment, we use the Census Dataset from the UCI Machine Learning library. This dataset contains 14 categorical and continuous features for about 48K individuals, labeled indicating whether they make more or less than $50K per year.

Since we are using Logistic Regression, we need to convert our categorical variables into One-Hot encoding. Spark MLLib offers a OneHotEncoder but its somewhat clumsy to apply to a large number of attributes, so we build a simpler home grown one below. It creates enumerated maps of all categorical variables and broadcasts them to the workers, so they can be applied to the input line in a single operation.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
val workClasses = sc.broadcast(
  List("Private", "Self-emp-not-inc", "Self-emp-inc", "Federal-gov", 
       "Local-gov", "State-gov", "Without-pay", "Never-worked")
  .zipWithIndex
  .toMap)
val educations = sc.broadcast(
  List("Bachelors", "Some-college", "11th", "HS-grad", "Prof-school", 
       "Assoc-acdm", "Assoc-voc", "9th", "7th-8th", "12th", "Masters", 
       "1st-4th", "10th", "Doctorate", "5th-6th", "Preschool")
  .zipWithIndex
  .toMap)
val maritalStatuses = sc.broadcast(
  List("Married-civ-spouse", "Divorced", "Never-married", "Separated", 
       "Widowed", "Married-spouse-absent", "Married-AF-spouse")
  .zipWithIndex
  .toMap)
val occupations = sc.broadcast(
  List("Tech-support", "Craft-repair", "Other-service", "Sales", 
       "Exec-managerial", "Prof-specialty", "Handlers-cleaners", 
       "Machine-op-inspct", "Adm-clerical", "Farming-fishing", 
       "Transport-moving", "Priv-house-serv", "Protective-serv", 
       "Armed-Forces")
  .zipWithIndex
  .toMap)
val relationships = sc.broadcast(
  List("Wife", "Own-child", "Husband", "Not-in-family", "Other-relative", 
       "Unmarried")
  .zipWithIndex
  .toMap)
val races = sc.broadcast(List("White", "Asian-Pac-Islander", 
                              "Amer-Indian-Eskimo", "Other", "Black")
  .zipWithIndex
  .toMap)
val sexes = sc.broadcast(List("Female", "Male")
  .zipWithIndex
  .toMap)
val nativeCountries = sc.broadcast(
  List("United-States", "Cambodia", "England", "Puerto-Rico", "Canada", 
       "Germany", "Outlying-US(Guam-USVI-etc)", "India", "Japan", "Greece", 
       "South", "China", "Cuba", "Iran", "Honduras", "Philippines", "Italy", 
       "Poland", "Jamaica", "Vietnam", "Mexico", "Portugal", "Ireland", 
       "France", "Dominican-Republic", "Laos", "Ecuador", "Taiwan", "Haiti", 
       "Columbia", "Hungary", "Guatemala", "Nicaragua", "Scotland", 
       "Thailand", "Yugoslavia", "El-Salvador", "Trinadad&Tobago", "Peru", 
       "Hong", "Holand-Netherlands")
  .zipWithIndex
  .toMap)

We then apply the above maps to preprocess the data and convert the dataset to the form required by the LogisticRegression classifier. We then split up our dataset 70/30 for training and validation. This gives us 22,830 training records and 9,731 validation records.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

def replaceWithOneHotEncoding(colValue: String, 
    lookup: Map[String, Int]): String = {
  val hotIdx = lookup.getOrElse(colValue, -1)
  val buf = ArrayBuffer.fill(lookup.size)(0)
  (0 until buf.size).foreach(i => if (hotIdx > -1) {
    buf(i) = if (i == hotIdx) 1 else 0
  })
  buf.toArray.map(_.toString).mkString(",")
}

def parseLine(line: String): LabeledPoint = {
  var cols = line.split(',').map(_.trim)
  cols(1) = replaceWithOneHotEncoding(cols(1), workClasses.value)
  cols(3) = replaceWithOneHotEncoding(cols(3), educations.value)
  cols(5) = replaceWithOneHotEncoding(cols(5), maritalStatuses.value)
  cols(6) = replaceWithOneHotEncoding(cols(6), occupations.value)
  cols(7) = replaceWithOneHotEncoding(cols(7), relationships.value)
  cols(8) = replaceWithOneHotEncoding(cols(8), races.value)
  cols(9) = replaceWithOneHotEncoding(cols(9), sexes.value)
  cols(13) = replaceWithOneHotEncoding(cols(13), nativeCountries.value)
  val features = Vectors.dense(cols.slice(0, 13)
                                   .mkString(",")
                                   .split(',')
                                   .map(_.toDouble))
  val label = if (cols(14).equals("<=50K")) 0 else 1
  LabeledPoint(label, features)
}

val census = sc.textFile("/path/to/census.data")
  .filter(line => line.trim.length > 0)
  .map(line => parseLine(line))
  .cache

val Array(censusTrain, censusValid) = census.randomSplit(Array(0.7, 0.3))

Grid Search (using Spark MLLib Pipelines)


Using Spark ML, I can create a pipeline with a Logistic Regression Estimator and a Parameter grid which executes a 3-fold Cross Validation at each Grid point. The grid itself contains 3 values for the elasticNetParam, 2 for maxIter and 2 for regParam, i.e. a total of 3*2*2=12 points in the Hyperparameter space. This pipeline will return the best model using the cv.fit() call.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val lr = new LogisticRegression()
val pipeline = new Pipeline()
  .setStages(Array(lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.elasticNetParam, Array(0.1, 0.5, 0.9))
  .addGrid(lr.maxIter, Array(10, 20))
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

val cvModel = cv.fit(censusTrain.toDF)

val results = cvModel.transform(censusValid.toDF)
  .select("label", "prediction")
  .collect
val numCorrectPredictions = results.map(row => 
    if (row.getDouble(0) == row.getDouble(1)) 1 else 0)
  .foldLeft(0)(_ + _)
val accuracy = 1.0D * numCorrectPredictions / results.size
println("Test set accuracy: %.3f".format(accuracy))

Grid Search (not using Spark MLLib Pipelines)


While the above approach is convenient, it is a black box. Specifically, there is no way to know the accuracy reported at each combination of hyperparameters during the grid search. In order to get this information, we have to do this manually, as shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
val gridParams = Map(
  "elasticNetParam" -> List(0.1, 0.5, 0.9),
  "regParam" -> List(0.1, 0.01),
  "maxIter" -> List(10, 20)
)
val gridEvals = for (elasticNetParam <- gridParams("elasticNetParam");
     regParam <- gridParams("regParam");
     maxIter <- gridParams("maxIter")) yield {
  val lr = new LogisticRegression()
    .setElasticNetParam(elasticNetParam.asInstanceOf[Double])
    .setRegParam(regParam.asInstanceOf[Double])
    .setMaxIter(maxIter.asInstanceOf[Int])
  // split data for 3-fold CV
  val random = scala.util.Random
  val cvAccuracies = (0 until 3).map(i => {
    val Array(censusCVTrain, censusCVTest) = 
      censusTrain.randomSplit(Array(0.67, 0.33), random.nextLong)
    val model = lr.fit(censusCVTrain.toDF)
    val results = model.transform(censusCVTest.toDF)
      .select("label", "prediction")
      .collect
    val numCorrect = results.map(row => 
        if (row.getDouble(0) == row.getDouble(1)) 1 else 0)
      .foldLeft(0)(_ + _)
    1.0D * numCorrect / results.size
  })
  val accuracy = 1.0D * cvAccuracies.sum / cvAccuracies.size
  (elasticNetParam, regParam, maxIter, accuracy)
}

// Save the evaluations for visualization via Python
val gridEvalsRDD = sc.parallelize(gridEvals)
gridEvalsRDD.coalesce(1)
  .map(e => "%.3f\t%.3f\t%d\t%.3f".format(e._1, e._2, e._3, e._4))
  .saveAsTextFile("/path/to/accs-grid")

Monte Carlo Search (not using Spark MLLib Pipelines)


We now modify the code above to search the hyperparameter space using the Monte Carlo approach outlined at the beginning of the post.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import org.apache.spark.ml.classification.LogisticRegressionModel

// optimizer hyperparameters
val numIters = 50    // number of iterations, larger is better
val tol = 1e-9       // terminate when accuracy gain less than this

// parameter bounds
val mcParams = Map(  // parameters with lower and upper bounds
  "elasticNetParam" -> List(0.1, 0.9),
  "regParam" -> List(0.1, 0.01),
  "maxIter" -> List(10, 20)
)

val randomizer = scala.util.Random
randomizer.setSeed(42L)

def randomDouble(bounds: List[Any]): Double = {
  val min = bounds.head.asInstanceOf[Double]
  val max = bounds.tail.head.asInstanceOf[Double]
  (randomizer.nextDouble * (max - min)) + min
}

def randomInt(bounds: List[Any]): Int = {
  val min = bounds.head.asInstanceOf[Int]
  val max = bounds.tail.head.asInstanceOf[Int]
  randomizer.nextInt(max - min) + min
}

var iters = 0
var prevAccuracy = 0.0D
var diff = 0.0

case class ModelAndResult(model: Map[String,_], accuracy: Double)
val modelsAndResults = scala.collection.mutable.ArrayBuffer[ModelAndResult]()

while (iters < numIters) {
  // generate random parameters and create a model
  val elasticNetParam = randomDouble(mcParams("elasticNetParam"))
  val regParam = randomDouble(mcParams("regParam"))
  val maxIter = randomInt(mcParams("maxIter"))
  val currentParams = Map("elasticNetParam" -> elasticNetParam,
                          "regParam" -> regParam,
                          "maxIter" -> maxIter)
  val lr = new LogisticRegression()
    .setElasticNetParam(elasticNetParam)
    .setRegParam(regParam)
    .setMaxIter(maxIter)
  // do 3-fold cross validation and get average accuracy
  val random = scala.util.Random
  val cvAccuracies = (0 until 3).map(i => {
    val Array(censusCVTrain, censusCVTest) = 
      censusTrain.randomSplit(Array(0.67, 0.33), random.nextLong)
    val model = lr.fit(censusCVTrain.toDF)
    val results = model.transform(censusCVTest.toDF)
      .select("label", "prediction")
      .collect
    val numCorrect = results.map(row => 
        if (row.getDouble(0) == row.getDouble(1)) 1 else 0)
      .foldLeft(0)(_ + _)
    1.0D * numCorrect / results.size
  })
  val accuracy = 1.0D * cvAccuracies.sum / cvAccuracies.size
  // acceptance criteria
  val acceptanceRate = if (prevAccuracy == 0.0D) 1.0D 
    else math.min(1.0D, accuracy / prevAccuracy)
  if (acceptanceRate >= 1.0D) {
    // keep the model, its obviously better than the previous one
    modelsAndResults += ModelAndResult(currentParams, accuracy)
    diff = accuracy - prevAccuracy
  } else {
    // throw the dice, keep the model if the dice is better than
    // acceptance rate.
    val dice = random.nextDouble * math.exp(-iters / numIters)
    if (dice > acceptanceRate) {
      // accept the model even though its worse than previous. This
      // is to jump out of local minima. The acceptance rate is annealed
      // by (numIter - iter) so it decreases as iter grows. We still have
      // the best model so far
      modelsAndResults += ModelAndResult(currentParams, accuracy)
    }
  }
  // update parameters
  prevAccuracy = accuracy
  iters += 1
  if (diff < tol) iters = numIters
}

// Save the evaluations for visualization via Python
val mcEvals = modelsAndResults.toList.map(mr => 
  (mr.model("elasticNetParam").asInstanceOf[Double], 
   mr.model("regParam").asInstanceOf[Double], 
   mr.model("maxIter").asInstanceOf[Int], 
   mr.accuracy))
val mcEvalsRDD = sc.parallelize(mcEvals)
  .coalesce(1)
  .map(e => "%.3f\t%.3f\t%d\t%.3f".format(e._1, e._2, e._3, e._4))
  .saveAsTextFile("/path/to/accs-mc")

I use Databricks Notebooks for this analysis, which provides limited visualization functionality natively, but allows me to switch to Python and matplotlib when I need to. Since I have 3 parameters, I attempted to visualize the selected points as a 3D scatterplot, the color of the point representing its accuracy. Here is the code to do the visualization.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
%python
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def plot_3dscatter(xyz_scores, labels):
    xx = []
    yy = []
    zz = []
    colors = []
    for xyz_score in xyz_scores:
        xx.append(xyz_score[0])
        yy.append(xyz_score[1])
        zz.append(xyz_score[2])
        colors.append(xyz_score[3])
    fig = plt.figure()
    ax = fig.add_subplot(111, projection="3d")
    p = ax.scatter(xx, yy, zz, c=colors, marker='o', cmap="cool")
    ax.set_xlabel(labels[0])
    ax.set_ylabel(labels[1])
    ax.set_zlabel(labels[2])
    fig.colorbar(p)
    return fig

def parse_eval(line):
    cols = line.split('\t')
    return (float(cols[0]), float(cols[1]), int(cols[2]), float(cols[3]))
    
// read from data saved from grid search above
evals_grid = (sc.textFile("/path/to/accs-grid/")
              .map(lambda line: parse_eval(line))
              .collect())
fig = plot_3dscatter(evals_grid, ["elasticNetParam", "regParam", "maxIter"])
display(fig)

This gives me the following visualizations for the Grid search (left) and Monte Carlo search (right) respectively. As can be seen, both approaches gravitates to approximately the same area in the hyperparameter space and produce very similar accuracy (0.841 vs 0.843 respectively). What is interesting to note, however, is that the Monte Carlo search seems to seek out and converge to a good area of the hyperspace, avoiding areas that are less promising.


I also plot the change in accuracy over time to illustrate the damping effect of Simulated Annealing. Here is the code to do so and the resulting chart. Note that the oscillations go down as time progresses, and I retain the best model so far (shown by the red dot), which may not necessarily be the model created by the last iteration.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import operator

points = enumerate([x[3] for x in evals_mc])
xs = []
ys = []
for x, y in points:
  xs.append(x)
  ys.append(y)
best_xy = sorted(zip(xs, ys), key=operator.itemgetter(1), reverse=True)[0]
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(xs, ys, color='b')
ax.plot(best_xy[0], best_xy[1], color='r', marker='o')
ax.set_xlabel("Iteration")
ax.set_ylabel("Accuracy")
ax.grid()
display(fig)


Future: Monte Carlo Search (using Spark MLLib Pipelines)


The following is a thought experiment. I would like to use Monte Carlo Hyperparameter optimization for my Spark MLLib models. Of course, it is possible to do so using the manual method I illustrated above, but I would also like to use the new Spark MLLib pipeline approach to do so. To do this, we would need some way to reach inside these currently opaque classes. The (non-working) code snippet below illustrates what that might look like.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val lr = new LogisticRegression()
val pipeline = new Pipeline()
  .setStages(Array(lr))

// add grid ranges instead of explicit list of values
val paramGrid = new MonteCarloGridBuilder()
  .addGridRange(lr.elasticNetParam, Array(0.1, 0.9))
  .addGridRange(lr.maxIter, Array(10, 20))
  .addGridRange(lr.regParam, Array(0.1, 0.01))
  .setMaxIters(50)
  .setTol(1e-9)
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

val cvModel = cv.fit(censusTrain.toDF)

val results = cvModel.transform(censusValid.toDF)
  .select("label", "prediction")
  .collect
val numCorrectPredictions = results.map(row => 
    if (row.getDouble(0) == row.getDouble(1)) 1 else 0)
  .foldLeft(0)(_ + _)
val accuracy = 1.0D * numCorrectPredictions / results.size
println("Test set accuracy: %.3f".format(accuracy))

As you will notice, the surface changes are quite minimal. The ParamGridBuilder has been replaced with the (hypothetical) MonteCarloGridBuilder and the addGrid calls have been replaced with addGridRange calls. In addition, there are 2 method calls on MonteCarloGridBuilder to set the maximum number of iterations it will run for and the tolerance, ie if the difference between the previous and current accuracy is less than tolerance, it should terminate. Internally, some more changes may be needed - currently the ParamGridBuilder creates the grid and hands it off. With a MonteCarloGridBuilder, the CrossValidator will need to check with the GridBuilder at each stage to determine the next point.

Be the first to comment. Comments are moderated to prevent spam.