RabbitMQService.java
9.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package com.ecommerce.notification.service;
import com.ecommerce.notification.model.Notification;
import com.ecommerce.notification.model.dto.NotificationRequest;
import com.ecommerce.notification.event.NotificationEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMQService {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
private final ApplicationEventPublisher eventPublisher;
public void sendNotificationEvent(Notification notification) {
try {
Map<String, Object> message = new HashMap<>();
message.put("eventType", "NOTIFICATION_SENT");
message.put("notificationId", notification.getNotificationId());
message.put("type", notification.getType());
message.put("channel", notification.getChannel());
message.put("status", notification.getStatus());
message.put("userId", notification.getUserId());
message.put("timestamp", System.currentTimeMillis());
rabbitTemplate.convertAndSend("notification.exchange", "notification.sent", message);
log.info("Notification event sent: {}", notification.getNotificationId());
} catch (Exception e) {
log.error("Failed to send notification event: {}", notification.getNotificationId(), e);
}
}
public void sendNotificationFailedEvent(Notification notification) {
try {
Map<String, Object> message = new HashMap<>();
message.put("eventType", "NOTIFICATION_FAILED");
message.put("notificationId", notification.getNotificationId());
message.put("type", notification.getType());
message.put("channel", notification.getChannel());
message.put("failureReason", notification.getFailureReason());
message.put("userId", notification.getUserId());
message.put("timestamp", System.currentTimeMillis());
rabbitTemplate.convertAndSend("notification.exchange", "notification.failed", message);
log.info("Notification failed event sent: {}", notification.getNotificationId());
} catch (Exception e) {
log.error("Failed to send notification failed event: {}", notification.getNotificationId(), e);
}
}
public void processOrderCreatedEvent(String message) {
try {
Map<String, Object> event = objectMapper.readValue(message, Map.class);
String orderId = (String) event.get("orderId");
String orderNumber = (String) event.get("orderNumber");
Long userId = (Long) event.get("userId");
String email = (String) event.get("guestEmail");
// Send order confirmation notification
NotificationRequest request = new NotificationRequest();
request.setUserId(userId);
request.setEmail(email);
request.setType("EMAIL");
request.setChannel("ORDER");
request.setTemplateName("order-confirmation");
request.setReferenceType("ORDER");
request.setReferenceId(orderNumber);
Map<String, Object> variables = new HashMap<>();
variables.put("orderNumber", orderNumber);
variables.put("orderId", orderId);
request.setVariables(variables);
// 使用事件发布代替直接调用
eventPublisher.publishEvent(new NotificationEvent(this, request));
log.info("Order confirmation event published for order: {}", orderNumber);
} catch (Exception e) {
log.error("Failed to process order created event: {}", e.getMessage());
}
}
public void processPaymentSuccessEvent(String message) {
try {
Map<String, Object> event = objectMapper.readValue(message, Map.class);
String orderNumber = (String) event.get("orderNumber");
Long userId = (Long) event.get("userId");
String paymentId = (String) event.get("paymentId");
// Send payment success notification
NotificationRequest request = new NotificationRequest();
request.setUserId(userId);
request.setType("EMAIL");
request.setChannel("PAYMENT");
request.setTemplateName("payment-success");
request.setReferenceType("PAYMENT");
request.setReferenceId(paymentId);
Map<String, Object> variables = new HashMap<>();
variables.put("orderNumber", orderNumber);
variables.put("paymentId", paymentId);
request.setVariables(variables);
// 使用事件发布代替直接调用
eventPublisher.publishEvent(new NotificationEvent(this, request));
log.info("Payment success event published for payment: {}", paymentId);
} catch (Exception e) {
log.error("Failed to process payment success event: {}", e.getMessage());
}
}
public void processLowStockEvent(String message) {
try {
Map<String, Object> event = objectMapper.readValue(message, Map.class);
String sku = (String) event.get("sku");
String productName = (String) event.get("productName");
Integer currentStock = (Integer) event.get("currentStock");
Integer safetyStock = (Integer) event.get("safetyStock");
// Send low stock notification to admin
NotificationRequest request = new NotificationRequest();
request.setEmail("admin@ecommerce.com"); // Admin email
request.setType("EMAIL");
request.setChannel("INVENTORY");
request.setSubject("Low Stock Alert");
request.setContent(String.format(
"Product %s (SKU: %s) is running low on stock. Current: %d, Safety: %d",
productName, sku, currentStock, safetyStock
));
// 使用事件发布代替直接调用
eventPublisher.publishEvent(new NotificationEvent(this, request));
log.info("Low stock event published for product: {}", sku);
} catch (Exception e) {
log.error("Failed to process low stock event: {}", e.getMessage());
}
}
public void processOrderStatusUpdatedEvent(String message) {
try {
Map<String, Object> event = objectMapper.readValue(message, Map.class);
String orderNumber = (String) event.get("orderNumber");
Long userId = (Long) event.get("userId");
String newStatus = (String) event.get("status");
String customerEmail = (String) event.get("customerEmail");
// Send order status update notification
NotificationRequest request = new NotificationRequest();
request.setUserId(userId);
request.setEmail(customerEmail);
request.setType("EMAIL");
request.setChannel("ORDER");
request.setTemplateName("order-status-update");
request.setReferenceType("ORDER");
request.setReferenceId(orderNumber);
Map<String, Object> variables = new HashMap<>();
variables.put("orderNumber", orderNumber);
variables.put("status", newStatus);
request.setVariables(variables);
// 使用事件发布代替直接调用
eventPublisher.publishEvent(new NotificationEvent(this, request));
log.info("Order status update event published for order: {}", orderNumber);
} catch (Exception e) {
log.error("Failed to process order status updated event: {}", e.getMessage());
}
}
public void processShippingUpdatedEvent(String message) {
try {
Map<String, Object> event = objectMapper.readValue(message, Map.class);
String orderNumber = (String) event.get("orderNumber");
Long userId = (Long) event.get("userId");
String trackingNumber = (String) event.get("trackingNumber");
String shippingCarrier = (String) event.get("shippingCarrier");
String customerEmail = (String) event.get("customerEmail");
// Send shipping update notification
NotificationRequest request = new NotificationRequest();
request.setUserId(userId);
request.setEmail(customerEmail);
request.setType("EMAIL");
request.setChannel("SHIPPING");
request.setTemplateName("shipping-update");
request.setReferenceType("ORDER");
request.setReferenceId(orderNumber);
Map<String, Object> variables = new HashMap<>();
variables.put("orderNumber", orderNumber);
variables.put("trackingNumber", trackingNumber);
variables.put("shippingCarrier", shippingCarrier);
request.setVariables(variables);
// 使用事件发布代替直接调用
eventPublisher.publishEvent(new NotificationEvent(this, request));
log.info("Shipping update event published for order: {}", orderNumber);
} catch (Exception e) {
log.error("Failed to process shipping updated event: {}", e.getMessage());
}
}
}