Cameron Hotchkies

Categories

  • Coding

Tags

  • actors
  • async
  • concurrency
  • futures
  • play
  • scala
  • tutorial

In the previous post, we added actors to handle concurrency in a more sensible manner inside of our Play application. In this post we add another API endpoint, relying further on our existing actor model and creating a new method for operating on Futures.

As before, the source code for this tutorial series is available on github with the code specifically for this post here. Since the code builds on the previous post, you can grab the current state of the project (where part four finished) here.

UPDATE: The original version of this post defined an incorrect projection for the augmented TicketBlock. The code has been fixed, but the commits on the GitHub version do not line up perfectly.

Limitations of the current API

Our existing API for listing ticket blocks works, but it is not very practical. The only way to get information about a ticket block is to either know the id field ahead of time, or list all ticket blocks ever created. If we take a minute to think about how our API is likely to be used, we can safely assume that the user will desire a behavior flow similar to this:

  1. Select the event they want to attend
  2. Inspect the block of tickets available to purchase for that event
  3. Place an order of x tickets from the block

Our current API is not very good for #2, and does not even support displaying if the block still has tickets available for purchase. We need an API endpoint that can supply the ticket block information from a given event, including the amount of tickets still available for purchase.

If we were to create a new API endpoint, it should probably follow this pattern:

GET  /events/:eventID/tickets/blocks/

Now that we have our end goal in mind, we can get started. Do not bother adding the endpoint to the routes file yet, as we need to create the controller definition first.

Creating the controller endpoint

Since our resource path root will be /events/, we will add the new method to our Events controller. We will create a method named ticketBlocksForEvent. The new method will contain boilerplate code to verify the event exists. Once it has the correct Event, it will make a new method call on the Event instance requesting a Future of all TicketBlocks associated with that event.

import com.semisafe.ticketoverlords.TicketBlock

// ... <snip> ...

def ticketBlocksForEvent(eventID: Long) = Action.async { request =>
  val eventFuture = Event.getByID(eventID)

  eventFuture.flatMap { event =>
    event.fold {
      Future.successful(
        NotFound(Json.toJson(ErrorResponse(NOT_FOUND, "No event found"))))
    } { e =>
      val ticketBlocks: Future[Seq[TicketBlock]] =
        e.ticketBlocksWithAvailability
      ticketBlocks.map { tb =>
        Ok(Json.toJson(SuccessResponse(tb)))
      }
    }
  }
}

A basic implementation

Now that we have defined the requirements for the ticketBlocksWithAvailability method, we can fill out the basic implementation in our Event case class.

def ticketBlocksWithAvailability(): Future[Seq[TicketBlock]] = {
  this.id.fold {
    Future.successful(Nil: Seq[TicketBlock])
  } { eid =>
    val basicBlocks = TicketBlock.listForEvent(eid)

    basicBlocks
  }
}

And add the TicketBlock.listForEvent() method…

def listForEvent(eventID: Long): Future[Seq[TicketBlock]] = {
  val blockList = table.filter { tb =>
    tb.eventID === eventID
  }.result
  db.run(blockList)
}

Tell me something new

We now have the remarkably unremarkable code required to retrieve a TicketBlock for a given Event. There is just one small detail we have ignored. None of this supplies the actual tickets available in a returned ticket block.

Open TicketIsser.scala and add the following case class.

case class AvailabilityCheck(ticketBlockID: Long)

This will serve as the message used to communicate with the issuer actors to request the current availability of a ticket block. We can add support for this directly in the TicketIssuer actor. Inside the TicketIssuer we simply forward the request to the TicketIssuerWorker if it exists.

def receive = {
  case order: Order         => placeOrder(order)
  case a: AvailabilityCheck => checkAvailability(a)
  case NewTicketBlock(t)    => t.id.foreach(createWorker)
}

def checkAvailability(message: AvailabilityCheck) = {
  val workerRef = workers.get(message.ticketBlockID)

  workerRef.fold {
    sender ! ActorFailure(TicketBlockUnavailable(message.ticketBlockID))
  } { worker =>
    worker forward message
  }
}

Adding support to TicketIssuerWorker is slightly more involved, due to the multiple receive states.

def initializing: Actor.Receive = {
  // ... <snip> ...  
  case AvailabilityCheck(ticketBlockID) => {
     val failureResponse = TicketBlockUnavailable(ticketBlockID)
     sender ! ActorFailure(failureResponse)
   }
}

def normalOperation(availability: Int): Actor.Receive = {
  // ... <snip> ...
  case _: AvailabilityCheck => sender ! availability
}

def soldOut: Actor.Receive = {
  // ... <snip> ...
  case _: AvailabilityCheck => sender ! 0
}

Finding a place for the new data

With the code for AvailabilityCheck in place, we can revisit the ticketBlocksWithAvailability method in the Event class. Currently, this method returns a Future[Seq[TicketBlock]]. Our current design for a TicketBlock does not include a count for the available ticket count. We can update the TicketBlock model to include an Option for the quantity of available tickets. Using an Option here allows us to include the value when we have it, and ignore it any other time.

case class TicketBlock(
  id: Option[Long],
  eventID: Long,
  name: String,
  productCode: String,
  price: BigDecimal,
  initialSize: Int,
  saleStart: DateTime,
  saleEnd: DateTime,
  availability: Option[Int] = None)

We will also need to update our * projection in the TicketBlocksTable class to handle the fact this field is unrelated to our database table.

UPDATE: There was a mistake in the original version of the code, so the update for this appears later in the GitHub version. Yes, there is a post coming on unit testing.

/*
UPDATE: The original version of the post used the following projection.
While it works for reading from the database, it totally breaks writes.
----
def * = (id.?, eventID, name, productCode, price, initialSize,
      saleStart, saleEnd, None) <>
      ((TicketBlock.apply _).tupled, TicketBlock.unapply)
----
The correct code is the following:
*/
def * = (id.?, eventID, name, productCode, price, initialSize,
  saleStart, saleEnd) <>
  (
    (TicketBlock.apply(_: Option[Long], _: Long, _: String, _: String,
      _: BigDecimal, _: Int, _: DateTime, _: DateTime,
      None)).tupled, { tb: TicketBlock =>
        TicketBlock.unapply(tb).map {
          case (a, b, c, d, e, f, g, h, _) => (a, b, c, d, e, f, g, h)
        }
      })

Setting the stage

Now that our TicketBlock has been augmented to support this new optional field, we can update the ticketBlocksWithAvailability method to actually include the available quantity. Before adding the core logic, we will start with some supporting code. We know we will need to perform an ask request to actors, which in turn requires a Timeout. Our existing timeout (for placing an order) is for an action that people will wait for. When it comes to the availability of tickets, users are unlikely to be as patient. Create a new parameter in application.conf named ticket_availability_ms.

ticketoverlords.timeouts.ticket_availability_ms=400

Since the original timeout is in seconds, we are appending the unit to this parameter. Now that the parameter is defined, we can add the Timeout to the Events controller.

import akka.util.Timeout
import play.api.Play.current
import scala.concurrent.duration._

// ... <snip> ...

def ticketBlocksForEvent(eventID: Long) = Action.async { request =>
  val eventFuture = Event.getByID(eventID)

  eventFuture.flatMap { event =>

    event.fold {
      Future.successful(
        NotFound(Json.toJson(ErrorResponse(NOT_FOUND, "No event found"))))
    } { e =>

      val timeoutKey = "ticketoverlords.timeouts.ticket_availability_ms"
      val configuredTimeout = current.configuration.getInt(timeoutKey)
      val resolvedTimeout = configuredTimeout.getOrElse(400)
      implicit val timeout = Timeout(resolvedTimeout.milliseconds)

      val ticketBlocks: Future[Seq[TicketBlock]] =
        e.ticketBlocksWithAvailability
      ticketBlocks.map { tb =>
        Ok(Json.toJson(SuccessResponse(tb)))
      }
    }
  }
}

Gathering all the data

We leave the timeout as an implicit val, which in turn is implicitly passed into the ask request in ticketBlocksWithAvailability.

import akka.util.Timeout
import akka.pattern.ask

// ... <snip> ...

def ticketBlocksWithAvailability(
   implicit timeout: Timeout): Future[Seq[TicketBlock]] = {

   id.fold {
     Future.successful(Nil: Seq[TicketBlock])
   } { eid =>

     val basicBlocks = TicketBlock.listForEvent(eid)
     val issuer = TicketIssuer.getSelection
     val blocksWithAvailability: Future[Seq[TicketBlock]] =
       basicBlocks.flatMap { blocks =>
         val updatedBlocks: Seq[Future[TicketBlock]] = for {
           block <- blocks
           blockID <- block.id
           availabilityRaw = issuer ? AvailabilityCheck(blockID)

           availability = availabilityRaw.mapTo[Int]

           updatedBlock = availability.map { a =>
             block.copy(availability = Option(a))
           }
         } yield updatedBlock

         // Transform Seq[Future[...]] to Future[Seq[...]]
         Future.sequence(updatedBlocks)
       }

     blocksWithAvailability
   }
 }

The code retrieves the list of ticket blocks with our new database method. We then iterate through the blocks retrieved from the database and make a request to the TicketIssuer for the availability of tickets for each block. This results in a sequence of Futures, one for each AvailabilityCheck performed.

Close, but not good enough

Inverting a sequence of futures is so common that the standard library includes the Future.sequence method, which our code used because the type signatures seemed to be what we wanted. Unfortunately, the Future.sequence() function gives up at the first sign of trouble.

Any Future that results in a failure will cause the whole sequence to become a failure. That is not what we want to happen at all, in fact we would prefer the opposite. We do not really care if some of the blocks do not respond, some information coming back to the user is better than no information coming back. We actually expect some of our Futures to fail, due to the way we raise exceptions via the actors Failure type. To allow for this, we will create a new Future-like object called OptimisticFuture.

package com.semisafe.ticketoverlords

import scala.concurrent.Future
import play.api.libs.concurrent.Execution.Implicits._

object OptimisticFuture {
  def sequence[A](source: Seq[Future[A]]): Future[Seq[A]] = {

    val optioned = source.map { f =>
      f.map(Option.apply).recover {
        case _ => None: Option[A]
      }
    }

    Future.sequence(optioned).map(_.flatten)
  }
}

Our new code performs a map on the sequence, wrapping Successful results in an Option. We use the recover method to discard the failures and replace them with None. We could easily use a Try or an Either instead of Option here, but we are not really concerned with why the failure occurred here. Once the Option sequence exists, we can flatten it to discard the None entries. With this new utility object, we can create the Future[Seq] we want that only contains successful futures.

def ticketBlocksWithAvailability(
  implicit timeout: Timeout): Future[Seq[TicketBlock]] = {

  id.fold {
    Future.successful(Nil: Seq[TicketBlock])
  } { eid =>

    val basicBlocks = TicketBlock.listForEvent(eid)
    val issuer = TicketIssuer.getSelection
    val blocksWithAvailability = basicBlocks.flatMap { blocks =>
      val updatedBlocks: Seq[Future[TicketBlock]] = for {
        block <- blocks
        blockID <- block.id
        availabilityRaw = issuer ? AvailabilityCheck(blockID)

        availability = availabilityRaw.mapTo[Int]

        updatedBlock = availability.map { a =>
          block.copy(availability = Option(a))
        }
      } yield updatedBlock


      // Use our utility class to get only successful results
      OptimisticFuture.sequence(updatedBlocks)
    }

    blocksWithAvailability
  }
}

We now have enough implemented to create our route endpoint.

GET  /events/:eventID/tickets/blocks/	controllers.Events.ticketBlocksForEvent(eventID: Long)

Good enough to test

By making a curl request, we can verify the first event returns the expected ticket blocks.

$ curl -w '\n' localhost:9000/events/1/tickets/blocks/

{"result":"ok","response":[]}

That was unexpected. There should be a valid response coming back. Just to be sure it is not our new code, we will run the exact same curl command again without restarting our application.

$ curl -w '\n' localhost:9000/events/1/tickets/blocks/

{"result":"ok","response":[{"id":1,"eventID":1,"name":"General
Admission","productCode":"GA0001","price":375,"initialSize":1000,"saleStart":142
0660800000,"saleEnd":1397746800000,"availability":995}]}

What happened here is that our actor selection that loads the TicketIssuer on demand has a problem that it is not readable for a short period of time while it initializes. This is as designed, but if no other service has attempted to use this actor, this call will fail. We will be fixing this in a later post.

In the meantime, it actually serves to illustrate that our failing futures (generating a TicketBlockUnavailable failure) will actually be suppressed by our OptimisticFuture

Until next time…

In this post we covered handling a sequence of future results that are not all required to build a response and converted them to a single Future containing all of the results from the tasks that completed successfully. This is a useful pattern for aggregating results from disparate 3rd party services that may or may not ever return results.

In part 6, we will take a break from the back end and give our user interface a sorely needed makeover.