Skip to content

Commit 1e77de6

Browse files
committed
Lambda working
1 parent 6b7fc67 commit 1e77de6

File tree

4 files changed

+192
-186
lines changed

4 files changed

+192
-186
lines changed

src/main/java/in/erail/amazon/lambda/AWSLambda.java

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.amazonaws.services.lambda.runtime.Context;
44
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
5+
import com.google.common.base.Preconditions;
56
import com.google.common.base.Strings;
67
import com.google.common.io.ByteStreams;
78
import in.erail.glue.Glue;
@@ -10,88 +11,99 @@
1011
import io.reactivex.schedulers.Schedulers;
1112
import io.vertx.core.buffer.Buffer;
1213
import io.vertx.core.json.JsonObject;
13-
import io.vertx.reactivex.core.eventbus.Message;
1414
import java.io.IOException;
1515
import java.io.InputStream;
1616
import java.io.OutputStream;
1717
import java.io.OutputStreamWriter;
18-
import java.util.concurrent.CompletableFuture;
19-
import java.util.concurrent.ExecutionException;
20-
import java.util.logging.Level;
21-
import java.util.logging.Logger;
22-
import static in.erail.common.FrameworkConstants.RoutingContext.Json.*;
18+
import in.erail.model.RequestEvent;
19+
import java.util.ArrayList;
20+
import java.util.HashSet;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.Set;
24+
import org.apache.logging.log4j.LogManager;
25+
import org.apache.logging.log4j.Logger;
2326

2427
/**
2528
*
2629
* @author vinay
2730
*/
2831
public class AWSLambda implements RequestStreamHandler {
2932

33+
protected Logger log = LogManager.getLogger(AWSLambda.class.getCanonicalName());
3034
private static final String SERVICE_ENV = "SERVICE";
3135
private static final String SERVICE_SYS_PROP = "service";
3236
private final RESTService mService;
37+
private final List<String> allowedFields = new ArrayList<>();
3338

3439
public AWSLambda() {
40+
41+
allowedFields.add("isBase64Encoded");
42+
allowedFields.add("statusCode");
43+
allowedFields.add("headers");
44+
allowedFields.add("multiValueHeaders");
45+
allowedFields.add("body");
46+
3547
String component = System.getenv(SERVICE_ENV);
3648
if (Strings.isNullOrEmpty(component)) {
3749
component = System.getProperty(SERVICE_SYS_PROP);
3850
}
51+
3952
mService = Glue.instance().resolve(component);
4053
}
4154

4255
@Override
4356
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
44-
45-
JsonObject requestJson = new JsonObject(Buffer.buffer(ByteStreams.toByteArray(inputStream)));
46-
JsonObject responseJson = handleMessage(requestJson);
47-
4857
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, "UTF-8")) {
49-
writer.write(responseJson.toString());
58+
JsonObject requestJson = new JsonObject(Buffer.buffer(ByteStreams.toByteArray(inputStream)));
59+
log.debug(() -> requestJson.toString());
60+
String resp = handleMessage(requestJson).blockingGet();
61+
log.debug(() -> resp);
62+
writer.write(resp);
5063
}
5164
}
5265

53-
public JsonObject handleMessage(JsonObject pJsonObject) {
54-
55-
JsonObject msg;
66+
public Single<String> handleMessage(JsonObject pRequest) {
67+
return Single
68+
.just(pRequest)
69+
.subscribeOn(Schedulers.computation())
70+
.map(this::convertBodyToBase64)
71+
.map(reqJson -> reqJson.mapTo(RequestEvent.class))
72+
.map(req -> getService().process(req))
73+
.map(resp -> JsonObject.mapFrom(resp))
74+
.map(this::sanatizeResponse)
75+
.map(respJson -> respJson.toString());
76+
}
5677

57-
try {
58-
CompletableFuture<JsonObject> result = new CompletableFuture<>();
59-
MessageAWSLambda<JsonObject> message = new MessageAWSLambda<>(result, pJsonObject);
78+
protected JsonObject sanatizeResponse(JsonObject pResp) {
79+
Preconditions.checkNotNull(pResp);
6080

61-
Single
62-
.just(message)
63-
.map(m -> new Message<JsonObject>(message))
64-
.subscribeOn(Schedulers.computation())
65-
.subscribe(m -> getService().process(m));
81+
Set<String> keys = new HashSet(pResp.fieldNames());
6682

67-
msg = result.get();
68-
} catch (InterruptedException | ExecutionException ex) {
69-
msg = new JsonObject();
70-
Logger.getLogger(AWSLambda.class.getName()).log(Level.SEVERE, null, ex);
71-
}
83+
keys
84+
.stream()
85+
.filter(key -> !allowedFields.contains(key))
86+
.forEach(key -> pResp.remove(key));
7287

73-
return transform(msg);
88+
return pResp;
7489
}
7590

76-
public JsonObject transform(JsonObject pMsg) {
77-
78-
pMsg.put(IS_BASE64_ENCODED, Boolean.TRUE);
91+
protected JsonObject convertBodyToBase64(JsonObject pRequest) {
7992

80-
if (!pMsg.containsKey(STATUS_CODE)) {
81-
pMsg.put(STATUS_CODE, "200");
82-
}
83-
84-
if (!pMsg.containsKey(HEADERS)) {
85-
pMsg.put(HEADERS, new JsonObject());
86-
}
93+
Boolean isBase64Encoded
94+
= Optional
95+
.ofNullable(pRequest.getBoolean("isBase64Encoded"))
96+
.orElse(Boolean.FALSE);
8797

88-
if (pMsg.containsKey(BODY)) {
89-
pMsg.put(BODY, pMsg.getString(BODY));
90-
} else {
91-
pMsg.put(BODY, "");
98+
if (isBase64Encoded == false && pRequest.containsKey("body")) {
99+
Optional<String> body = Optional.ofNullable(pRequest.getString("body"));
100+
body.ifPresent((t) -> {
101+
pRequest.remove("body");
102+
pRequest.put("body", body.get().getBytes());
103+
});
92104
}
93105

94-
return pMsg;
106+
return pRequest;
95107
}
96108

97109
public RESTService getService() {

src/main/java/in/erail/amazon/lambda/MessageAWSLambda.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

0 commit comments

Comments
 (0)