package concurrent
Common constants, factory methods and objects used throughout OPAL when performing concurrent computations.
- Source
- package.scala
- Alphabetic
- By Inheritance
- concurrent
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- class ConcurrentExceptions extends Exception
An exception that is used to signal that some exceptions occured concurrently.
An exception that is used to signal that some exceptions occured concurrently. Those exceptions are added to this exception by the underlying framework and can then be queried using the standard methods.
This exception is not associated with a stack trace, because it is not the root cause of the problem.
- final class ConcurrentTasks[T] extends Tasks[T]
Executes the given function
process
for each submitted value of typeT
.Executes the given function
process
for each submitted value of typeT
. Theprocess
function can add further values that should be processed.val tasks = Tasks[T] { (tasks : Tasks[T], t : T) => // do something with t if (<some condition>) { tasks.submit(nextT) } } val exceptions = tasks.join()
Example: - trait Locking extends AnyRef
A basic facility to model shared and exclusive access to some functionality/data structure.
A basic facility to model shared and exclusive access to some functionality/data structure.
Usage
To use this generic locking facility you should mix in this trait.
- class OPALBoundedThreadPoolExecutor extends ThreadPoolExecutor
A ThreadPool that knows the
ThreadGroup
associated with its threads and that catches exceptions if a thread crashes and reports them using the OPALLogger facility.A ThreadPool that knows the
ThreadGroup
associated with its threads and that catches exceptions if a thread crashes and reports them using the OPALLogger facility.If the root cause of the exception should be related to the OPALLogger then the error is written to
System.err
.The pool uses demon threads to make sure that these threads never prevent the JVM from regular termination.
- final class SequentialTasks[T] extends Tasks[T]
- T
Type of the processed data.
- sealed trait Tasks[T] extends AnyRef
Value Members
- def BoundedExecutionContext(name: String, n: Int): ExecutionContext
- def BoundedThreadPool(name: String, n: Int): OPALBoundedThreadPoolExecutor
- final val NumberOfThreadsForCPUBoundTasks: Int
The number of threads that should be used by parallelized computations that are CPU bound (which do not use IO).
The number of threads that should be used by parallelized computations that are CPU bound (which do not use IO). This number is always larger than 0. This number is intended to reflect the number of physical cores (not hyperthreaded ones).
- final val NumberOfThreadsForIOBoundTasks: Int
The size of the thread pool used by OPAL for IO bound tasks.
The size of the thread pool used by OPAL for IO bound tasks. The size should be at least as large as the number of physical cores and is ideally between 1 and 3 times larger than the number of (hyperthreaded) cores. This enables the efficient execution of IO bound tasks.
- implicit final val OPALHTBoundedExecutionContext: ExecutionContext
The ExecutionContext used by OPAL.
The ExecutionContext used by OPAL.
- Note
This
ExecutionContext
must not be shutdown.
- final val OPALHTBoundedExecutionContextTaskSupport: ExecutionContextTaskSupport
- final val OPALHTBoundedThreadPool: OPALBoundedThreadPoolExecutor
Thread pool which supports at most
NumberOfThreadsForIOBoundTasks
tasks.Thread pool which supports at most
NumberOfThreadsForIOBoundTasks
tasks.- Note
This thread pool must not be shutdown.
- final val OPALUnboundedExecutionContext: ExecutionContext
- final val OPALUnboundedThreadPool: ExecutorService
- final val defaultIsInterrupted: () => Boolean
- final def handleUncaughtException(t: Thread, e: Throwable): Unit
- final def handleUncaughtException(t: Throwable): Unit
- final def parForeachArrayElement[T, U](data: Array[T], parallelizationLevel: Int = NumberOfThreadsForCPUBoundTasks, isInterrupted: () => Boolean = defaultIsInterrupted)(f: Function[T, U]): Unit
Execute the given function
f
in parallel for each element of the given array.Execute the given function
f
in parallel for each element of the given array. After processing an element it is checked whether the computation should be aborted.In general – but also at most –
parallelizationLevel
many threads will be used to process the elements. The core idea is that each thread processes an element and after that grabs the next element from the array. Hence, this handles situations gracefully where the effort necessary to analyze a specific element varies widely.- Annotations
- @throws("the set of concurrently thrown suppressed exceptions ")
- Exceptions thrown
ConcurrentExceptions
if any exception occurs; the thrown exception stores all other exceptions (getSuppressed
)- Note
The given function
,f
must not make use of non-local returns; such returns will be caught and reported later.The OPALExecutionContext is used for getting the necessary threads.
- final def parForeachSeqElement[T, U](data: IndexedSeq[T], parallelizationLevel: Int = NumberOfThreadsForCPUBoundTasks, isInterrupted: () => Boolean = defaultIsInterrupted)(f: Function[T, U]): Unit
Execute the given function
f
in parallel for each element of the given indexed seq.Execute the given function
f
in parallel for each element of the given indexed seq. After processing an element it is checked whether the computation should be aborted.In general – but also at most –
parallelizationLevel
many threads will be used to process the elements. The core idea is that each thread processes an element and after that grabs the next element from the array. Hence, this handles situations gracefully where the effort necessary to analyze a specific element varies widely.- Annotations
- @throws("the set of concurrently thrown suppressed exceptions ")
- Exceptions thrown
ConcurrentExceptions
if any exception occurs; the thrown exception stores all other exceptions (getSuppressed
)- Note
The given function
,f
must not make use of non-local returns; such returns will be caught and reported later.The OPALExecutionContext is used for getting the necessary threads.
- object Locking
Defines several convenience methods related to using
(Reentrant(ReadWrite))Lock
s. - object Tasks
Factory to create Tasks objects to process value oriented tasks.