Pretty image
In the final installment of his series on the Scala language, Venkat explores concurrency.

We saw the object-oriented paradigm and the functional style of programming interplay in Scala in the previous articles in this series. In this article we’ll use the functional purity for greater good—programming concurrency.

We all desire to make our applications faster, more responsive. There’s no shortage of resources with multiple cores in our hands. The hard part is writing the code correctly to reap the benefits of the power on hand.

To fully exploit the power of concurrency and perform various complex tasks, concurrency libraries like Akka are quite helpful. (See also Programming Concurrency on the JVM by one of our favorite authors -ed.) However, in this article, we’ll stay within the facilities provided directly in the Scala language and make use of parallel collections.

We want our applications to be responsive and faster, but we don’t want to compromise their correctness. The purity of functional programming is our ally in this area. When we set out to make code concurrent, we must ensure the purity of operations, making sure they don’t have side effects. This means using vals (rather than vars) and immutable objects. For avoiding side effects, Scala rewards us with faster response without compromising the correctness of the results.

Let’s first create a utility function that will allow us to measure the time operations will take. This will help us compare the sequential processing and its concurrent counterpart.

 object Time {
  def code(block: () => Unit) = {
  val start = System.nanoTime
  try {
  block()
  } finally {
  val end = System.nanoTime
  println("Time taken: " + (end - start)/1.0e9)
  }
  }
 }

The code function of the Time singleton accepts a function value and measures the time it takes to apply or execute the function value (code block). For example, to report the time for a simple block of code that takes a short nap, we can write the following:

 Time.code { () => Thread.sleep(1000) }
 //Time taken: 1.00088

We’ve seen different Scala collections in the previous articles. The collections provide a special method named par that returns to us a parallel collection.

Let’s create a list of names and get a parallel collection from it.

 val names = List("Jane", "Jacob", "Brenda", "Brad")
 println(names)
 //List(Jane, Jacob, Brenda, Brad)
 println(names.par)
 //ParVector(Jane, Jacob, Brenda, Brad)

Certain operations on parallel collections run concurrently and we can make use of these to speed up operations that take significant time. Suppose we’re creating a messenger application and want to check the status of multiple people we’re interacting with. The function for this operation might look like the following:

 def checkStatus(name : String) = {
  Thread.sleep(1000) //simulate the delay to communicate
  String.format("%s's status", name)
 }

Sequential calls to this function will incur delay in proportion to the number of elements in the list.

 Time.code { () => println(names.map { checkStatus }) }
 //List(Jane's status, Jacob's status, Brenda's status,
 // Brad's status)
 //Time taken: 4.005623

We can enjoy a speedup if we apply the map function on the parallel collection we created.

 Time.code { () => println(names.par.map { checkStatus }) }
 //ParVector(Jane's status, Jacob's status, Brenda's status,
 // Brad's status)
 //Time taken: 1.021915

Unlike the execution of the map function on the List, the map function on the parallel collection ran the given function value concurrently for each element. The number of concurrent executions depends on the number of threads in a pool that Scala allocates, which in turn depends on the number of cores available. This is simple, but is somewhat limiting. If we need finer control on the thread pool, libraries like Akka provide some good solutions.

Suppose we want to find if all our meeting participants are connected. We can do this sequentially.

 def isConnected(name : String) = {
  Thread.sleep(1000) //simulate the delay to communicate
  name.length > 4 //simulated response
 }
 Time.code { () =>
  println("All connected? :" + names.forall { isConnected })
 }
 //All connected? :false
 //Time taken: 4.004065

Unlike the map function, the forall function needs to collect the result of the evaluation of the function value for all the elements. However, the evaluations can be performed concurrently as in the next code.

 Time.code { () =>
  println("All connected? :" + names.par.forall { isConnected })
 }
 //All connected? :false
 //Time taken: 1.018888

We saw how the parallel collection runs some operations concurrently. Not all operations can be executed concurrently, however. For example, the parallel collection provides the foldLeft, foldRight, and reduce methods so we can conveniently invoke them on these kinds of collections like we do on regular collections. Based on the context, we have to keep in mind that such operation will be sequential and not concurrent.

 def addToConnected(connected : Int, name : String) =
  connected + (if (isConnected(name)) 1 else 0)
 Time.code { () =>
  println("Number of folks connected: " +
  names.foldLeft(0) { addToConnected })
 }
 //Number of folks connected: 2
 //Time taken: 4.005546
 Time.code { () =>
  println("Number of folks connected: " +
  names.par.foldLeft(0) { addToConnected })
 }
 //Number of folks connected: 2
 //Time taken: 4.014092

There is another caveat with parallel collections: the concurrent version may do more work. Suppose we want to find the first connected person. We could use the find function, like so:

 Time.code { () =>
  println("A connected person: " + names.find { isConnected })
 }
 //A connected person: Some(Jacob)
 //Time taken: 2.003715

The sequential version evaluated the isConnected function for the first two persons in the list before it found a matching element. Let’s take a look at the parallel version.

 Time.code { () =>
  println("A connected person: " + names.par.find { isConnected })
 }
 //A connected person: Some(Jacob)
 //Time taken: 1.020151

The concurrent version gave the same result as the previous version and took less time, but there’s a catch. To understand this, let’s introduce a print statement, an impurity, to make visible the actual tasks executed.

 names.find { name => println("seq: " + name); isConnected(name) }
 //seq: Jane
 //seq: Jacob
 names.par.find { name => println("conc: " + name); isConnected(name) }
 //conc: Jane
 //conc: Jacob
 //conc: Brad
 //conc: Brenda

The concurrent version ends up doing more work than the sequential version, as it’s trying to evaluate all the cases concurrently. We have to weight this in and ensure the extra executions are not prohibitively high and impact either the performance or the outcome.

Let’s put the parallel collection to a practical use. In the December article on “Functional Style of Programming” we looked at concise code to fetch and process stock prices from Yahoo. Let’s revisit that code and make it concurrent.

For this example we’ll use the following tickers:

val tickers = List("AAPL", "AMD", "CSCO", "GOOG", "HPQ", "INTC", "MSFT", "ORCL")

The code to fetch the price and construct it into a StockPrice object is repeated from that article here.

 case class StockPrice(ticker : String, price : Double) {
  def print = println("Top stock is " + ticker + " at price $" + price)
 }
 def getPrice(ticker : String) = {
  val url = "http://ichart.finance.yahoo.com/table.csv?s=" + ticker
  val data = io.Source.fromURL(url).mkString
  val price = data.split("\n")(1).split(",")(4).toDouble
  StockPrice(ticker, price)
 }

The helper functions we needed to check if the stock prices are less than $500 and to pick the higher priced stock are show next:

 def isNotOver500(stockPrice : StockPrice) = stockPrice.price < 500
 def pickHigherPriced(stockPrice1 : StockPrice, stockPrice2 : StockPrice) =
  if(stockPrice1.price > stockPrice2.price) stockPrice1 else stockPrice2

Finally here’s the sequential code to compose all these to produce the desired result. We’ll measure the time to execute this code.

 Time.code { () =>
  tickers map getPrice filter isNotOver500 reduce pickHigherPriced print
 }
 //Top stock is ORCL at price $30.01
 //Time taken: 17.777705

The code took around 17 seconds to get the prices from Yahoo and determine the highest-priced stock not over $500.

Let’s make a small change to the code to turn this into concurrent execution.

 Time.code { () =>
  tickers.par map getPrice filter isNotOver500 reduce pickHigherPriced print
 }
 //Top stock is ORCL at price $30.01
 //Time taken: 3.805312

We inserted the call to par and invoked the map on the resulting parallel collection. All the requests to Yahoo and the subsequent calls to isNotOver500 are done concurrently. The only sequential part is the reduce operation and the resulting calls to pickHigherPriced. The concurrent version took only around 3 seconds to produce the same result.

The functional programming style combined with powerful libraries make concurrent programming not only easier but also fun.

It has been a pleasure writing this series, and we’ve now arrived at the culminating article. I sincerely hope you enjoyed this exploration of the Scala language. Thank you for reading.

Dr. Venkat Subramaniam is an award-winning author, founder of Agile Developer, Inc., and an adjunct faculty at the University of Houston. He has trained and mentored thousands of software developers in the US, Canada, Europe, and Asia, and is a regularly invited speaker at several international conferences. Venkat helps his clients effectively apply and succeed with agile practices on their software projects. He is also the author of .NET Gotchas, the coauthor of 2007 Jolt Productivity Award winning Practices of an Agile Developer, the author of Programming Groovy: Dynamic Productivity for the Java Developer and Programming Scala: Tackle Multi-Core Complexity on the Java Virtual Machine. His latest book is Programming Concurrency on the JVM: Mastering Synchronization, STM, and Actors.

This series started in the September 2011 issue and concludes with this August 2012 issue. If you’d like to read the whole series, here are the links to the individual articles: 9/11: The Elegance of Scala, 10/11: Sensible Typing and Optional Items, 11/11: Cute Classes and Pure OO, 12/11: Functional Style of Programming, 1/12: Working with Collections, 2/12: Creating Higher Order Functions, 3/12: Pattern Matching, 4/12: XML as First Class Citizen, 5/12: Recursions and Tail Call Optimization, 6/12: Using Traits, 7/12: Chaining Traits, and 8/12: Concurrency

Send the author your feedback or discuss the article in the magazine forum.