VOOZH about

URL: https://www.javacodegeeks.com/2023/04/java-concurrency-condition.html

⇱ Java Concurrency: Condition - Java Code Geeks


Previously we checked on ReentRantLock and its fairness. One of the things we can stumble upon is the creation of a Condition. By using Condition we can create mechanisms that allow threads to wait for specific conditions to be met before proceeding with their execution.

The condition provides the following methods:

public interface Condition {

 void await() throws InterruptedException;

 void awaitUninterruptibly();

 long awaitNanos(long nanosTimeout) throws InterruptedException;

 boolean await(long time, TimeUnit unit) throws InterruptedException;

 boolean awaitUntil(Date deadline) throws InterruptedException;

 void signal();

 void signalAll();
}

The closest we came to that so far is the wait Object Monitor method.
A Condition is bound to a Lock and a thread cannot interact with a condition and its methods if it does not have a hold on that lock.
Also Condition uses the underlying lock mechanisms, for example signal and signalAll will use the underlying Queue of the threads that is maintained by the Lock and will notify them to wake up.

One of the obvious things to implement using Conditions is a BlockingQueue. Worker threads processing data and publisher threads dispatching data. Data are published on a queue, worker threads will process data from the queue and then they should wait if there is no data in the queue.

For a worker thread, if the condition is met the flow is the following:

  • Acquire the lock
  • Check the condition
  • Process Data
  • Release the lock

If the condition is not met, the flow would slightly change to this:

  • Acquire the lock
  • Check the condition
  • Wait until the condition is met.
  • Re-acquire the lock
  • Process Data
  • Release the lock

The publisher thread whenever it adds a message it should notify the threads waiting on the condition.

The workflow would be like this.

  • Acquire the lock
  • Publish data
  • Notify the workers
  • Release the lock

Obviously this functionality already exists through the BlockingQueue interface and the LinkedBlockingDeque and ArrayBlockingQueue implementation.
We will proceed with an implementation for the shake of the example.

Let’s see the Message Queue:

package com.gkatzioura.concurrency.lock.condition;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue<T> {

 private Queue<T> queue = new LinkedList<>();
 private Lock lock = new ReentrantLock();
 private Condition hasMessages = lock.newCondition();

 public void publish(T message) {
 lock.lock();
 try {
 queue.offer(message);
 hasMessages.signal(); 
 } finally {
 lock.unlock();
 }
 }

 public T receive() throws InterruptedException {
 lock.lock();
 try {
 while (queue.isEmpty()) {
 hasMessages.await();
 }
 return queue.poll();
 } finally {
 lock.unlock();
 }
 }

}

Now let’s put it into action:

MessageQueue<String> messageQueue = new MessageQueue<>();

 @Test
 void testPublish() throws InterruptedException {
 Thread publisher = new Thread(() -> {
 for (int i = 0; i < 10; i++) {
 String message = "Sending message num: " + i;
 log.info("Sending [{}]", message);
 messageQueue.publish(message);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 throw new RuntimeException(e);
 }
 }
 });

 Thread worker1 = new Thread(() -> {
 for (int i = 0; i < 5; i++) {
 try {
 String message = messageQueue.receive();
 log.info("Received: [{}]", message);
 } catch (InterruptedException e) {
 throw new RuntimeException(e);
 }
 }
 });

 Thread worker2 = new Thread(() -> {
 for (int i = 0; i < 5; i++) {
 try {
 String message = messageQueue.receive();
 log.info("Received: [{}]", message);
 } catch (InterruptedException e) {
 throw new RuntimeException(e);
 }
 }
 });

 publisher.start();
 worker1.start();
 worker2.start();

 publisher.join();
 worker1.join();
 worker2.join();
 }

That’s it! Our workers processed the expected messages and waited when the queue was empty.

Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: Java Concurrency: Condition

Opinions expressed by Java Code Geeks contributors are their own.

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Thank you!

We will contact you soon.

πŸ‘ Photo of Emmanouil Gkatziouras
Emmanouil Gkatziouras
April 28th, 2023Last Updated: April 27th, 2023
1 936 2 minutes read

Emmanouil Gkatziouras

He is a versatile software engineer with experience in a wide variety of applications/services.He is enthusiastic about new projects, embracing new technologies, and getting to know people in the field of software.
Subscribe

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
3 years ago

The Wait Object Monitor method is now quite popular and applied by many people.

0
Reply
Back to top button
Close
wpDiscuz