RabbitMQService.java 9.79 KB
package com.ecommerce.notification.service;

import com.ecommerce.notification.model.Notification;
import com.ecommerce.notification.model.dto.NotificationRequest;
import com.ecommerce.notification.event.NotificationEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMQService {
    
    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper;
    private final ApplicationEventPublisher eventPublisher;
    
    public void sendNotificationEvent(Notification notification) {
        try {
            Map<String, Object> message = new HashMap<>();
            message.put("eventType", "NOTIFICATION_SENT");
            message.put("notificationId", notification.getNotificationId());
            message.put("type", notification.getType());
            message.put("channel", notification.getChannel());
            message.put("status", notification.getStatus());
            message.put("userId", notification.getUserId());
            message.put("timestamp", System.currentTimeMillis());
            
            rabbitTemplate.convertAndSend("notification.exchange", "notification.sent", message);
            log.info("Notification event sent: {}", notification.getNotificationId());
        } catch (Exception e) {
            log.error("Failed to send notification event: {}", notification.getNotificationId(), e);
        }
    }
    
    public void sendNotificationFailedEvent(Notification notification) {
        try {
            Map<String, Object> message = new HashMap<>();
            message.put("eventType", "NOTIFICATION_FAILED");
            message.put("notificationId", notification.getNotificationId());
            message.put("type", notification.getType());
            message.put("channel", notification.getChannel());
            message.put("failureReason", notification.getFailureReason());
            message.put("userId", notification.getUserId());
            message.put("timestamp", System.currentTimeMillis());
            
            rabbitTemplate.convertAndSend("notification.exchange", "notification.failed", message);
            log.info("Notification failed event sent: {}", notification.getNotificationId());
        } catch (Exception e) {
            log.error("Failed to send notification failed event: {}", notification.getNotificationId(), e);
        }
    }
    
    public void processOrderCreatedEvent(String message) {
        try {
            Map<String, Object> event = objectMapper.readValue(message, Map.class);
            String orderId = (String) event.get("orderId");
            String orderNumber = (String) event.get("orderNumber");
            Long userId = (Long) event.get("userId");
            String email = (String) event.get("guestEmail");
            
            // Send order confirmation notification
            NotificationRequest request = new NotificationRequest();
            request.setUserId(userId);
            request.setEmail(email);
            request.setType("EMAIL");
            request.setChannel("ORDER");
            request.setTemplateName("order-confirmation");
            request.setReferenceType("ORDER");
            request.setReferenceId(orderNumber);
            
            Map<String, Object> variables = new HashMap<>();
            variables.put("orderNumber", orderNumber);
            variables.put("orderId", orderId);
            request.setVariables(variables);
            
            // 使用事件发布代替直接调用
            eventPublisher.publishEvent(new NotificationEvent(this, request));
            
            log.info("Order confirmation event published for order: {}", orderNumber);
            
        } catch (Exception e) {
            log.error("Failed to process order created event: {}", e.getMessage());
        }
    }
    
    public void processPaymentSuccessEvent(String message) {
        try {
            Map<String, Object> event = objectMapper.readValue(message, Map.class);
            String orderNumber = (String) event.get("orderNumber");
            Long userId = (Long) event.get("userId");
            String paymentId = (String) event.get("paymentId");
            
            // Send payment success notification
            NotificationRequest request = new NotificationRequest();
            request.setUserId(userId);
            request.setType("EMAIL");
            request.setChannel("PAYMENT");
            request.setTemplateName("payment-success");
            request.setReferenceType("PAYMENT");
            request.setReferenceId(paymentId);
            
            Map<String, Object> variables = new HashMap<>();
            variables.put("orderNumber", orderNumber);
            variables.put("paymentId", paymentId);
            request.setVariables(variables);
            
            // 使用事件发布代替直接调用
            eventPublisher.publishEvent(new NotificationEvent(this, request));
            
            log.info("Payment success event published for payment: {}", paymentId);
            
        } catch (Exception e) {
            log.error("Failed to process payment success event: {}", e.getMessage());
        }
    }
    
    public void processLowStockEvent(String message) {
        try {
            Map<String, Object> event = objectMapper.readValue(message, Map.class);
            String sku = (String) event.get("sku");
            String productName = (String) event.get("productName");
            Integer currentStock = (Integer) event.get("currentStock");
            Integer safetyStock = (Integer) event.get("safetyStock");
            
            // Send low stock notification to admin
            NotificationRequest request = new NotificationRequest();
            request.setEmail("admin@ecommerce.com"); // Admin email
            request.setType("EMAIL");
            request.setChannel("INVENTORY");
            request.setSubject("Low Stock Alert");
            request.setContent(String.format(
                    "Product %s (SKU: %s) is running low on stock. Current: %d, Safety: %d",
                    productName, sku, currentStock, safetyStock
            ));
            
            // 使用事件发布代替直接调用
            eventPublisher.publishEvent(new NotificationEvent(this, request));
            
            log.info("Low stock event published for product: {}", sku);
            
        } catch (Exception e) {
            log.error("Failed to process low stock event: {}", e.getMessage());
        }
    }
    
    public void processOrderStatusUpdatedEvent(String message) {
        try {
            Map<String, Object> event = objectMapper.readValue(message, Map.class);
            String orderNumber = (String) event.get("orderNumber");
            Long userId = (Long) event.get("userId");
            String newStatus = (String) event.get("status");
            String customerEmail = (String) event.get("customerEmail");
            
            // Send order status update notification
            NotificationRequest request = new NotificationRequest();
            request.setUserId(userId);
            request.setEmail(customerEmail);
            request.setType("EMAIL");
            request.setChannel("ORDER");
            request.setTemplateName("order-status-update");
            request.setReferenceType("ORDER");
            request.setReferenceId(orderNumber);
            
            Map<String, Object> variables = new HashMap<>();
            variables.put("orderNumber", orderNumber);
            variables.put("status", newStatus);
            request.setVariables(variables);
            
            // 使用事件发布代替直接调用
            eventPublisher.publishEvent(new NotificationEvent(this, request));
            
            log.info("Order status update event published for order: {}", orderNumber);
            
        } catch (Exception e) {
            log.error("Failed to process order status updated event: {}", e.getMessage());
        }
    }
    
    public void processShippingUpdatedEvent(String message) {
        try {
            Map<String, Object> event = objectMapper.readValue(message, Map.class);
            String orderNumber = (String) event.get("orderNumber");
            Long userId = (Long) event.get("userId");
            String trackingNumber = (String) event.get("trackingNumber");
            String shippingCarrier = (String) event.get("shippingCarrier");
            String customerEmail = (String) event.get("customerEmail");
            
            // Send shipping update notification
            NotificationRequest request = new NotificationRequest();
            request.setUserId(userId);
            request.setEmail(customerEmail);
            request.setType("EMAIL");
            request.setChannel("SHIPPING");
            request.setTemplateName("shipping-update");
            request.setReferenceType("ORDER");
            request.setReferenceId(orderNumber);
            
            Map<String, Object> variables = new HashMap<>();
            variables.put("orderNumber", orderNumber);
            variables.put("trackingNumber", trackingNumber);
            variables.put("shippingCarrier", shippingCarrier);
            request.setVariables(variables);
            
            // 使用事件发布代替直接调用
            eventPublisher.publishEvent(new NotificationEvent(this, request));
            
            log.info("Shipping update event published for order: {}", orderNumber);
            
        } catch (Exception e) {
            log.error("Failed to process shipping updated event: {}", e.getMessage());
        }
    }
}