Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ibm.gse.orderms.domain.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import ibm.gse.orderms.domain.model.order.ShippingOrder;
import ibm.gse.orderms.infra.api.dto.ShippingOrderReference;
import ibm.gse.orderms.infra.repository.OrderCreationException;
Expand All @@ -10,23 +11,34 @@
import org.slf4j.LoggerFactory;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
public class ShippingOrderService {

//BK
@Inject
@Channel("orders-status-out")
public Emitter<Record<String, String>> emitter;

static final Logger logger = LoggerFactory.getLogger(ShippingOrderService.class);

private ShippingOrderRepository orderRepository = null;

public ShippingOrderService() {
this.orderRepository = new ShippingOrderRepositoryMock();
}


public void createOrder(ShippingOrder order) throws OrderCreationException {
this.orderRepository.addOrUpdateNewShippingOrder(order);
this.orderRepository.addOrUpdateNewShippingOrder(order);
}

public List<ShippingOrderReference> getAllOrders() {
Expand All @@ -38,17 +50,27 @@ public List<ShippingOrderReference> getAllOrders() {
order.getProductID(),
order.getVoyageID(),
order.getContainerID(),
order.getStatus());
order.getStatus());
orderReferences.add(ref);
}
return orderReferences;
return orderReferences;
}

public Optional<ShippingOrder> getOrderByOrderID(String orderId) {
public Optional<ShippingOrder> getOrderByOrderID(String orderId) {
return this.orderRepository.getOrderByOrderID(orderId);
}

public void updateOrder(ShippingOrder order) throws OrderUpdateException {
public void updateOrder(ShippingOrder order) throws OrderUpdateException {
this.orderRepository.updateShippingOrder(order);
}
}

public void sendMessages(ShippingOrder order) {
ObjectMapper mapper = new ObjectMapper();
try {
String orderjson = mapper.writeValueAsString(order);
emitter.send(Record.of(order.getOrderID(), orderjson));
} catch (Exception e) {
logger.error(String.format("Error sending message to Kafka %s", order.getStatus()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public class ShippingOrderResource {
JMSQueueWriter<EventBase> jmsQueueWriter;

//BK
@Inject
@Channel("orders-status-out")
public Emitter<Record<String, String>> emitter;
//@Inject
//@Channel("orders-status-out")
//public Emitter<Record<String, String>> emitter;

public ShippingOrderResource() {
}
Expand Down Expand Up @@ -105,7 +105,8 @@ public Response createShippingOrder(String createOrderParameters) throws Excepti
OrderEvent orderEvent = new OrderEvent(System.currentTimeMillis(), EventBase.ORDER_CREATED_TYPE, "1.0", orderEventPayload);
jmsQueueWriter.sendMessage(orderEvent, String.valueOf(System.getenv("VOYAGE_REQUEST_QUEUE")));
String orderjson = mapper.writeValueAsString(order);
emitter.send(Record.of(order.getOrderID(), orderjson));
//emitter.send(Record.of(order.getOrderID(), orderjson));
shippingOrderService.sendMessages(order);

} catch (Exception e) {
logger.error("Error writing message to the " + System.getenv("VOYAGE_REQUEST_QUEUE") +
Expand All @@ -115,8 +116,9 @@ public Response createShippingOrder(String createOrderParameters) throws Excepti
order.setStatus(ShippingOrder.CANCELLED_STATUS);
shippingOrderService.updateOrder(order);
//BK - emit status to Kafka
String orderjson = mapper.writeValueAsString(order);
emitter.send(Record.of(order.getOrderID(), orderjson));
//String orderjson = mapper.writeValueAsString(order);
//emitter.send(Record.of(order.getOrderID(), orderjson));
shippingOrderService.sendMessages(order);


OrderCancelAndRejectPayload payload = new OrderCancelAndRejectPayload(order, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ public class FreezerResponseListener extends AbstractConsumer {
@Inject
JMSQueueWriter<EventBase> jmsQueueWriter;

//@Inject
//@Channel("orders-status-out")
public Emitter<Record<String, String>> emitter;


@Override
Expand Down Expand Up @@ -61,8 +58,7 @@ public void processMessage(String rawMessageBody) {
shippingOrder.assignContainer(freezerAllocatedEvent.getPayload());
shippingOrderService.updateOrder(shippingOrder);

String orderjson = mapper.writeValueAsString(shippingOrder);
emitter.send(Record.of(shippingOrder.getOrderID(), orderjson));
shippingOrderService.sendMessages(shippingOrder);

} else if(rawEvent.getString("type").equals(EventBase.TYPE_CONTAINER_NOT_FOUND)
|| rawEvent.getString("type").equals(EventBase.TYPE_CONTAINER_CANCELED)) {
Expand All @@ -75,8 +71,7 @@ public void processMessage(String rawMessageBody) {

shippingOrder.setStatus(ShippingOrder.CANCELLED_STATUS);
shippingOrderService.updateOrder(shippingOrder);
String orderjson = mapper.writeValueAsString(shippingOrder);
emitter.send(Record.of(shippingOrder.getOrderID(), orderjson));
shippingOrderService.sendMessages(shippingOrder);


jmsQueueWriter.sendMessage(freezerNotFoundEvent, getRequestQueue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import ibm.gse.orderms.infra.jms.producer.JMSQueueWriter;
import io.smallrye.reactive.messaging.kafka.Record;
import io.vertx.core.json.JsonObject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;


import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
Expand All @@ -26,9 +25,7 @@ public class VoyageResponseListener extends AbstractConsumer {
@Inject
JMSQueueWriter<EventBase> jmsQueueWriter;

// @Inject
// @Channel("orders-status-out")
public Emitter<Record<String, String>> emitter;



@Override
Expand Down Expand Up @@ -61,8 +58,7 @@ public void processMessage(String rawMessageBody) {
ShippingOrder shippingOrder = shippingOrderService.getOrderByOrderID(orderId).orElseThrow();
shippingOrder.cancelVoyage();
shippingOrderService.updateOrder(shippingOrder);
String orderjson = mapper.writeValueAsString(shippingOrder);
emitter.send(Record.of(shippingOrder.getOrderID(), orderjson));
shippingOrderService.sendMessages(shippingOrder);


} else if(rawEvent.getString("type").equals(EventBase.TYPE_VOYAGE_CANCELED)){
Expand All @@ -78,8 +74,7 @@ public void processMessage(String rawMessageBody) {

shippingOrder.cancelVoyage();
shippingOrderService.updateOrder(shippingOrder);
String orderjson = mapper.writeValueAsString(shippingOrder);
emitter.send(Record.of(shippingOrder.getOrderID(), orderjson));
shippingOrderService.sendMessages(shippingOrder);


} else if(rawEvent.getString("type").equals(EventBase.TYPE_VOYAGE_ASSIGNED)) {
Expand All @@ -96,8 +91,8 @@ public void processMessage(String rawMessageBody) {
shippingOrder.assign(voyageAssignedEvent.getPayload());
shippingOrderService.updateOrder(shippingOrder);

String orderjson = mapper.writeValueAsString(shippingOrder);
emitter.send(Record.of(shippingOrder.getOrderID(), orderjson));

shippingOrderService.sendMessages(shippingOrder);

if(shippingOrder.getProductID().equals("REFEER_FAILS")) {
voyageAssignedEvent.getPayload().setProductId("REFEER_FAILS");
Expand Down