alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (package.scala)

This example Scala source code file (package.scala) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Scala tags/keywords

buffersplitter, combiner, compositethrowable, elem, elem, int, newto, newto, otherwise, r, r, throwable, to, to

The Scala package.scala source code

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.collection


import java.lang.Thread._

import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
import scala.collection.mutable.UnrolledBuffer
import annotation.unchecked.uncheckedVariance


/** Package object for parallel collections. 
 */
package object parallel {
  
  /* constants */
  val MIN_FOR_COPY = 512
  val CHECK_RATE = 512
  val SQRT2 = math.sqrt(2)
  val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
  
  /* functions */
  
  /** Computes threshold from the size of the collection and the parallelism level.
   */
  def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
    val p = parallelismLevel
    if (p > 1) 1 + sz / (8 * p)
    else sz
  }
  
  private[parallel] def unsupported = throw new UnsupportedOperationException
  
  private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)
  
  private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
  
  private[parallel] def getTaskSupport: TaskSupport = 
    if (util.Properties.isJavaAtLeast("1.6")) {
      val vendor = util.Properties.javaVmVendor
      if ((vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport
      else new ThreadPoolTaskSupport
    } else new ThreadPoolTaskSupport
  
  val tasksupport = getTaskSupport
  
  /* implicit conversions */
  
  trait FactoryOps[From, Elem, To] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }
    
    def isParallel: Boolean
    def asParallel: CanCombineFrom[From, Elem, To]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R): Otherwise[R]
  }
  
  implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {
    def isParallel = bf.isInstanceOf[Parallel]
    def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new Otherwise[R] {
      def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
    }
  }
  
  trait TraversableOps[T] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }
    
    def isParallel: Boolean
    def isParIterable: Boolean
    def asParIterable: ParIterable[T]
    def isParSeq: Boolean
    def asParSeq: ParSeq[T]
    def ifParSeq[R](isbody: ParSeq[T] => R): Otherwise[R]
    def toParArray: ParArray[T]
  }
  
  implicit def traversable2ops[T](t: collection.GenTraversableOnce[T]) = new TraversableOps[T] {
    def isParallel = t.isInstanceOf[Parallel]
    def isParIterable = t.isInstanceOf[ParIterable[_]]
    def asParIterable = t.asInstanceOf[ParIterable[T]]
    def isParSeq = t.isInstanceOf[ParSeq[_]]
    def asParSeq = t.asInstanceOf[ParSeq[T]]
    def ifParSeq[R](isbody: ParSeq[T] => R) = new Otherwise[R] {
      def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
    }
    def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
      val it = t.toIterator
      val cb = mutable.ParArrayCombiner[T]()
      while (it.hasNext) cb += it.next
      cb.result
    }
  }
  
  trait ThrowableOps {
    def alongWith(that: Throwable): Throwable
  }
  
  implicit def throwable2ops(self: Throwable) = new ThrowableOps {
    def alongWith(that: Throwable) = (self, that) match {
      case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
      case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
      case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
      case _ => new CompositeThrowable(Set(self, that))
    }
  }
  
  /* classes */
  
  /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
  final class CompositeThrowable(val throwables: Set[Throwable])
  extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", "))
  
  
  /** A helper iterator for iterating very small array buffers.
   *  Automatically forwards the signal delegate when splitting.
   */
  private[parallel] class BufferSplitter[T]
    (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
  extends IterableSplitter[T] {
    def hasNext = index < until
    def next = {
      val r = buffer(index)
      index += 1
      r
    }
    def remaining = until - index
    def dup = new BufferSplitter(buffer, index, until, signalDelegate)
    def split: Seq[IterableSplitter[T]] = if (remaining > 1) {
      val divsz = (until - index) / 2
      Seq(
        new BufferSplitter(buffer, index, index + divsz, signalDelegate),
        new BufferSplitter(buffer, index + divsz, until, signalDelegate)
      )
    } else Seq(this)
    private[parallel] override def debugInformation = {
      buildString {
        append =>
        append("---------------")
        append("Buffer iterator")
        append("buffer: " + buffer)
        append("index: " + index)
        append("until: " + until)
        append("---------------")
      }
    }
  }
  
  /** A helper combiner which contains an array of buckets. Buckets themselves
   *  are unrolled linked lists. Some parallel collections are constructed by
   *  sorting their result set according to some criteria.
   *  
   *  A reference `buckets` to buckets is maintained. Total size of all buckets
   *  is kept in `sz` and maintained whenever 2 bucket combiners are combined.
   *
   *  Clients decide how to maintain these by implementing `+=` and `result`.
   *  Populating and using the buckets is up to the client. While populating them,
   *  the client should update `sz` accordingly. Note that a bucket is by default
   *  set to `null` to save space - the client should initialize it.
   *  Note that in general the type of the elements contained in the buckets `Buck`
   *  doesn't have to correspond to combiner element type `Elem`.
   *  
   *  This class simply gives an efficient `combine` for free - it chains
   *  the buckets together. Since the `combine` contract states that the receiver (`this`)
   *  becomes invalidated, `combine` reuses the receiver and returns it.
   *  
   *  Methods `beforeCombine` and `afterCombine` are called before and after
   *  combining the buckets, respectively, given that the argument to `combine`
   *  is not `this` (as required by the `combine` contract).
   *  They can be overriden in subclasses to provide custom behaviour by modifying
   *  the receiver (which will be the return value).
   */
  private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
    (private val bucketnumber: Int)
  extends Combiner[Elem, To] {
  //self: EnvironmentPassingCombiner[Elem, To] =>
    protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
    protected var sz: Int = 0
    
    def size = sz
    
    def clear = {
      buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
      sz = 0
    }
    
    def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
    def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
    
    def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
      if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
        beforeCombine(other)
        
        val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
        var i = 0
        while (i < bucketnumber) {
          if (buckets(i) eq null) {
            buckets(i) = that.buckets(i)
          } else {
            if (that.buckets(i) ne null) buckets(i) concat that.buckets(i)
          }
          i += 1
        }
        sz = sz + that.size
        
        afterCombine(other)
        
        this
      } else sys.error("Unexpected combiner type.")
    } else this
    
  }
  
  
}
















Other Scala examples (source code examples)

Here is a short list of links related to this Scala package.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.