VOOZH about

URL: https://dev.to/bingulhan/i-rebuilt-the-core-of-my-data-engine-heres-what-changed-in-v130-35od

⇱ I Rebuilt the Core of My Data Engine — Here's What Changed in v1.3.0 - DEV Community


I Rebuilt the Core of My Data Engine — Here's What Changed in v1.3.0

What is Nexus Core?

Nexus Core is a standalone Java application — a central data orchestration engine that sits between your distributed server infrastructure and your persistence layer. All data operations (reads, writes, deletes, increments) are routed through it via a Redis pub/sub message bus. Your servers stay thin. Your data logic lives in one place.

This release focused on three things: live health signaling, intelligent cache management, and thread isolation.


🫀 Live Protocol — Heartbeat Every Second

The most important addition in v1.3.0 is LIVE — a new RequestType in DataAddon that represents Nexus announcing its own presence on the network.

Every second, RedisDataContainer schedules a broadcast to the darkland_nexus_live channel:

// RedisDataContainer.java
rm.scheduleTask(this::sendNetworkLiveBroadcast, 1, 1, TimeUnit.SECONDS);
private void sendNetworkLiveBroadcast() {
 NexusApplication.getApplication().getRedisManager().processTask(() -> {
 NexusJsonDataContainer jsonDataContainer = new NexusJsonDataContainer();
 jsonDataContainer.set("type", "LIVE");
 jsonDataContainer.set("source", "nexus");
 jsonDataContainer.set("time", System.currentTimeMillis() / 1000L);

 NexusApplication.getApplication().getRedisManager()
 .publish("darkland_nexus_live", jsonDataContainer.toFullJson());
 });
}

Any subscriber listening to darkland_nexus_live can now verify that Nexus is alive before sending requests. If the heartbeat stops, clients know the engine is down and can react accordingly — no more firing requests into silence.

The LIVE type is also formally part of the RequestType enum in DataAddon:

public enum RequestType {
 SET_DATA, GET_DATA, UPDATE_DATA, REMOVE_DATA,
 BROADCAST, LOAD_CACHE, INCREMENT_DATA, LIVE
}

🗄️ Cache — Dynamic TTL per DataAddon

Previously there was no way to define per-module cache lifetimes. v1.3.0 adds an abstract method to DataAddon:

public abstract int getCacheTTL();

Every addon now declares its own TTL. When a key is written to Redis, that TTL is applied directly:

// RedisManager.java
public void setData(String key, String json, DataAddon addon) {
 processTask(() -> {
 try (Jedis jedis = pool.getResource()) {
 SetParams params = new SetParams().ex(addon.getCacheTTL());
 jedis.set(key, json, params);
 }
 });
}

And on every cache hit, the TTL is renewed (touch-to-renew / sliding expiration):

// DataAddon.java — inside getData()
if (app.getRedisManager().exists(keyTag)) {
 String redisJson = app.getRedisManager().getData(keyTag).get();
 DataModel m = new DataModel(keyTag, UUID.randomUUID().toString(),
 modelInitComp(redisJson), this, specificValue);
 app.getDataContainer().addModelFix(keyTag, m);
 app.getRedisManager().renewTTL(keyTag, getCacheTTL()); // sliding expiration
 return Optional.of(m);
}
// RedisManager.java
public void renewTTL(String key, int seconds) {
 processTask(() -> {
 try (Jedis jedis = pool.getResource()) {
 jedis.expire(key, seconds);
 }
 });
}

Active data stays warm. Unused data evicts itself. No manual intervention needed.


🔄 L1 / L2 Auto-Invalidation via Keyspace Notifications

When Nexus connects to Redis, it automatically enables keyspace notifications:

// RedisManager.java
public void enableKeyspaceNotifications() {
 try (Jedis jedis = pool.getResource()) {
 jedis.configSet("notify-keyspace-events", "Ex");
 System.out.println("Nexus: Keyspace Notifications (Expired) enabled via Jedis.");
 }
}

A dedicated thread (Nexus-L1-Sync-Thread) subscribes to the expiry channel and immediately purges the in-process L1 cache when a key expires in Redis:

// RedisDataContainer.java
public void startL1SyncListener() {
 new Thread(() -> {
 String expiredChannel = "__keyevent@0__:expired";
 while (!Thread.currentThread().isInterrupted()) {
 try (Jedis jedis = pool.getResource()) {
 jedis.subscribe(new JedisPubSub() {
 @Override
 public void onMessage(String channel, String message) {
 removeModel(message);
 LOGGER.warning("[Nexus] Key expired: " + message
 + ", Removing from L1 cache to maintain data integrity.");
 }
 }, expiredChannel);
 } catch (Exception e) {
 try { Thread.sleep(5000); } catch (InterruptedException ie) { break; }
 }
 }
 }, "Nexus-L1-Sync-Thread").start();
}

The result: L1 (RAM) and L2 (Redis) are always in sync. Stale reads are gone.


⚡ Dual-Queue Worker Architecture

Before v1.3.0, inbound messages and outbound tasks competed on the same thread pool. v1.3.0 separates them completely with two independent LinkedBlockingQueue instances:

// RedisManager.java
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(50000);
private final BlockingQueue<Runnable> internalTaskQueue = new LinkedBlockingQueue<>(50000);

Inbound workers (one per CPU core) pull from messageQueue and handle incoming Redis messages:

private void startInboundWorkers() {
 int cores = Runtime.getRuntime().availableProcessors();
 for (int i = 0; i < cores; i++) {
 new Thread(() -> {
 NexusReceiver receiver = new NexusReceiver(this);
 while (!Thread.currentThread().isInterrupted()) {
 try {
 String msg = messageQueue.take();
 receiver.handleSyncMessage(msg);
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 break;
 }
 }
 }, "Nexus-Inbound-Worker-" + i).start();
 }
}

Outbound workers pull from internalTaskQueue and handle Redis/Mongo write operations:

private void startOutboundWorkers() {
 int workers = Math.max(2, Runtime.getRuntime().availableProcessors());
 for (int i = 0; i < workers; i++) {
 new Thread(() -> {
 while (!Thread.currentThread().isInterrupted()) {
 try {
 Runnable task = internalTaskQueue.take();
 task.run();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 break;
 }
 }
 }, "Nexus-Outbound-Worker-" + i).start();
 }
}

An inbound spike can no longer delay outbound responses. If the outbound queue is full, the task falls back to the ScheduledExecutorService:

public void processTask(Runnable task) {
 if (!internalTaskQueue.offer(task)) {
 scheduler.execute(task); // fallback
 }
}

🔍 Reflection Caching

DataAddon uses annotations (@DbDataModels) to define its schema. Previously these fields were scanned via reflection on every request. v1.3.0 caches the result with double-checked locking:

// DataAddon.java
private volatile Field[] cachedAnnotatedFields = null;
private final Object fieldCacheLock = new Object();

private Field[] getAnnotatedFields() {
 if (cachedAnnotatedFields != null) return cachedAnnotatedFields;
 synchronized (fieldCacheLock) {
 if (cachedAnnotatedFields != null) return cachedAnnotatedFields;
 cachedAnnotatedFields = java.util.Arrays.stream(getClass().getDeclaredFields())
 .filter(f -> f.isAnnotationPresent(DbDataModels.class))
 .peek(f -> f.setAccessible(true))
 .toArray(Field[]::new);
 }
 return cachedAnnotatedFields;
}

Same pattern for the ID field — scanned once, cached for all subsequent requests:

private volatile String cachedIdFieldName = null;
private volatile Class<?> cachedIdClass = null;
private final Object idCacheLock = new Object();

Reflection runs once per addon class. Every request after that hits the cache directly.


Architecture Summary

Distributed Servers
 Server #1 ─┐
 Server #2 ──┼──► darkland_nexus (Redis) ──► Nexus-Inbound-Worker-N
 Server #3 ─┘ │
 NexusReceiver.handleSyncMessage()
 │
 Nexus-Outbound-Worker-N
 │
 ┌──────────────┴──────────────┐
 │ L1 Cache (ConcurrentHashMap)│
 │ L2 Cache (Redis + TTL) │
 │ MongoDB (persistence) │
 └─────────────────────────────┘
 │
 darkland_nexus_live ◄── heartbeat every 1s

Source → github.com/mustafabinguldev/nexus-core