Technical Tuesday: How to write a custom Spark Classifier


 

In last month’s edition of dataxu’s Technical Tuesday series, we took a look at why you should write your own spark classifier. This month, Maximo Gurmendez, a Data Science Engineering Lead, dives into the practical steps of how to write a custom Spark Classifier using Categorical Naive Bayes.

How to write a custom Spark Classifier: Categorical Naive Bayes

Recently, we took a look at a few reasons why you would want to write your own Spark Classifier/Estimator. And now that you’ve decided to move forward, let’s take a look at how to write your own Spark Classifier by going over the implementation of a flavor of Naive Bayes for binary classification over categorical features. As opposed to the standard Spark implementation, this version does not need additional stages for feature encoding into numerical vectors, making it much faster to train.

dataxu bids on ads in real-time on behalf of its customers and trains on past bids to optimize for future bids. Our system trains thousands of advertiser-specific models and has multi-terabyte datasets. Hence, we need to consider both model performance and cost of training. We always choose the model with the highest performance, but for a given performance level, we need to pick the classifier that minimizes training time (and prediction time, but that’s a topic for another blog post).

For our case, the custom implementation we’ll describe, which we’ll call Categorical Naive Bayes, has similar model performance (using ROC AUC as the metric) to the standard Naive Bayes implementation that comes with Spark. However, to achieve that parity, we had to add some additional stages of feature engineering to Spark’s Naive Bayes, such as numerical encoding from categorical features, and attribute selection. This slows down the trainer as it implies several more passes through the dataset.

Full disclosure, we didn’t end up using this new implementation in production and chose mostly tree-based algorithms due to model performance reasons. That said, Categorical Naive Bayes is 3X faster than Spark Naive Bayes and 10X faster than Spark Random Forest Bayes for our real-time bidding training data, in which most features are categorical.

Recap on Spark Pipeline Concepts:

A Spark ML pipeline consists of a series of stages. Each stage can be a Transformer or an Estimator. Transformers apply a well-defined transformation on a dataset while Estimators have the added capability of producing Models by traversing the dataset. NaiveBayes and StringIndexer are examples of Estimators while VectorAssembler and OneHotEncoder are examples of Transformers. Models, in turn, are transformers because they can provide predictions for all elements in a dataset as a transformation.

Comparing Spark Naive Bayes to Categorical Naive Bayes

Spark Naive Bayes assumes a feature layout mostly used in language models (e.g. multiple Bernoulli or Multinomial models) where the value of a feature indicates the presence or strength of that term (column) within the document (row). Hence, one needs to use techniques such as one-hot-encoding to run a Naive Bayes algorithm on top of categorical features, and some sort of feature selection to avoid having too many features. A typical pipeline would be:

    val stringIndexerFeature1 = new StringIndexer()
    stringIndexerFeature1.setInputCol("feature1")
    stringIndexerFeature1.setOutputCol("num_feature1")
    
    val stringIndexerFeature2 = new StringIndexer()
    stringIndexerFeature2.setInputCol("feature2")
    stringIndexerFeature2.setOutputCol("num_feature2")
    
    val assembler = new VectorAssembler()
      .setInputCols(Array("num_feature1", "num_feature2"))
      .setOutputCol("numerical_features")
    
    val encoder = new OneHotEncoder()
      .setInputCol("numerical_features")
      .setOutputCol("features")
    
    val naiveBayes = new NaiveBayes().setLabelCol("label")
    
    val stages: Array[PipelineStage] = Array(
      stringIndexerFeature1,
      stringIndexerFeature2,
      assembler,
      encoder,
      naiveBayes)
    
    val pipeline = new Pipeline().setStages(stages)
    val model = pipeline.fit(trainDf)

Note that we have two string indexers in this sample pipeline. At the time of this writing, Spark is not smart enough to realize that those two stages could be parallelized and, thus, we pay the cost of traversing the data twice before we submit it to the classifier. In order to avoid this, our implementation computes the conditional probabilities for all categories in a single pass. These conditional probabilities are sufficient to make predictions on a dataset.

For each categorical value of a feature, we need to compute the conditional probability by counting the number of positive and negative instances. As you can imagine, features with extremely high cardinality might blow up the memory of the executors. For this reason, we use the stream-lib library which calculates the top-k elements and their frequency on a stream of data, following the ideas from Metwally, Agrawal, and Amr Abbadi (Efficient computation of frequent and top-k elements in data streams).

 Continue reading the full post here to discover the typical usage, implementation, and more of Categorical Naive Bayes.