Make sure you get all your messages in your Scala code

01Jul09

I had this funny little Scala actor related bug today. Imagine you want to process a few things in parallel. So you go:

val processor = self
jobs.foreach { job =>
  actor {
    processor ! (job.id, job.run)
  }
}
// Merge results
for (i <- 1 to jobs.size) {
  self.receiveWithin(1000) {
    case (jobId:Int, result:JobResult) => mergeResult(result)
  }
}

All good. Then you realise that one or more of the jobs may fail with an exception. Which you have to handle somehow. So, you think, you’ll break on the first exception and report back. So you change that to:

val processor = self
jobs.foreach { job =>
  actor {
    try {
      processor  !  (job.id, job.run)
    } catch {
      case ex:Throwable => processor  !  (job.id, ex, job)
    }
  }
}
// Merge results
for (i <- 1 to jobs.size) {
  self.receiveWithin(1000) {
    case (jobId:Int, result:JobResult) => mergeResult(result)
    case(jobId:Int, ex:Throwable, job:Job) => throw new RuntimeException("Job " + jobId + " failed", ex)
  }
}

Cool. You fail on the first one – you just blow up and report back to your caller that something went wrong. Great! Well, not so…. Because the other jobs you started are still going to send you their results. You’ve stopped that thread by throwing an exception, so there’s nothing to receive those messages. When the web-server (in this case) reuses that thread, it will be sent all those messages. It won’t actually receive them until it hits the receiveWithin methond, so when you are expecting the return from the freshly started actors, you will actually be getting the messages from the actors that broke while servicing the last request. Kind of undesirable really.

One thing you can do is wait for all the messages. Take a response back regardless of what it is. This is what I did. Here’s the example:

for (rowNumber <- 1 to mainRowsCount) {
self.receiveWithin(2000) {
case (rowNumber:Int, subentities:Seq[Entity]) => {
mainRowsWithJoins(rowNumber).addEntities(subentities)
}
case (rowNumber:Int, exception:Throwable, subrequest:Request) => {
exceptions += new RuntimeException(“Subrequest ” + subrequest.toString + ” failed”, exception)
}
}
}
for (jobNumber <- 1 to mainRowsCount) {

  self.receiveWithin(2000) { 

    case (jobNumber:Int, result:JobResult) => mergeResult(result)

    case (rowNumber:Int, exception:Throwable) => exceptions += new RuntimeException("Job " + job.toString + " failed", exception)

  }

}

Seems to cut the mustard for now, but I still think there’s something wrong with it – I don’t think you’re guaranteed to get all the messages, and I haven’t even started thinking about what will happen when I hit it with loads of requests running off the same Tomcat thread pool.

PS: Ok, now I’m reading this and thinking – that thread is dead, right, cause I blew it with the runtime exception. Except, it isn’t – some “self” is still receiving those messages when it says “receiveWithing”. Actors are not necessarily threads, I get that, and I’m sure it’s right, but it’s still quite confusing…..

Advertisement


No Responses Yet to “Make sure you get all your messages in your Scala code”

  1. Leave a Comment

Leave a Reply

Fill in your details below or click an icon to log in:

Gravatar
WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s


Follow

Get every new post delivered to your Inbox.