Skip to content

Commit 8b4a34d

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Introduce TailRetentionEventCompactor to compact and retain the tail of the event stream
Provide a way to manage the size of an event stream Specifically, it: * Keeps the retentionSize most recent events raw. * Compacts all events that never compacted and older than the retained tail, including the most recent compaction events, into a new summary event. * Appends this new summary event to the end of the event stream. PiperOrigin-RevId: 859936701
1 parent 2e59550 commit 8b4a34d

File tree

2 files changed

+435
-0
lines changed

2 files changed

+435
-0
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.summarizer;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
21+
import com.google.adk.events.Event;
22+
import com.google.adk.events.EventCompaction;
23+
import com.google.adk.sessions.BaseSessionService;
24+
import com.google.adk.sessions.Session;
25+
import com.google.genai.types.Content;
26+
import io.reactivex.rxjava3.core.Completable;
27+
import io.reactivex.rxjava3.core.Maybe;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.ListIterator;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
/**
36+
* This class performs event compaction by retaining the tail of the event stream.
37+
*
38+
* <ul>
39+
* <li>Keeps the {@code retentionSize} most recent events raw.
40+
* <li>Compacts all events that never compacted and older than the retained tail, including the
41+
* most recent compaction event, into a new summary event in a sliding window fashion.
42+
* <li>The new summary event is generated by the {@link BaseEventSummarizer}.
43+
* <li>Appends this new summary event to the end of the event stream.
44+
* </ul>
45+
*/
46+
public final class TailRetentionEventCompactor implements EventCompactor {
47+
48+
private static final Logger logger = LoggerFactory.getLogger(TailRetentionEventCompactor.class);
49+
50+
private final BaseEventSummarizer summarizer;
51+
private final int retentionSize;
52+
53+
public TailRetentionEventCompactor(BaseEventSummarizer summarizer, int retentionSize) {
54+
this.summarizer = summarizer;
55+
this.retentionSize = retentionSize;
56+
}
57+
58+
@Override
59+
public Completable compact(Session session, BaseSessionService sessionService) {
60+
checkArgument(summarizer != null, "Missing BaseEventSummarizer for event compaction");
61+
logger.debug("Running tail retention event compaction for session {}", session.id());
62+
63+
return Completable.fromMaybe(
64+
getCompactionEvents(session.events())
65+
.flatMap(summarizer::summarizeEvents)
66+
.flatMapSingle(e -> sessionService.appendEvent(session, e)));
67+
}
68+
69+
/**
70+
* Identifies events to be compacted based on the tail retention strategy.
71+
*
72+
* <p>This method iterates backwards through the event list to find the most recent compaction
73+
* event (if any) and collects all uncompacted events that occurred after the range covered by
74+
* that compaction. It then applies the retention policy, excluding the most recent {@code
75+
* retentionSize} events from the summary.
76+
*
77+
* <p><b>Example Scenario:</b>
78+
*
79+
* <p>Consider a case where retention size is 3. An event (E4) appears before an older compaction
80+
* event (C1) in the list due to previous retention, but is not covered by C1. Later, a newer
81+
* compaction (C2) occurs covering E2 and E3.
82+
*
83+
* <ul>
84+
* <li>T=1: E1
85+
* <li>T=2: E2
86+
* <li>T=3: E3
87+
* <li>T=4: E4
88+
* <li>T=5: C1 (Covers T=1). <i>List: E2, E3, E4 </i> are preserved.
89+
* <li>T=6: E6
90+
* <li>T=7: E7
91+
* <li>T=8: C2 (Covers T=2 to T=3). <i>List: E4, E6, E7</i> are preserved. The compaction events
92+
* in this round is <i>List: C1, E2, E3</i>
93+
* <li>T=9: E9.
94+
* </ul>
95+
*
96+
* <p><b>Execution with Retention = 3:</b>
97+
*
98+
* <ol>
99+
* <li>The method scans backward: E7, C2, E6, E5, C1, E4...
100+
* <li><b>C2</b> is identified as the most recent compaction event (end timestamp T=3).
101+
* <li><b>E7, E6, E5</b> are collected as they are newer than T=3.
102+
* <li><b>C1</b> is ignored as we only care about the boundary set by the latest compaction.
103+
* <li><b>E4</b> (T=4) is collected because it is newer than T=3.
104+
* <li>Scanning stops at E3 (or earlier) as it is covered by C2 (timestamp <= T=3).
105+
* <li>The initial list of events to summarize (reversed back to chronological order): <b>[C2,
106+
* E4, E5, E6, E7]</b>.
107+
* <li>Applying retention (keep last 3): <b>E5, E6, E7</b> are removed from the summary list.
108+
* <li><b>Final Output:</b> {@code [C2, E4]}. E4 and the previous summary C2 will be compacted
109+
* together.
110+
* </ol>
111+
*/
112+
private Maybe<List<Event>> getCompactionEvents(List<Event> events) {
113+
// If there are not enough events to summarize, we can return early.
114+
if (events.size() <= retentionSize) {
115+
return Maybe.empty();
116+
}
117+
118+
long compactionEndTimestamp = Long.MIN_VALUE;
119+
Event lastCompactionEvent = null;
120+
List<Event> eventsToSummarize = new ArrayList<>();
121+
122+
// Iterate backwards from the end of the window to summarize.
123+
ListIterator<Event> iter = events.listIterator(events.size());
124+
while (iter.hasPrevious()) {
125+
Event event = iter.previous();
126+
127+
if (!isCompactEvent(event)) {
128+
// Only include events that are strictly after the last compaction range.
129+
if (event.timestamp() > compactionEndTimestamp) {
130+
eventsToSummarize.add(event);
131+
continue;
132+
} else {
133+
// Exit early if we have reached the last event of last compaction range.
134+
break;
135+
}
136+
}
137+
EventCompaction compaction = event.actions().compaction().orElse(null);
138+
139+
// We only rely on the most recent compaction event to set the boundary. Older compaction
140+
// events are ignored.
141+
if (lastCompactionEvent == null) {
142+
compactionEndTimestamp = compaction.endTimestamp();
143+
lastCompactionEvent = event;
144+
}
145+
}
146+
147+
if (eventsToSummarize.size() <= retentionSize) {
148+
return Maybe.empty();
149+
}
150+
151+
// Add the last compaction event to the list of events to summarize.
152+
// This is to ensure that the last compaction event is included in the summary.
153+
if (lastCompactionEvent != null) {
154+
// Use the compacted content for the compaction event.
155+
Content content = lastCompactionEvent.actions().compaction().get().compactedContent();
156+
eventsToSummarize.add(lastCompactionEvent.toBuilder().content(content).build());
157+
}
158+
159+
Collections.reverse(eventsToSummarize);
160+
161+
// Apply retention: keep the most recent 'retentionSize' events out of the summary.
162+
// We do this by removing them from the list of events to be summarized.
163+
eventsToSummarize
164+
.subList(eventsToSummarize.size() - retentionSize, eventsToSummarize.size())
165+
.clear();
166+
return Maybe.just(eventsToSummarize);
167+
}
168+
169+
private static boolean isCompactEvent(Event event) {
170+
return event.actions() != null && event.actions().compaction().isPresent();
171+
}
172+
}

0 commit comments

Comments
 (0)