VOOZH about

URL: https://egkatzioura.com/category/google-cloud/

⇱ Google Cloud – Emmanouil Gkatziouras


Skip to content

Public-key cryptography or asymmetric cryptography is something we use every daily, take for example the TLS on this very website.

From wikipedia

In a public-key encryption system, anyone with a public key can encrypt a message, yielding a ciphertext, but only those who know the corresponding private key can decrypt the ciphertext to obtain the original message.[8]
For example, a journalist can publish the public key of an encryption key pair on a web site so that sources can send secret messages to the news organization in ciphertext.

👁 Generated image

Wikipedia gives a good example. Another example:
You have a process sending data from your premises to another datacenter. Apart from encryption in transit you want to go an extra step and encrypt the contents, ensuring that only certain workloads in the datacenter can decrypt the data.  This could be done with a symmetric key. The issue with a symmetric key is that both parties will have access to the same key, able to encrypt and decrypt information. Both parties might be uneasy to share the same key, plus we have an one direction flow: on premises datacenter to external datacenter.
This is where asymmetric encryption can be used. A public key encrypts the data before they get dispatched and the receiver of the data has the right private key to decrypt the data.

Let’s generate some RSA keys

#generate the private key
openssl genrsa -out keypair.pem 2048
#generate the public key
openssl rsa -in keypair.pem -pubout -out publickey.crt

We have a public key that we can use to encrypt data and a private key to use to decrypt the encrypted data.

The following java snippet reads the keys encrypts a string and decrypts it

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class Main {

 public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException {
 final String privateKeyText;
 final String publicKeyText;

 try(InputStream privateKeyInputstream = new FileInputStream( "/path/to/keypair.pem");
 InputStream publicKeyInputStream = new FileInputStream("/path/to/publickey.crt");) {
 privateKeyText = new String(privateKeyInputstream.readAllBytes());
 publicKeyText = new String(publicKeyInputStream.readAllBytes());
 }

 String privateKeyBase64 = privateKeyText.replaceAll("\\n", "").replace("-----BEGIN PRIVATE KEY-----", "").replace("-----END PRIVATE KEY-----", "");
 String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");

 byte[] privateKeyBytes = Base64.getDecoder().decode(privateKeyBase64);
 byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

 KeyFactory keyFactory = KeyFactory.getInstance("RSA");

 PKCS8EncodedKeySpec privateKeySpec = new PKCS8EncodedKeySpec(privateKeyBytes);
 PrivateKey privateKey = keyFactory.generatePrivate(privateKeySpec);

 X509EncodedKeySpec publicKeySpec = new X509EncodedKeySpec(publicKeyBytes);
 PublicKey publicKey = keyFactory.generatePublic(publicKeySpec);

 String unEncryptedText = "un-encrypted text";
 byte[] message = unEncryptedText.getBytes(StandardCharsets.UTF_8);

 Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 OAEPParameterSpec oaepParams =
 new OAEPParameterSpec(
 "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
 encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
 encryptCypher.update(message);

 byte[] ciphertext = encryptCypher.doFinal();

 Cipher cipher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 cipher.init(Cipher.DECRYPT_MODE, privateKey, oaepParams);
 cipher.update(ciphertext);

 byte[] decrypted = cipher.doFinal();
 String decryptedText = new String(decrypted);

 assert decryptedText.equals(unEncryptedText);
 }

}

To wrap up the above example we read the contents of the keys. The keys are stored in a Base64 format thus we decode them and we create the keys using the decoded bytes. Then using the cipher and the keys we encrypt and decrypt the payload.

On the public cloud we have KMS.

We can use the AWS or the GCP KMS to create a private-public key pair.

Let’s start with the GCP example first. Once we created an asymmetric KMS we can download the public key.

👁 Image

Should we want to decrypt any data it will only happen through the KMS api. The public key can be found through the console.

We shall place the key on the classpath with the name gcp.crt

We shall encrypt he data using the public key downloaded and decrypt them using the KMS API:

import com.google.cloud.kms.v1.*;
import com.google.protobuf.ByteString;

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class GCPKMS {

 public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException {
 try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) {
 String name = "full-gcp-key-name";

 String unEncryptedText = "un-encrypted text";

 InputStream publicKeyInputStream = Main.class.getClassLoader().getResourceAsStream("gcp.crt");
 String publicKeyText = new String(publicKeyInputStream.readAllBytes());
 String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");
 byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

 X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
 java.security.PublicKey publicKey = KeyFactory.getInstance("RSA").generatePublic(keySpec);

 Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 OAEPParameterSpec oaepParams =
 new OAEPParameterSpec(
 "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
 encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
 byte[] ciphertext = encryptCypher.doFinal(unEncryptedText.getBytes(StandardCharsets.UTF_8));


 AsymmetricDecryptResponse response = client.asymmetricDecrypt(AsymmetricDecryptRequest.newBuilder()
 .setCiphertext(ByteString.copyFrom(ciphertext))
 .setName(name)
 .build());

 assert response.getPlaintext().toStringUtf8().equals(unEncryptedText);
 }
 }

}

Pretty similar with AWS KMS

Once we created the asymmetric key we can download the public key from console. 👁 Image

The key shall be placed on the classpath with the name `aws.crt`

We shall encrypt he data using the public key downloaded and decrypt them using the KMS API:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.DecryptRequest;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;

import javax.crypto.Cipher;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class AWSKMS {

 public static void main(String[] args) throws Exception {
 String keyId = "kms-arn";

 KmsClient kmsClient = KmsClient.builder()
 .region(Region.of("us-central-1"))
 .credentialsProvider(DefaultCredentialsProvider.builder()
 .build())
 .build();

 String unEncryptedText = "un-encrypted text";

 InputStream publicKeyInputStream = Main.class.getClassLoader().getResourceAsStream("aws.crt");
 String publicKeyText = new String(publicKeyInputStream.readAllBytes());
 String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");
 byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

 X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
 java.security.PublicKey publicKey = KeyFactory.getInstance("RSA").generatePublic(keySpec);

 Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
 OAEPParameterSpec oaepParams =
 new OAEPParameterSpec(
 "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
 encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
 byte[] ciphertext = encryptCypher.doFinal(unEncryptedText.getBytes(StandardCharsets.UTF_8));

 var resp = kmsClient.decrypt(DecryptRequest.builder()
 .keyId(keyId)
 .encryptionAlgorithm(EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256)
 .ciphertextBlob(SdkBytes.fromByteArray(ciphertext))
 .build());

 assert resp.plaintext().asUtf8String().equals(unEncryptedText);
 }

}

That’s it! We did asymmetric encryption and decryption locally using the rsa keys we created. We did asymmetric encryption using a GCP KMS public key and decrypted using the GCP KMS api. Lastly we did asymmetric encryption using an AWS KMS public key and decrypted using the AWS KMS api.

GCP’s Pub/Sub is one of my favourite services for streaming data. Seamless without getting into an operational overhead. Pub/Sub comes with GCP Schemas, which can be used to define the schema of the data flowing through the topics.

👁 Image

Interacting with the Pub/Sub Schemas is streamlined, you only have to include the Pub/Sub binary:

 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>google-cloud-pubsub</artifactId>
 <version>1.134.0</version>
 </dependency>

Then we can proceed on listing the topics:

 String projectId = "your-gcp-project-id";

 try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
 var response = schemaServiceClient.listSchemas(ProjectName.of(projectId));

 for (var s : response.iterateAll()) {
 System.out.println(s.getName()+ " "+s.getRevisionId());

 Schema schema = schemaServiceClient.getSchema(s.getName());
 System.out.println(schema.getDefinition());
 }

 }

Take note that when listing the topics you just fetch a listing of the names, the actual schema is not fetched even through the same model is used.

To fetch a schema’s definition you need to get the schema through a call.

Through the api you can also submit a schema:

 String projectId = "your-gcp-project-id";
 String schemaId = "example-schema";

 String schemaDefinition = "{ \"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [ { \"name\": \"name\", \"type\": \"string\" } ] }";
 CreateSchemaRequest createSchemaRequest = CreateSchemaRequest.newBuilder()
 .setParent(projectId)
 .setSchema(Schema.newBuilder()
 .setName(SchemaName.of(projectId, schemaId).toString())
 .setType(Schema.Type.AVRO)
 .setDefinition(schemaDefinition)
 .build())
 .build();
 
 System.out.println("Schema created: " + createSchemaRequest.getSchema().getName());

Now you might be wondering what happens on Pub/Sub topics that are bound to a certain schema. In that case the request to publish the message will fail. The validation happens on the server side.

 Publisher publisher = Publisher.newBuilder(TopicName.parse("projects/test-project/topics/test-pub-sub")).build();
 var mes = publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom("invalid-format".getBytes())).build()).get();
...
Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	

When you subscribe to topic with a schema attached any subscription message received has the metadata describing what schema was used to create the message.

MessageReceiver receiver =
 (PubsubMessage message, AckReplyConsumer consumer) -> {
 String name = message.getAttributesMap().get("googclient_schemaname");
 String revision = message.getAttributesMap().get("googclient_schemarevisionid");
 }
...
}

The schema name should be the full schema name including the project id, the revision contains the revision of the schema. In case of multiple revisions you can fetch the revision of interest by specifying it in the getSchema request. Now receiving the schema in a message is handy since it gives you the ability to have a dynamic handling of the various messages received by subscriptions.
For example you can read an avro message from a received subscription, by fetching the schema on demand.

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
 String name = message.getAttributesMap().get("googclient_schemaname");


 Schema schema = schemaServiceClient.getSchema(SchemaName.parse(name));
 if(schema.getType().equals(Type.AVRO)) {
 String definition = schema.getDefinition();
 org.apache.avro.Schema avroSchema =new org.apache.avro.Schema.Parser().parse(definition);

 GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>();

 try(InputStream inputStream = new ByteArrayInputStream(message.getData().toByteArray())) {
 BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
 GenericRecord genericRecord = datumReader.read(null, decoder);
 }
 }

}
...
}

You might want your codebase to benefit further from the registry and even generate the corresponding classes for the various schemas. We have done that in a previous post.

In that case we can use the gcp-schemas-maven-plugin.

You can combine the plugin with a schema model generation plugin.
For example you can download Avro schemas from a GCP projects, and generate the Java classes from those Avro schemas.

 <plugin>
 <groupId>io.github.gkatzioura.gcp</groupId>
 <artifactId>gcp-schemas-maven-plugin</artifactId>
 <version>1.0</version>
 <executions>
 <execution>
 <id>one</id>
 <phase>generate-sources</phase>
 <goals>
 <goal>download</goal>
 </goals>
 <configuration>
 <project>gcp-project-1</project>
 <outputDirectory>src/main/avro</outputDirectory>
 <schemaType>AVRO</schemaType>
 </configuration>
 </execution>
 </executions>
 </plugin>
 <plugin>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-maven-plugin</artifactId>
 <version>1.12.0</version>
 <executions>
 <execution>
 <phase>generate-sources</phase>
 <goals>
 <goal>schema</goal>
 </goals>
 <configuration>
 <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
 <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
 </configuration>
 </execution>
 </executions>
 </plugin>

If you want to be selective on the Schemas to use you can even specify exactly the schema name or the version

 <plugin>
 <groupId>io.github.gkatzioura.gcp</groupId>
 <artifactId>gcp-schemas-maven-plugin</artifactId>
 <version>1.0</version>
 <executions>
 <execution>
 <id>one</id>
 <phase>generate-sources</phase>
 <goals>
 <goal>download</goal>
 </goals>
 <configuration>
 <project>gcp-project-1</project>
 <outputDirectory>src/main/avro</outputDirectory>
 <schemaType>AVRO</schemaType>
 <subjectPatterns>schema1,schema2,schema3</subjectPatterns>
 <versions>113aca6b,69965687,e6dc8d46</versions> 
 </configuration>
 </execution>
 </executions>
 </plugin>

That’s it, hope this streamlines your data pipelines development!

Pub/Sub is a messaging solution provided by GCP

Before we dive into the actual configuration we need to be aware that Spring Cloud for GCP is now managed by the Google Cloud Team. Therefore the latest code can be found here.

👁 Image

Our application will receive messages from Pub/Sub and expose them using an endpoint.
Let’s go for the imports first

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.gkatzioura</groupId>
 <artifactId>spring-cloud-pubsub-example</artifactId>
 <version>1.0-SNAPSHOT</version>

 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.4.1</version>
 <relativePath/>
 </parent>

 <properties>
 <maven.compiler.source>11</maven.compiler.source>
 <maven.compiler.target>11</maven.compiler.target>
 </properties>

 <dependencyManagement>
 <dependencies>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>spring-cloud-gcp-dependencies</artifactId>
 <version>2.0.4</version>
 <type>pom</type>
 <scope>import</scope>
 </dependency>
 </dependencies>
 </dependencyManagement>

 <dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>spring-cloud-gcp-pubsub</artifactId>
 </dependency>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>spring-cloud-gcp-autoconfigure</artifactId>
 </dependency>

 <dependency>
 <groupId>org.springframework.integration</groupId>
 <artifactId>spring-integration-core</artifactId>
 </dependency>
 </dependencies>

</project>

Quick note: with a few tweaks you can use the PubSub emulator available from the Google Cloud Team.

The first class will contain the Pub/Sub messages received. It will be a queue containing a limited number of messages.

package com.gkatzioura.pubsub.example;

import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

@Component
public class LatestUpdates {

 LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);

 public void addUpdate(String update) {
 boundedQueue.add(update);
 }

 public String fetch() {
 return boundedQueue.poll();
 }

}

The Pub/Sub configuration will initiate the listener, plus shall use spring integration.

We define a message channel.

 @Bean
 public MessageChannel pubsubInputChannel() {
 return new DirectChannel();
 }

Then add the inbound channel adapter The ack mode will be set to manual.

 @Bean
 public PubSubInboundChannelAdapter messageChannelAdapter(
 @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
 PubSubTemplate pubSubTemplate) {
 PubSubInboundChannelAdapter adapter =
 new PubSubInboundChannelAdapter(pubSubTemplate, "your-subscription");
 adapter.setOutputChannel(inputChannel);
 adapter.setAckMode(AckMode.MANUAL);
 adapter.setPayloadType(String.class);
 return adapter;
 }

Then we add a listener method. The way acknowledgements are handled is up to the developer. If a exception occurs on that block it will be caught and send on an error stream. Therefore messages will continue to get pulled.

 @ServiceActivator(inputChannel = "pubsubInputChannel")
 public void messageReceiver(String payload,
 @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
 latestUpdates.addUpdate(message.getPubsubMessage().getData().toStringUtf8());
 message.ack();
 }

The entire Pub/Sub configuration

package com.gkatzioura.pubsub.example;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;

import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.AckMode;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;

@Configuration
public class PubSubConfiguration {

 private final LatestUpdates latestUpdates;

 public PubSubConfiguration(LatestUpdates latestUpdates) {
 this.latestUpdates = latestUpdates;
 }

 @Bean
 public MessageChannel pubsubInputChannel() {
 return new DirectChannel();
 }

 @Bean
 public PubSubInboundChannelAdapter messageChannelAdapter(
 @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
 PubSubTemplate pubSubTemplate) {
 PubSubInboundChannelAdapter adapter =
 new PubSubInboundChannelAdapter(pubSubTemplate, "your-subscription");
 adapter.setOutputChannel(inputChannel);
 adapter.setAckMode(AckMode.MANUAL);
 adapter.setPayloadType(String.class);
 return adapter;
 }

 @ServiceActivator(inputChannel = "pubsubInputChannel")
 public void messageReceiver(String payload,
 @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
 latestUpdates.addUpdate(message.getPubsubMessage().getData().toStringUtf8());
 message.ack();
 }

}

The controller will just pull from the internal Queue.

package com.gkatzioura.pubsub.example;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UpdatesController {

 private LatestUpdates latestUpdates;

 public UpdatesController(LatestUpdates latestUpdates) {
 this.latestUpdates = latestUpdates;
 }

 @GetMapping("/update")
 public String getLatestUpdate() {
 return latestUpdates.fetch();
 }

}

Next step is to define an application for Spring

package com.gkatzioura.pubsub.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ExampleApplication {


 public static void main(String[] args) {
 SpringApplication.run(ExampleApplication.class, args);
 }

}

By running the application be aware that you need to have at least one env variable set

spring.cloud.gcp.pubsub.enabled=true

This will fallback to your Local GCP configuration and will identify your credentials as well as the project pointing at.

That’s it! To summarise, we achieved to pull messages from Pub/Sub and expose them on an endpoint.

Previously we had an introduction on the BigQuery Storage API and we proceeded reading data using the Arrow format.
In this tutorial we shall read Data using the Avro format.

👁 Image

What applied on the previous tutorial applies here too.

We shall create a BigQuery Storage Client, create a ReadSession using the Avro format and iterate the data on each stream.

Let’s get started by importing the dependencies, we do import the Avro library needed.

 <dependencyManagement>
 <dependencies>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>libraries-bom</artifactId>
 <version>20.5.0</version>
 <type>pom</type>
 <scope>import</scope>
 </dependency>
 </dependencies>
 </dependencyManagement>

 <dependencies>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>google-cloud-bigquerystorage</artifactId>
 </dependency>
 <dependency>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro</artifactId>
 <version>1.9.2</version>
 </dependency>
 </dependencies>

Our next step would be to create an Avro Data Reader for our rows that have the schema of col1:string, col2:int. In our case we shall just print the data through sys.out

package com.gkatzioura.bigquery.storage.api.avro;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;

import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.protobuf.ByteString;

public class AvroReader {

 private final GenericDatumReader<GenericRecord> datumReader;

 public AvroReader(AvroSchema arrowSchema) {
 Schema schema = new Schema.Parser().parse(arrowSchema.getSchema());
 this.datumReader = new GenericDatumReader<>(schema);
 }

 public void processRows(ByteString avroRows) throws IOException {
 try(InputStream inputStream = new ByteArrayInputStream(avroRows.toByteArray())) {
 BinaryDecoder decoder =DecoderFactory.get().binaryDecoder(inputStream, null);

 while (!decoder.isEnd()) {
 GenericRecord item = datumReader.read(null, decoder);

 System.out.println(item.get("col1")+","+item.get("col2"));
 }
 }
 }

}

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.avro;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class AvroMain {

 public static void main(String[] args) throws Exception {

 String projectId = System.getenv("PROJECT_ID");

 try (BigQueryReadClient client = BigQueryReadClient.create()) {
 String parent = String.format("projects/%s", projectId);

 String srcTable =
 String.format(
 "projects/%s/datasets/%s/tables/%s",
 projectId, System.getenv("DATASET"), System.getenv("TABLE"));

 ReadSession.Builder sessionBuilder =
 ReadSession.newBuilder()
 .setTable(srcTable)
 .setDataFormat(DataFormat.AVRO);


 CreateReadSessionRequest.Builder builder =
 CreateReadSessionRequest.newBuilder()
 .setParent(parent)
 .setReadSession(sessionBuilder)
 .setMaxStreamCount(1);
 ReadSession session = client.createReadSession(builder.build());

 Preconditions.checkState(session.getStreamsCount() > 0);

 String streamName = session.getStreams(0).getName();

 ReadRowsRequest readRowsRequest =
 ReadRowsRequest.newBuilder().setReadStream(streamName).build();

 ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);

 for (ReadRowsResponse response : stream) {
 new AvroReader(session.getAvroSchema()).processRows(response.getAvroRows().getSerializedBinaryRows());
 }
 }
 }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We did specify that the format to be used will be Avro.
Once we get a Response, the response will contain the initiated the Session, the Avro schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Then we pass data to our Avro decoder.

That’s it we just read data from the BigQuery Storage API using Avro and Arrow!

Previously we had an introduction on the BigQuery Storage API. As explained the storage API of BigQuery supports two formats. For this tutorial we will choose the Arrow Format.

👁 Image

First let’s import the dependencies. The BigQuery storage API binary does not come with a library to parse Arrow. This way the consumer receives the binaries in an Arrow format, and it’s up to the consumer on how to consume the binaries and what libraries to use.

 <dependencyManagement>
 <dependencies>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>libraries-bom</artifactId>
 <version>20.5.0</version>
 <type>pom</type>
 <scope>import</scope>
 </dependency>
 </dependencies>
 </dependencyManagement>

 <dependencies>
 <dependency>
 <groupId>com.google.cloud</groupId>
 <artifactId>google-cloud-bigquerystorage</artifactId>
 </dependency>
 <dependency>
 <groupId>org.apache.arrow</groupId>
 <artifactId>arrow-vector</artifactId>
 <version>4.0.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.arrow</groupId>
 <artifactId>arrow-memory-netty</artifactId>
 <version>4.0.0</version>
 </dependency>
 </dependencies>

As mentioned before, when we use Arrow we need to import a library for the memory allocation Arrow needs.

We shall create first a plain Arrow Reader.
This Reader will be BigQuery agnostic. This is one of the benefits when we use a platform-language independent format.

An Arrow Binary shall be submitted to the reader with the schema and the rows shall be printed in CSV format.

package com.gkatzioura.bigquery.storage.api.arrow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;

public class ArrowReader implements AutoCloseable {

 private final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

 private final VectorSchemaRoot root;
 private final VectorLoader loader;

 public ArrowReader(ArrowSchema arrowSchema) throws IOException {
 Schema schema =
 MessageSerializer.deserializeSchema(
 new ReadChannel(
 new ByteArrayReadableSeekableByteChannel(
 arrowSchema.getSerializedSchema().toByteArray())));

 Preconditions.checkNotNull(schema);
 List<FieldVector> vectors = new ArrayList<>();
 for (Field field : schema.getFields()) {
 vectors.add(field.createVector(allocator));
 }

 root = new VectorSchemaRoot(vectors);
 loader = new VectorLoader(root);
 }

 public void processRows(ArrowRecordBatch batch) throws IOException {
 org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
 MessageSerializer.deserializeRecordBatch(
 new ReadChannel(
 new ByteArrayReadableSeekableByteChannel(
 batch.getSerializedRecordBatch().toByteArray())),
 allocator);

 loader.load(deserializedBatch);
 deserializedBatch.close();
 System.out.println(root.contentToTSVString());
 root.clear();
 }

 @Override
 public void close() throws Exception {
 root.close();
 allocator.close();
 }

}

The constructor will have the schema injected, then the schema root shall be created.
Pay attention that we receive the schema in a binary form, it’s up to us and our library on how to read it.

 Schema schema =
 MessageSerializer.deserializeSchema(
 new ReadChannel(
 new ByteArrayReadableSeekableByteChannel(
 arrowSchema.getSerializedSchema().toByteArray())));

You can find more on reading Arrow data on this tutorial.

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.arrow;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class ArrowMain {

 public static void main(String[] args) throws Exception {

 String projectId = System.getenv("PROJECT_ID");

 try (BigQueryReadClient client = BigQueryReadClient.create()) {
 String parent = String.format("projects/%s", projectId);

 String srcTable =
 String.format(
 "projects/%s/datasets/%s/tables/%s",
 projectId, System.getenv("DATASET"), System.getenv("TABLE"));

 ReadSession.Builder sessionBuilder =
 ReadSession.newBuilder()
 .setTable(srcTable)
 .setDataFormat(DataFormat.ARROW);

 CreateReadSessionRequest.Builder builder =
 CreateReadSessionRequest.newBuilder()
 .setParent(parent)
 .setReadSession(sessionBuilder)
 .setMaxStreamCount(1);
 ReadSession session = client.createReadSession(builder.build());

 try (ArrowReader reader = new ArrowReader(session.getArrowSchema())) {
 Preconditions.checkState(session.getStreamsCount() > 0);

 String streamName = session.getStreams(0).getName();

 ReadRowsRequest readRowsRequest =
 ReadRowsRequest.newBuilder().setReadStream(streamName).build();

 ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
 for (ReadRowsResponse response : stream) {
 Preconditions.checkState(response.hasArrowRecordBatch());
 reader.processRows(response.getArrowRecordBatch());
 }
 }
 }
 }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We do have to specify that the format to be used will be Arrow.
Once we get a Response, the response will contain the initiated the Session, the Arrow schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Our next example will focus on reading data in Avro format.

BigQuery provides us with the Storage API for fast access using an rpc-based protocal. With this option you can receive the data in a binary serialized format. The alternative ways to retrieve BigQuery Data is through the Rest API and a Bulk export.

Retrieving data through the Rest API is great for small result sets. For example if a product of an aggregation is going to have limited amount of rows it makes sense to use the Rest API, retrieve the results and use them on an application like Grafana. However👁 Image
when it comes to big result sets retrieving results in json, serializing and storing them, has an extra overhead. Exporting in Binary formats help you avoid this overhead.

Bulk Data export is a good solution to export big result sets however you are limited to where the data are getting stored (Google Cloud Storage), and some daily limits on exports.

Thus the storage API combines the flexibility of using a rpc protocol, the efficiency of downloading big results sets in a binary format and the flexibility to choose where those data shall be stored.

The storage API provides two ways to stream Data, either through Avro or through Arrow.

When using the Storage API first step is to create a Session. The format (Avro/Arrow) should be specified. This session can have more than one Streams, max number of streams can be specified.
Streams will contain the data in the format specified and can be read in parallel. The session expires on its own with no need for handling.

If a Session request is successful then it shall contain the schema of the data and the streams to use to download the data.

For the following example we assume the table, that we read data from has two columns, col1 is a string and col2 is a number. An Arrow example of this schema can be found here.

In order to test the storage api you need an account on GCP with the BigQuery Storage API enabled and a dataset created.

Let’s continue to the Arrow example.

It happens to all of us. We develop stateless applications that can scale horizontally without any effort.
However sometimes cases arise where you need to achieve some type of coordination.

You can go really advanced on this one. For example you can use a framework like Akka and it’s cluster capabilities. Or you can go really simple like rolling a mechanism on your own as long as it gives you the results needed. On another note you can just have different node groups based on the work you need them to do. The options and the solutions can change based on the problem.

If your problem can go with a simple option, one way to do so , provided you use Google Cloud Storage, is to use its lock capabilities.
Imagine for example a scenario of 4 nodes, they do scale dynamically but each time a new node registers you want to change its actions by acquiring a unique configuration which does not collide with a configuration another node might have received.

👁 Image

The strategy can be to use a file on Google Cloud Storage for locking and a file that acts as a centralised configuration registry.

The lock file is nothing more that a file on cloud storage which shall be created and deleted. What will give us lock abilities is the option on GCS to create a file only if it not exists.
Thus a process from one node will try to create the `lock` file, this action would be equivalent to obtaining the lock.
Once the process is done will delete the file, this action would be equivalent to releasing the lock.
Other processes in the meantime will try to create the file (acquire the lock) and fail (file already exists) because other processes have created the file.
Meanwhile the process that has successfully created the file (acquired the lock) will change the centralised configuration registry and once done will delete the file (release the lock).

So let’s start with the lock object.

package com.gkatzioura.gcs.lock;

import java.util.Optional;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;

public class GCSLock {

	public static final String LOCK_STRING = "_lock";
	private final Storage storage;
	private final String bucket;
	private final String keyName;

	private Optional<Blob> acquired = Optional.empty();

	GCSLock(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public boolean acquire() {
		try {
			var blobInfo = BlobInfo.newBuilder(bucket, keyName).build();
			var blob = storage.create(blobInfo, LOCK_STRING.getBytes(), Storage.BlobTargetOption.doesNotExist());
			acquired = Optional.of(blob);
			return true;
		} catch (StorageException storageException) {
			return false;
		}
	}

	public void release() {
		if(!acquired.isPresent()) {
			throw new IllegalStateException("Lock was never acquired");
		}
		storage.delete(acquired.get().getBlobId());
	}

}

As you can see the write specifies to write an object only if it does not exist. This operation behind the scenes is using the x-goog-if-generation-match header which is used for concurrency.
Thus one node will be able to acquire the lock and change the configuration files.
Afterwards it can delete the lock. If an exception is raised probably the operation fails and the lock is already acquired.

To make the example more complete let’s make the configuration file. The configuration file would be a simple json file for key map actions.

package com.gkatzioura.gcs.lock;

import java.util.HashMap;
import java.util.Map;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.json.JSONObject;

public class GCSConfiguration {

	private final Storage storage;
	private final String bucket;
	private final String keyName;

	GCSConfiguration(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public void addProperty(String key, String value) {
		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		final JSONObject configJson;

		if(blob==null) {
			configJson = new JSONObject();
		} else {
			configJson = new JSONObject(new String(blob.getContent()));
		}

		configJson.put(key, value);

		var blobInfo = BlobInfo.newBuilder(blobId).build();
		storage.create(blobInfo, configJson.toString().getBytes());
	}

	public Map<String,String> properties() {

		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		var map = new HashMap<String,String>();

		if(blob!=null) {
			var jsonObject = new JSONObject(new String(blob.getContent()));
			for(var key: jsonObject.keySet()) {
				map.put(key, jsonObject.getString(key));
			}
		}

		return map;
	}

}

It is a simple config util backed by GCS. Eventually it can be changed and put the lock operating inside the addProperty operation, it’s up to the user and the code. For the purpose of this blog we shall just acquire the lock change the configuration and release the lock.
Our main class will look like this.

package com.gkatzioura.gcs.lock;

import com.google.cloud.storage.StorageOptions;

public class Application {

	public static void main(String[] args) {
		var storage = StorageOptions.getDefaultInstance().getService();

		final String bucketName = "bucketName";
		final String lockFileName = "lockFileName";
		final String configFileName = "configFileName";

		var lock = new GCSLock(storage, bucketName, lockFileName);
		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

		var lockAcquired = lock.acquire();
		if(lockAcquired) {
			gcsConfig.addProperty("testProperty", "testValue");
			lock.release();
		}

		var config = gcsConfig.properties();

		for(var key: config.keySet()) {
			System.out.println("Key "+key+" value "+config.get(key));
		}

	}

}

Now let’s go for some multithreading. Ten threads will try to put values, it is expected that they have some failure.

package com.gkatzioura.gcs.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class ApplicationConcurrent {

	private static final String bucketName = "bucketName";
	private static final String lockFileName = "lockFileName";
	private static final String configFileName = "configFileName";

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		var storage = StorageOptions.getDefaultInstance().getService();

		final int threads = 10;
		var service = Executors.newFixedThreadPool(threads);
		var futures = new ArrayList<Future>(threads);

		for (var i = 0; i < threads; i++) {
			futures.add(service.submit(update(storage, "property-"+i, "value-"+i)));
		}

		for (var f : futures) {
			f.get();
		}

		service.shutdown();

		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);
		var properties = gcsConfig.properties();

		for(var i=0; i < threads; i++) { System.out.println(properties.get("property-"+i)); } } private static Runnable update(final Storage storage, String property, String value) { return () -> {
			var lock = new GCSLock(storage, bucketName, lockFileName);
			var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

			boolean lockAcquired = false;

			while (!lockAcquired) {
				lockAcquired = lock.acquire();
				System.out.println("Could not acquire lock");
			}

			gcsConfig.addProperty(property, value);
			lock.release();
		};
	}
}

Obviously 10 threads are ok to display the capabilities. During the thread initialization and execution some threads will try to acquire the lock simultaneously and one will fails, while other threads will be late and will fail and wait until the lock is available.

In the end what is expected is all of them to have their values added to the configuration.
That’s it. If your problems have a simple nature this approach might do the trick. Obviously you can use the http api instead of the sdk. You can find the code on github.

Pub/Sub is a nice tool provided by GCP.  It is really handy and can help you with the messaging challenges you application might face. Actually if you work with GCP it is the managed messaging solution that you can use.

As expected working with the actual Pub/Sub solution comes with some quota, so for

👁 Image
development it is essential to use something which is not going to cost you.

In these cases you can use the Pub/Sub emulator. To get started with the emulator you need to install it

gcloud components install pubsub-emulator

It is indeed convenient however to have a docker image since it is way more portable. Unfortunately there is no official image for that from google cloud. In any case you can use one of the solutions available on Docker Hub.

Now let’s run it

gcloud beta emulators pubsub start --project=test-project

After that your application can connect to the pub/sub emulator. The default port is 8085

I will use a Java unit test as an example for this one.

package org.gkatzioura.pubsub;

import java.io.IOException;
import java.nio.charset.Charset;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class LocalPubSubTest {

 private static final String PROJECT = "test-project";
 private static final String SUBSCRIPTION_NAME = "SUBSCRIBER";
 private static final String TOPIC_NAME = "test-topic-id";

 private static final String hostPort = "127.0.0.1:8085";

 private ManagedChannel channel;
 private TransportChannelProvider channelProvider;
 private TopicAdminClient topicAdmin;

 private Publisher publisher;
 private SubscriberStub subscriberStub;
 private SubscriptionAdminClient subscriptionAdminClient;

 private ProjectTopicName topicName = ProjectTopicName.of(PROJECT, TOPIC_NAME);
 private ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT, SUBSCRIPTION_NAME);

 private Subscription subscription;

 @Before
 public void setUp() throws Exception {
 channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
 channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

 CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

 topicAdmin = createTopicAdmin(credentialsProvider);
 topicAdmin.createTopic(topicName);

 publisher = createPublisher(credentialsProvider);
 subscriberStub = createSubscriberStub(credentialsProvider);
 subscriptionAdminClient = createSubscriptionAdmin(credentialsProvider);
 subscription = subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
 }

 @After
 public void tearDown() throws Exception {
 topicAdmin.deleteTopic(topicName);
 subscriptionAdminClient.deleteSubscription(subscription.getName());
 channel.shutdownNow();
 }

 @Test
 public void testLocalPubSub() throws Exception {
 final String messageText = "text";
 PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
 .setData(ByteString.copyFrom(messageText, Charset.defaultCharset()))
 .build();
 publisher.publish(pubsubMessage).get();

 PullRequest pullRequest = PullRequest.newBuilder()
 .setMaxMessages(1)
 .setReturnImmediately(true) // return immediately if messages are not available
 .setSubscription(subscription.getName())
 .build();

 PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
 String receiveMessageText = pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8();

 Assert.assertEquals(messageText, receiveMessageText);
 }

 private TopicAdminClient createTopicAdmin(CredentialsProvider credentialsProvider) throws IOException {
 return TopicAdminClient.create(
 TopicAdminSettings.newBuilder()
 .setTransportChannelProvider(channelProvider)
 .setCredentialsProvider(credentialsProvider)
 .build()
 );
 }

 private SubscriptionAdminClient createSubscriptionAdmin(CredentialsProvider credentialsProvider) throws IOException {
 SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
 .setCredentialsProvider(credentialsProvider)
 .setTransportChannelProvider(channelProvider)
 .build();
 return SubscriptionAdminClient.create(subscriptionAdminSettings);
 }

 private Publisher createPublisher(CredentialsProvider credentialsProvider) throws IOException {
 return Publisher.newBuilder(topicName)
 .setChannelProvider(channelProvider)
 .setCredentialsProvider(credentialsProvider)
 .build();
 }

 private SubscriberStub createSubscriberStub(CredentialsProvider credentialsProvider) throws IOException {
 SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
 .setTransportChannelProvider(channelProvider)
 .setCredentialsProvider(credentialsProvider)
 .build();
 return GrpcSubscriberStub.create(subscriberStubSettings);
 }

}

That’s it. Now you can have some cost efficient unit tests!

One of the major issues when dealing with large codebases in our teams has to do with artifact sharing and artifact storage.

There are various options out there that provide many features such as jfrog, nexus, archiva etc.

I have been into using them, setting them up and configuring and they certainly provide you with many features. Also having you own repository installation gives you a lot of flexibility. Furthermore docker has made things a lot easier and thus setting them up takes almost no time.

Now if you use a cloud provider like amazon, azure etc there is a more lightweight option and pretty easy to setup. By using a cloud provider such as amazon, azure or google you have cheap and easy access to storage. The storage options that they provide can also be used in order to host your private artifacts or even your public ones.

To do so you need to use a maven wagon which is capable to communicate with the storage options that your cloud provider has and this is exactly what the CloudStorageMaven project deals with.

The CloudStorageMaven project provides you with wagons interacting with Amazon S3, Azure Blob Storage and Google Cloud Storage.

If you already use one of these cloud services hosting your artificats on them seems like a no brainer and theese wagons make it a lot easier to do so.

I have compiled some tutorials on how to get started with each one of them

Happy coding!

Loading Comments...