Skip to content

Commit 18f6179

Browse files
dchakrav-githubdiwakar
andauthored
Allow for automated call graph generation to keep contexts unique (#249)
* Allow for automated call graph generation to keep contexts unique within StdCallbackContext for replay deduping. Now developers do not need to provide a name when calling services. There is a unique call graph context maintained for each service call made. The context is request aware, so different requests made will have their own independent context for dedupe Dedupe identical requests ```java ProgressEvent<Model, StdCallbackContext> result = initiator.translateToServiceRequest(m -> createRepository) .makeServiceCall((r, c) -> c.injectCredentialsAndInvokeV2( r, c.client()::createRepository)) .success(); ProgressEvent<Model, StdCallbackContext> result_2 = // make same request call initiator.translateToServiceRequest(m -> createRepository) .makeServiceCall((r, c) -> c.injectCredentialsAndInvokeV2( r, c.client()::createRepository)) .success(); assertThat(result).isEqualsTo(result_2); ``` * Prevent ConcurrentModificationExceptions in stabilize calls if they access the map and attempt to modify it Co-authored-by: diwakar <diwakar@amazon.com>
1 parent 52114e3 commit 18f6179

File tree

6 files changed

+257
-18
lines changed

6 files changed

+257
-18
lines changed

src/main/java/software/amazon/cloudformation/proxy/AmazonWebServicesClientProxy.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.awscore.AwsResponse;
4141
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
4242
import software.amazon.awssdk.awscore.exception.AwsServiceException;
43+
import software.amazon.awssdk.core.SdkClient;
4344
import software.amazon.awssdk.core.exception.NonRetryableException;
4445
import software.amazon.awssdk.core.exception.RetryableException;
4546
import software.amazon.awssdk.core.pagination.sync.SdkIterable;
@@ -169,6 +170,12 @@ public RequestMaker<ClientT, ModelT, CallbackT> initiate(String callGraph) {
169170
return new CallContext<>(callGraph, client, model, callback);
170171
}
171172

173+
@Override
174+
public <
175+
RequestT> Caller<RequestT, ClientT, ModelT, CallbackT> translateToServiceRequest(Function<ModelT, RequestT> maker) {
176+
return initiate("").translateToServiceRequest(maker);
177+
}
178+
172179
@Override
173180
public ModelT getResourceModel() {
174181
return model;
@@ -192,6 +199,11 @@ public <NewModelT> Initiator<ClientT, NewModelT, CallbackT> rebindModel(NewModel
192199
Preconditions.checkNotNull(callback, "cxt can not be null");
193200
return new StdInitiator<>(client, model, callback);
194201
}
202+
203+
@Override
204+
public Logger getLogger() {
205+
return AmazonWebServicesClientProxy.this.loggerProxy;
206+
}
195207
}
196208

197209
@Override
@@ -234,6 +246,22 @@ class CallContext<ClientT, ModelT, CallbackT extends StdCallbackContext>
234246
RequestT> Caller<RequestT, ClientT, ModelT, CallbackT> translateToServiceRequest(Function<ModelT, RequestT> maker) {
235247
return new Caller<RequestT, ClientT, ModelT, CallbackT>() {
236248

249+
private final CallGraphNameGenerator<ModelT, RequestT, ClientT,
250+
CallbackT> generator = (incoming, model_, reqMaker, client_, context_) -> {
251+
final RequestT request = reqMaker.apply(model_);
252+
String objectHash = String.valueOf(Objects.hashCode(request));
253+
String serviceName = (client_ == null
254+
? ""
255+
: (client_ instanceof SdkClient)
256+
? ((SdkClient) client_).serviceName()
257+
: client_.getClass().getSimpleName());
258+
String requestName = request != null ? request.getClass().getSimpleName().replace("Request", "") : "";
259+
String callGraph = serviceName + ":" + requestName + "-" + (incoming != null ? incoming : "") + "-"
260+
+ objectHash;
261+
context_.request(callGraph, (ignored -> request)).apply(model_);
262+
return callGraph;
263+
};
264+
237265
@Override
238266
public Caller<RequestT, ClientT, ModelT, CallbackT> backoffDelay(Delay delay) {
239267
CallContext.this.delay = delay;
@@ -315,6 +343,8 @@ public ProgressEvent<ModelT, CallbackT> done(Callback<RequestT, ResponseT, Clien
315343
// stabilization
316344
// lambdas. This ensures that we call demux as necessary.
317345
//
346+
final String callGraph = generator.callGraph(CallContext.this.callGraph, model, maker,
347+
client.client(), context);
318348
Delay delay = override.getDelay(callGraph, CallContext.this.delay);
319349
Function<ModelT, RequestT> reqMaker = context.request(callGraph, maker);
320350
BiFunction<RequestT, ProxyClient<ClientT>, ResponseT> resMaker = context.response(callGraph, caller);
@@ -377,6 +407,7 @@ public ProgressEvent<ModelT, CallbackT> done(Callback<RequestT, ResponseT, Clien
377407
long remainingTime = getRemainingTimeInMillis();
378408
long localWait = next.toMillis() + 2 * elapsed + 100;
379409
if (remainingTime > localWait) {
410+
loggerProxy.log("Waiting for " + next.getSeconds() + " for call " + callGraph);
380411
Uninterruptibles.sleepUninterruptibly(next.getSeconds(), TimeUnit.SECONDS);
381412
continue;
382413
}

src/main/java/software/amazon/cloudformation/proxy/CallChain.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public interface CallChain {
5353
* @param <ModelT> the model object being worked on
5454
* @param <CallbackT> the callback context
5555
*/
56-
interface Initiator<ClientT, ModelT, CallbackT extends StdCallbackContext> {
56+
interface Initiator<ClientT, ModelT, CallbackT extends StdCallbackContext> extends RequestMaker<ClientT, ModelT, CallbackT> {
5757
/**
5858
* Each service call must be first initiated. Every call is provided a separate
5959
* name called call graph. This is essential from both a tracing perspective as
@@ -74,6 +74,11 @@ interface Initiator<ClientT, ModelT, CallbackT extends StdCallbackContext> {
7474
*/
7575
CallbackT getCallbackContext();
7676

77+
/**
78+
* @return logger associated to log messages
79+
*/
80+
Logger getLogger();
81+
7782
/**
7883
* Can rebind a new model to the call chain while retaining the client and
7984
* callback context
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
package software.amazon.cloudformation.proxy;
16+
17+
import java.util.function.Function;
18+
19+
@FunctionalInterface
20+
public interface CallGraphNameGenerator<ModelT, RequestT, ClientT, CallbackT extends StdCallbackContext> {
21+
String callGraph(String incoming, ModelT model, Function<ModelT, RequestT> reqMaker, ClientT client, CallbackT context);
22+
}

src/main/java/software/amazon/cloudformation/proxy/StdCallbackContext.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,13 @@
3232
import java.util.Collection;
3333
import java.util.Collections;
3434
import java.util.LinkedHashMap;
35+
import java.util.List;
3536
import java.util.Map;
37+
import java.util.Objects;
3638
import java.util.function.BiFunction;
3739
import java.util.function.Function;
40+
import java.util.function.Predicate;
41+
import java.util.stream.Collectors;
3842
import javax.annotation.concurrent.ThreadSafe;
3943

4044
/**
@@ -272,13 +276,67 @@ public <ResponseT> ResponseT response(String callGraph) {
272276
return (ResponseT) callGraphs.get(callGraph + ".response");
273277
}
274278

279+
@SuppressWarnings("unchecked")
280+
public <RequestT> RequestT findFirstRequestByContains(String contains) {
281+
return (RequestT) findFirst((key) -> key.contains(contains) && key.endsWith(".request"));
282+
}
283+
284+
@SuppressWarnings("unchecked")
285+
public <RequestT> List<RequestT> findAllRequestByContains(String contains) {
286+
return (List<RequestT>) findAll((key) -> key.contains(contains) && key.endsWith(".request"));
287+
}
288+
289+
@SuppressWarnings("unchecked")
290+
public <ResponseT> ResponseT findFirstResponseByContains(String contains) {
291+
return (ResponseT) findFirst((key) -> key.contains(contains) && key.endsWith(".response"));
292+
}
293+
294+
@SuppressWarnings("unchecked")
295+
public <ResponseT> List<ResponseT> findAllResponseByContains(String contains) {
296+
return (List<ResponseT>) findAll((key) -> key.contains(contains) && key.endsWith(".response"));
297+
}
298+
299+
Object findFirst(Predicate<String> contains) {
300+
Objects.requireNonNull(contains);
301+
return callGraphs.entrySet().stream().filter(e -> contains.test(e.getKey())).findFirst().map(Map.Entry::getValue)
302+
.orElse(null);
303+
304+
}
305+
306+
List<Object> findAll(Predicate<String> contains) {
307+
Objects.requireNonNull(contains);
308+
return callGraphs.entrySet().stream().filter(e -> contains.test(e.getKey())).map(Map.Entry::getValue)
309+
.collect(Collectors.toList());
310+
}
311+
275312
<RequestT, ResponseT, ClientT, ModelT, CallbackT extends StdCallbackContext>
276313
CallChain.Callback<RequestT, ResponseT, ClientT, ModelT, CallbackT, Boolean>
277314
stabilize(String callGraph, CallChain.Callback<RequestT, ResponseT, ClientT, ModelT, CallbackT, Boolean> callback) {
278315
return (request1, response1, client, model, context) -> {
279-
Boolean result = (Boolean) callGraphs.computeIfAbsent(callGraph + ".stabilize",
280-
(ign) -> callback.invoke(request1, response1, client, model, context) ? Boolean.TRUE : null);
281-
return result != null ? Boolean.TRUE : Boolean.FALSE;
316+
final String key = callGraph + ".stabilize";
317+
Boolean result = (Boolean) callGraphs.getOrDefault(key, Boolean.FALSE);
318+
if (!result) {
319+
//
320+
// The StdCallbackContext can be shared. However the call to stabilize for a
321+
// given content
322+
// is usually confined to one thread. If for some reason we spread that across
323+
// threads, the
324+
// worst that can happen is a double compute for stabilize. This isn't the
325+
// intended pattern.
326+
// Why are we changing it from computeIfAbsent pattern? For the callback we send
327+
// in the
328+
// StdCallbackContext which can be used to add things into context. That will
329+
// lead to
330+
// ConcurrentModificationExceptions when the compute running added things into
331+
// context when
332+
// needed
333+
//
334+
result = callback.invoke(request1, response1, client, model, context);
335+
if (result) {
336+
callGraphs.put(key, Boolean.TRUE);
337+
}
338+
}
339+
return result;
282340
};
283341
}
284342

0 commit comments

Comments
 (0)