VOOZH about

URL: https://thenewstack.io/simplified-data-pipelines-with-pulsar-transformation-functions/

⇱ Simplified Data Pipelines with Pulsar Transformation Functions - The New Stack


TNS
SUBSCRIBE
Join our community of software engineering leaders and aspirational developers. Always stay in-the-know by getting the most important news and exclusive content delivered fresh to your inbox to learn more about at-scale software development.
REQUIRED
It seems that you've previously unsubscribed from our newsletter in the past. Click the button below to open the re-subscribe form in a new tab. When you're done, simply close that tab and continue with this form to complete your subscription.
The New Stack does not sell your information or share it with unaffiliated third parties. By continuing, you agree to our Terms of Use and Privacy Policy.
Welcome and thank you for joining The New Stack community!
Please answer a few simple questions to help us deliver the news and resources you are interested in.
REQUIRED
REQUIRED
REQUIRED
REQUIRED
REQUIRED
Great to meet you!
Tell us a bit about your job so we can cover the topics you find most relevant.
REQUIRED
REQUIRED
REQUIRED
REQUIRED
REQUIRED
Welcome!

We’re so glad you’re here. You can expect all the best TNS content to arrive Monday through Friday to keep you on top of the news and at the top of your game.

What’s next?

Check your inbox for a confirmation email where you can adjust your preferences and even join additional groups.

Follow TNS on your favorite social media networks.

Become a TNS follower on LinkedIn.

Check out the latest featured and trending stories while you wait for your first TNS newsletter.

PREV
1 of 2
NEXT
VOXPOP
As a JavaScript developer, what non-React tools do you use most often?
Angular
0%
Astro
0%
Svelte
0%
Vue.js
0%
Other
0%
I only use React
0%
I don't use JavaScript
0%
Thanks for your opinion! Subscribe below to get the final results, published exclusively in our TNS Update newsletter:
NEW! Try Stackie AI
From clobbered drafts to real-time sync
Apr 14th 2026 10:00am, by David Moore
TypeScript 6.0 RC arrives as a bridge to a faster future
Mar 14th 2026 9:00am, by Darryl K. Taft
Mastra empowers web devs to build AI agents in TypeScript
Jan 28th 2026 11:00am, by Loraine Lawson
2023-04-18 08:53:37
Simplified Data Pipelines with Pulsar Transformation Functions
sponsor-datastax,sponsored-post-contributed,
Data / Low Code / No Code / Open Source

Simplified Data Pipelines with Pulsar Transformation Functions

They provide a low-code way to develop basic processing and routing of data using existing Pulsar features.
Apr 18th, 2023 8:53am by Christophe Bornet
👁 Featued image for: Simplified Data Pipelines with Pulsar Transformation Functions
DataStax sponsored this post.
Using functions in the cloud is a very efficient way of creating iterable workflows that can transform data, analyze source code, make platform configurations, and do many other useful jobs. As you develop a function you will quickly realize a need for a solid foundation of utilities and formatting. A typical function in production isn’t just one simple class doing some simple job — that’s usually hello world :). Similar to designing microservices, functions have boilerplate code and need standardized processes, and writing the boilerplate code can feel like valuable time spent on a seemingly mindless task. You bring value to a project by creating its core logic, not by creating JSON-parsing methods. The streaming engineering team at DataStax experienced this reality first hand while creating the DataStax Astra Streaming platform; we decided to do something about it by creating Transformation Functions. These are pre-made Pulsar functions that can be your boilerplate processes in a data pipeline. Before transformations, when you wanted to build data pipelines on top of Apache Pulsar, you had limited options:
  • Build your own service with one of the Pulsar clients that will consume from a topic, process the message and publish the result to another topic. A lot of boilerplate code needs to be written for this.
  • Use a full-fledged stream processing engine such as Apache Flink or Apache Spark. These technologies are very advanced and support SQL so you don’t need to write a lot of code. But that’s another technology to deploy in your stack that has its own maintenance burden and cost of acquisition. Flink and Spark are useful for complex real-time analytics but they are overkill for simple cases such as removing or renaming a field in a structured message.
DataStax, an IBM company, provides the real-time vector data tools that Gen AI apps need, with seamless integration with developers’ stacks of choice.
Learn More
The latest from DataStax
Transformation Functions create a new option in your data pipelines. The goals of the project were simple:
  • Provide a low-code solution to develop basic processing and routing of data.
  • Use existing Pulsar features that don’t require anything more than standard Pulsar.
  • Have the possibility to be played in memory, in front of a sink so you don’t have to use an intermediate topic. (This feature comes from PIP 193.)

About Transformation Functions

A Transformation Function is essentially a regular Pulsar Function created in Java. The functions are a suite of commonly used operations. Similar to connectors and other Pulsar artifacts, Transformation Functions are packaged as a NAR and can be deployed in a Pulsar cluster using the pulsar-admin CLI or as a built-in function. Transformation Functions can be “connected” together to perform multiple-step processes and can include a “when” conditional to skip certain steps in the flow. Because it’s a Pulsar Function, there are no needed add-ons or extensions to use it. A function can be deployed quickly to a Pulsar standalone instance or in a fully functioning production cluster. When you create an instance of a function, you pass a JSON formatted configuration. The configuration contains the list of operations to apply in series on the data. As a low-code solution, the only “language” you need to know is the basic DSL (domain specific language) used by the configuration.
👁 Image

A transformation function that doubles the input value

Function Operations

Available Transformation Functions include:
  • Cast: modifies the key or value schema to a target-compatible schema.
  • Drop-fields: drops fields from structured data.
  • Merge-key-value: merges the fields of key-value records where both the key and value are structured data with the same schema type.
  • Unwrap-key-value: if the record is a key-value, extracts the key-value’s key or value and makes it the record value.
  • Flatten: flattens structured data.
  • Drop: drops a record from further processing.
  • Compute: computes new field values on the fly or replaces existing ones.

Example Configuration

Here is an example of connecting multiple functions together in series, to manipulate message data:
{
 "steps": [
 {"type": "drop-fields", "fields": "password", "part": "value"},
 {"type": "merge-key-value"},
 {"type": "unwrap-key-value"},
 {"type": "cast", "schema-type": "STRING"}
 ]
}
Say, for example, the Pulsar function using this transformation had a `KeyValue<AVRO, AVRO>` input type. The schema included a “userId” key and the firstname, lastname, and password fields. The function would automatically perform the following steps on the message data:
  1. Drop the “password” field from processing
  2. Merge the userId key-value with the rest of the fields
  3. Unwrap the value out of the key-value object
  4. Cast to a string type and return
The output of the function would be a JSON-formatted string with the fields userId, firstName and lastName.

Transformation Function Compute Operation

Among all the operation types, one that is particularly powerful is the “compute” operation. It is used to create or update message values, properties or metadata with an expression. The expression can take input from fixed values, message values, properties or metadata. The expression language features:
  • Arithmetic operations: +, – (binary), *, / and div, % and mod, – (unary)
  • Logical operations: and, &&, or, ||, not, !
  • Relational operations: ==, eq, !=, ne, <, lt, >, gt, <=, ge, >=, le.
  • Utility functions: uppercase, contains, trim, concat, coalesce, now, dateadd
  • Referencing values from: key (for key-value), value, messageKey, topicName, destinationTopic, eventTime, properties
  • Referencing nested values of structured key and value (such as `value.my_value_field`)
For instance, here is the configuration for a transformation function to route messages to topics based on their message key:
{
 "steps": [
 {
 "type": "compute", 
 "fields": [ 
 {
 "name": "destinationTopic", 
 "expression" : "fn:concat('routed-', messageKey)"
 }
 ]
 }
 ]
}


An example message that has the key “foo” will be published to the topic “routed-foo”. An example message that has the key bar will be published to the topic routed-bar.

Taking Transformation Functions Further

Let’s take a concrete example and see how Transformation Functions make things so much easier. For this example, we’ll refer to the use cases from a previous blog post “Developing and Running Serverless Apache Pulsar Functions.” In this post, three functions were written:
  • enricher: takes a byte array input, converts it to string and adds an “EUR” suffix.
  • filter: takes a String input, extracts the first word (up to a space), converts it to double and filters values that are below a configurable threshold.
  • content-based router: takes a Double input and routes values below 1,000 to the topic `cbr-low` and values above 1,000 to the topic `cbr-high` after converting them to String.
Transformation Functions simplify this previous work. Consider the following function configurations that replace each of those examples. The “enricher” function:
{
 "steps": [
 {
 "type": "compute", 
 "fields": [{"name": "value", "expression": "fn:concat(value, ' EUR')"}]
 }
 ]
}
The “filter” function:
{
 "steps": [
 {
 "type": "compute",
 "fields": [{"name": "value", "expression": "fn:replace(value, ' .*', '')"}]
 },
 {
 "type": "drop",
 "when": "value < 123.45"
 }
 ]
}
The “cbr” function (assuming we set the output topic for the function to `cbr-high`):
{
 "steps": [
 {
 "type": "compute",
 "fields": [{ "name": "destinationTopic", "expression": "'persistent://cbornet-examples/default/cbr-low'"}],
 "when": "value < 1000"
 }
 ]
}
Please note that there are small differences between these functions and the ones from this article. For instance, the filter function from the blog post logs a message for values that are below the threshold. Apart from that we can create these functions in a much simpler way.

Deploying the Functions on Astra Streaming

Transformation Functions are built into DataStax’s managed Pulsar platform, Astra Streaming. You deploy as a standard function, declaring the function-type as transforms. Continuing from the example blog functions, we can deploy those transformations with the following commands using the pulsar-admin CLI. The “enricher” function:
bin/pulsar-admin functions create \
 --function-type transforms \
 --name enricher \
 --inputs cbornet-examples/default/enricher-in \
 --output cbornet-examples/default/enricher-out \
 --user-config "{\"steps\": [{ \"type\": \"compute\", \"fields\": [{ \"name\": \"value\", \"expression\": \"fn:concat(value, ' EUR')\" }] }] }" \
 --tenant cbornet-examples \
 --namespace default \
 --auto-ack true
The “filter” function:
bin/pulsar-admin functions create \
 --function-type transforms \
 --name filter \
 --inputs cbornet-examples/default/enricher-out \
 --output cbornet-examples/default/filter-out \
 --user-config "{\"steps\": [{ \"type\": \"compute\", \"fields\": [{ \"name\": \"value\", \"expression\": \"fn:replace(value, ' .*', '')\"}] }, { \"type\": \"drop\", \"when\": \"value < 123.45\" } ]}" \
 --tenant cbornet-examples \
 --namespace default \
 --auto-ack true
The “cbr” function:
bin/pulsar-admin functions create \
 --function-type transforms \
 --name cbr \
 --inputs cbornet-examples/default/filter-out \
 --output cbornet-examples/default/cbr-high \
 --user-config "{\"steps\": [{\"type\": \"compute\", \"fields\": [{\"name\": \"destinationTopic\", \"expression\": \"'persistent://cbornet-examples/default/cbr-low'\"}], \"when\": \"value < 1000\"} ]}" \
 --tenant cbornet-examples \
 --namespace default \
 --auto-ack true

Getting Started with Transformation Functions

This first set of operations are based on the use cases we have seen in the field. We know there are many more operations that could be added. Please provide your feedback and make suggestions in the issue tracker of the project. If you want to test this feature quickly, you can get a free Pulsar instance in under a minute with Astra Streaming. This instance will have the Transformation Functions built in. You can immediately create transformation instances with the pulsar-admin CLI. This is already used in production by customers. Astra Streaming and Luna Streaming 2.10 also have the possibility to bundle the function with a sink and have the transformation done in memory (see PIP 193), which is a great way to reduce storage costs and to improve latency by avoiding the use of an intermediate topic. The ability to bundle a function with a sink will also be a part of the Apache Pulsar project, starting in version 3.0. Learn more about DataStax
DataStax, an IBM company, provides the real-time vector data tools that Gen AI apps need, with seamless integration with developers’ stacks of choice.
Learn More
The latest from DataStax
TRENDING STORIES
Christophe Bornet is a senior software engineer at DataStax and an Apache Pulsar committer. Through his love for open source, he's also a core team member of the JHipster and OpenAPI Generator projects.
Read more from Christophe Bornet
DataStax sponsored this post.
SHARE THIS STORY
TRENDING STORIES
TNS owner Insight Partners is an investor in: Pragma.
SHARE THIS STORY
TRENDING STORIES
TNS DAILY NEWSLETTER Receive a free roundup of the most recent TNS articles in your inbox each day.
The New Stack does not sell your information or share it with unaffiliated third parties. By continuing, you agree to our Terms of Use and Privacy Policy.