![]() |
VOOZH | about |
We’re so glad you’re here. You can expect all the best TNS content to arrive Monday through Friday to keep you on top of the news and at the top of your game.
Check your inbox for a confirmation email where you can adjust your preferences and even join additional groups.
Follow TNS on your favorite social media networks.
Become a TNS follower on LinkedIn.
Check out the latest featured and trending stories while you wait for your first TNS newsletter.
KafkaShareConsumer class includes a configuration property that allows applications fine-grained control over the life cycle and acknowledgement of each message. By setting the property share.acknowledgment.mode to explicit, the client can now use the values of the AcknowledgmentType enumeration to specify how a message moves through the state machine of the share partition.
👁 Kafka KIP-932 share groupKafkaShareConsumer with the ACCEPT acknowledgement type to move this message to the Acknowledged stage of the life cycle. The real control here comes from exception cases. The developer can determine where to RELEASE or REJECT the ConsumerRecord based on the type of exception that occurred during processing. For errors that can be retried, use RELEASE to (potentially) send the message back to the Available state. If the error isn’t recoverable or the message is erroneous, use REJECT to send the message to the Archived state and move on with processing others.
Here’s how this looks in practice:
KafkaShareConsumer.subscribe to one or more topics.ConsumerRecords.doProcessing() method.acknowledge() that record with the AcknowledgementType.ACCEPT.acknowledge() that record with the AcknowledgementType.RELEASE.KafkaShareConsumer.commitSync() method will commit the message-level acknowledgements from this batch. Otherwise, the acknowledgements are automatically committed. public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-share-group");
props.put("share.acknowledgement.mode", "explicit");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(
props, new StringDeserializer(), new StringDeserializer());
// subscribe to a topic, joining the share group
consumer.subscribe(List.of("foo"));
while (true) {
// Fetch a batch of records acquired for this consumer...
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
try {
doProcessing(record);
// Acknowledge this event was handled without error.
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
logger.error("Consumer: Error handling event: {}", e.getMessage(), e);
// NOTE: Choosing to RELEASE on any exception here.
consumer.acknowledge(record, AcknowledgeType.RELEASE);
}
});
// OPTIONALLY: Commit the Acknowledgements of this batch of records.
//consumer.commitSync();
}
}
client_credentials grant type. An alternative was building and maintaining some custom integrations with Kafka’s pluggable authentication interface, perhaps with an identity provider like Okta, AWS IAM, Google Cloud IAM or Auth0. These resulted in long-lived, static credentials or required organizations to build and maintain a complex, often unsupported bespoke solution to integrate Kafka clients with their modern identity provider.
With KIP-1139, Kafka clients now support the jwt-bearer token grant type, enabling clients to authenticate securely using industry-standard JSON Web Tokens (JWTs) from their existing OAuth 2.o and OpenID Connect (OIDC) identity providers. This eliminates the need for static credentials. Enterprises with strong security requirements can use this native support of jwt-bearer for all Kafka client applications: producers, consumers and Kafka connectors.
Consult your cloud provider’s documentation on how to obtain the connection information needed by your clients and brokers. To learn how to configure your identity provider to leverage the functionality of jwt-bearer token grants, see this Confluent documentation.
group.protocol=streams, that helps differentiate between different types of consumers in your event streaming ecosystem. There are new operations in the Admin API to describe these streams groups, as well as new command line interface (CLI) scripts.
As for monitoring, new broker-side metrics also separate these protocols — be it a traditional consumer group, a share group or a streams application. Increased task-level visibility in your Kafka Streams application will help with debugging and troubleshooting scenarios.
Since this is for an early access feature, it’s strictly for testing purposes and not yet ready for production uses. Kafka Streams developers are encouraged to experiment and provide feedback that will help shape the future of this functionality.