How we built a data pipeline with Lambda Architecture using Spark/Spark Streaming

Walmart Labs is a data-driven company. Many business and product decisions are based on the insights derived from data analysis. I work in Expo which is the A/B Testing platform for Walmart. As part of the platform we built a data ingestion and reporting pipeline which is used by the experimentation team to identify how the experiments are trending. In this blog I would like to give a little primer on how we built the data ingestion and reporting pipeline with Lamda Architecture using Spark which provides code reusability between the Streaming and the batch layers, key configurations for the deployment and a few troubleshooting tips.

Use Case

The use case is to take the click stream events, aggregate them based on the session id and generate metrics such as unique visitors, visits, orders, revenues, units, bounce rates, site error rates, performance metrics etc for both assigned and qualified experiments. Following is our architecture that accomplishes that:

Lamda Architecture

We have been running a Lambda architecture with Spark for more than 2 years in production now. The Lambda architecture provides a robust system that is fault-tolerant against hardware failures and human mistakes. The resulting system is linearly scalable by scaling out instead of scaling up. We needed a lambda architecture as there is no guarantee that the data will arrive exactly once and there can be noise in the data and to protect against data that arrives late due to network or server instability. Lambda architecture handles these issues by processing the data twice, once in the realtime streaming to give a quick view of the data/metrics that get generated and second time in a scheduled batch process to give a more reliable view of the data/metrics. One key consideration that we had with our lambda architecture was to make our writes to KairosDB idempotent in order to be able to reprocess the data and guarantee exactly once processing semantics.

Why Spark/Spark Streaming

Spark/Spark streaming improves developer productivity as it provides a unified api for streaming, batch and interactive analytics. Overall, Spark has been a flexible, robust and scalable data processing engine for our use case. Spark Streaming provides efficient fault-tolerant stateful stream processing. It provides high throughput and good integration with data stores like Cassandra, Kafka, Hadoop etc.

Stats

Here are some statistics about our data pipeline:

  • ~70k events per second on an average
  • ~250k events per second during peak hours

Details

We use Spark Kafka Direct for our streaming that guarantees exactly once processing semantics with checkpointing enabled for recovery. Batch processing is built on top of core spark.The above mentioned metrics are calculated and stored in KairosDB, a time series DB. Also, the user session information is stored in HDFS as Hive External tables from the batch processing. The user state is maintained for 12 hours in streaming and in batch the user state is maintained as long as the user is active.

RDDs in the streaming and the batch processing are processed as follows:

Parsing and Validation: This is done in a series of map()flatMap()filter()groupByKey()reduceByKey() and leftOuterJoin() RDD transformations which gets reused across Streaming and Batch jobs.

Data Enrichment and state maintenance: Data enrichment and state maintenance is done under updateStateByKey() in Streaming and mapPartitions() in case of batch processing with the business logic for merging the sessions and enriching the session with additional information getting reused across batch and streaming jobs. In streaming the state is maintained in memory and in the checkpoint as part of the functionality provided by updateStateByKey(). In batch processing, the state is maintained in Hive External Tables in HDFS. We also calculate rolling hour and 24 hour unique visitors using reduceByKeyAndWindow() with inverse function in the Streaming job.

Metric Generation and Save to KairosDB: Metrics are generated using flatMap() and reduceByKey() after the Sessionization and data enrichment and are saved to KairosDB using a kairosDB client. The write to KairosDB happens in forEachPartition() action. The logic for the save gets reused in both the Streaming and batch processing.

In the stream processing, the incoming beacons for each micro batch are grouped by sessionId and sorted by timestamp. Session state is updated by processing the sessions from the current micro batch and merging them with the session state from the previous micro batch. Metrics generated after sessionization are reduced and then stored to KairosDB. We store the offsets in Zookeeper in case we need to restart the job from the last successfully processed offsets. For accuracy, our transactions update the offsets in the same transaction when updating the results. The following code sample gives an overview of the same.

  //get or create streaming context
  def main(args: Array[String]) {
    try {
      val context =   StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY,
          () => {
            createFunc(args)
          })
      context.start()
      context.awaitTermination()

      sys.ShutdownHookThread{
        LOGGER.info("Gracefully shutting down")
        context.stop(true,true)
        LOGGER.info("Application stopped")
      }

    }catch {
      case ex: Exception => {
        System.exit(1)
      }
    }
  }
  
  def createFunc(args: Array[String]): StreamingContext = {
  
  //Start from the offsets stored in the zookeeper 
 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc,     kafkaParams, topicAndPartitions, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))
     
  
  //Getting the offset ranges of the input DStream
  var offsetRanges = Array[OffsetRange]()

  val kafkaStreamRdd = kafkaStream.transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
  }
  
   //Do the filters etc and get the relevant beacons needed for Sessionization
   val beaconMap = kafkaStreamRdd.transform(rdd => populateBeaconMap(rdd))
   
   //Group the sessions by sessionId and sort based on timestamp
   val groupedAndSortedSessions = beaconMap.transform(rdd => populateGroupedAndSortedSessions(rdd))
  
   //Use updateStateByKey to maintain the state of the sessions
   val statefulSessions = groupedAndSortedSessions.updateStateByKey(updateStatefulSessions)
   
   //Usage of checkpoint for stateful streaming
   statefulSessions.checkpoint(Seconds(DSTREAM_CHECKPOINT_INTERVAL))
   
   //Generate metric values from stateful sessions
   val metricValues = statefulSessions.transform(rdd => generateMetricValues(rdd))
   
   //Reduce the metrics generated
   val reducedMeticValues = metricValues.transform(rdd => generateReducedMetrics(rdd))
   
   //Saving the metrics to KairosDB
   reducedMeticValues.foreachRDD { rdd =>
            //Create the broadcast variable of the curator framework to store the data in Zookeeper
            val curatorSink = CuratorSinkBroadCast.initCuratorSink(rdd.sparkContext)
            //Save the data to kairos DB          
            saveToKairos(rdd, offsetRanges, curatorSink)
            
    }
  
  }
  
 //Method used by updateStateByKey to produce Stateful Sessions
  def updateStatefulSessions(
                                 //(Sequential list of timestamp and Json)
                                 currentSessionList: Seq[(List[(Long, String)])],
                                 //(requestSession, mergedSession)
                                 previousSessionRecord: Option[(StatefulSession, StatefulSession)]
                                 ): Option[(StatefulSession, StatefulSession)] = {
    //(requestSession,mergedSession)
    var result: Option[(StatefulSession, StatefulSession)] = null
    try {
      //Invalidating the session if it's inactive for greater than 35 minutes
      if (currentSessionList.size == 0) {
          if ((System.currentTimeMillis() - previousSessionRecord.getRequestTime) > 2100000l) {
              result = None
          } 
        }
      //Iterating through grouped and sorted sessions in the current batch and merge with the sessions from the previous batch
      currentSessionList.foreach(currentSessionRecord => {
        //Merge the session record from the current batch with the merged session from the previous batch  
        result = processStatefulSessions(currentSessionRecord,previousSessionRecord)
      })
    }catch {
      case ex: Exception => {   
        throw new Exception(msg, ex)
      }
    }
    result
  }

The following code sample has the reusable functions that are called from the Stream processing and batch processing for filtering and populating the valid beacons, populating grouped and sorted sessions, processing Stateful Sessions , generating the metric values, reducing the metrics and storing the generated metrics to KairosDB.

  //Do the filters etc and get the relevant beacons needed for Sessionization
  def populateBeaconMap(rdd: RDD[(String, String)]): RDD[(String, (Long, String))] = {
    rdd.flatMap[(String, (Long, String))] { case (key, inputBeacon) =>
        val beaconData = constructBeaconData(inputBeacon)
        if (beaconData !=null && beaconData.isValidBeacon()) {
          Seq((beaconData.getSessionID, (beaconData.getTimestamp, inputBeacon)))
        } else {
          Seq(("", (0l, "")))
        }
    }
  }
  
 //Group the sessions by sessionId and sort based on timestamp 
 def populateGroupedAndSortedSessions(rdd: RDD[(String, (Long, String))]): RDD[(String, List[(Long, String)])] = {
    val groupedSessions = rdd.groupByKey()
    val sortedSessions = groupedSessions.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1))
    sortedSessions
  }
  
  //Process stateful sessions by creating new or merged sessions
  def processStatefulSessions(currentSessionBeaconList: List[(Long, String)], previousBatchSessionRecord: StatefulSession): Option[(StatefulSession, StatefulSession)] = {
        val beaconSessionization = new BeaconSessionization()
        val sessionInfoArray = beaconSessionization.buildNewOrMergedSessions(currentSessionBeaconList, previousBatchSessionRecord)
        val currentSession = sessionInfoArray(0).asInstanceOf[StatefulSession]
        val mergedSession = if (sessionInfoArray(1) != null) sessionInfoArray(1).asInstanceOf[StatefulSession] else null
        Some((currentSession, mergedSession))
   }
   
   //Generate metric values from stateful sessions
   def generateMetricValues(rdd: RDD[(String, (StatefulSession, StatefulSession))]): RDD[(Any)] = {
    rdd.flatMap { case (sessionId, processStatefulSessions: (StatefulSession, StatefulSession)) =>
          val metricGeneration = new MetricGeneration()
          metricGeneration.getMetricValues(processStatefulSessions._1, processStatefulSessions._2).toArray.toTraversable
    }
  }
  
  //Reduce the metrics generated  
  def generateReducedMetrics(rdd: RDD[(String, (Long, Array[String]))]):
    RDD[(String, (Long, Array[String]))] = {
      rdd.reduceByKey((metricRecord1, metricRecord2) => {
        (Math.max(metricRecord1._1, metricRecord2._1), (metricRecord1._2 ++ metricRecord2._2))
     })
   }
  
  //Saving the data to KairosDB  
  def saveToKairos(recordsToSave: RDD[(String, (Long, Float))],offsetRanges:Array[OffsetRange], curatorSink: Broadcast[CuratorSink]) = {
    recordsToSave.foreachPartition({ partitionOfMetricRecords =>
        var metricList = new java.util.ArrayList[String]()
        if (partitionOfMetricRecords != null) {
               partitionOfMetricRecords.foreach(metricRecord => {
               metricHourList.add(constructMetrics(metricRecord._1, metricRecord._2))
            }
          )
        }
        callKairosClientToSaveDataPoints(metricList)
        //Committing the last processed offsets to Zookeeper
        if (offsetRanges!=null && offsetRanges.length>0 && curatorSink!=null) {
          val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          val nodePath = "/" + consumerId + "/offsets/" + osr.topic + "/" + osr.partition
          curatorSink.value.updateZookeeper(nodePath, osr.untilOffset)
        }
      })
    }  
The following is a code sample of the batch processing that does the Stateful Sessionization, Metric Generation and Save to KairosDB and the above functions that are called from Stream Processing are called from batch processing as well thus providing code reusability between the batch and stream processing.
//Creating a new Spark Context and generating a new Spark Session

val conf = new SparkConf().setAppName("BatchProcessing")

val sc = new SparkContext(conf)

val appSparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .config("hive.scratch.dir.permission", "777")
      .config("spark.sql.warehouse.dir", HIVE_WARE_HOUSE_DIR)
      .config(conf)
      .getOrCreate()
      
 
 val job = Job.getInstance()
 val fs: FileSystem = FileSystem.get(job.getConfiguration)
 
 //Loading the beacon data from HDFS
 val beaconData = sc.sequenceFile(inputPaths, classOf[BytesWritable], classOf[Text], 800).
          map { case (x, y) => (x.toString, y.toString) }
          
 //Do the filters etc and get the relevant beacons needed for Sessionization
 val sessions = populateBeaconMap(beaconData)

 ///Group the sessions by sessionId and sort based on timestamp
 val groupedAndSortedSessions = populateGroupedAndSortedSessions(sessions) 
 
 //Loading the previous job run records from HDFS stored in session table
 val prevSessionsRDD = loadMatchingSessionsFromThePreviousJobRunToMerge(fs, appSparkSession, sc, startTimeStamp.toLong*1000l)

 //CoGroup the incoming sessions with the matching sessions loaded from the previous job run
 val coGroupSessions = groupedAndSortedSessions.leftOuterJoin(prevSessionsRDD)
 
 //Enriching the session and merging the Session from the current batch with previous batch 
 val statefulSessions = coGroupSessions.mapPartitions[(String, (StatefulSession, StatefulSession))]({ partition =>
        val newOrMergedSessionsHashmap = collection.mutable.HashMap[String, (StatefulSession, StatefulSession)]()
        //Iterating through grouped and sorted sessions in the current batch and merge with the sessions from the previous batch
             partition.foreach(sessionRecordWithPreviousJobRun => {
                var previousMergedSession: StatefulSession = fromJsonSession(sessionRecordWithPreviousJobRun._2._2.get)               
                //Retrieve the current batch grouped and sorted session records
                val currentSessionRecord = sessionRecordWithPreviousJobRun._2._1
                //Merge the session record from the current batch with the merged session from the previous batch  
                val newOrMergedSessions = processStatefulSessions(currentSessionRecord, previousMergedSession)
                newOrMergedSessionsHashmap += sessionId ->(currentSession, mergedSession)
            })
            newOrMergedSessionsHashmap.toIterator         
      }, true)
      
     //Generate an RDD with sessionId, mergedSession and hourlyPartitioner
     val mergedSessionsRDDToBeSavedToHdfs = statefulSessions.map[(String, String, String)] { case (key, value) =>
          (key, .toJson(value._2),
            (TimeBucketUtil.getYear(value.getLastHit)
              + "-" + TimeBucketUtil.getMonth(value.getLastHit)
              + "-" + TimeBucketUtil.getDay(value.getLastHit)
              + "-" + TimeBucketUtil.getHour(value.getLastHit)))
        }  
     //Create the dataframe from the RDD
     val currentSessionsDFToWriteInHdfs = mergedSessionsRDDToBeSavedToHdfs.toDF("session_id", "session_json",
          "hourly_partitioner")
     
     //Register the dataframe with a temporary view
     currentSessionsDFToWriteInHdfs.createOrReplaceTempView("sessionRecordsTemp")  
     
     //Insert the session data to hive external tables which is used to maintain state
     appSparkSession.sql(s"  CREATE EXTERNAL TABLE IF NOT EXISTS ${SESSION_RECS_TABLE_NAME} (session_id STRING, session_json STRING) " +
          s"PARTITIONED BY (hourly_partitioner STRING)    " +
          s"stored as ORC LOCATION '${DATA_DIR}/${SESSION_RECS_TABLE_NAME}' ")

     appSparkSession.sql(
          s""" from sessionRecordsTemp ps  insert overwrite table ${SESSION_RECS_TABLE_NAME}
            | partition(hourly_partitioner)
            | select  ps.session_id, ps.session_json,
            | ps.hourly_partitioner  """.stripMargin)
            
     //Generate metric values from stateful sessions
     val metricValues = generateMetricValues(statefulSessions)
     
      //Reduce the metrics generated
     val reducedMeticValues = generateReducedMetrics(metricValues)
     
     //Saving the metrics to KairosDB
     saveToKairos(reducedMeticValues, null, null)
     
     //Method definition for loading the previous job run sessions
     private def loadMatchingSessionsFromThePreviousJobRunToMerge(fs:FileSystem, appSparkSession:SparkSession, sc:   SparkContext, batchStartTime:Long):
                                                               RDD[(String, String)] = {
        val prevSessionQryResultsForId = appSparkSession.sql("SELECT ps.session_id,  ps.session_json  " +
          s" from ${SESSION_RECS_TABLE_NAME} ps  " +
          " where  ps.hourly_partitioner = '"+formatYYYY_MM_DD_HH(batchStartTime - 3600l) +"'")

        val prevSessionsRDD = prevSessionQryResultsForId.rdd.map[(String, String)](rowRec =>
          (rowRec.getAs[String]("session_id"), rowRec.getAs[String]("session_json")))

        prevSessionsRDD
  }
     

Streaming Configuration and Deployment

We use a standalone cluster for Spark Streaming job deployment. For HA/DRwe run the Streaming Job in 2 different regions. The Streaming job from only one region writes to Kairos cluster at any point of time and the job in the second region keeps running by maintaining the state with no writes to kairos. If there is any issue with the job in the first region or if we need to do any maintenance to the cluster or upgrade the code or if there are any issues with the Kafka topic in one region, we switch the writes to Kairos from the job running in the second region. This gives us higher availability of the Streaming Job with session state being maintained and the data written to kairos is continuous.

Our streaming job runs with a micro batch of 60 seconds. The correct micro batch size is chosen depending on the processing times. We started with a batch window size of 10 seconds and observed processing times and increased it incrementally until the processing times are completed within the micro batch time period and there is no scheduling delay or it’s only increasing occasionally and recovers quickly. The ideal number of executors/cores is dependent on the application by considering various factors like number of peak events per second, maximum allowed lag and the buffering capabilities of the streaming source which can be arrived at by testing in a pre-production environment. The right amount of parallelism is between 2–3 times the number of cores and needs to be arrived at iteratively by testing the job with various configurations. Another key consideration is to set spark.memory.fraction and spark.memory.storageFraction to the right values. Based on various tests, it is observed that spark.locality.wait=0s is good for the job performance. We use kryo serialization in our Spark Jobs for better performance. When upgrading the application code, the application needs to be shutdown gracefully with no further records to process.

Job Monitoring and Automatic Job Restarts for Streaming

We use a python script that runs every 5 minutes to monitor the streaming job to see if its up and running. We monitor if there is any lag, total delay, number of failed jobs vs number of successful jobs and take appropriate steps if it exceeds a certain threshold. We query for these metrics from Spark UI end point http://streamingUIURL:4040/metrics/json and http://streamingUIURL:4040/api/v1/applications/<appId>/jobs. If the job is down for some reason or the number of failure jobs are greater than the number of successful jobs for the past 5 minutes or if any of the task is stuck due to any infrastructure issues then the job gets restarted automatically and we receive an alert email.

Troubleshooting the streaming job

We use Spark streaming UI to troubleshoot any issues with the job. Following is a screenshot of the Streaming UI. Ideally the scheduling delay should be 0 or even if it increases, it should recover quickly within acceptable time periods. The processing time should be less than or equal to the microbatch time period. The Kafka Direct stream consumption parallelism is equal to the number of Kafka partitions and the number of partitions need to be adjusted if there is not enough parallelism for the reads.

In case of increased processing times, individual batches can be drilled down to see which stage is taking time as shown below. Shuffle reads/writes should be as less as possible and there should not be any data skew. If the shuffles are taking a lot of time, then one way of fixing that is by increasing the cores and the parallelism.

DAG visualization as shown below is used to determine if there are any pipelining operations that are separated by shuffles. Performance is good if all the operations go through a single pipeline without any shuffles. It also gives an insight if any of the RDDs are cached. The cached RDDs are denoted by a green highlight. Caching this RDD means future computations on this RDD can be accessed from memory thereby improving the performance.

The storage section of streaming UI as shown below provides an insight to see if the cached RDDs are regularly cleared or not. Ideally cached RDDs should be cleared at regular intervals to avoid any performance bottlenecks.

The executors tab of the Web UI as shown below also provides information about the number of active tasks, utilization of the cores, task GC time, shuffle reads/writes. If there is a huge difference between active tasks vs cores allocated then there is under utilization of the cluster and the core allocation need to be adjusted accordingly. Thread dump is used to drill down on possible performance issues.

Batch Configuration and Deployment

We use Mesos as our cluster manager for batch jobs and use HDFS for state maintenance and intermediate file storage. The batch processing job runs every 30 minutes and processes any new data available in HDFS. The data is stored in Hive external tables. We configured the executor memory depending on the amount of data that is getting processed in each batch. We also use shuffle operations like reduceByKeygroupByKeyrepartitioncoalescecogroupjoin etc as part of the batch job, the configs like spark.executor.memoryspark.cores.maxspark.default.parallelism were adjusted according to the job requirement iteratively.

Troubleshooting the batch job

A few slow local tasks could cause a huge performance impact when one stage needs to finish before the next one can start. The detailed stage/tasks link in the Spark UI can be used to identify slow running nodes, nodes with resources problems, skew in data partitioning which can be identified by looking at the input size/records, or a small number of tasks taking significantly longer time to execute than the others. Drilling down into slower running tasks we can determine if the slowness is in writing data, reading data, or computation. If the processing is slow, it can be due to not enough resources and we would need to focus on how much memory and cpu has been allocated for the executers and also the total number of cores allocated for the job.

Conclusion

To recap, here are some points covered in this article

  • Building the data pipeline for A/B testing with the lambda architecture using Spark helped us to have quick view of the data/metrics that get generated with a streaming job, and a reliable view from a scheduled batch process.
  • Using Spark/Spark Streaming helped us to write the business logic functions once, and then reuse the code in a batch ETL process as well as a streaming process which helped us lower the risk for errors resulting from duplicate code bases and also helped with the developer productivity as it provides a unified api for streaming, batch and interactive analytics.
  • We focused on having a stable stream/batch processing application first before focusing on the throughput. The performance of the applications were improved by tuning Spark’s serialization, memory parameters, increasing the number of cores and parallelism iteratively.
  • Spark Streaming/Batch UI provides very good information on the performance bottlenecks like shuffles, data skew, slow running tasks due to resource issues, task GC time, shuffle reads/writes, slow running stages, storage and other information to help troubleshoot the jobs.

Leave a Reply