VOOZH about

URL: https://egkatzioura.com/category/java/

⇱ Java – Emmanouil Gkatziouras


Skip to content

I love rich models, I enjoy having something cohesive that can be re-used and have the much needed logic at the right place.

But we don’t live in a perfect world: Introducing autogeneration.👁 Image

You got jooq, then you get swagger generated models. For sure you can plug some custom code but that can be cumbersome.

If you’ve been into Scala, like I used to, you know that you add a method to an existing type using implicit classes.
And that’s what I wished for when I stumbled on generated models that I needed to add some logic.

This is where Lombok’s ExtensionMethod comes to the rescue.

First things first the lombok binary is needed

<dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <version>1.18.24</version>
 <scope>provided</scope>
</dependency>

Now assuming we have a class imported from a library

package com.egkatzioura.external.models;

public class CustomModel {

...
}

Assuming the class resides on an imported jar we cannot add any method to it.
We shall create a class with a static method whose input is the model of interest.

package com.egkatzioura.utils;

import lombok.experimental.ExtensionMethod;

public class CustomModelUtil {

 public static void youCustomMethod(CustomModel customModel) {
 System.out.println("custom method called");
 }
}

So far we did not do anything with lombok, likely you might wonder how do we apply it.
Essentially you need to apply the annotation to each class you want the method to be used as if it was part of the original class specification.

package com.egkatzioura.models;

import lombok.experimental.ExtensionMethod;
import com.egkatzioura.utils.CustomModelUtil;

@ExtensionMethod({CustomModelUtil.class})
public class Application {

 public static void main(String[] args) {
 CustomModel customModel = new CustomModel();
 customModel.whatever();
 }
}

Now you can have that rich model you craved for 😉 But hold a sec, we just fixed an issue introduced by generation with generation :/.
Well wether this is good or bad will leave it up to you.

Spring is a powerful framework which serves billion of requests worldwide every minute. One of the things that made it special was its Dependency injection capabilities. At the start of the application Spring scans for the classes in the classpath, identifies the configuration classes as well as the classes containing bean related annotations. Conditionals have a  pivotal role to the environment creation.👁 Image

There are many reasons why you need to use Conditionals.

Overall think about your modular application. Your application has to operate on various different environments. You got Development, Staging, UAT, Production etc. Your code should be as close to production as it can be, yet there can always be some variations.

What if your application has to work on multiple clouds? In that case a broker class might need different implementations. For example on AWS you want to dispatch messages using SQS, on Azure you shall do so using storage Queues.

Usually we do this by creating an interface specifying the functionality we want (plain old strategy pattern)

public interface MessagePublisher {
 
 void publish(String message);
 
}

SQS implementation

public class SqsMessagePublisher implements MessagePublisher {
 
 public void publish(String message) {
 ... 
 }
 
}

Azure implementation

public class AzureStorageQueuePublisher implements MessagePublisher {
 
 public void publish(String message) {
 ... 
 }
 
}

You could define the implementation to use with a @Configuration bean that checks the defined properties.

@Configuration
public class MessagePublisherConfig {

 @Value("${message.publisher.type:sqs}")
 private String publisherType;

 @Bean
 public MessagePublisher messagePublisher() {
 if (publisherType != null &amp;amp;amp;amp;amp;amp;amp;amp;&amp;amp;amp;amp;amp;amp;amp;amp; publisherType.equalsIgnoreCase("azure")) {
 return new AzureStorageQueuePublisher();
 } else {
 // Default to SQS or handle other cases
 return new SqsMessagePublisher();
 }
 }
}

This code works however thanks to Spring, there is no need for that if statement.

Conditional on Property

Spring has this problem already shorted with conditional beans.
We shall change our classes by using he ConditionalOnProperty annotation.

@Component
@ConditionalOnProperty(name = "message.publisher.type", havingValue = "azure")
public class AzureStorageQueuePublisher implements MessagePublisher {
 public void publish(String message) {
 ... 
 }
}
@Component
@ConditionalOnProperty(name = "message.publisher.type", havingValue = "sqs")
public class SqsMessagePublisher implements MessagePublisher {
 
 public void publish(String message) {
 ... 
 }
 
}

Conditional on class

Now you might think that having both implementations in one jar is kinda bloated. It is likely that a lean jar is better. One jar built with the AWS dependencies and one jar built with the Azure dependencies. Beyond the capabilities of the built tool used (for example profiles on maven) our codebase should be able to handle any class loading issues.
There is a Conditional annotation based on the presence of classes.

@Component
@ConditionalOnClass(QueueServiceClient.class)
public class AzureStorageQueuePublisher implements MessagePublisher {

 public void publish(String message) {
 }

}
@Component
@ConditionalOnClass(SqsClient.class)
public class SqsMessagePublisher implements MessagePublisher {

 public void publish(String message) {
 }

}

Behind the scenes spring scans the class definition and identifies if the required class exists on the binary before proceeding on instantiation. The above option enables us to have a jar with less dependencies that will instantiate the right bean implementations based on the environment.

ConditionalOnMissingBean

Regardless of the environment we might want to spin up a default implementation of the MessagePublisher, in case certain criteria are not fulfilled. In that case the ConditionalOnMissingBean annotation can help.

@Component
@ConditionalOnMissingBean(MessagePublisher.class)
public class DefaultPublisher implements MessagePublisher {
 @Override
 public void publish(String message) {

 }
}

Simplify

As we can see Conditionals are powerful, yet they do bring a configuration overhead which can be error prone or cumbersome. Instead of using the same configuration all over again we can simplify it by defining a conditional annotation with the configurations preset.

@ConditionalOnProperty(name = "message.publisher.type", havingValue = "azure")
@Retention(RetentionPolicy.RUNTIME)
public @interface AzurePublisherEnabled {
}

On the above example we can use the AzurePublisherEnabled annotation for the Azure only implementation.

Customization

So far conditionals have a wide range of options and ways to simplify them but what if you want something more complex that the existing annotations cannot fulfil? In that case you can create your own conditions.The condition can be fulfilled based on information retrieved by the ConditionContext whether they are environment variables or other aspects of the running program.

package com.gkatzioura.broker;

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class LocalBrokerCondition implements Condition {
 
 @Override
 public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
 //Logic
 return false;
 }
 
}

To use this conditional handler you just have to configure it in the annotation.

@Conditional(LocalBrokerCondition.class)
public class LocalPublisher implements MessagePublisher {

 @Override
 public void publish(String message) {

 }
}

Testing

So far so good, so what about testing? One option is to spin up a spring context using
@SpringBootTest and check if certain bean implementation have been instantiated. Another handy way is to use the ApplicationContextRunner

 @Test
 public void testShouldBeDisabled() {
 ApplicationContextRunner runner = new ApplicationContextRunner()
 .withConfiguration(UserConfigurations.of(AzureStorageQueuePublisher.class));
 runner.withPropertyValues("message.publisher.type=azure")
 .run(context -&amp;amp;amp;amp;amp;amp;amp;gt; assertThat(context.getBean(MessagePublisher.class)).isInstanceOf(AzureStorageQueuePublisher.class));
 }

This is very elegant and removes the need to create a complex spring environment for unit tests.

So that’s it about conditionals, pretty sure you are gonna stumble on them on most spring based open source projects! Happy hacking 😉

Public-key cryptography or asymmetric cryptography is something we use every daily, take for example the TLS on this very website.

From wikipedia

In a public-key encryption system, anyone with a public key can encrypt a message, yielding a ciphertext, but only those who know the corresponding private key can decrypt the ciphertext to obtain the original message.[8]
For example, a journalist can publish the public key of an encryption key pair on a web site so that sources can send secret messages to the news organization in ciphertext.

👁 Generated image

Wikipedia gives a good example. Another example:
You have a process sending data from your premises to another datacenter. Apart from encryption in transit you want to go an extra step and encrypt the contents, ensuring that only certain workloads in the datacenter can decrypt the data.  This could be done with a symmetric key. The issue with a symmetric key is that both parties will have access to the same key, able to encrypt and decrypt information. Both parties might be uneasy to share the same key, plus we have an one direction flow: on premises datacenter to external datacenter.
This is where asymmetric encryption can be used. A public key encrypts the data before they get dispatched and the receiver of the data has the right private key to decrypt the data.

Let’s generate some RSA keys

#generate the private key
openssl genrsa -out keypair.pem 2048
#generate the public key
openssl rsa -in keypair.pem -pubout -out publickey.crt

We have a public key that we can use to encrypt data and a private key to use to decrypt the encrypted data.

The following java snippet reads the keys encrypts a string and decrypts it

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class Main {

 public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException {
 final String privateKeyText;
 final String publicKeyText;

 try(InputStream privateKeyInputstream = new FileInputStream( "/path/to/keypair.pem");
 InputStream publicKeyInputStream = new FileInputStream("/path/to/publickey.crt");) {
 privateKeyText = new String(privateKeyInputstream.readAllBytes());
 publicKeyText = new String(publicKeyInputStream.readAllBytes());
 }

 String privateKeyBase64 = privateKeyText.replaceAll("\\n", "").replace("-----BEGIN PRIVATE KEY-----", "").replace("-----END PRIVATE KEY-----", "");
 String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");

 byte[] privateKeyBytes = Base64.getDecoder().decode(privateKeyBase64);
 byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

 KeyFactory keyFactory = KeyFactory.getInstance("RSA");

 PKCS8EncodedKeySpec privateKeySpec = new PKCS8EncodedKeySpec(privateKeyBytes);
 PrivateKey privateKey = keyFactory.generatePrivate(privateKeySpec);

 X509EncodedKeySpec publicKeySpec = new X509EncodedKeySpec(publicKeyBytes);
 PublicKey publicKey = keyFactory.generatePublic(publicKeySpec);

 String unEncryptedText = "un-encrypted text";
 byte[] message = unEncryptedText.getBytes(StandardCharsets.UTF_8);

 Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 OAEPParameterSpec oaepParams =
 new OAEPParameterSpec(
 "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
 encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
 encryptCypher.update(message);

 byte[] ciphertext = encryptCypher.doFinal();

 Cipher cipher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 cipher.init(Cipher.DECRYPT_MODE, privateKey, oaepParams);
 cipher.update(ciphertext);

 byte[] decrypted = cipher.doFinal();
 String decryptedText = new String(decrypted);

 assert decryptedText.equals(unEncryptedText);
 }

}

To wrap up the above example we read the contents of the keys. The keys are stored in a Base64 format thus we decode them and we create the keys using the decoded bytes. Then using the cipher and the keys we encrypt and decrypt the payload.

On the public cloud we have KMS.

We can use the AWS or the GCP KMS to create a private-public key pair.

Let’s start with the GCP example first. Once we created an asymmetric KMS we can download the public key.

👁 Image

Should we want to decrypt any data it will only happen through the KMS api. The public key can be found through the console.

We shall place the key on the classpath with the name gcp.crt

We shall encrypt he data using the public key downloaded and decrypt them using the KMS API:

import com.google.cloud.kms.v1.*;
import com.google.protobuf.ByteString;

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class GCPKMS {

 public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException {
 try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) {
 String name = "full-gcp-key-name";

 String unEncryptedText = "un-encrypted text";

 InputStream publicKeyInputStream = Main.class.getClassLoader().getResourceAsStream("gcp.crt");
 String publicKeyText = new String(publicKeyInputStream.readAllBytes());
 String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");
 byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

 X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
 java.security.PublicKey publicKey = KeyFactory.getInstance("RSA").generatePublic(keySpec);

 Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 OAEPParameterSpec oaepParams =
 new OAEPParameterSpec(
 "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
 encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
 byte[] ciphertext = encryptCypher.doFinal(unEncryptedText.getBytes(StandardCharsets.UTF_8));


 AsymmetricDecryptResponse response = client.asymmetricDecrypt(AsymmetricDecryptRequest.newBuilder()
 .setCiphertext(ByteString.copyFrom(ciphertext))
 .setName(name)
 .build());

 assert response.getPlaintext().toStringUtf8().equals(unEncryptedText);
 }
 }

}

Pretty similar with AWS KMS

Once we created the asymmetric key we can download the public key from console. 👁 Image

The key shall be placed on the classpath with the name `aws.crt`

We shall encrypt he data using the public key downloaded and decrypt them using the KMS API:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.DecryptRequest;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;

import javax.crypto.Cipher;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class AWSKMS {

 public static void main(String[] args) throws Exception {
 String keyId = "kms-arn";

 KmsClient kmsClient = KmsClient.builder()
 .region(Region.of("us-central-1"))
 .credentialsProvider(DefaultCredentialsProvider.builder()
 .build())
 .build();

 String unEncryptedText = "un-encrypted text";

 InputStream publicKeyInputStream = Main.class.getClassLoader().getResourceAsStream("aws.crt");
 String publicKeyText = new String(publicKeyInputStream.readAllBytes());
 String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");
 byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

 X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
 java.security.PublicKey publicKey = KeyFactory.getInstance("RSA").generatePublic(keySpec);

 Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 OAEPParameterSpec oaepParams =
 new OAEPParameterSpec(
 "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
 encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
 byte[] ciphertext = encryptCypher.doFinal(unEncryptedText.getBytes(StandardCharsets.UTF_8));

 var resp = kmsClient.decrypt(DecryptRequest.builder()
 .keyId(keyId)
 .encryptionAlgorithm(EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256)
 .ciphertextBlob(SdkBytes.fromByteArray(ciphertext))
 .build());

 assert resp.plaintext().asUtf8String().equals(unEncryptedText);
 }

}

That’s it! We did asymmetric encryption and decryption locally using the rsa keys we created. We did asymmetric encryption using a GCP KMS public key and decrypted using the GCP KMS api. Lastly we did asymmetric encryption using an AWS KMS public key and decrypted using the AWS KMS api.

GCP’s Pub/Sub is one of my favourite services for streaming data. Seamless without getting into an operational overhead. Pub/Sub comes with GCP Schemas, which can be used to define the schema of the data flowing through the topics.

👁 Image

Interacting with the Pub/Sub Schemas is streamlined, you only have to include the Pub/Sub binary:

 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>google-cloud-pubsub</artifactId>
 <version>1.134.0</version>
 </dependency>

Then we can proceed on listing the topics:

 String projectId = "your-gcp-project-id";

 try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
 var response = schemaServiceClient.listSchemas(ProjectName.of(projectId));

 for (var s : response.iterateAll()) {
 System.out.println(s.getName()+ " "+s.getRevisionId());

 Schema schema = schemaServiceClient.getSchema(s.getName());
 System.out.println(schema.getDefinition());
 }

 }

Take note that when listing the topics you just fetch a listing of the names, the actual schema is not fetched even through the same model is used.

To fetch a schema’s definition you need to get the schema through a call.

Through the api you can also submit a schema:

 String projectId = "your-gcp-project-id";
 String schemaId = "example-schema";

 String schemaDefinition = "{ \"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [ { \"name\": \"name\", \"type\": \"string\" } ] }";
 CreateSchemaRequest createSchemaRequest = CreateSchemaRequest.newBuilder()
 .setParent(projectId)
 .setSchema(Schema.newBuilder()
 .setName(SchemaName.of(projectId, schemaId).toString())
 .setType(Schema.Type.AVRO)
 .setDefinition(schemaDefinition)
 .build())
 .build();
 
 System.out.println("Schema created: " + createSchemaRequest.getSchema().getName());

Now you might be wondering what happens on Pub/Sub topics that are bound to a certain schema. In that case the request to publish the message will fail. The validation happens on the server side.

 Publisher publisher = Publisher.newBuilder(TopicName.parse("projects/test-project/topics/test-pub-sub")).build();
 var mes = publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom("invalid-format".getBytes())).build()).get();
...
Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	

When you subscribe to topic with a schema attached any subscription message received has the metadata describing what schema was used to create the message.

MessageReceiver receiver =
 (PubsubMessage message, AckReplyConsumer consumer) -> {
 String name = message.getAttributesMap().get("googclient_schemaname");
 String revision = message.getAttributesMap().get("googclient_schemarevisionid");
 }
...
}

The schema name should be the full schema name including the project id, the revision contains the revision of the schema. In case of multiple revisions you can fetch the revision of interest by specifying it in the getSchema request. Now receiving the schema in a message is handy since it gives you the ability to have a dynamic handling of the various messages received by subscriptions.
For example you can read an avro message from a received subscription, by fetching the schema on demand.

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
 String name = message.getAttributesMap().get("googclient_schemaname");


 Schema schema = schemaServiceClient.getSchema(SchemaName.parse(name));
 if(schema.getType().equals(Type.AVRO)) {
 String definition = schema.getDefinition();
 org.apache.avro.Schema avroSchema =new org.apache.avro.Schema.Parser().parse(definition);

 GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>();

 try(InputStream inputStream = new ByteArrayInputStream(message.getData().toByteArray())) {
 BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
 GenericRecord genericRecord = datumReader.read(null, decoder);
 }
 }

}
...
}

You might want your codebase to benefit further from the registry and even generate the corresponding classes for the various schemas. We have done that in a previous post.

In that case we can use the gcp-schemas-maven-plugin.

You can combine the plugin with a schema model generation plugin.
For example you can download Avro schemas from a GCP projects, and generate the Java classes from those Avro schemas.

 <plugin>
 <groupId>io.github.gkatzioura.gcp</groupId>
 <artifactId>gcp-schemas-maven-plugin</artifactId>
 <version>1.0</version>
 <executions>
 <execution>
 <id>one</id>
 <phase>generate-sources</phase>
 <goals>
 <goal>download</goal>
 </goals>
 <configuration>
 <project>gcp-project-1</project>
 <outputDirectory>src/main/avro</outputDirectory>
 <schemaType>AVRO</schemaType>
 </configuration>
 </execution>
 </executions>
 </plugin>
 <plugin>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-maven-plugin</artifactId>
 <version>1.12.0</version>
 <executions>
 <execution>
 <phase>generate-sources</phase>
 <goals>
 <goal>schema</goal>
 </goals>
 <configuration>
 <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
 <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
 </configuration>
 </execution>
 </executions>
 </plugin>

If you want to be selective on the Schemas to use you can even specify exactly the schema name or the version

 <plugin>
 <groupId>io.github.gkatzioura.gcp</groupId>
 <artifactId>gcp-schemas-maven-plugin</artifactId>
 <version>1.0</version>
 <executions>
 <execution>
 <id>one</id>
 <phase>generate-sources</phase>
 <goals>
 <goal>download</goal>
 </goals>
 <configuration>
 <project>gcp-project-1</project>
 <outputDirectory>src/main/avro</outputDirectory>
 <schemaType>AVRO</schemaType>
 <subjectPatterns>schema1,schema2,schema3</subjectPatterns>
 <versions>113aca6b,69965687,e6dc8d46</versions> 
 </configuration>
 </execution>
 </executions>
 </plugin>

That’s it, hope this streamlines your data pipelines development!

I’ve been using Flink for some time and I started to prefer it over GCP Dataflow. There are various reasons for that and will get into details in future blogs.

Regardless I like the easiness on deploying it to Kuberenetes, as well as running it in various modes (streaming vs batching). It’s great to such a powerful framework which can be applied to a variety of cases.

👁 Image

Many Flink tutorials focus on a no code approach mainly driven by SQL. I would like to do something different. For example making your own Sink and Sources, as well as a more Java focused approach.

It all starts with our initial project which is what this blog is about.

I will use maven on this one and take advantage of the maven archetype:

mvn archetype:generate \
 -DarchetypeGroupId=org.apache.flink \
 -DarchetypeArtifactId=flink-quickstart-java \
 -DarchetypeVersion=1.20.0 \
 -DgroupId=com.egkatzioura.flink \
 -DartifactId=flinking-around \
 -Dversion=1.0.0-SNAPSHOT
 

After running this, we get a simple class that runs a job.

package com.egkatzioura.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamJob {

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.execute("Flink Java API Skeleton");
	}
}

Will enhance this example by emitting some integers and filtering out the ones that exceed a threshold. I will create a filter function. Given a threshold the function will filter the elements that exceed that threshold.

I will start with a test, therefore I have to update some of the dependencies.

...
	<dependencies>
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-engine</artifactId>
			<version>5.9.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-test-utils</artifactId>
			<version>${flink.version}</version>
			<scope>test</scope>
		</dependency>
...
	</dependencies>

Onwards we shall implement our test class. We will start with a failed tests which we should make it into a working test by implementing the functionality missing.

package com.egkatzioura.flink;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

class FilterAboveThresholdTest {

 @ClassRule
 public static MiniClusterWithClientResource flinkCluster =
 new MiniClusterWithClientResource(
 new MiniClusterResourceConfiguration.Builder()
 .setNumberSlotsPerTaskManager(2)
 .setNumberTaskManagers(1)
 .build());

 @Test
 void filterAboveTen() throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 var dataStream = env.fromData(1,12,3,4,15,6);

 try (CloseableIterator&lt;Integer&gt; iterator = dataStream.executeAndCollect()) {
 List&lt;Integer&gt; results = Lists.newArrayList(iterator);
 assertThat(results)
 .containsExactlyInAnyOrder(1,3,4,6);
 }
 }

}

As expected our test fails thus we need to implement the filter.

package com.egkatzioura.flink;

import org.apache.flink.api.common.functions.FilterFunction;

public class RemoveAboveThreshold implements FilterFunction&lt;Integer&gt; {

 private final Integer threshold;

 public RemoveAboveThreshold(Integer threshold) {
 this.threshold = threshold;
 }

 @Override
 public boolean filter(Integer integer) throws Exception {
 return integer&lt;threshold;
 }

}

Now let’s integrate the filter with our test.

package com.egkatzioura.flink;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

class RemoveAboveThresholdTest {

 @ClassRule
 public static MiniClusterWithClientResource flinkCluster =
 new MiniClusterWithClientResource(
 new MiniClusterResourceConfiguration.Builder()
 .setNumberSlotsPerTaskManager(2)
 .setNumberTaskManagers(1)
 .build());

 @Test
 void filterAboveTen() throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 var dataStream = env.fromData(1,12,3,4,15,6);

 RemoveAboveThreshold removeAboveThreshold = new RemoveAboveThreshold(10);
 try (CloseableIterator&lt;Integer&gt; iterator = dataStream.filter(removeAboveThreshold).executeAndCollect()) {
 List&lt;Integer&gt; results = Lists.newArrayList(iterator);
 assertThat(results)
 .containsExactlyInAnyOrder(1,3,4,6);
 }
 }

}

That’s it, we had our first Flink get-started project with Flink and followed a TDD approach.

Virtual threads have been around for a while. Overall the talk of the town is how Asynchronous Applications will benefit from Virtual Threads.
Why Asynchronous is such a big deal? Mainly because blocking code is a pain. You keep the thread busy waiting for a response.
What is Asynchronous IO?
I would go for a full explanation in other blogs, but here’s a TL;DR.
The network calls are dispatched from your Java app to the native networking implementation through system calls. Bytes are exchanged and results are sent back to the application. While this is being done you need to block to receive the results.

👁 Image

Take for example an http request, you shall block for a response until all the interactions that go through the wire are finished, including creating the socket, establishing the connection, sending the request and receiving the response.

There is an alternative: you can execute the network based requests and poll for the results of the network interactions once they are complete. Overall your codebase for network calls will be a set of chained callbacks.

Java has NIO, the networking api that takes advantage of this functionality. Netty is a Library that uses NIO behind the scenes and reactive applications are based on it.

Virtual threads are run by a pool of native threads. On a network call the native threads can park the virtual thread and resume it once the network interaction is complete. Java has certain blocking networking apis that when they run in a virtual thread they are configured in a non-blocking mode.

An example is the HttpUrlConnection.

This code block runs on the main Thread, thus a native Thread:

 public static void main(String[] args) {
 try {
 var connection = (HttpURLConnection) new URL("https://google.com").openConnection();
 connection.setRequestMethod("GET");
 connection.connect();

 byte[] bytes = connection.getInputStream().readAllBytes();

 System.out.println(new String(bytes));
 } catch (IOException e) {
 throw new RuntimeException(e);
 }
 }

It is way different behind the scenes than this block, running in a virtual Thread:

Thread vThread = Thread.ofVirtual().start(() -> {
 try {
 var connection = (HttpURLConnection) new URL("https://google.com").openConnection();
 connection.setRequestMethod("GET");
 connection.connect();

 byte[] bytes = connection.getInputStream().readAllBytes();

 System.out.println(new String(bytes));
 } catch (IOException e) {
 throw new RuntimeException(e);
 }
 });

 vThread.join();

As you know from previous blogs I have the tendency to dive deeper.

With some digging we end up to the creation of the NioSocketImpl. This is where the check is made whether the call is executed in a Virtual Thread and therefore make the call non blocking.

 private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
 throws IOException
 {
 if (!nonBlocking
 && (timed || Thread.currentThread().isVirtual())) {
 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 IOUtil.configureBlocking(fd, false);
 nonBlocking = true;
 }
 }

NIO has a notification mechanism the Poller, which continuously checks for any complete I/O interactions.

 private void pollerLoop() {
 try {
 for (;;) {
 poll(-1);
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 }

Provided an update was received the virtual thread will be un-parked and continue where it was left.

 @Override
 void unpark() {
 if (!getAndSetParkPermit(true) && currentThread() != this) {
 int s = state();

 // unparked while parked
 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
 submitRunContinuation();
 return;
 }
...

On the next I/O call the Virtual Thread will be parked.

 private void park(FileDescriptor fd, int event, long nanos) throws IOException {
 Thread t = Thread.currentThread();
 if (t.isVirtual()) {
 Poller.poll(fdVal(fd), event, nanos, this::isOpen);
 if (t.isInterrupted()) {
 throw new InterruptedIOException();

So think how this plays out:

  • Code blocks inside virtual threads are executed by native threads
  • When an I/O call is initiated the network apis will make it into a non blocking one, since it is run under Virtual Threads
  •  Instead of blocking and wait for the I/O call to finish the Virtual Thread will be parked
  • The thread used to run the Virtual Thread will continue on other operations
  • Once the I/O is finished the Poller will pick up the event and unpark the Virtual Thread.
  • The native thread will resume executing the code block in the virtual Thread
...
() -> {
 try {
 var connection = (HttpURLConnection) new URL("https://google.com").openConnection(); //
 connection.setRequestMethod("GET");
 connection.connect(); //Open NIO socket, set as non blocking and park Virtual Thread while waiting for connection to establish

 byte[] bytes = connection.getInputStream().readAllBytes(); //Poller un-parked Virtual Thread, proceed to the next network call and park Virtual Thread once more.

 System.out.println(new String(bytes));// Call finished Virtual Thread is un-parked. 
 } catch (IOException e) {
 throw new RuntimeException(e);
 }
}
...

The way the mechanism described is wrapped together, gives the sense that it is sequential just like the code above. Behind the scenes it’s a combination of the NIO apis and the native threads that the Virtual Threads are executed upon.

Previously we had an introduction to Atomic Variables and how they work. We found out that they are based on CAS operations and using them does not introduce context switching.

👁 Image

Based on previous blogs on visibility and the happens before guarantee we might wonder about visibility with regards to atomic variables. Eventually we found out behind the scenes that atomic variables wrap corresponding volatile values.

As we have seen when we use an atomic variable visibility is guaranteed.

AtomicLong as well as AtomicInteger use the sun.misc.Unsafe.

public class AtomicInteger extends Number implements java.io.Serializable {
 private static final long serialVersionUID = 6214790243416807050L;

 /*
 * This class intended to be implemented using VarHandles, but there
 * are unresolved cyclic startup dependencies.
 */
 private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
 private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

 

Unsafe provides various methods like  compareAndSetLong, compareAndSetInt. Those methods are mapped to the corresponding native calls based on the jre implementation.

However if we look at the AtomicIntegerArray implementation we see the usage of VarHandle.

public class AtomicIntegerArray implements java.io.Serializable {
 private static final long serialVersionUID = 2862133569453604235L;
 private static final VarHandle AA
 = MethodHandles.arrayElementVarHandle(int[].class);
 private final int[] array;

Somehow VarHandle contains native calls, thus it becomes confusing what is used when. Essentially the VarHandle is used in order to replace the usage of the Unsafe class.

When it comes to the scalar values we see a volatile value declared but on the AtomicLongArray implementation we see an array of integers and VarHandle operating upon the array.

For example:

 public final long get(int i) {
 return AA.getVolatile(this.array, i);
 }

 public final void set(int i, long newValue) {
 AA.setVolatile(this.array, i, newValue);
 }

So how this affects the visibility?

By checking the documentation

setVolatile: Sets the value of a variable to the newValue, with memory semantics of setting as if the variable was declared volatile.
getVolatile: Returns the value of a variable, with memory semantics of reading as if the variable was declared volatile.

By using setVolatile and getVolatile, guarantees that we read and write to the main memory. Also VarHandle is used for CAS operations on primitives and object references.

It seems we missed one class from the package java.util.concurrent.atomic the AtomicReferenceFieldUpdater . This class is a wrapper on Unsafe methods and enables atomic updates to the volatile reference fields of a class. AtomicReferenceFieldUpdater can be used in atomic data structures when you want to reduce the memory footprint instead of using AtomicReference. AtomicReferenceFieldUpdater is rarely used. VarHandle is the the more recent class for atomic operations and is going to replace the usage of sun.misc.Unsafe from the java.util.concurrent.atomic package.

In the following blogs we shall have a look on VarHandle and its usages.

ConcurrentLinkedQueue, ConcurrentHashMap, Rate limiters, they all have a secret sauce in common. That secret sauce is the usage of atomic variables. Atomic variables are based on CAS operations. A CAS operation stands for compare and swap, it is an atomic instruction were the content of a memory location is compared with a given value and if they are the same the content is swapped with a given value. An atomic operation is an operation that runs without interruption, it should guarantee that this operation is executed coherently.

👁 Image

What we can understand from the above is that these operations prevent a context switch.
The instruction is a simple operation on a specific location in memory that changes the value if the previous value matches an expected value.

Multiple threads and a CAS operation

Let me explain a bit further how multiple threads can use a CAS operation without the need of synchronization. Previously we checked the volatile modifier in Java objects. The threads would write directly to a variable in memory and also would read directly from memory. In case of a CAS operation a thread will try to write directly to that variable. If the old variable has the same value with the variable the thread expects, that the value will be updated. The confirmation of the success of the operation can be in various forms depending on the implementation. It can be a boolean response, the old value of the variable that has been updated or a failure in case of a failure of the operation.

We can take for example atomic_cmpxchg, a c++ function.

 Read the 32-bit value (referred to as old) stored at location pointed by p. Compute (old
 == cmp) ? val : old and store result at location pointed by p. The function returns old.

 A 64-bit version of this function, atom_cmpxchg(3clc), is enabled by
 cl_khr_int64_base_atomics(3clc).

Let us assume a scenario of two threads trying to change the same variable

👁 Image

Thread A will be successful since the existing value on the variable matched the expected value. Thread A could replace the contents of the variable

👁 Image

Thread B will not be successful. When thread tried to set the new value, the expected value of the variable was different since it was changed by Thread A. Thus Thread B received back the value which was the value set from Thread A.

Regardless of the number of operations since CAS is an atomic instruction there won’t be any synchronization problems. Changes will occur only if the past state is the one expected on each thread.

While both threads executed this operation, there was no need for a lock. One of the two operations would succeed the other operation won’t. Since a lock is not being used we also avoid the case of a context switch.

Let’s see what Java offers when it comes to Atomic variables.

Java Atomic Variables for Primitive Data Types

The atomic package for Java offers classes to perform atomic operations for Primitive Data Types: AtomicBoolean, AtomicInteger, AtomicLong. Those classes have a volatile variable of a primitive type declared and perform CAS operations on that variable by using the Unsafe class. We stumbled upon the Unsafe class previously when we were examining the internals of Lock Support.

//java.util.concurrent.atomic.AtomicInteger
...
 public final boolean compareAndSet(int expectedValue, int newValue) {
 return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
 }
...
//jdk.internal.misc.Unsafe
...
 /**
 * ....
 * &amp;lt;wp-p&amp;gt;This operation has memory semantics of a {@code volatile} read
 * and write. Corresponds to C11 atomic_compare_exchange_strong.
 * @return {@code true} if successful
 */
 public final native boolean compareAndSetInt(Object o, long offset,
 int expected,
 int x);
...

Eventually a native call happens that operates to the main memory, based on the Primitive Type used.

Java Atomic Variables for arrays of Primitive Data Types

We can find classes such as AtomicIntegerArray, AtomicLongArray. These classes contain an array of the corresponding data type. On the constructor either the array or the array length is specified. Then on each operation you have to specify the index of the array that the operation will take place. Behind the scenes the value of a variable will be set, with memory semantics of setting as if the variable was declared volatile.

// java.util.concurrent.atomic.AtomicLongArray
 /**
 * Atomically sets the element at index {@code i} to {@code newValue}
 * if the element's current value {@code == expectedValue},
 * with memory effects as specified by {@link VarHandle#compareAndSet}.
 * ...
 */
 public final boolean compareAndSet(int i, long expectedValue, long newValue) {
 return AA.compareAndSet(array, i, expectedValue, newValue);
 }

...
// java.lang.invoke.VarHandle
 /**
 * Atomically sets the value of a variable to the {@code newValue} with the
 * memory semantics of {@link #setVolatile} if the variable's current value,
 * referred to as the &amp;lt;em&amp;gt;witness value&amp;lt;/em&amp;gt;, {@code ==} the
 * {@code expectedValue}, as accessed with the memory semantics of
 * {@link #getVolatile}.
 *....
 */
 public final native
 @MethodHandle.PolymorphicSignature
 @IntrinsicCandidate
 Object compareAndExchange(Object... args);

Java Atomic Reference

In case of AtomicReference, we keep a reference of the object, thus the position of the object in the heap memory. Essentially what is swapped and compared is the object reference, not the contents of the object. In the upcoming blogs we shall get further into details on Atomic Variables and their usages.

In the previous blog we examined the visibility of variables between two threads that use the same lock. Systems nowadays are multi-processor systems and each processor has a different cache. When a thread operates on a variable those changes are applied in the cache and then they are written to the main memory. Hence when two threads operate on the same variable the values in their caches would be different. When using a lock the changes are published to the main memory and the threads reaching that lock will receive the updated values from the main memory.

What if there was the option for threads to read the latest value by reading straight from the main memory? This is were the volatile keyword kicks in. When we use the volatile modifier in a variable, we declare that this variable will not be cached in the CPU cache. Instead threads will have to fetch or modify the variable directly from the main memory.

👁 Image

We have stumbled on the volatile keywork before when using the double checked locking pattern to create a singleton.

From the documentation

Using volatile variables reduces the risk of memory consistency errors, because any write to a volatile variable establishes a happens-before relationship with subsequent reads of that same variable. This means that changes to a volatile variable are always visible to other threads. What’s more, it also means that when a thread reads a volatile variable, it sees not just the latest change to the volatile, but also the side effects of the code that led up the change.

Let’s make an example, we have the following piece of code that tracks the duration of calls. In case a duration exceeds a certain limit we shall stop tracking the duration.

public class DurationTracker {

 public static final int DURATION_LIMIT = 20;
 private int avgDuration;

 private volatile boolean reachedPeak = false;

 public void addDuration(int duration) {
 if (reachedPeak) {
 return;
 }

 avgDuration = (avgDuration+duration)/2;

 if (avgDuration> DURATION_LIMIT) {
 reachedPeak = true;
 }

 }

 public boolean reachedPeak() {
 return reachedPeak;
 }

}

When we call addDuration we calculate the avgDuration and if it exceeds the limit we set the reachedPeak to true essentially marking the end of tracking the duration.

So let’s examine what happens if this code is run by multiple threads.

On purpose reachedPeak is marked as volatile. When threads check this value they will always read it from the main memory. The updates on that value are immediately visible. The avgDuration variable when updated will be updated at the cpu cache. Since there is no locking when these calculations happen, the calculations are not gonna be accurate because the updates will be visible at the cpu cache but not at the main memory. Eventually the duration tracking will stop as soon as one of the threads and the values on the cpu cache result to the duration limit. Provided accuracy is required the block reading the value of avgDuration and updating it should be guarded by a lock.

Thus the codebase will transform to this block.

public class DurationTracker {

 public static final int DURATION_LIMIT = 20;
 private int avgDuration;

 private volatile boolean reachedPeak = false;

 private static final Object lock = new Object();

 public void addDuration(int duration) {
 if (reachedPeak) {
 return;
 }

 synchronized (lock) {
 avgDuration = (avgDuration + duration) / 2;
 }

 if (avgDuration> DURATION_LIMIT) {
 reachedPeak = true;
 }
 }

 public boolean reachedPeak() {
 return reachedPeak;
 }

}

Now the results will be accurate since modifications to avgDuration are guarded by a lock.
If we exceeded the duration limit we set reachedPeak to true.
There is only one operation going on here and that’s setting reachedPeak to true. Since reachedPeak is initially false the only state change that can happen is from false to true. Because reachedPeak is volatile and writes go straight to main memory it might give the fallacy that there is no need for synchronization when using volatile variables. This is not the case. Volatile solves the problem of visibility but the problem of synchronization exists.
We shall modify our use case and instead of keeping track if we reached the max duration through the reachedPeak variable, we shall store the average duration in a volatile variable.

public class DurationTracker {

 public static final int DURATION_LIMIT = 20;

 private volatile int avgDuration;

 public void addDuration(int duration) {
 if (avgDuration > DURATION_LIMIT) {
 return;
 }

 avgDuration = (avgDuration + duration)/2;
 }

 public boolean reachedPeak() {
 return avgDuration > DURATION_LIMIT;
 }

}

Checking if the avgDuration is bigger than the duration limit should work ok. After all the value is fetched from the main memory. The operation that updates the avgDuration is problematic. Because it’s one-liner it gives the sense it is one atomic operation but is not.
If we break it down on steps it comes down to the following steps

  1. load the avgDuration from the main memory
  2. calculate the sum of the values of the duration and avgDuration
  3. divide the result
  4. store the result to the  avgDuration variable to the main memory

This is four operations. These four operations when executed by multiple threads will cause inconsistent results. For example the value loaded when calculating the avgDuration might be out of date if another thread updated it while we were on step 3.

Thus using volatile does not remove the need for synchronization,

Our example should utilize a lock.

public class DurationTracker {

 public static final int DURATION_LIMIT = 20;

 private volatile int avgDuration;

 private static final Object lock = new Object();
 
 public void addDuration(int duration) {
 if (avgDuration > DURATION_LIMIT) {
 return;
 }

 synchronized (lock) {
 avgDuration = (avgDuration + duration) / 2;
 }
 }

 public boolean reachedPeak() {
 return avgDuration > DURATION_LIMIT;
 }

}

If you think about it, we took advantage of reading from main memory and prevented our threads from trying to acquire a lock which is costly.
Now someone might look at the code snippets and argue that atomic operations would be better. Well will get to that in another blog 😉

Previously we examined the happens before guarantee in java. This guarantee gives us confidence when we write multithreaded programs with regards to the re-ordering of statements that can happen. In this post we shall focus on variable visibility between two threads and what happens when we change a variable that is shared.

👁 Image

Let’s examine the following code snippet:

import java.util.Date;

public class UnSynchronizedCountDown {
 private int number = Integer.MAX_VALUE;

 public Thread countDownUntilAsync(final int threshold) {
 return new Thread(() -> {
 while (number>threshold) {
 number--;
 System.out.println("Decreased "+number +" at "+ new Date());
 }
 });
 }

 private void waitUntilThresholdReached(int threshold) {
 while (number>threshold) {
 }
 }

 public static void main(String[] args) {
 int threshold = 2125840327;

 UnSynchronizedCountDown unSynchronizedCountDown = new UnSynchronizedCountDown();
 unSynchronizedCountDown.countDownUntilAsync(threshold).start();

 unSynchronizedCountDown.waitUntilThresholdReached(threshold);
 System.out.println("Threshold reached at "+new Date());
 }
}

This is a bad piece of code, two threads operate on the same variable number without any synchronization. Now the code will likely run forever! Regardless when the countDown thread reaches the goal the main thread will not pick the new value which is bellow the threshold. This is because the changes made to the number variable have not been made visible to the main thread. So it’s not only about synchronizing and issuing thread safe operations but also ensuring that the changes a thread has made are visible.

Intrinsic locking in java guarantees that one thread can see the changes of another thread. So when we use synchronized the changes of a thread become visible to the other thread that has stumbled on the synchronized block.

Let’s change our example and showcase this:

package com.gkatzioura.concurrency.visibility;


public class SynchronizedCountDown {
 private int number = Integer.MAX_VALUE;
 private String message = "Nothing changed";
 private static final Object lock = new Object();

 private int getNumber() {
 synchronized (lock) {
 return number;
 }
 }

 public Thread countDownUntilAsync(final int threshold) {
 return new Thread(() -> {
 message = "Count down until "+threshold;
 while (number>threshold) {
 synchronized (lock) {
 number--;
 if(number<=threshold) {
 }
 }
 }
 });
 }

 private void waitUntilThresholdReached(int threshold) {
 while (getNumber()>threshold) {
 }
 }

 public static void main(String[] args) {
 int threshold = 2147270516;

 SynchronizedCountDown synchronizedCountDown = new SynchronizedCountDown();
 synchronizedCountDown.countDownUntilAsync(threshold).start();
 System.out.println(synchronizedCountDown.message);

 synchronizedCountDown.waitUntilThresholdReached(threshold);
 System.out.println(synchronizedCountDown.message);
 }
}

Access to the number variable is protected by a lock. Also modifying the variable is synchronized using the same lock.
Eventually the program will terminate as expected since we will reach the threshold. Every time we enter the synchronized block the changes made by the countdown thread will be visible to the main thread. This applies not only to the variables involved on a synchronized block but also to the variables that were visible to the countdown thread. Thus although the message variable was not inside any synchronized block at the end of the program its altered value got publicized, thus saw the right value printed.

Loading Comments...