|
2 | 2 |
|
3 | 3 | import com.amazonaws.services.lambda.runtime.Context; |
4 | 4 | import com.amazonaws.services.lambda.runtime.RequestStreamHandler; |
| 5 | +import com.google.common.base.Preconditions; |
5 | 6 | import com.google.common.base.Strings; |
6 | 7 | import com.google.common.io.ByteStreams; |
7 | 8 | import in.erail.glue.Glue; |
|
10 | 11 | import io.reactivex.schedulers.Schedulers; |
11 | 12 | import io.vertx.core.buffer.Buffer; |
12 | 13 | import io.vertx.core.json.JsonObject; |
13 | | -import io.vertx.reactivex.core.eventbus.Message; |
14 | 14 | import java.io.IOException; |
15 | 15 | import java.io.InputStream; |
16 | 16 | import java.io.OutputStream; |
17 | 17 | import java.io.OutputStreamWriter; |
18 | | -import java.util.concurrent.CompletableFuture; |
19 | | -import java.util.concurrent.ExecutionException; |
20 | | -import java.util.logging.Level; |
21 | | -import java.util.logging.Logger; |
22 | | -import static in.erail.common.FrameworkConstants.RoutingContext.Json.*; |
| 18 | +import in.erail.model.RequestEvent; |
| 19 | +import java.util.ArrayList; |
| 20 | +import java.util.HashSet; |
| 21 | +import java.util.List; |
| 22 | +import java.util.Optional; |
| 23 | +import java.util.Set; |
| 24 | +import org.apache.logging.log4j.LogManager; |
| 25 | +import org.apache.logging.log4j.Logger; |
23 | 26 |
|
24 | 27 | /** |
25 | 28 | * |
26 | 29 | * @author vinay |
27 | 30 | */ |
28 | 31 | public class AWSLambda implements RequestStreamHandler { |
29 | 32 |
|
| 33 | + protected Logger log = LogManager.getLogger(AWSLambda.class.getCanonicalName()); |
30 | 34 | private static final String SERVICE_ENV = "SERVICE"; |
31 | 35 | private static final String SERVICE_SYS_PROP = "service"; |
32 | 36 | private final RESTService mService; |
| 37 | + private final List<String> allowedFields = new ArrayList<>(); |
33 | 38 |
|
34 | 39 | public AWSLambda() { |
| 40 | + |
| 41 | + allowedFields.add("isBase64Encoded"); |
| 42 | + allowedFields.add("statusCode"); |
| 43 | + allowedFields.add("headers"); |
| 44 | + allowedFields.add("multiValueHeaders"); |
| 45 | + allowedFields.add("body"); |
| 46 | + |
35 | 47 | String component = System.getenv(SERVICE_ENV); |
36 | 48 | if (Strings.isNullOrEmpty(component)) { |
37 | 49 | component = System.getProperty(SERVICE_SYS_PROP); |
38 | 50 | } |
| 51 | + |
39 | 52 | mService = Glue.instance().resolve(component); |
40 | 53 | } |
41 | 54 |
|
42 | 55 | @Override |
43 | 56 | public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { |
44 | | - |
45 | | - JsonObject requestJson = new JsonObject(Buffer.buffer(ByteStreams.toByteArray(inputStream))); |
46 | | - JsonObject responseJson = handleMessage(requestJson); |
47 | | - |
48 | 57 | try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, "UTF-8")) { |
49 | | - writer.write(responseJson.toString()); |
| 58 | + JsonObject requestJson = new JsonObject(Buffer.buffer(ByteStreams.toByteArray(inputStream))); |
| 59 | + log.debug(() -> requestJson.toString()); |
| 60 | + String resp = handleMessage(requestJson).blockingGet(); |
| 61 | + log.debug(() -> resp); |
| 62 | + writer.write(resp); |
50 | 63 | } |
51 | 64 | } |
52 | 65 |
|
53 | | - public JsonObject handleMessage(JsonObject pJsonObject) { |
54 | | - |
55 | | - JsonObject msg; |
| 66 | + public Single<String> handleMessage(JsonObject pRequest) { |
| 67 | + return Single |
| 68 | + .just(pRequest) |
| 69 | + .subscribeOn(Schedulers.computation()) |
| 70 | + .map(this::convertBodyToBase64) |
| 71 | + .map(reqJson -> reqJson.mapTo(RequestEvent.class)) |
| 72 | + .map(req -> getService().process(req)) |
| 73 | + .map(resp -> JsonObject.mapFrom(resp)) |
| 74 | + .map(this::sanatizeResponse) |
| 75 | + .map(respJson -> respJson.toString()); |
| 76 | + } |
56 | 77 |
|
57 | | - try { |
58 | | - CompletableFuture<JsonObject> result = new CompletableFuture<>(); |
59 | | - MessageAWSLambda<JsonObject> message = new MessageAWSLambda<>(result, pJsonObject); |
| 78 | + protected JsonObject sanatizeResponse(JsonObject pResp) { |
| 79 | + Preconditions.checkNotNull(pResp); |
60 | 80 |
|
61 | | - Single |
62 | | - .just(message) |
63 | | - .map(m -> new Message<JsonObject>(message)) |
64 | | - .subscribeOn(Schedulers.computation()) |
65 | | - .subscribe(m -> getService().process(m)); |
| 81 | + Set<String> keys = new HashSet(pResp.fieldNames()); |
66 | 82 |
|
67 | | - msg = result.get(); |
68 | | - } catch (InterruptedException | ExecutionException ex) { |
69 | | - msg = new JsonObject(); |
70 | | - Logger.getLogger(AWSLambda.class.getName()).log(Level.SEVERE, null, ex); |
71 | | - } |
| 83 | + keys |
| 84 | + .stream() |
| 85 | + .filter(key -> !allowedFields.contains(key)) |
| 86 | + .forEach(key -> pResp.remove(key)); |
72 | 87 |
|
73 | | - return transform(msg); |
| 88 | + return pResp; |
74 | 89 | } |
75 | 90 |
|
76 | | - public JsonObject transform(JsonObject pMsg) { |
77 | | - |
78 | | - pMsg.put(IS_BASE64_ENCODED, Boolean.TRUE); |
| 91 | + protected JsonObject convertBodyToBase64(JsonObject pRequest) { |
79 | 92 |
|
80 | | - if (!pMsg.containsKey(STATUS_CODE)) { |
81 | | - pMsg.put(STATUS_CODE, "200"); |
82 | | - } |
83 | | - |
84 | | - if (!pMsg.containsKey(HEADERS)) { |
85 | | - pMsg.put(HEADERS, new JsonObject()); |
86 | | - } |
| 93 | + Boolean isBase64Encoded |
| 94 | + = Optional |
| 95 | + .ofNullable(pRequest.getBoolean("isBase64Encoded")) |
| 96 | + .orElse(Boolean.FALSE); |
87 | 97 |
|
88 | | - if (pMsg.containsKey(BODY)) { |
89 | | - pMsg.put(BODY, pMsg.getString(BODY)); |
90 | | - } else { |
91 | | - pMsg.put(BODY, ""); |
| 98 | + if (isBase64Encoded == false && pRequest.containsKey("body")) { |
| 99 | + Optional<String> body = Optional.ofNullable(pRequest.getString("body")); |
| 100 | + body.ifPresent((t) -> { |
| 101 | + pRequest.remove("body"); |
| 102 | + pRequest.put("body", body.get().getBytes()); |
| 103 | + }); |
92 | 104 | } |
93 | 105 |
|
94 | | - return pMsg; |
| 106 | + return pRequest; |
95 | 107 | } |
96 | 108 |
|
97 | 109 | public RESTService getService() { |
|
0 commit comments