From d2b75b860710299372f5f2df1f5c3dc6f19b7335 Mon Sep 17 00:00:00 2001 From: Kirill Kostin Date: Mon, 10 Nov 2025 16:57:19 +0100 Subject: [PATCH 1/2] (feat): Add a minimum polling interval for CHANGE_ON_EVENT subscription type --- .../api/messages/PlcSubscriptionRequest.java | 14 ++++++++++++- .../utils/S7PlcSubscriptionRequest.java | 21 +++++++------------ .../DefaultPlcSubscriptionRequest.java | 8 ++++++- .../java/utils/cache/LeasedPlcConnection.java | 5 +++++ 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java index 862286c2570..d4e1b8f8027 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java @@ -67,7 +67,7 @@ interface Builder extends PlcRequestBuilder { * Adds a new tag to the to be constructed request which should be polled cyclically. * * @param name alias of the tag. - * @param tag tag instance for accessing the tag. + * @param tag tag instance for accessing the tag. * @param pollingInterval interval, in which the tag should be polled. * @return builder. */ @@ -126,6 +126,18 @@ interface Builder extends PlcRequestBuilder { */ PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer); + /** + * Adds a new tag to the to be constructed request which should be updated as soon as + * a value changes in the PLC. + * + * @param name alias of the tag. + * @param tag tag instance for accessing the tag. + * @param consumer consumer for receiving update events for a given tag only. + * @param minInterval min interval for updates (Limits the number of events for high frequency changes). + * @return builder. + */ + PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer, Duration minInterval); + /** * Adds a new subscription to the to be constructed request which should be updated * as soon as an event occurs. diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java index f317cf2a490..0f458a4e09a 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java @@ -21,25 +21,18 @@ import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; -import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse; import org.apache.plc4x.java.api.model.PlcSubscriptionTag; import org.apache.plc4x.java.api.model.PlcTag; -import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.java.api.types.PlcSubscriptionType; import org.apache.plc4x.java.spi.connection.PlcTagHandler; -import org.apache.plc4x.java.spi.generation.SerializationException; -import org.apache.plc4x.java.spi.generation.WriteBuffer; import org.apache.plc4x.java.spi.messages.utils.DefaultPlcTagItem; import org.apache.plc4x.java.spi.messages.utils.PlcTagItem; import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag; -import org.apache.plc4x.java.spi.utils.Serializable; import java.time.Duration; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.plc4x.java.s7.readwrite.TimeBase; import org.apache.plc4x.java.s7.readwrite.tag.S7SubscriptionTag; @@ -247,18 +240,20 @@ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag ta return addChangeOfStateTag(name, tag, null); } - /* - * - */ @Override public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer) { + return addChangeOfStateTag(name, tag, consumer, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer, Duration minInterval) { if (tags.containsKey(name)) { throw new PlcRuntimeException(CONST_DUPLICATE_TAG + " '" + name + "'"); } - if (!(tag instanceof S7SubscriptionTag)){ + if (!(tag instanceof S7SubscriptionTag)) { throw new PlcRuntimeException(CONST_INVALID_TYPE); - } - tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CHANGE_OF_STATE, consumer)); + } + tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer)); return this; } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java index 1e1e8a98aa1..6fc6be2ee55 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java @@ -200,10 +200,16 @@ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag ta @Override public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer) { + return addChangeOfStateTag(name, tag, consumer, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer, + Duration minInterval) { if (tags.containsKey(name)) { throw new PlcRuntimeException("Duplicate tag definition '" + name + "'"); } - tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CHANGE_OF_STATE, consumer)); + tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer)); return this; } diff --git a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java index 3b8b46ce35e..9225d4807c7 100644 --- a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java +++ b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java @@ -404,6 +404,11 @@ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag ta return innerBuilder.addChangeOfStateTag(name, tag, consumer); } + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer, Duration minInterval) { + return innerBuilder.addChangeOfStateTag(name, tag, consumer, minInterval); + } + @Override public PlcSubscriptionRequest.Builder addEventTagAddress(String name, String tagAddress) { return innerBuilder.addEventTagAddress(name, tagAddress); From 338be871199464cab73fb0b481ace09d76ce4147 Mon Sep 17 00:00:00 2001 From: Kirill Kostin Date: Mon, 10 Nov 2025 17:36:35 +0100 Subject: [PATCH 2/2] (feat): added method with DURATION: addChangeOfStateTagAddress (Without consumer) addChangeOfStateTagAddress (With consumer) addChangeOfStateTag (Without consumer) --- .../api/messages/PlcSubscriptionRequest.java | 34 +++++++++++++++++++ .../utils/S7PlcSubscriptionRequest.java | 28 ++++++++++----- .../DefaultPlcSubscriptionRequest.java | 25 ++++++++++---- .../java/utils/cache/LeasedPlcConnection.java | 15 ++++++++ 4 files changed, 88 insertions(+), 14 deletions(-) diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java index d4e1b8f8027..f1a0fe23bd2 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java @@ -105,6 +105,29 @@ interface Builder extends PlcRequestBuilder { */ PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer); + /** + * Adds a new tag to the to be constructed request which should be updated as soon as + * a value changes in the PLC. + * + * @param name alias of the tag. + * @param tagAddress tag address string for accessing the tag. + * @param minInterval min interval for updates (Limits the number of events for high frequency changes). + * @return builder. + */ + PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Duration minInterval); + + /** + * Adds a new tag to the to be constructed request which should be updated as soon as + * a value changes in the PLC. + * + * @param name alias of the tag. + * @param tagAddress tag address string for accessing the tag. + * @param consumer consumer for receiving update events for a given tag only. + * @param minInterval min interval for updates (Limits the number of events for high frequency changes). + * @return builder. + */ + PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer, Duration minInterval); + /** * Adds a new tag to the to be constructed request which should be updated as soon as * a value changes in the PLC. @@ -126,6 +149,17 @@ interface Builder extends PlcRequestBuilder { */ PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer); + /** + * Adds a new tag to the to be constructed request which should be updated as soon as + * a value changes in the PLC. + * + * @param name alias of the tag. + * @param tag tag instance for accessing the tag. + * @param minInterval min interval for updates (Limits the number of events for high frequency changes). + * @return builder. + */ + PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Duration minInterval); + /** * Adds a new tag to the to be constructed request which should be updated as soon as * a value changes in the PLC. diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java index 0f458a4e09a..d0f58ea1b27 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java @@ -218,26 +218,38 @@ public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag tag, Dura @Override public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress) { - return addChangeOfStateTagAddress(tagAddress, name); + return addChangeOfStateTagAddress(name, tagAddress, null, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Duration minInterval) { + return addChangeOfStateTagAddress(name, tagAddress, null, minInterval); } - /* - * - */ @Override public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer) { + return addChangeOfStateTagAddress(name, tagAddress, consumer, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer, Duration minInterval) { if (tags.containsKey(name)) { throw new PlcRuntimeException(CONST_DUPLICATE_TAG + " '" + name + "'"); } - S7Tag[] s7tags = new S7Tag[]{S7Tag.of(tagAddress)}; - S7SubscriptionTag tag = new S7SubscriptionTag(S7SubscriptionType.CYCLIC_SUBSCRIPTION, s7tags, TimeBase.B01SEC, (short) 1); - tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CHANGE_OF_STATE, consumer)); + S7Tag[] s7tags = new S7Tag[]{S7Tag.of(tagAddress)}; + S7SubscriptionTag tag = new S7SubscriptionTag(S7SubscriptionType.CYCLIC_SUBSCRIPTION, s7tags, TimeBase.B01SEC, (short) 1); + tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer)); return this; } @Override public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag) { - return addChangeOfStateTag(name, tag, null); + return addChangeOfStateTag(name, tag, null, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Duration minInterval) { + return addChangeOfStateTag(name, tag, null, minInterval); } @Override diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java index 6fc6be2ee55..f8506fc64f1 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java @@ -179,23 +179,36 @@ public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag tag, Dura @Override public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress) { - addChangeOfStateTagAddress(name, tagAddress, null); - return this; + return addChangeOfStateTagAddress(name, tagAddress, null, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Duration minInterval) { + return addChangeOfStateTagAddress(name, tagAddress, null, minInterval); } @Override public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer) { + return addChangeOfStateTagAddress(name, tagAddress, consumer, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer, Duration minInterval) { if (tags.containsKey(name)) { throw new PlcRuntimeException("Duplicate tag definition '" + name + "'"); } - tags.put(name, new BuilderItem(() -> tagHandler.parseTag(tagAddress), PlcSubscriptionType.CHANGE_OF_STATE, consumer)); - return null; + tags.put(name, new BuilderItem(() -> tagHandler.parseTag(tagAddress), PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer)); + return this; } @Override public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag) { - addChangeOfStateTag(name, tag, null); - return this; + return addChangeOfStateTag(name, tag, null, null); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Duration minInterval) { + return addChangeOfStateTag(name, tag, null, minInterval); } @Override diff --git a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java index 9225d4807c7..2f17ee53715 100644 --- a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java +++ b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java @@ -394,6 +394,16 @@ public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, St return innerBuilder.addChangeOfStateTagAddress(name, tagAddress, consumer); } + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Duration minInterval) { + return innerBuilder.addChangeOfStateTagAddress(name, tagAddress, minInterval); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer consumer, Duration minInterval) { + return innerBuilder.addChangeOfStateTagAddress(name, tagAddress, consumer, minInterval); + } + @Override public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag) { return innerBuilder.addChangeOfStateTag(name, tag); @@ -404,6 +414,11 @@ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag ta return innerBuilder.addChangeOfStateTag(name, tag, consumer); } + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Duration minInterval) { + return innerBuilder.addChangeOfStateTag(name, tag, minInterval); + } + @Override public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag, Consumer consumer, Duration minInterval) { return innerBuilder.addChangeOfStateTag(name, tag, consumer, minInterval);