OCP Chapter 18
Concurrency
Threads
- a Thread is the smallest unit of execution that can be scheduled by the operation system.
- a process is a group of associated threads that execute in th esame shared environment.
- single-threaded process have only one thread
- multithreaded process uses one or more threads
- shared environment share the same memory space and can directly communicate with one another.
- a task is a single unit of work performed by a thread.
- a task can be implemented as an lambda expression
- a thread can complete multiple independent tasks but only task at a time.
- for simplicity we refer to threads that contain only a single user defined thread as a single-threaded application. Since we are uninterested in the system threads.
- the property of executing multiple threads and processes at the same time is actually referred as concurrency.
- CPU uses thread scheduler to accomplish threads running
- a thread scheduler may employ a round-robin schedule
- when a thread alloted time is complete but the thread has not finished processing a context switch occurs. A context-switch is the process of storing a threads current state
- thread priority
Thread Types
- system thread is created by the JVM and runs in the background of the application. fe garbage collection
- user-defined thread is one created by the applicationdeveloper to accopmplish a specific task.
- deamon thread is one that will not prevent the JVM from exiting when the program finishes. Both sysem an user defined threads can be marked as daemon threads
Task with Runnable
@FunctionalInterface public interface Runnable {
void run();
}
Summary
This chapter introduced you to threads and showed you how to process tasks in parallel using the Concurrency API. The work that a thread performs can be expressed as lambda expressions or instances of Runnable or Callable.
For the exam, you should know how to concurrently execute tasks using ExecutorService. You should also know which ExecutorService instances are available, including scheduled and pooled services.
Thread‐safety is about protecting data from being corrupted by multiple threads modifying it at the same time. Java offers many tools to keep data safe including atomic classes, synchronized methods/blocks, the Lock framework, and CyclicBarrier. The Concurrency API also includes numerous collections classes that handle multithreaded access for you. For the exam, you should also be familiar with the concurrent collections including the CopyOnWriteArrayList class, which creates a new underlying structure anytime the list is modified.
When processing tasks concurrently, a variety of potential threading issues can arise. Deadlock, starvation, and livelock can result in programs that appear stuck, while race conditions can result in unpredictable data. For the exam, you need to know only the basic theory behind these concepts. In professional software development, however, finding and resolving such problems is often quite challenging.
Finally, we discussed parallel streams and showed you how to use them to perform parallel decompositions and reductions. Parallel streams can greatly improve the performance of your application. They can also cause unexpected results since the results are no longer ordered. Remember to avoid stateful lambda expressions, especially when working with parallel streams.
(new Thread(new PrintData())).start();
// Will compile but not start a new thread!!
(new PrintData()).run();
Polling with Sleep
Polling is the process of intermittently checking data at some fixed interval.
public class CheckResults {
private static int counter = 0;
public static void main(String[] args) {
new Thread(() -> {
for(int i = 0; i < 500; i++) CheckResults.counter++;
}).start();
while(CheckResults.counter < 100) {
System.out.println("Not reached yet");
}
System.out.println("Reached!");
Concurrency API
- ExecutorService interface which creates and manage threads.
import java.util.concurrent.*;
public class ZooInfo {
public static void main(String[] args) {
ExecutorService service = null;
Runnable task1 = () ->
System.out.println("Printing zoo inventory");
Runnable task2 = () -> {for(int i = 0; i < 3; i++)
System.out.println("Printing record: "+i);};
try {
service = Executors.newSingleThreadExecutor();
System.out.println("begin");
service.execute(task1);
service.execute(task2);
service.execute(task1);
System.out.println("end");
} finally {
if(service != null) service.shutdown();
} }
}
- SHUTDOWN NIET VERGETEN!!!
- shutdown stopt lopende threads niet, die lopen uit. dan gebruik shutdownNow()
- shutdownNow() returns a list
with tasks that were submitted to the thread executor but that were never started.
Submitting tasks
- fire-and-forget
- execute()
- submit() returns a Future, can check if task is completed.
|Method name |Description|
|-|-|
|void execute(Runnable command)|Executes a Runnable task at some point in the future|
|Future> submit(Runnable task)|Executes a Runnable task at some point in the future and returns a Future representing the task|
|
schedule(Callable<V> callable, long delay, TimeUnit unit) // after period
schedule(Runnable command, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // every period
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
scheduleAtFixedRate
-service.scheduleAtFixedRate(command, 5, 1, TimeUnit.MINUTES);
Concurrency with Pools
- ExecutorService newSingle ThreadExecutor()
- ScheduledExecutorService newSingleThreadScheduledExecutor()
- ExecutorService newCachedThreadPool()
- ExecutorService newFixedThreadPool(int)
ScheduledExecutorService newScheduledThreadPool(int)
single threaded executor will wait for a thread to become available
- a pooled-thread can execute the next task concurrently until the pool is filled then awaits etc
Determine the thread pool size
- Runtime.getRuntime().availableProcessors()
Writing Thread-Safe Code
- race-condition
Protect Data with Atomic Classes
atomic is the property of an operation to be carried out as a single unit of execution without any interference by another thread.
AtomicBoolean
- AtomicInteger
- AtomicLong
Improveing Acces with Synchronized Blocks
- Atomic alleen helpt nog niets in de volgordelijkheid daarom hulp van monitor that supports mutual exclusion
Synchronizing Methods
ipv synchronize a var kan je ook de hele methode synchronizen
Lock Framework
- ReentrantLock()
// Implementation #1 with a synchronized block
Object object = new Object();
synchronized(object) {
// Protected code
}
// Implementation #2 with a Lock
Lock lock = new ReentrantLock();
try {
lock.lock();
// Protected code
} finally {
lock.unlock();
}
- void lock()
- void unlock()
- boolean tryLock()
- booelean tryLock(long, TimeUnit)
tryLock()
- used in try/finally block
Lock lock = new ReentrantLock();
new Thread(() -> printMessage(lock)).start(); if(lock.tryLock()) {
try {
System.out.println("Lock obtained, entering protected
code");
} finally {
lock.unlock();
}
} else {
System.out.println("Unable to acquire lock, doing something
else");
}
Duplicate Lock Requests
- critical to release all the locks!
- ReentrantReadWriteLock niet voor t examen maar wel handig!
Orchestrating Tasks with a CyclicBarrier
- taken met CyclicBarrier kunnen gegroepeerd en op volgorde van groepen worden uitgevoerd.
- let op groote threadpool
- let op deadlock als je rules of poolgrootes regelt
import java.util.concurrent.*;
public class CyclicBarrierExample {
public static final int NUMBER_OF_THREADS = 4;
public static final int NUMBER_OF_STEPS = 3;
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS, new Runnable() {
@Override
public void run() {
System.out.println("All threads completed a step. Moving to the next step...\n");
}
});
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
executorService.submit(new Worker(barrier));
}
executorService.shutdown();
}
}
class Worker implements Runnable {
private CyclicBarrier barrier;
public Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
for (int step = 1; step <= CyclicBarrierExample.NUMBER_OF_STEPS; step++) {
performTask(step);
System.out.println(Thread.currentThread().getName() + " completed step " + step);
barrier.await(); // Wait for other threads
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
private void performTask(int step) {
// Simulating task for each step
System.out.println(Thread.currentThread().getName() + " is performing task for step " + step);
try {
Thread.sleep((long) (Math.random() * 1000)); // Simulate varying task durations
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Concurrent Collections
- om om memory consistency errors te vermijden. wanneer twee threads verschillende data krijgen die gelijk zou moeten zijn.
- concurrentModificationException at runtime
var foodData = new HashMap<String, Integer>();
foodData.put("penguin", 1);
foodData.put("flamingo", 2);
for (String key : foodData.keySet()) {
foodData.remove(key); // Key klopt niet meer na key penguin removed
}
// wel met:
var foodData = new ConcurrentHashMap<String, Integer>();
foodData.put("penguin", 1);
foodData.put("flamingo", 2);
for (String key : foodData.keySet()) {
System.out.println("remove key " + key);
foodData.remove(key);
}
In principe hetzelfde als de niet concurrent collecties
Class name | Java Collections Framework interfaces | Elements ordered? | Sort? | Blocking? |
---|---|---|---|---|
ConcurrentHashMap | ConcurrentMap | No | No | No |
ConcurrentLinkedQueue | Queue | Yes | No | No |
ConcurrentSkipListMap | ConcurrentMap SortedMap NavigableMap | Yes | Yes | No |
ConcurrentSkipListSet | SortedSet NavigableSet | Yes | Yes | No |
CopyOnWriteArrayList | List | Yes | No | No |
CopyOnWriteArraySet | Set | No | No | No |
LinkedBlockingQueue | BlockingQueue | Yes | No | Yes |
Deleting when looping
List<String> birds = new CopyOnWriteArrayList<>();
birds.add("hawk");
birds.add("hawk");
birds.add("hawk");
for (String bird : birds) birds.remove(bird);
System.out.print(birds.size()); // 0
// kan dus ook met arraylist en iterator.
var iterator = birds.iterator();
while(iterator.hasNext()) {
iterator.next();
<b>iterator.remove()</b>;
}
System.out.print(birds.size()); // 0
Blocking Queue waiting methods
Method name | Description |
---|---|
offer(E e,long timeout,TimeUnit unit) | Adds an item to the queue, waiting the specified time and returning false if the time elapses before space is available |
poll(long timeout,TimeUnit unit) | Retrieves and removes an item from the queue waiting the specified time and returning null if the time elapses before the item is available |
try {
var blockingQueue = new LinkedBlockingQueue<Integer>(); blockingQueue.offer(39);
blockingQueue.offer(3, 4, TimeUnit.SECONDS); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll(10,
TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
// Handle interruption
}
Obtaining Synchronized Collections
- synchronizedCollection(Collection
c) - synchronizedList(List
list) - synchronizedMap(Map<K,V> m)
- synchronizedNavigableMap(NavigableMap<K,V> m)
- synchronizedNavigableSet(NavigableSet
s) - synchronizedSet(Set
s) - synchronizedSortedMap(SortedMap<K,V> m)
synchronizedSortedSet(SortedSet
s) If you know at the time of creation that your object requires synchronization, then you should use one of the concurrent collection classes listed in this table.
Identifying Threading Problems
- liveness is ability of an application to be able to execute in a timely manner.
kind of stuck state
deadlock
- starvation
- livelock
Deadlock
- Deadlock occurs when two or more threads are blocked forever, each waiting on the other.
Starvation
- Starvation occurs when a single thread is perpetually denied access to a shared resource or lock.
Livelock
- Livelock occurs when two or more threads are conceptually blocked forever, although they are each still active and trying to complete their task.
- gebeurt vaak als resultaat wanneer je een deadlock wilt voorkomen
Race Conditions
- A race condition is an undesirable result that occurs when two tasks, which should be completed sequentially, are completed at the same time.
- For the exam, you should understand that race conditions lead to invalid data if they are not properly handled. Even the solution where both participants fail to proceed is preferable to one in which invalid data is permitted to enter the system.
Parallel Streams
- A parallel stream is a stream that is capable of processing results concurrently, using multiple threads.
Calling parallel() on an existing Stream
Stream<Integer> s1 = List.of(1,2).stream();
Stream<Integer> s2 = s1.parallel();
Calling parallelStream() on a Collection Object
Stream<Integer> s3 = List.of(1,2).parallelStream();
PERFORMING A PARALLEL DECOMPOSITION
public static void main(String[] args) {
// feitelijk achter elkaar
// long start = System.currentTimeMillis();
// List.of(1,2,3,4,5)
// .stream()
// .map(w -> doWork(w))
// .forEach(s -> System.out.print(s + " ")); // 12345
// feitelijk naast elkaar
long start = System.currentTimeMillis();
List.of(1,2,3,4,5)
.parallelStream()
.map(w -> doWork(w))
.forEach(s -> System.out.print(s + " ")); //3 4 5 2 1
// naast elkaar maar in volgorde!
List.of(5,2,1,4,3)
.parallelStream()
.map(w -> doWork(w))
.forEachOrdered(s -> System.out.print(s + " ")); // 5 2 1 4 3
}
private static int doWork(int input) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {}
return input;
}
PROCESSING PARALLEL REDUCTIONS
- Since order is not guaranteed with parallel streams, methods such as findAny() on parallel streams may result in unexpected behavior.
- Besides possibly improving performance and modifying the order of operations, using parallel streams can impact how you write your application. Reduction operations on parallel streams are referred to as parallel reductions. The results for parallel reductions can be different from what you expect when working with serial streams.
System.out.println(List.of(1,2,3,4,5,6)
.stream() .findAny().get()); // 1
System.out.println(List.of(1,2,3,4,5,6)
.parallelStream() .findAny().get()); // (vaak)4 (soms)1
System.out.println(
List.of(1,2,3,4,5,6).stream().unordered().findAny()); //Optional[1]
// unordedered can improve performance
Combining Results with reduce()
System.out.println(List.of('w', 'o', 'l', 'f') .parallelStream()
.reduce("",
(s1,c) -> s1 + c,
(s2,s3) -> s2 + s3)); // wolf
` - Intermediate result (s1 + c): o - Intermediate result (s1 + c): f - Intermediate result (s1 + c): w - Intermediate result (s1 + c): l - Combined result (s2 + s3): wo - Combined result (s2 + s3): lf - Combined result (s2 + s3): wolf - Final result: wolf
`
Order Order!
System.out.println(List.of("w","o","l","f") .parallelStream()
.reduce("X", String::concat)); // XwXoXlXf
System.out.println(List.of("w","o","l","f") .stream()
.reduce("X", String::concat)); // Xwolf
Combining Results with collect()
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner)
Stream<String> stream = Stream.of("w", "o", "l", "f").parallel();
SortedSet<String> set = stream.collect(ConcurrentSkipListSet::new,
Set::add,
Set::addAll);
System.out.println(set); // [f, l, o, w] natural soring dus alphabetish!
Requirements for Parallel Reduction with collect()
- The stream is parallel.
- The parameter of the collect() operation has the Characteristics.CONCURRENT characteristic.
- Either the stream is unordered or the collector has the characteristic Characteristics.UNORDERED.
stream.collect(Collectors.toSet()); // Not a parallelreduction
The Collectors class includes two sets of static methods for retrieving collectors, toConcurrentMap() and groupingByConcurrent(), that are both UNORDERED and CONCURRENT.
Parallel reduction on a Collector
- Every Collector instance defines a characteristics() method that returns a set of Collector.Characteristics attributes. When using a Collector to perform a parallel reduction, a number of properties must hold true. Otherwise, the collect() operation will execute in a single‐threaded fashion.
Requirements for Parallel Reduction with Collect()
- The stream is parallel.
- The parameter of the collect() operation has the Characteristics.CONCURRENT characteristic.
- Either the stream is unordered or the collector has the characteristic Characteristics.UNORDERED.
stream.collect(Collectors.toSet()); // Not a parallel reduction Set is unorded but is not concurrent characteristic
The Collectors class includes two sets of static methods for retrieving collectors, UNORDERED and CONCURRENT - toConcurrentMap() - groupingByConcurrent()
Stream<String> ohMy =
Stream.of("lions","tigers","bears").parallel();
ConcurrentMap<Integer, String> map = ohMy
.collect(Collectors.toConcurrentMap(String::length, k -> k,
(s1, s2) -> s1 + "," + s2)); System.out.println(map); // {5=lions,bears, 6=tigers}
System.out.println(map.getClass());
// java.util.concurrent.ConcurrentHashMap
// groupingby
var ohMy = Stream.of("lions","tigers","bears").parallel();
ConcurrentMap<Integer, List<String>> map = ohMy.collect(
Collectors.groupingByConcurrent(String::length));
System.out.println(map); // {5=[lions, bears], 6=[tigers]}
Avoiding Stateful Operations
Side effects can appear in parallel streams if your lambda expressions are stateful. A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. On the other hand, a stateless lambda expression is one whose result does not depend on any state that might change during the execution of a pipeline.
public static List<Integer> addValues(IntStream source) {
return source.filter(s -> s % 2 == 0)
.boxed() .collect(Collectors.toList()); // zonder lambda
}