From 1501a6e6c43dbbff2ff2086ea523746a828ed1c7 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Sat, 10 Jan 2026 09:55:06 -0500 Subject: [PATCH] simplifications and refinements to temporaryresource cache Signed-off-by: Steve Hawkins --- .../controller/ControllerEventSource.java | 13 ++++-- .../source/informer/EventFilterDetails.java | 34 +++++---------- .../source/informer/InformerEventSource.java | 9 ++-- .../informer/TemporaryResourceCache.java | 42 ++++++++----------- .../informer/InformerEventSourceTest.java | 6 ++- .../TemporaryPrimaryResourceCacheTest.java | 35 ++++++++++++---- 6 files changed, 77 insertions(+), 62 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index cf694af360..db80c0f4a9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -31,6 +31,7 @@ import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; @@ -138,15 +139,19 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso @Override public void onAdd(T resource) { - var obsoleteResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(resource); - handleEvent(ResourceAction.ADDED, resource, null, null, obsoleteResourceVersion); + var handling = temporaryResourceCache.onAddOrUpdateEvent(resource); + handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW); } @Override public void onUpdate(T oldCustomResource, T newCustomResource) { - var knownResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource); + var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource); handleEvent( - ResourceAction.UPDATED, newCustomResource, oldCustomResource, null, knownResourceVersion); + ResourceAction.UPDATED, + newCustomResource, + oldCustomResource, + null, + handling != EventHandling.NEW); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index 1920c9cf3b..c3a0237726 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -17,48 +17,36 @@ import java.util.Optional; +import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; class EventFilterDetails { private int activeUpdates = 0; private ResourceEvent lastEvent; - private int lastUpdatedResourceVersion = -1; - - public int getActiveUpdates() { - return activeUpdates; - } public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; } - public void decreaseActiveUpdates() { + public boolean decreaseActiveUpdates() { activeUpdates = activeUpdates - 1; + return activeUpdates == 0; } public void setLastEvent(ResourceEvent event) { lastEvent = event; } - public void setLastUpdatedResourceVersion(String version) { - var parsed = Integer.parseInt(version); - if (parsed > lastUpdatedResourceVersion) { - lastUpdatedResourceVersion = parsed; - } - } - - public Optional getLatestEventAfterLastUpdateEvent() { - if (lastEvent == null) return Optional.empty(); - if (Integer.parseInt(lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion()) - > lastUpdatedResourceVersion) { + public Optional getLatestEventAfterLastUpdateEvent(String updatedResourceVersion) { + if (lastEvent != null + && (updatedResourceVersion == null + || ReconcileUtils.validateAndCompareResourceVersions( + lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(), + updatedResourceVersion) + > 0)) { return Optional.of(lastEvent); - } else { - return Optional.empty(); } - } - - public boolean isFilteringDone() { - return activeUpdates == 0; + return Optional.empty(); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 49b5f32d3e..247a471df2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -33,6 +33,7 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION; @@ -159,10 +160,12 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO primaryToSecondaryIndex.onAddOrUpdate(newObject); var resourceID = ResourceID.fromResource(newObject); - if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) { + var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject); + + if (eventHandling != EventHandling.NEW) { log.debug( - "Skipping event propagation for {}, since was a result of a reconcile action. Resource" - + " ID: {}", + "{} event propagation for {}. Resource ID: {}", + eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping", operation, ResourceID.fromResource(newObject)); } else if (eventAcceptedByFilter(operation, newObject, oldObject)) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 85455b4ce7..5d6e6d71b6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -61,6 +61,12 @@ public class TemporaryResourceCache { private final Map activeUpdates = new HashMap<>(); + public enum EventHandling { + DEFER, + OBSOLETE, + NEW + } + public TemporaryResourceCache(boolean comparableResourceVersions) { this.comparableResourceVersions = comparableResourceVersions; } @@ -79,16 +85,9 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } var ed = activeUpdates.get(resourceID); - ed.decreaseActiveUpdates(); - if (updatedResourceVersion != null) { - ed.setLastUpdatedResourceVersion(updatedResourceVersion); - } - if (ed.getActiveUpdates() == 0) { - var latestEventAfterUpdate = ed.getLatestEventAfterLastUpdateEvent(); - if (latestEventAfterUpdate.isPresent()) { - activeUpdates.remove(resourceID); - } - return latestEventAfterUpdate; + if (ed.decreaseActiveUpdates()) { + activeUpdates.remove(resourceID); + return ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion); } else { return Optional.empty(); } @@ -101,13 +100,13 @@ public void onDeleteEvent(T resource, boolean unknownState) { /** * @return true if the resourceVersion was obsolete */ - public boolean onAddOrUpdateEvent(T resource) { + public EventHandling onAddOrUpdateEvent(T resource) { return onEvent(resource, false, false); } - private synchronized boolean onEvent(T resource, boolean unknownState, boolean delete) { + private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) { if (!comparableResourceVersions) { - return false; + return EventHandling.NEW; } var resourceId = ResourceID.fromResource(resource); @@ -121,7 +120,7 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d latestResourceVersion = resource.getMetadata().getResourceVersion(); } var cached = cache.get(resourceId); - boolean obsoleteEvent = false; + EventHandling result = EventHandling.NEW; int comp = 0; if (cached != null) { comp = ReconcileUtils.validateAndCompareResourceVersions(resource, cached); @@ -130,26 +129,21 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d // we propagate event only for our update or newer other can be discarded since we know we // will receive // additional event - obsoleteEvent = false; + result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW; } else { - obsoleteEvent = true; + result = EventHandling.OBSOLETE; } } var ed = activeUpdates.get(resourceId); - if (ed != null) { + if (ed != null && result != EventHandling.OBSOLETE) { ed.setLastEvent( delete ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState) : new ResourceEvent( ResourceAction.UPDATED, resourceId, resource)); // todo true action - if (ed.isFilteringDone() && ed.getLatestEventAfterLastUpdateEvent().isPresent()) { - activeUpdates.remove(resourceId); - return false; - } else { - return true; - } + return EventHandling.DEFER; } else { - return obsoleteEvent; + return result; } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index f54e47304b..0fc721cccb 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -36,6 +36,7 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; @@ -98,7 +99,7 @@ void skipsEventPropagation() { when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(true); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.OBSOLETE); informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -108,6 +109,7 @@ void skipsEventPropagation() { @Test void processEventPropagationWithoutAnnotation() { + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); @@ -115,6 +117,7 @@ void processEventPropagationWithoutAnnotation() { @Test void processEventPropagationWithIncorrectAnnotation() { + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); informerEventSource.onAdd( new DeploymentBuilder(testDeployment()) .editMetadata() @@ -131,6 +134,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(cachedDeployment)); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); informerEventSource.onUpdate(cachedDeployment, testDeployment()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index 6f423b1cf5..f8e3d74680 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -28,6 +28,7 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static org.assertj.core.api.Assertions.assertThat; @@ -133,7 +134,7 @@ void lockedEventBeforePut() throws Exception { temporaryResourceCache.putResource(testResource); assertThat(result.isDone()).isFalse(); temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "3"); - assertThat(result.get(10, TimeUnit.SECONDS)).isTrue(); + assertThat(result.get(10, TimeUnit.SECONDS)).isEqualTo(EventHandling.NEW); } finally { ex.shutdownNow(); } @@ -145,15 +146,15 @@ void putBeforeEvent() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); - assertThat(result).isFalse(); + assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); temporaryResourceCache.putResource(nextResource); - // the result is false since the put was not part of event filtering update + // the result is obsolete result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); - assertThat(result).isFalse(); + assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @Test @@ -162,7 +163,7 @@ void putBeforeEventWithEventFiltering() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); - assertThat(result).isFalse(); + assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); @@ -172,9 +173,29 @@ void putBeforeEventWithEventFiltering() { temporaryResourceCache.putResource(nextResource); temporaryResourceCache.doneEventFilterModify(resourceId, "3"); - // the result is false since the put was not part of event filtering update + // the result is obsolete result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); - assertThat(result).isTrue(); + assertThat(result).isEqualTo(EventHandling.OBSOLETE); + } + + @Test + void putAfterEventWithEventFiltering() { + var testResource = testResource(); + + // first ensure an event is not known + var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + assertThat(result).isEqualTo(EventHandling.NEW); + + var nextResource = testResource(); + nextResource.getMetadata().setResourceVersion("3"); + var resourceId = ResourceID.fromResource(testResource); + + temporaryResourceCache.startEventFilteringModify(resourceId); + result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + // the result is deferred + assertThat(result).isEqualTo(EventHandling.DEFER); + temporaryResourceCache.putResource(nextResource); + temporaryResourceCache.doneEventFilterModify(resourceId, "3"); } @Test