API模块接收请求,推送到消息队列
router模块消费消息,分发到各个模块
每个模块消费消息,在推回API模块,因为api模块需要知道最终执行结果
API模块配置:
spring:
cloud:
stream:
bindings:
outbound-agent-state-list.destination: outbound.agent-state-list #生产
agent-state-list-reply-channel: #消费回调回来的消息
destination: outbound.agent-state-list-reply
group: ${nodeNo:0}
durableSubscription: false
consumer.maxPriority: 10
rabbit.bindings:
outbound-agent-state-list.producer.routing-key-expression: '''router'''
agent-state-list-reply-channel.consumer.durableSubscription: false
@Component
public interface OutboundOutputChannels {
String OUTBOUND_AGENT_STATE_LIST = "outbound-agent-state-list";
@Output(value = OUTBOUND_AGENT_STATE_LIST)
MessageChannel agentStateListOutput();
}
@Component
public interface OutboundInputChannels {
String OUTBOUND_AGENT_STATE_LIST_REPLY = "agent-state-list-reply-channel";
@Input(value = OUTBOUND_AGENT_STATE_LIST_REPLY)
SubscribableChannel agentStateListReply();
}
接收回调消息并出来业务逻辑
@Slf4j
@RequiredArgsConstructor
@EnableBinding(OutboundInputChannels.class)
public class OutboundReplyMonitor extends DestroyableMonitor {
@StreamListener(OutboundInputChannels.OUTBOUND_AGENT_STATE_LIST_REPLY)
public void agentStateListReply(AgentStateListReplyDTO payload) {
countUp();
agentStateService.processAgentStateListReply(payload);
countDown();
}
}
API接口入口
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
@Api(value = "外呼接口", tags = "精准智能请求人工及人工接起")
public class AgentStateController {
private final AgentStateService agentStateService;
/**
* 5.9 精准智能请求人工及人工接起
* 调用商路通服务,获取所有坐席首页数据
*
* @param agentStateRequestDTO 请求参数
* @return 返回ResponseServer
*/
@ApiOperation(value = "用商路通服务,获取所有坐席首页数据", httpMethod = "POST")
@PostMapping("/agentStateList")
public ApiResponse agentStateList(@Validated @RequestBody AgentStateListRequestDTO agentStateRequestDTO, @RequestHeader(value = WebConstant.PRIORITY, required = false, defaultValue = "2") Integer priority) throws Exception {
log.info("请求:调用商路通服务,获取所有坐席首页数据,param:{}", agentStateRequestDTO.toString());
return agentStateService.agentStateList(agentStateRequestDTO, priority);
}
}
API接收请求后,数据校验,发送请求到消息队列,并等待消息的响应
@Slf4j
@Service
@RequiredArgsConstructor
public class AgentStateService {
private final MessageService messageService;
private final CallSupplierTypeRelationService callSupplierTypeRelationService;
private final CallSupplierService callSupplierService;
private final CallTypeService callTypeService;
protected final ApplicationProperties properties;
private final Map<String, CompletableFuture<ApiResponse>> queryFutureMap = new ConcurrentHashMap<>();
public ApiResponse agentStateList(AgentStateListRequestDTO queue, Integer priority) throws Exception {
long startTime = System.currentTimeMillis();
String businessId = SltBusinessEnum.getNameByCallType(queue.getCallType());
if (StringUtils.isEmpty(businessId)) {
throw new BusinessException("商路通不支持的外呼类型:" + queue.getCallType());
}
CallSupplierPO supplierPO = callSupplierService.findByCode(queue.getSupplier());
if (supplierPO == null) {
throw new BusinessException("供应商不存在:" + queue.getSupplier());
}
CallTypePO callTypePO = callTypeService.findByCode(queue.getCallType());
if (callTypePO == null) {
throw new BusinessException("外呼类型不存在:" + queue.getCallType());
}
CallSupplierTypeRelationPO callSupplierTypeRelationPo = callSupplierTypeRelationService.findBySupplierIdAndCallTypeId(supplierPO.getId(), callTypePO.getId());
if (null == callSupplierTypeRelationPo) {
throw new BusinessException("供应商外呼类型关系不存在");
}
queue.setCallType(callTypePO.getCode());
queue.setSupplierCode(supplierPO.getCode());
queue.setQueueId(StringCheckUtil.uuid());
messageService.sendToAgentStateListChannel(queue, priority);
CompletableFuture<ApiResponse> future = new CompletableFuture<>();
queryFutureMap.put(queue.getQueueId(), future);
long maxWaitMillis = properties.getMaxWaitMillis().toMillis();
Duration duration = properties.getMaxWaitMillis().minusMillis(System.currentTimeMillis() - startTime);
if (duration.isNegative() || duration.isZero()) {
log.info("调用商路通服务,获取所有坐席首页数据,已超过最大等待时间{}ms,param:{}", properties.getMaxWaitMillis().toMillis(), queue.toString());
throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());
}
log.debug("等待返回调用商路通服务,获取所有坐席首页数据请求结果,允许等待时间{}ms", duration.toMillis());
try {
return future.get(duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());
} finally {
queryFutureMap.remove(queue.getQueueId());
}
}
public void processAgentStateListReply(AgentStateListReplyDTO payload) {
String queueId = payload.getQueueId();
log.info("API模块-接收-调用商路通服务,获取所有坐席首页数据结果,{}", payload.toString());
if (queryFutureMap.containsKey(queueId)) {
queryFutureMap.get(queueId).complete(ApiResponse.success(payload.getRows()));
} else {
log.debug("未找到queueId记录:{}", queueId);
}
}
}
停止服务前,检查是否有消息正在处理
@Slf4j
public class DestroyableMonitor {
private static final Integer MAX_WAIT_COUNT = 20;
private AtomicLong messageCount = new AtomicLong();
protected Long countUp() {
return messageCount.incrementAndGet();
}
protected Long countDown() {
return messageCount.decrementAndGet();
}
@PreDestroy
private void tearDown() throws InterruptedException {
int waitCount = 0;
while (messageCount.get() > 0 && waitCount++ < MAX_WAIT_COUNT) {
log.info("正在关闭消息监听程序{},等待3秒[{}/{}]...", this.getClass().getCanonicalName(), waitCount, MAX_WAIT_COUNT);
Thread.sleep(3000L);
}
if (messageCount.get() > 0) {
log.warn("应用非安全关闭,当前仍有{}条正在处理的消息", messageCount.get());
}
}
}
router模块队列配置:
spring.cloud.stream:
bindings:
agent-state-list-input-channel:
destination: outbound.agent-state-list
group: router
consumer:
maxAttempts: 1
concurrency: 10
agent-state-list-slt-acv-channel.destination: outbound.agent-state-list
agent-state-list-slt-ivr-channel.destination: outbound.agent-state-list
rabbit.bindings:
agent-state-list-input-channel.consumer.bindingRoutingKey: router
agent-state-list-slt-acv-channel.producer.routing-key-expression: '''slt-acv'''
agent-state-list-slt-ivr-channel.producer.routing-key-expression: '''slt-ivr'''
@Component
public interface RouterInputChannels {
String AGENT_STATE_LIST_INPUT_CHANNEL = "agent-state-list-input-channel";
@Input(value = AGENT_STATE_LIST_INPUT_CHANNEL)
SubscribableChannel agentStateListInput();
}
@Component
public interface RouterOutputChannels {
String AGENT_STATE_LIST_SLT_ACV = "agent-state-list-slt-acv-channel";
String AGENT_STATE_LIST_SLT_IVR = "agent-state-list-slt-ivr-channel";
@Output(AGENT_STATE_LIST_SLT_ACV)
MessageChannel agentStateListSltACR();
@Output(AGENT_STATE_LIST_SLT_IVR)
MessageChannel agentStateListSltIVR();
}
router 模块监听
@Slf4j
@Component
@EnableBinding(RouterInputChannels.class)
@RequiredArgsConstructor
public class AgentStatusListMonitor extends DestroyableMonitor {
private final AgentStateListService agentStateListService;
@StreamListener(RouterInputChannels.AGENT_STATE_LIST_INPUT_CHANNEL)
public void onMessage(AgentStateListRequestDTO req) {
countUp();
try {
log.info("AgentStatusListMonitor收到请求,调用商路通服务,获取所有坐席首页数据:查询参数:{}", req.toString());
agentStateListService.process(req);
} finally {
countDown();
}
}
}
router业务处理和消息分发
@Slf4j
@Service
@RequiredArgsConstructor
public class AgentStateListService {
private final AgentStateListMessageService agentStateListMessageService;
private final RouterOutputChannels routerOutputChannels;
public void process(AgentStateListRequestDTO payload) {
agentStateListMessageService.send(payload);
Message<AgentStateListRequestDTO> message = MessageBuilder.withPayload(payload).build();
switch (payload.getSupplier()) {
case OutboundSupplierConstant.SLT:
switch (payload.getCallType()) {
case OutboundTypeConstant.IVR:
routerOutputChannels.agentStateListSltIVR().send(message);
return;
case OutboundTypeConstant.ACV:
routerOutputChannels.agentStateListSltACR().send(message);
return;
default:
routerOutputChannels.agentStateListSltACR().send(message);
routerOutputChannels.agentStateListSltIVR().send(message);
return;
}
default:
log.error("调用商路通服务,获取所有坐席首页数据的消息转发失败,供应商不支持,[payload:{}]", payload);
}
}
}
分发后的模块
base模块
application-slt.yml
spring.cloud.stream:
bindings:
agent-state-list-reply-channel.destination: outbound.agent-state-list-reply
@Component
public interface SltBoundInputChannels {
String AGENT_STATE_LIST_CHANNEL = "agent-state-list-channel";
@Input(value = AGENT_STATE_LIST_CHANNEL)
SubscribableChannel agentStateListSltAcvInput();
}
@Component
public interface SltBoundOutputChannels {
String AGENT_STATE_LIST_REPLY_CHANNEL = "agent-state-list-reply-channel";
@Output(AGENT_STATE_LIST_REPLY_CHANNEL)
MessageChannel agentStateListReplyOutput();
}
base模块的monitor
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(SltBoundInputChannels.class)
public class SltAgentInfoMonitor extends DestroyableMonitor {
private final SltAgentStateListService sltAgentStateListService;
@StreamListener(SltBoundInputChannels.AGENT_STATE_LIST_CHANNEL)
public void process(Message<AgentStateListRequestDTO> message) {
log.info("外呼平台:SLT,代理模块收到消息,调用商路通服务,获取所有坐席首页数据, params: {}", message.getPayload());
countUp();
sltAgentStateListService.process(message.getPayload());
countDown();
}
}
service实现了一些slt请求的方法
@Slf4j
@Service
public class SltAgentStateListService extends AbstractSltRequestServiceTemplate<AgentStateListRequestDTO, AgentStateListResponseDTO> {
private final SltBoundOutputChannels sltBoundOutputChannels;
public SltAgentStateListService(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService, SltBoundOutputChannels sltBoundOutputChannels) {
super(properties, objectMapper, restTemplateService);
this.sltBoundOutputChannels = sltBoundOutputChannels;
}
@Override
protected void handleException(Exception exception, AgentStateListRequestDTO payload) {
log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,调用异常, params: {}", payload, exception);
}
@Override
public RequestDTO generateRequest(AgentStateListRequestDTO payload) {
log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,创建请求,params:{}", payload);
SupplierRequestInfoDTO requestInfo = properties.getRequestInfo(payload.getSupplierCode());
RequestDTO request = new RequestDTO();
request.setAction(OUTBOUND_AGENT_STATE_LIST_ACTION);
request.setStartTime(payload.getStartTime());
request.setEndTime(payload.getEndTime());
request.setBusinessID(SltBusinessEnum.getNameByCallType(payload.getCallType()));
request.setBaseUrl(requestInfo.getBaseUrl());
request.setUrl(requestInfo.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_AGENT_STATE_LIST_SERVLET)));
request.setLoginUser(requestInfo.getUser());
request.setLoginPwd(requestInfo.getPassword());
return request;
}
@Override
public void handleResponse(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {
if (SltResponseCode.RESPONSE_SUCCESS_STR.equals(responseDTO.getReturnCode())) {
processReply(responseDTO, payload);
} else {
log.warn("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-处理响应,获取失败,params:{}, result:{}", payload, responseDTO);
}
}
@Override
protected TypeReference<AgentStateListResponseDTO> instanceReference() {
return new TypeReference<>() {
};
}
//消息写会API模块
private void processReply(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {
List<AgentStateListJsonTO> rowsJsonObject = responseDTO.getRows();
List<AgentStateListDTO> rowsList = new ArrayList<>();
for (AgentStateListJsonTO e : rowsJsonObject) {
AgentStateListDTO agentStateListDTO = new AgentStateListDTO();
BeanUtils.copyProperties(e, agentStateListDTO);
rowsList.add(agentStateListDTO);
}
AgentStateListReplyDTO reply = new AgentStateListReplyDTO();
reply.setReturnCode(responseDTO.getReturnCode());
reply.setMessage(responseDTO.getReturnMessage());
reply.setRows(rowsList);
reply.setQueueId(payload.getQueueId());
reply.setTotal(responseDTO.getTotal());
sltBoundOutputChannels.agentStateListReplyOutput().send(MessageBuilder.withPayload(reply).build());
}
}
@Slf4j
public abstract class AbstractSltRequestServiceTemplate<T extends BaseQueue, K extends ResponseDTO> extends SltSendRequestService<K>
implements IEncapsulationRequestEntityInterface<T, RequestDTO>, IParsingResponseBodyInterface<T, K> {
public AbstractSltRequestServiceTemplate(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService) {
super(properties, objectMapper, restTemplateService);
}
public void process(T payload) {
try {
RequestDTO requestDTO = generateRequest(payload);
K response = sendRequest(requestDTO);
handleResponse(response, payload);
} catch (Exception e) {
handleException(e, payload);
}
}
/**
* 处理异常
*
* @param exception 异常对象
* @param payload 传递数据
*/
protected abstract void handleException(Exception exception, T payload);
}
下面几个类是对SLT的所有请求的封装
@Slf4j
@Component
@RequiredArgsConstructor
public class SltSendRequestService<T extends ResponseDTO> implements ISendRequestInterface<RequestDTO, T> {
protected final ApplicationProperties properties;
protected final ObjectMapper objectMapper;
protected final RestTemplateService restTemplateService;
ThreadLocal<Boolean> checkLogin = new ThreadLocal<>();
/**
* 发送请求
*
* @param request 请求参数
* @return 响应实体
* @throws Exception 异常
*/
@Override
public T sendRequest(RequestDTO request) throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
String requestParam = "param=".concat(objectMapper.writeValueAsString(request));
HttpEntity<String> requestEntity = new HttpEntity<>(requestParam, headers);
long startTime = System.currentTimeMillis();
ResponseEntity<String> responseEntity = null;
log.info("请求:[{}],header: {}", requestParam, headers.toString());
try {
responseEntity = restTemplateService.post(request.getUrl(), requestEntity, properties.getRequestTimeoutMaximum(), String.class);
Assert.hasText(responseEntity.getBody(), "接口调用异常:商路通接口响应体为空");
if (responseEntity.getStatusCode().equals(HttpStatus.OK)) {
T response = objectMapper.readValue(responseEntity.getBody(), instanceReference());
return checkResponse(response, request);
} else {
log.error("商路通外呼请求:接口调用失败 [statusCode:{},body:{}]", responseEntity.getStatusCodeValue(), responseEntity.getBody());
throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "商路通外呼请求:接口调用失败");
}
} finally {
checkLogin.remove();
long elapsedTime = System.currentTimeMillis() - startTime;
if (responseEntity == null) {
log.error("商路通外呼请求详情:[请求地址:[{}],请求体:[{}],响应体:null]", request.getUrl(), requestParam);
} else {
log.info("商路通外呼请求详情:[请求地址:[{}],请求时间:{},请求体:[{}],响应体:[{}]]",
request.getUrl(), elapsedTime, requestParam, responseEntity.getBody());
}
}
}
protected TypeReference<T> instanceReference() {
return new TypeReference<T>() {
};
}
/**
* 校验响应体是否合法
*
* @param response 响应体对象
* @param request 请求体对象
* @return 响应体对象
* @throws Exception 异常
*/
T checkResponse(T response, RequestDTO request) throws Exception {
if (StringUtils.equals(SltResponseCode.RESPONSE_TIME_OUT_STR, response.getReturnCode()) &&
StringUtils.equals("超时", response.getReturnMessage())) {
log.info("商路通外呼请求:接口请求响应码超时:[{}],调用登录接口", response.toString());
return login(request);
} else {
return response;
}
}
/**
* 商路通登陆操作
*
* @param request 请求参数对象
* @return 响应体对象
* @throws Exception 异常
*/
private T login(RequestDTO request) throws Exception {
checkLogin();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
String loginParam = "userId=".concat(request.getLoginUser()).concat("&md5pwd=").concat(request.getLoginPwd());
HttpEntity<String> requestEntity = new HttpEntity<>(loginParam, headers);
ResponseEntity<String> loginResponseEntity = restTemplateService.post(request.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_LOGIN)), requestEntity);
log.info("商路通外呼请求:登陆接口响应:[body:{}]", loginResponseEntity.getBody());
Assert.hasText(loginResponseEntity.getBody(), "接口调用异常:商路通登录接口响应体为空");
LoginResponseDTO loginResponse = objectMapper.readValue(loginResponseEntity.getBody(), LoginResponseDTO.class);
if (SltResponseCode.RESPONSE_SUCCESS.equals(loginResponse.getReturnCode())) {
return sendRequest(request);
} else {
throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(),
"商路通外呼请求:登录接口调用失败:".concat(Objects.requireNonNull(loginResponseEntity.getBody())));
}
}
/**
* 校验请求而否二次登陆
*/
private void checkLogin() {
if (checkLogin.get() == null || !checkLogin.get()) {
checkLogin.set(true);
} else {
throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "操作失败: 商路通登录接口已经调用过");
}
}
}
public interface ISendRequestInterface<T extends BaseRequestEntity,K extends BaseResponseEntity> {
/**
* 发送请求
*
* @param request 请求实体
* @return 响应实体
* @throws Exception 异常
*/
K sendRequest(T request) throws Exception;
}
public interface IEncapsulationRequestEntityInterface<Q extends BaseQueue,T extends BaseRequestEntity> {
/**
* 初始化请求对象
*
* @param payload 消息队列传递请求相关参数信息
* @return 请求参数
*/
T generateRequest(Q payload) throws Exception;
}
@Data
public class BaseRequestEntity {
}
下面是IVR模块的示例,ACV模块省略
配置队列:
spring.cloud.stream:
bindings:
agent-state-list-channel:
destination: outbound.agent-state-list
group: slt-ivr
consumer:
maxAttempts: 1
concurrency: 10
rabbit.bindings:
agent-state-list-channel.consumer.bindingRoutingKey: slt-ivr
相关推荐
### RabbitMQ 使用总结 #### 一、安装与配置 **1. 安装顺序** - **ERLANG安装**: 首先确保安装了Erlang环境。安装时注意选择带有`windowdll`选项的版本,这有助于后续RabbitMQ的稳定运行。 - **RabbitMQ安装**: ...
RabbitMq 使用手册 本文档为 RabbitMq 使用手册,介绍了 RabbitMq 的应用场景和开发指导。RabbitMq 是一个由 Erlang 开发的 AMQP(Advanced Message Queue)流行的开源消息队列系统。RabbitMq 的结构图如下: ...
最后,文档中还涵盖了一些RabbitMQ使用过程中的常见问题,以及它们的解决方法,来帮助用户更好地理解和运用RabbitMQ。 值得注意的是,文档中提到的某些部分可能由于OCR技术导致识别错误或遗漏,但基于上述描述,...
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用...
这份"RabbitMQ使用参考-YS"文档,很可能是为了帮助用户理解和掌握RabbitMQ的基本概念、安装配置、使用方法以及最佳实践。 首先,我们需要了解RabbitMQ的基本概念。RabbitMQ的核心是消息 broker,它接收、存储和转发...
RabbitMQ适用于多种操作系统,文档中提及了基于华为云使用CentOS 7.6(64位)系统部署RabbitMQ 3.8.2版本的实例。该服务软件的安装和配置被记录在官方手册中,为用户提供了一个较为完整的指南。 安装RabbitMQ通常...
RabbitMQ 使用手册 RabbitMQ 是一个流行的消息队列系统,它提供了多种交换机类型,每种交换机类型都有其特点和用途。下面我们将详细介绍 RabbitMQ 中的 Fanout Exchange、Direct Exchange 和 Topic Exchange。 ...
消息中间件RabbitMQ使用
RabbitMQ使用规范 概述 RabbitMQ是基于AMQP高性能的开源消息队列服务器,可以用来实现消息的发布、订阅和routing。该规范旨在为RabbitMQ的使用提供一个统一的标准,确保RabbitMQ的使用符合公司的技术架构和安全要求...
本文将详细介绍如何在C++和QT环境下集成并使用RabbitMQ,以便于开发者构建实时通信和高并发的应用。 首先,安装RabbitMQ需要在系统上设置Erlang环境,因为RabbitMQ是用Erlang语言编写的。确保已下载并安装了Erlang ...
本篇将详细介绍如何在Spring Boot项目中集成并使用RabbitMQ。 首先,我们需要在Spring Boot项目中添加RabbitMQ的相关依赖。在`pom.xml`文件中,加入以下Maven依赖: ```xml <groupId>org.springframework.boot ...
RabbitMQ 和消息传递系统通常会使用特定的术语来描述其中的关键组件。 - **生产者(Producer)**:生产者是指发送消息的程序或应用。在RabbitMQ 的图形表示中,我们通常用 "P" 来标记它。 - **队列(Queue)**:队列是...
在本文中,我们将深入探讨如何在C#环境中封装和使用RabbitMQ,这是一个基于AMQP(Advanced Message Queuing Protocol)的消息中间件。首先,我们来看看RabbitMQ是什么以及它的核心功能。 RabbitMQ是一个开源的消息...
而RabbitMQ则是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,能够实现异步通信和解耦系统组件。将Spring Boot与RabbitMQ整合,可以让我们在微服务架构中轻松地...
在IT行业中,RabbitMQ是一个广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效处理和路由消息。要使用RabbitMQ,首先需要确保你的环境已经正确...
此外,RabbitMQ默认使用5672端口,如果该端口已被其他服务占用,需要更改RabbitMQ的配置文件(rabbitmq.config)来指定一个未使用的端口。同时,记得检查你的防火墙设置,确保允许RabbitMQ服务的入站和出站通信。 ...
RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
在JavaScript中使用RabbitMQ,首先需要一个客户端库,如`amqplib`,这是一个官方支持的JavaScript客户端,可以与RabbitMQ服务器建立连接。安装该库可以通过npm(Node.js的包管理器)执行以下命令: ```bash npm ...
**四、RabbitMQ使用基础** 1. 生产者与消费者:RabbitMQ的核心概念包括生产者(发送消息的客户端)、消费者(接收消息的客户端)以及队列(存储消息的数据结构)。生产者将消息发布到队列,消费者从队列中获取并...
在本方案中,我们将利用RabbitMQ的延迟队列特性来实现在订单和库存系统中的分布式事务最终一致性。 RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的消息中间件,它提供了一种可靠的消息传递机制,使得...