diff --git a/src/main/java/ibm/gse/orderms/domain/service/ShippingOrderService.java b/src/main/java/ibm/gse/orderms/domain/service/ShippingOrderService.java index 115f86d..a7b151f 100644 --- a/src/main/java/ibm/gse/orderms/domain/service/ShippingOrderService.java +++ b/src/main/java/ibm/gse/orderms/domain/service/ShippingOrderService.java @@ -1,5 +1,6 @@ package ibm.gse.orderms.domain.service; +import com.fasterxml.jackson.databind.ObjectMapper; import ibm.gse.orderms.domain.model.order.ShippingOrder; import ibm.gse.orderms.infra.api.dto.ShippingOrderReference; import ibm.gse.orderms.infra.repository.OrderCreationException; @@ -10,23 +11,34 @@ import org.slf4j.LoggerFactory; import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import io.smallrye.reactive.messaging.kafka.Record; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; + @ApplicationScoped public class ShippingOrderService { + + //BK + @Inject + @Channel("orders-status-out") + public Emitter> emitter; + static final Logger logger = LoggerFactory.getLogger(ShippingOrderService.class); - + private ShippingOrderRepository orderRepository = null; - + public ShippingOrderService() { this.orderRepository = new ShippingOrderRepositoryMock(); } - - + + public void createOrder(ShippingOrder order) throws OrderCreationException { - this.orderRepository.addOrUpdateNewShippingOrder(order); + this.orderRepository.addOrUpdateNewShippingOrder(order); } public List getAllOrders() { @@ -38,17 +50,27 @@ public List getAllOrders() { order.getProductID(), order.getVoyageID(), order.getContainerID(), - order.getStatus()); + order.getStatus()); orderReferences.add(ref); } - return orderReferences; + return orderReferences; } - public Optional getOrderByOrderID(String orderId) { + public Optional getOrderByOrderID(String orderId) { return this.orderRepository.getOrderByOrderID(orderId); } - public void updateOrder(ShippingOrder order) throws OrderUpdateException { + public void updateOrder(ShippingOrder order) throws OrderUpdateException { this.orderRepository.updateShippingOrder(order); - } + } + + public void sendMessages(ShippingOrder order) { + ObjectMapper mapper = new ObjectMapper(); + try { + String orderjson = mapper.writeValueAsString(order); + emitter.send(Record.of(order.getOrderID(), orderjson)); + } catch (Exception e) { + logger.error(String.format("Error sending message to Kafka %s", order.getStatus())); + } + } } diff --git a/src/main/java/ibm/gse/orderms/infra/api/ShippingOrderResource.java b/src/main/java/ibm/gse/orderms/infra/api/ShippingOrderResource.java index 3381df0..ab5e724 100644 --- a/src/main/java/ibm/gse/orderms/infra/api/ShippingOrderResource.java +++ b/src/main/java/ibm/gse/orderms/infra/api/ShippingOrderResource.java @@ -60,9 +60,9 @@ public class ShippingOrderResource { JMSQueueWriter jmsQueueWriter; //BK - @Inject - @Channel("orders-status-out") - public Emitter> emitter; + //@Inject + //@Channel("orders-status-out") + //public Emitter> emitter; public ShippingOrderResource() { } @@ -105,7 +105,8 @@ public Response createShippingOrder(String createOrderParameters) throws Excepti OrderEvent orderEvent = new OrderEvent(System.currentTimeMillis(), EventBase.ORDER_CREATED_TYPE, "1.0", orderEventPayload); jmsQueueWriter.sendMessage(orderEvent, String.valueOf(System.getenv("VOYAGE_REQUEST_QUEUE"))); String orderjson = mapper.writeValueAsString(order); - emitter.send(Record.of(order.getOrderID(), orderjson)); + //emitter.send(Record.of(order.getOrderID(), orderjson)); + shippingOrderService.sendMessages(order); } catch (Exception e) { logger.error("Error writing message to the " + System.getenv("VOYAGE_REQUEST_QUEUE") + @@ -115,8 +116,9 @@ public Response createShippingOrder(String createOrderParameters) throws Excepti order.setStatus(ShippingOrder.CANCELLED_STATUS); shippingOrderService.updateOrder(order); //BK - emit status to Kafka - String orderjson = mapper.writeValueAsString(order); - emitter.send(Record.of(order.getOrderID(), orderjson)); + //String orderjson = mapper.writeValueAsString(order); + //emitter.send(Record.of(order.getOrderID(), orderjson)); + shippingOrderService.sendMessages(order); OrderCancelAndRejectPayload payload = new OrderCancelAndRejectPayload(order, e.getMessage()); diff --git a/src/main/java/ibm/gse/orderms/infra/jms/consumer/FreezerResponseListener.java b/src/main/java/ibm/gse/orderms/infra/jms/consumer/FreezerResponseListener.java index ba60ed8..034d24a 100644 --- a/src/main/java/ibm/gse/orderms/infra/jms/consumer/FreezerResponseListener.java +++ b/src/main/java/ibm/gse/orderms/infra/jms/consumer/FreezerResponseListener.java @@ -26,9 +26,6 @@ public class FreezerResponseListener extends AbstractConsumer { @Inject JMSQueueWriter jmsQueueWriter; - //@Inject - //@Channel("orders-status-out") - public Emitter> emitter; @Override @@ -61,8 +58,7 @@ public void processMessage(String rawMessageBody) { shippingOrder.assignContainer(freezerAllocatedEvent.getPayload()); shippingOrderService.updateOrder(shippingOrder); - String orderjson = mapper.writeValueAsString(shippingOrder); - emitter.send(Record.of(shippingOrder.getOrderID(), orderjson)); + shippingOrderService.sendMessages(shippingOrder); } else if(rawEvent.getString("type").equals(EventBase.TYPE_CONTAINER_NOT_FOUND) || rawEvent.getString("type").equals(EventBase.TYPE_CONTAINER_CANCELED)) { @@ -75,8 +71,7 @@ public void processMessage(String rawMessageBody) { shippingOrder.setStatus(ShippingOrder.CANCELLED_STATUS); shippingOrderService.updateOrder(shippingOrder); - String orderjson = mapper.writeValueAsString(shippingOrder); - emitter.send(Record.of(shippingOrder.getOrderID(), orderjson)); + shippingOrderService.sendMessages(shippingOrder); jmsQueueWriter.sendMessage(freezerNotFoundEvent, getRequestQueue()); diff --git a/src/main/java/ibm/gse/orderms/infra/jms/consumer/VoyageResponseListener.java b/src/main/java/ibm/gse/orderms/infra/jms/consumer/VoyageResponseListener.java index 40708e8..d154aab 100644 --- a/src/main/java/ibm/gse/orderms/infra/jms/consumer/VoyageResponseListener.java +++ b/src/main/java/ibm/gse/orderms/infra/jms/consumer/VoyageResponseListener.java @@ -11,8 +11,7 @@ import ibm.gse.orderms.infra.jms.producer.JMSQueueWriter; import io.smallrye.reactive.messaging.kafka.Record; import io.vertx.core.json.JsonObject; -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Emitter; + import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; @@ -26,9 +25,7 @@ public class VoyageResponseListener extends AbstractConsumer { @Inject JMSQueueWriter jmsQueueWriter; - // @Inject - // @Channel("orders-status-out") - public Emitter> emitter; + @Override @@ -61,8 +58,7 @@ public void processMessage(String rawMessageBody) { ShippingOrder shippingOrder = shippingOrderService.getOrderByOrderID(orderId).orElseThrow(); shippingOrder.cancelVoyage(); shippingOrderService.updateOrder(shippingOrder); - String orderjson = mapper.writeValueAsString(shippingOrder); - emitter.send(Record.of(shippingOrder.getOrderID(), orderjson)); + shippingOrderService.sendMessages(shippingOrder); } else if(rawEvent.getString("type").equals(EventBase.TYPE_VOYAGE_CANCELED)){ @@ -78,8 +74,7 @@ public void processMessage(String rawMessageBody) { shippingOrder.cancelVoyage(); shippingOrderService.updateOrder(shippingOrder); - String orderjson = mapper.writeValueAsString(shippingOrder); - emitter.send(Record.of(shippingOrder.getOrderID(), orderjson)); + shippingOrderService.sendMessages(shippingOrder); } else if(rawEvent.getString("type").equals(EventBase.TYPE_VOYAGE_ASSIGNED)) { @@ -96,8 +91,8 @@ public void processMessage(String rawMessageBody) { shippingOrder.assign(voyageAssignedEvent.getPayload()); shippingOrderService.updateOrder(shippingOrder); - String orderjson = mapper.writeValueAsString(shippingOrder); - emitter.send(Record.of(shippingOrder.getOrderID(), orderjson)); + + shippingOrderService.sendMessages(shippingOrder); if(shippingOrder.getProductID().equals("REFEER_FAILS")) { voyageAssignedEvent.getPayload().setProductId("REFEER_FAILS");