From da91b742e3424287195cf92c599b16d088ff7842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 15 Jan 2026 15:53:26 +0100 Subject: [PATCH 1/4] Event filtering now records resource action and previous resource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is important to have a correct further event propagation Signed-off-by: Attila Mészáros --- .../processing/event/EventProcessor.java | 2 +- .../processing/event/EventSourceManager.java | 2 +- .../{controller => }/ResourceAction.java | 2 +- .../controller/ControllerEventSource.java | 7 +++- .../controller/ResourceDeleteEvent.java | 1 + .../source/controller/ResourceEvent.java | 1 + .../informer/ExtendedResourceEvent.java | 42 +++++++++++++++++++ .../source/informer/InformerEventSource.java | 27 +++++------- .../informer/ManagedInformerEventSource.java | 22 +++++++--- .../informer/TemporaryResourceCache.java | 19 +++++---- .../processing/event/EventProcessorTest.java | 2 +- .../event/ResourceStateManagerTest.java | 2 +- .../controller/ControllerEventSourceTest.java | 1 + .../informer/InformerEventSourceTest.java | 15 ++++--- .../TemporaryPrimaryResourceCacheTest.java | 37 +++++++++++----- 15 files changed, 130 insertions(+), 52 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/{controller => }/ResourceAction.java (90%) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 3685b509aa..b476c39614 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -37,7 +37,7 @@ import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.event.source.Cache; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 411fc10e31..62e19394c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -37,9 +37,9 @@ import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java similarity index 90% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java index 33c4c5a2d6..fff8680913 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.javaoperatorsdk.operator.processing.event.source.controller; +package io.javaoperatorsdk.operator.processing.event.source; public enum ResourceAction { ADDED, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index db80c0f4a9..c92d8a0c5b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -28,6 +28,7 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; @@ -139,13 +140,15 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso @Override public void onAdd(T resource) { - var handling = temporaryResourceCache.onAddOrUpdateEvent(resource); + var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null); handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW); } @Override public void onUpdate(T oldCustomResource, T newCustomResource) { - var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource); + var handling = + temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.UPDATED, newCustomResource, oldCustomResource); handleEvent( ResourceAction.UPDATED, newCustomResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java index ac21250051..6219207faf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; /** * Extends ResourceEvent for informer Delete events, it holds also information if the final state is diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java index 395f3755fb..88f9bf8716 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; public class ResourceEvent extends Event { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java new file mode 100644 index 0000000000..4ae476a3de --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java @@ -0,0 +1,42 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Optional; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; + +/** Used only for resource event filtering. */ +public class ExtendedResourceEvent extends ResourceEvent { + + private HasMetadata previousResource; + + public ExtendedResourceEvent( + ResourceAction action, + ResourceID resourceID, + HasMetadata latestResource, + HasMetadata previousResource) { + super(action, resourceID, latestResource); + this.previousResource = previousResource; + } + + public Optional getPreviousResource() { + return Optional.ofNullable(previousResource); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 247a471df2..2cb81dede4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -32,7 +32,7 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION; @@ -107,7 +107,7 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - onAddOrUpdate(Operation.ADD, newResource, null); + onAddOrUpdate(ResourceAction.ADDED, newResource, null); } @Override @@ -120,7 +120,7 @@ public void onUpdate(R oldObject, R newObject) { newObject.getMetadata().getResourceVersion(), oldObject.getMetadata().getResourceVersion()); } - onAddOrUpdate(Operation.UPDATE, newObject, oldObject); + onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject); } @Override @@ -156,27 +156,27 @@ public synchronized void start() { manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate); } - private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) { + private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) { primaryToSecondaryIndex.onAddOrUpdate(newObject); var resourceID = ResourceID.fromResource(newObject); - var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject); + var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); if (eventHandling != EventHandling.NEW) { log.debug( "{} event propagation for {}. Resource ID: {}", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping", - operation, + action, ResourceID.fromResource(newObject)); - } else if (eventAcceptedByFilter(operation, newObject, oldObject)) { + } else if (eventAcceptedByFilter(action, newObject, oldObject)) { log.debug( "Propagating event for {}, resource with same version not result of a reconciliation." + " Resource ID: {}", - operation, + action, resourceID); propagateEvent(newObject); } else { - log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); + log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID); } } @@ -251,11 +251,11 @@ public boolean allowsNamespaceChanges() { return configuration().followControllerNamespaceChanges(); } - private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) { + private boolean eventAcceptedByFilter(ResourceAction operation, R newObject, R oldObject) { if (genericFilter != null && !genericFilter.accept(newObject)) { return false; } - if (operation == Operation.ADD) { + if (operation == ResourceAction.ADDED) { return onAddFilter == null || onAddFilter.accept(newObject); } else { return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject); @@ -266,9 +266,4 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) { return (onDeleteFilter == null || onDeleteFilter.accept(resource, b)) && (genericFilter == null || genericFilter.accept(resource)); } - - private enum Operation { - ADD, - UPDATE - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 620edd729e..4a33d23bfd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -42,7 +42,7 @@ import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.*; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; @SuppressWarnings("rawtypes") @@ -90,6 +90,7 @@ public void changeNamespaces(Set namespaces) { * Also makes sure that the even produced by this update is filtered, thus does not trigger the * reconciliation. */ + @SuppressWarnings("unchecked") public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator updateMethod) { ResourceID id = ResourceID.fromResource(resourceToUpdate); if (log.isDebugEnabled()) { @@ -110,12 +111,21 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< res.ifPresent( r -> { R latestResource = (R) r.getResource().orElseThrow(); - // for update we need to have a historic resource, this might be improved to mimic more - // realistic scenario + + // as previous resource version we use the one from successful update, since + // we process new event here only if that is more recent then the event from our update. + // Note that this is equivalent with the scenario when an informer watch connection + // would + // reconnect and loose some events in between. + // If that update was not successful we still record the previous version from the + // actual + // event in the ExtendedResourceEvent. + R extendedResourcePrevVersion = + (r instanceof ExtendedResourceEvent) + ? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null) + : null; R prevVersionOfResource = - updatedForLambda != null - ? updatedForLambda - : (r.getAction() == ResourceAction.UPDATED ? latestResource : null); + updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion; handleEvent( r.getAction(), latestResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index f8254c1bf4..7e46dcf060 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -27,7 +27,7 @@ import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -94,17 +94,23 @@ public synchronized Optional doneEventFilterModify( } public void onDeleteEvent(T resource, boolean unknownState) { - onEvent(resource, unknownState, true); + onEvent(ResourceAction.DELETED, resource, null, unknownState, true); } /** * @return true if the resourceVersion was obsolete */ - public EventHandling onAddOrUpdateEvent(T resource) { - return onEvent(resource, false, false); + public EventHandling onAddOrUpdateEvent( + ResourceAction action, T resource, T prevResourceVersion) { + return onEvent(action, resource, prevResourceVersion, false, false); } - private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) { + private synchronized EventHandling onEvent( + ResourceAction action, + T resource, + T prevResourceVersion, + boolean unknownState, + boolean delete) { if (!comparableResourceVersions) { return EventHandling.NEW; } @@ -139,8 +145,7 @@ private synchronized EventHandling onEvent(T resource, boolean unknownState, boo ed.setLastEvent( delete ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState) - : new ResourceEvent( - ResourceAction.UPDATED, resourceId, resource)); // todo true action + : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); return EventHandling.DEFER; } else { return result; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index ac187d7eb9..bff9ef3dbd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -38,8 +38,8 @@ import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java index 25e93a813c..d480dd06f8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java @@ -20,7 +20,7 @@ import org.junit.jupiter.api.Test; import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import static org.assertj.core.api.Assertions.assertThat; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index baef2110df..ef3e56ce8b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -34,6 +34,7 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 0fc721cccb..84fa15ac51 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -99,7 +99,8 @@ void skipsEventPropagation() { when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.OBSOLETE); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.OBSOLETE); informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -109,7 +110,8 @@ void skipsEventPropagation() { @Test void processEventPropagationWithoutAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); @@ -117,7 +119,8 @@ void processEventPropagationWithoutAnnotation() { @Test void processEventPropagationWithIncorrectAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.NEW); informerEventSource.onAdd( new DeploymentBuilder(testDeployment()) .editMetadata() @@ -134,12 +137,14 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(cachedDeployment)); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(cachedDeployment, testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)).onAddOrUpdateEvent(testDeployment()); + verify(temporaryResourceCacheMock, times(1)) + .onAddOrUpdateEvent(any(), eq(testDeployment()), any()); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index 4c5d137fd3..0d58b45a29 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -28,6 +28,7 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static org.assertj.core.api.Assertions.assertThat; @@ -61,7 +62,9 @@ void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() { var testResource = testResource(); temporaryResourceCache.onAddOrUpdateEvent( - testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build()); + ResourceAction.ADDED, + testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build(), + null); temporaryResourceCache.putResource(testResource); @@ -101,11 +104,13 @@ void removesResourceFromCache() { ConfigMap testResource = propagateTestResourceToCache(); temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.ADDED, new ConfigMapBuilder(testResource) .editMetadata() .withResourceVersion("3") .endMetadata() - .build()); + .build(), + null); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isNotPresent(); @@ -130,7 +135,11 @@ void lockedEventBeforePut() throws Exception { ExecutorService ex = Executors.newSingleThreadExecutor(); try { - var result = ex.submit(() -> temporaryResourceCache.onAddOrUpdateEvent(testResource)); + var result = + ex.submit( + () -> + temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.ADDED, testResource, null)); temporaryResourceCache.putResource(testResource); assertThat(result.isDone()).isFalse(); @@ -146,7 +155,8 @@ void putBeforeEvent() { var testResource = testResource(); // first ensure an event is not known - var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); @@ -154,7 +164,7 @@ void putBeforeEvent() { temporaryResourceCache.putResource(nextResource); // the result is obsolete - result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @@ -163,7 +173,8 @@ void putBeforeEventWithEventFiltering() { var testResource = testResource(); // first ensure an event is not known - var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); @@ -175,7 +186,7 @@ void putBeforeEventWithEventFiltering() { temporaryResourceCache.doneEventFilterModify(resourceId, "3"); // the result is obsolete - result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @@ -184,7 +195,8 @@ void putAfterEventWithEventFilteringNoPost() { var testResource = testResource(); // first ensure an event is not known - var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); @@ -192,7 +204,9 @@ void putAfterEventWithEventFilteringNoPost() { var resourceId = ResourceID.fromResource(testResource); temporaryResourceCache.startEventFilteringModify(resourceId); - result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + result = + temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.UPDATED, nextResource, testResource); // the result is deferred assertThat(result).isEqualTo(EventHandling.DEFER); temporaryResourceCache.putResource(nextResource); @@ -213,7 +227,8 @@ void putAfterEventWithEventFilteringWithPost() { // completing with the 3 rv. var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("4"); - var result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, nextResource, null); assertThat(result).isEqualTo(EventHandling.DEFER); var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3"); @@ -225,7 +240,7 @@ void putAfterEventWithEventFilteringWithPost() { void rapidDeletion() { var testResource = testResource(); - temporaryResourceCache.onAddOrUpdateEvent(testResource); + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); temporaryResourceCache.onDeleteEvent( new ConfigMapBuilder(testResource) .editMetadata() From 43fcce366f88a2a1eb222e1b2f4e4d8322e88a49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 16 Jan 2026 11:50:10 +0100 Subject: [PATCH 2/4] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../processing/event/source/informer/InformerEventSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 2cb81dede4..98f230255b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -251,11 +251,11 @@ public boolean allowsNamespaceChanges() { return configuration().followControllerNamespaceChanges(); } - private boolean eventAcceptedByFilter(ResourceAction operation, R newObject, R oldObject) { + private boolean eventAcceptedByFilter(ResourceAction action, R newObject, R oldObject) { if (genericFilter != null && !genericFilter.accept(newObject)) { return false; } - if (operation == ResourceAction.ADDED) { + if (action == ResourceAction.ADDED) { return onAddFilter == null || onAddFilter.accept(newObject); } else { return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject); From 8e3ad5075f319e9b8feaf747c9ba76a88356cf2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 16 Jan 2026 16:47:20 +0100 Subject: [PATCH 3/4] tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 2 +- .../source/informer/InformerEventSource.java | 2 +- .../informer/ManagedInformerEventSource.java | 7 +- .../informer/InformerEventSourceTest.java | 247 ++++++++++++++++-- 4 files changed, 234 insertions(+), 24 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index c92d8a0c5b..b4784e1b6d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -84,7 +84,7 @@ public synchronized void start() { } @Override - public synchronized void handleEvent( + protected synchronized void handleEvent( ResourceAction action, T resource, T oldResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 98f230255b..24a95e7f67 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -139,7 +139,7 @@ public synchronized void onDelete(R resource, boolean b) { } @Override - public void handleEvent( + protected void handleEvent( ResourceAction action, R resource, R oldResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 4a33d23bfd..f198184468 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -130,14 +130,15 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< r.getAction(), latestResource, prevVersionOfResource, - !(r instanceof ResourceDeleteEvent) - || ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown(), + (r instanceof ResourceDeleteEvent) + ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown() + : null, false); }); } } - public abstract void handleEvent( + protected abstract void handleEvent( ResourceAction action, R resource, R oldResource, diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 84fa15ac51..57a69f6182 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -15,16 +15,22 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.time.Duration; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.BaseConfigurationService; @@ -35,17 +41,24 @@ import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,9 +69,11 @@ class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; private static final String DEFAULT_RESOURCE_VERSION = "1"; + ExecutorService executorService = Executors.newSingleThreadExecutor(); + private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); - private final TemporaryResourceCache temporaryResourceCacheMock = + private TemporaryResourceCache temporaryResourceCache = mock(TemporaryResourceCache.class); private final EventHandler eventHandlerMock = mock(EventHandler.class); private final InformerEventSourceConfiguration informerEventSourceConfiguration = @@ -74,11 +89,12 @@ void setup() { when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); informerEventSource = - new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { - // mocking start - @Override - public synchronized void start() {} - }; + spy( + new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { + // mocking start + @Override + public synchronized void start() {} + }); var mockControllerConfig = mock(ControllerConfiguration.class); when(mockControllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService()); @@ -91,15 +107,15 @@ public synchronized void start() {} when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any())) .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); informerEventSource.start(); - informerEventSource.setTemporalResourceCache(temporaryResourceCacheMock); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); } @Test void skipsEventPropagation() { - when(temporaryResourceCacheMock.getResourceFromCache(any())) + when(temporaryResourceCache.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.OBSOLETE); informerEventSource.onAdd(testDeployment()); @@ -110,7 +126,7 @@ void skipsEventPropagation() { @Test void processEventPropagationWithoutAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -119,7 +135,7 @@ void processEventPropagationWithoutAnnotation() { @Test void processEventPropagationWithIncorrectAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.NEW); informerEventSource.onAdd( new DeploymentBuilder(testDeployment()) @@ -135,22 +151,21 @@ void processEventPropagationWithIncorrectAnnotation() { void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { Deployment cachedDeployment = testDeployment(); cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - when(temporaryResourceCacheMock.getResourceFromCache(any())) + when(temporaryResourceCache.getResourceFromCache(any())) .thenReturn(Optional.of(cachedDeployment)); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(cachedDeployment, testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)) - .onAddOrUpdateEvent(any(), eq(testDeployment()), any()); + verify(temporaryResourceCache, times(1)).onAddOrUpdateEvent(any(), eq(testDeployment()), any()); } @Test void genericFilterForEvents() { informerEventSource.setGenericFilter(r -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -162,7 +177,7 @@ void genericFilterForEvents() { @Test void filtersOnAddEvents() { informerEventSource.setOnAddFilter(r -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onAdd(testDeployment()); @@ -172,7 +187,7 @@ void filtersOnAddEvents() { @Test void filtersOnUpdateEvents() { informerEventSource.setOnUpdateFilter((r1, r2) -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -182,13 +197,207 @@ void filtersOnUpdateEvents() { @Test void filtersOnDeleteEvents() { informerEventSource.setOnDeleteFilter((r, b) -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onDelete(testDeployment(), true); verify(eventHandlerMock, never()).handleEvent(any()); } + @Test + void handlesPrevResourceVersionForUpdate() { + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + var resp = testDeployment(); + incResourceVersion(resp, 1); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return resp; + })); + informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 2)); + latch.countDown(); + + await() + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("3"); + return true; + }), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("2"); + return true; + }), + isNull(), + eq(false)); + }); + } + + @Test + void handlesPrevResourceVersionForUpdateInCaseOfException() { + withRealTemporaryResourceCache(); + + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + try { + latch.await(); + throw new KubernetesClientException("fake"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); + latch.countDown(); + + await() + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("2"); + return true; + }), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("1"); + return true; + }), + isNull(), + eq(false)); + }); + } + + @Test + void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { + withRealTemporaryResourceCache(); + + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + var resp = testDeployment(); + incResourceVersion(resp, 1); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return resp; + })); + + informerEventSource.onUpdate( + incResourceVersion(testDeployment(), 1), incResourceVersion(testDeployment(), 2)); + informerEventSource.onUpdate( + incResourceVersion(testDeployment(), 2), incResourceVersion(testDeployment(), 3)); + latch.countDown(); + + await() + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("4"); + return true; + }), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("2"); + return true; + }), + isNull(), + eq(false)); + }); + } + + @Test + void doesNotPropagateEventIfReceivedBeforeUpdate() { + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + var resp = testDeployment(); + incResourceVersion(resp, 2); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return resp; + })); + informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); + latch.countDown(); + + await() + .pollDelay(Duration.ofMillis(100)) + .untilAsserted( + () -> { + verify(informerEventSource, never()) + .handleEvent(any(), any(), any(), any(), anyBoolean()); + }); + } + + private void withRealTemporaryResourceCache() { + temporaryResourceCache = new TemporaryResourceCache<>(true); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); + } + + R incResourceVersion(R resource, int increment) { + var v = resource.getMetadata().getResourceVersion(); + if (v == null) { + throw new IllegalArgumentException("Resource version is null"); + } + resource.getMetadata().setResourceVersion(versionPlus(v, increment)); + return resource; + } + + String versionPlus(String resourceVersion, int increment) { + return "" + (Integer.parseInt(resourceVersion) + increment); + } + @Test void informerStoppedHandlerShouldBeCalledWhenInformerStops() { final var exception = new RuntimeException("Informer stopped exceptionally!"); From 074d42028fdcef8fd721de4d9db399a3c89fb24f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 16 Jan 2026 20:17:15 +0100 Subject: [PATCH 4/4] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 5 +++-- .../informer/InformerEventSourceTest.java | 21 ++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index f198184468..fa04f6c03f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -108,7 +108,7 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< id, updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion()); var updatedForLambda = updatedResource; - res.ifPresent( + res.ifPresentOrElse( r -> { R latestResource = (R) r.getResource().orElseThrow(); @@ -134,7 +134,8 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown() : null, false); - }); + }, + () -> log.debug("No new event present after the filtering update; id: {}", id)); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 57a69f6182..8807088082 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -64,12 +65,13 @@ import static org.mockito.Mockito.when; @SuppressWarnings({"rawtypes", "unchecked"}) +@TestInstance(value = TestInstance.Lifecycle.PER_METHOD) class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; private static final String DEFAULT_RESOURCE_VERSION = "1"; - ExecutorService executorService = Executors.newSingleThreadExecutor(); + ExecutorService executorService = Executors.newCachedThreadPool(); private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); @@ -205,7 +207,7 @@ void filtersOnDeleteEvents() { } @Test - void handlesPrevResourceVersionForUpdate() { + void handlesPrevResourceVersionForUpdate() throws InterruptedException { withRealTemporaryResourceCache(); var deployment = testDeployment(); CountDownLatch latch = new CountDownLatch(1); @@ -224,7 +226,10 @@ void handlesPrevResourceVersionForUpdate() { } return resp; })); - informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 2)); + Thread.sleep(50); + informerEventSource.onUpdate( + incResourceVersion(deployment, 1), incResourceVersion(testDeployment(), 2)); + latch.countDown(); await() @@ -251,7 +256,7 @@ void handlesPrevResourceVersionForUpdate() { } @Test - void handlesPrevResourceVersionForUpdateInCaseOfException() { + void handlesPrevResourceVersionForUpdateInCaseOfException() throws InterruptedException { withRealTemporaryResourceCache(); withRealTemporaryResourceCache(); @@ -270,6 +275,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { throw new RuntimeException(e); } })); + Thread.sleep(50); informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); latch.countDown(); @@ -297,7 +303,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { } @Test - void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { + void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() throws InterruptedException { withRealTemporaryResourceCache(); withRealTemporaryResourceCache(); @@ -318,7 +324,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { } return resp; })); - + Thread.sleep(50); informerEventSource.onUpdate( incResourceVersion(testDeployment(), 1), incResourceVersion(testDeployment(), 2)); informerEventSource.onUpdate( @@ -349,7 +355,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { } @Test - void doesNotPropagateEventIfReceivedBeforeUpdate() { + void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { withRealTemporaryResourceCache(); var deployment = testDeployment(); CountDownLatch latch = new CountDownLatch(1); @@ -368,6 +374,7 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() { } return resp; })); + Thread.sleep(50); informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); latch.countDown();