3850 字
19 分钟
RocketMQ生产者与消费者完整封装技术文档
RocketMQ生产者与消费者完整封装技术文档
概述
本文档提供了基于Spring Boot的RocketMQ完整解决方案,包括消息生产者和消费者的封装实现。支持同步发送、异步发送、延迟消息、定时消息、有序消息以及并发消费、顺序消费等多种模式。
依赖配置
在项目的 pom.xml中添加RocketMQ Spring Boot Starter依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.3</version></dependency>配置文件
application.yml配置
在 application.yml中添加RocketMQ配置:
rocketmq: # NameServer地址,多个用分号分隔 name-server: localhost:9876
producer: # 生产者组名,同一个组内的生产者发送相同Topic的消息 group: prodProduceGroup
# 是否启用消息轨迹,用于消息的发送和消费轨迹追踪 # 开启后可在控制台查看消息的完整链路,但会影响性能 enable-msg-trace: false
# 访问密钥,用于RocketMQ的权限验证 access-key: '00phgp34'
# 秘密密钥,与access-key配对使用 secret-key: 'errtteeet3!'
# 发送消息超时时间(毫秒),默认3000ms send-message-timeout: 10000
# 发送失败重试次数,默认2次 retry-times-when-send-failed: 3
# 消息最大大小(字节),默认4MB max-message-size: 4194304
consumer: # 是否启用消息轨迹 enable-msg-trace: false
# 消费者访问密钥 access-key: '00phgp34'
# 消费者秘密密钥 secret-key: 'errtteeet3!'
# 拉取消息超时时间(毫秒) pull-timeout: 10000broker.conf配置
RocketMQ Broker服务端配置文件:
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.
# Broker对外服务的IP地址,客户端连接的IPbrokerIP1=公网IP
# NameServer的地址,如果有多个的话,使用分号分隔开namesrvAddr=公网IP:30001
# Broker端口号,默认10911listenPort=30002
# store的存储路径storePathRootDir=/home/rocketmq/data/store
# commitLog的存储路径storePathCommitLog=/home/rocketmq/data/store/commitlog
# 消费队列的存储路径storePathConsumeQueue=/home/rocketmq/data/store/consumequeue
# 消息索引的存储路径storePathIndex=/home/rocketmq/data/store/index
# checkpoint文件的存储路径storeCheckpoint=/home/rocketmq/data/store/checkpoint
# abort文件的存储路径abortFile=/home/rocketmq/data/store/abort
# 集群名称brokerClusterName = DefaultCluster
# Broker名称brokerName = broker-a
# BrokerID,0表示Master,大于0表示SlavebrokerId = 0
# 该时间清理过期数据deleteWhen = 04
# 文件超过N小时没改变算是过期(小时)fileReservedTime = 72
# Broker角色:ASYNC_MASTER异步复制Master、SYNC_MASTER同步双写Master、SLAVEbrokerRole = ASYNC_MASTER
# 延迟消息时间级别(delayLevel 1对应1s, 2对应5s,依此类推)# 默认18个级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmessageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
# 发送队列等待时间waitTimeMillsInSendQueue=999999
# 刷盘方式:SYNC_FLUSH同步刷盘、ASYNC_FLUSH异步刷盘flushDiskType = SYNC_FLUSH
# 存储消息时使用可重入锁useReentrantLockWhenPutMessage=true
# 消息最大大小,默认4MB,这里设置为10MBmaxMessageSize=4194304
# 消息属性最大大小,默认32KB,这里设置为64KBmaxPropertySize=65536
# 启用访问控制列表(ACL)权限验证aclEnable=true
# ACL配置文件路径aclConfig=/home/rocketmq/rocketmq-all-5.1.2-bin-release/conf/plain_acl.ymlplain_acl.yml配置
RocketMQ访问控制列表(ACL)权限配置文件:
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.
# 全局白名单IP地址,这些IP可以无需认证访问RocketMQglobalWhiteRemoteAddresses: - 127.0.0.1 # 本地回环地址 - 公网IP # RocketMQ服务器公网IP - 192.168.1.100 # RocketMQ服务器内网IP
# 用户账户配置accounts: - accessKey: 00phgp34 # 访问密钥,与客户端配置保持一致 secretKey: errtteeet3! # 秘密密钥,与客户端配置保持一致 whiteRemoteAddress: 192.168.1.* # 该用户允许访问的IP范围 admin: true # 是否为管理员账户部署脚本
启动脚本 start.sh
#!/bin/bashlogsDir=/home/rocketmq/logsinstallDir=/home/rocketmq/rocketmq-all-5.1.2-bin-releasemkdir -p $logsDir
# 设置 JVM 启动参数,仅输出 ERROR 日志export JAVA_OPT="$JAVA_OPT -Drocketmq.log.level=ERROR"
# 启动 NameServernohup sh $installDir/bin/mqnamesrv -c $installDir/conf/namesrv.properties > $logsDir/mqnamesrv.out 2>&1 &
# 启动 Broker,指定 NameServer 地址nohup sh $installDir/bin/mqbroker -n 公网IP:9876 -c $installDir/conf/broker.conf > $logsDir/broker.out 2>&1 &
# 启动管理控制台(可选)#nohup java -jar /home/rocketmq/console/rocketmq-dashboard-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=公网IP:9876 > $logsDir/dashboard.out 2>&1 &停止脚本 stop.sh
#!/bin/bashcd /home/rocketmq/rocketmq-all-5.1.2-bin-release/bin
# 停止 Brokersh mqshutdown broker
# 停止 NameServersh mqshutdown namesrv
# 停止管理控制台jarname='rocketmq-dashboard-1.0.0.jar'pid=`ps aux | grep $jarname | grep -v grep | awk '{print $2}'`echo $pidkill -9 $pid脚本说明
启动脚本功能:
- 日志目录创建:自动创建日志存储目录
- JVM参数设置:配置日志级别为ERROR,减少日志输出
- NameServer启动:使用默认9876端口启动注册中心
- Broker启动:指定NameServer地址启动消息服务
- 控制台启动:可选的Web管理界面(注释状态)
停止脚本功能:
- 服务停止:按顺序停止Broker和NameServer
- 进程清理:强制杀死管理控制台进程
- 安全关闭:确保所有RocketMQ相关进程完全退出
使用方法:
# 赋予执行权限chmod +x start.sh stop.sh
# 启动RocketMQ./start.sh
# 停止RocketMQ./stop.sh注意事项:
- 确保脚本中的路径与实际安装路径一致
- NameServer端口9876与broker.conf中的配置保持一致
- 日志文件会输出到指定的logs目录便于排查问题
消息生产者封装
接口定义
IMQProducer 接口
import org.apache.rocketmq.client.producer.SendResult;
/** * 消息队列生产者 */public interface IMQProducer<T> {
/** * 异步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG */ void asyncSend(String topic, T msg, int delayLevel);
/** * 同步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG */ SendResult syncSend(String topic, T msg, int delayLevel);
/** * 同步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG */ SendResult syncSend(String topic, T msg);
/** * 定时投递消息 如果要发送指定TAG,topic参数格式TOPIC:TAG */ SendResult syncSendDeliverTimeMills(String topic, T msg, long pushTime);
/** * 有序发送消息 */ SendResult syncSendOrderly(String topic, T msg, String hashKey);}生产者实现
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;
import jakarta.annotation.Resource;
@Service@Slf4jpublic class RocketMQProducer<T> implements IMQProducer<T> {
private final String LOG_SIGN = "生产者消息";
@Resource private RocketMQTemplate rocketMQTemplate;
@Override public void asyncSend(String topic, T msg, int delayLevel) { try { Message<T> message = MessageBuilder.withPayload(msg).build(); rocketMQTemplate.asyncSend(topic, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("【{}】【{}】发送成功, 状态码【{}】", LOG_SIGN, message, sendResult.getSendStatus()); }
@Override public void onException(Throwable throwable) { log.warn("【{}】【{}】发送异常", LOG_SIGN, message, throwable); } }, 10000, delayLevel); } catch (Exception e) { log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e); } }
@Override public SendResult syncSend(String topic, T msg, int delayLevel) { try { Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 10000, delayLevel); log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】", LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e); } return null; }
@Override public SendResult syncSend(String topic, T msg) { try { Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 10000); log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】", LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e); } return null; }
@Override public SendResult syncSendDeliverTimeMills(String topic, T msg, long pushTime) { try { Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills(topic, message, pushTime); log.info("【{}】【{}】发送定时消息结果 状态码【{}】 MsgId【{}】", LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("【{}】【{}】发送定时消息异常", LOG_SIGN, msg, e); } return null; }
@Override public SendResult syncSendOrderly(String topic, T msg, String hashKey) { try { Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, message, hashKey); log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】", LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e); } return null; }}延迟消息级别说明
RocketMQ默认支持18个延迟级别:
| Level | 延迟时间 | Level | 延迟时间 | Level | 延迟时间 |
|---|---|---|---|---|---|
| 1 | 1s | 7 | 3m | 13 | 9m |
| 2 | 5s | 8 | 4m | 14 | 10m |
| 3 | 10s | 9 | 5m | 15 | 20m |
| 4 | 30s | 10 | 6m | 16 | 30m |
| 5 | 1m | 11 | 7m | 17 | 1h |
| 6 | 2m | 12 | 8m | 18 | 2h |
使用示例:
// 发送延迟5秒的消息(delayLevel = 2)mqProducer.syncSend("TOPIC", "message", 2);消息消费者封装
1. 并发消费者(高吞吐量场景)
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.context.annotation.Profile;import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/** * 并发消费者 - 高吞吐量场景 * 适用于: 日志处理、数据统计、通知发送等对顺序不敏感的场景 */@Profile({"prod"})@Slf4j@Service@RocketMQMessageListener( consumeThreadMax = 10, consumeMode = ConsumeMode.CONCURRENTLY, topic = "CONCURRENT_TOPIC", consumerGroup = "CONCURRENT_CONSUMER_GROUP")public class ConcurrentMQConsumer implements RocketMQListener<MessageExt> {
@Override public void onMessage(MessageExt messageExt) { try { // 获取消息基本信息 String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); String msgId = messageExt.getMsgId(); int reconsumeTimes = messageExt.getReconsumeTimes();
// 获取消息体 String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("并发消费消息 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}, ReconsumeTimes: {}", topic, tags, keys, msgId, reconsumeTimes);
// 业务处理逻辑 processMessage(messageBody, messageExt);
log.info("并发消费成功 - MsgId: {}", msgId);
} catch (Exception e) { log.error("并发消费失败 - MsgId: {}, 错误信息: {}", messageExt.getMsgId(), e.getMessage(), e);
// 抛出异常会触发重试机制 throw new RuntimeException("消息处理失败", e); } }
/** * 业务消息处理 */ private void processMessage(String messageBody, MessageExt messageExt) { // 根据消息内容进行业务处理 // 例如:数据入库、调用外部API、发送通知等 log.info("处理消息内容: {}", messageBody);
// 具体业务逻辑 // ... }}2. 顺序消费者(严格顺序场景)
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.context.annotation.Profile;import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/** * 顺序消费者 - 严格顺序场景 * 适用于: 订单状态变更、账户余额变动、状态机流转等需要严格顺序的场景 */@Profile({"prod"})@Slf4j@Service@RocketMQMessageListener( consumeThreadMax = 10, consumeMode = ConsumeMode.ORDERLY, topic = "ORDERLY_TOPIC", consumerGroup = "ORDERLY_CONSUMER_GROUP")public class OrderlyMQConsumer implements RocketMQListener<MessageExt> {
@Override public void onMessage(MessageExt messageExt) { try { // 获取消息基本信息 String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); String msgId = messageExt.getMsgId(); int queueId = messageExt.getQueueId(); long queueOffset = messageExt.getQueueOffset();
// 获取消息体 String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("顺序消费消息 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}, QueueId: {}, QueueOffset: {}", topic, tags, keys, msgId, queueId, queueOffset);
// 业务处理逻辑 processOrderlyMessage(messageBody, messageExt);
log.info("顺序消费成功 - MsgId: {}, QueueId: {}, QueueOffset: {}", msgId, queueId, queueOffset);
} catch (Exception e) { log.error("顺序消费失败 - MsgId: {}, QueueId: {}, 错误信息: {}", messageExt.getMsgId(), messageExt.getQueueId(), e.getMessage(), e);
// 顺序消费失败时,当前队列会被暂停消费 throw new RuntimeException("顺序消息处理失败", e); } }
/** * 顺序业务消息处理 */ private void processOrderlyMessage(String messageBody, MessageExt messageExt) { // 顺序消费的业务处理 // 例如:订单状态流转、账户变动等需要严格顺序的操作
String orderKey = messageExt.getKeys(); log.info("处理订单消息 - OrderKey: {}, Content: {}", orderKey, messageBody);
// 模拟状态机处理 processOrderStatus(orderKey, messageBody); }
/** * 订单状态处理示例 */ private void processOrderStatus(String orderKey, String messageBody) { // 解析消息内容,处理订单状态变更 // 保证同一订单的消息按顺序处理 log.info("订单状态更新完成 - OrderKey: {}", orderKey);
// 具体业务逻辑 // ... }}3. 带TAG过滤的消费者
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.annotation.SelectorType;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.context.annotation.Profile;import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/** * 带TAG过滤的消费者 * 只消费指定TAG的消息 */@Profile({"prod"})@Slf4j@Service@RocketMQMessageListener( consumeThreadMax = 10, consumeMode = ConsumeMode.CONCURRENTLY, topic = "USER_EVENT_TOPIC", consumerGroup = "USER_EVENT_CONSUMER_GROUP", selectorType = SelectorType.TAG, selectorExpression = "LOGIN || LOGOUT || REGISTER")public class UserEventMQConsumer implements RocketMQListener<MessageExt> {
@Override public void onMessage(MessageExt messageExt) { try { String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String msgId = messageExt.getMsgId(); String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("用户事件消费 - Topic: {}, Tags: {}, MsgId: {}", topic, tags, msgId);
// 根据不同TAG处理不同业务 switch (tags) { case "LOGIN": handleUserLogin(messageBody); break; case "LOGOUT": handleUserLogout(messageBody); break; case "REGISTER": handleUserRegister(messageBody); break; default: log.warn("未知的用户事件类型: {}", tags); }
log.info("用户事件处理成功 - MsgId: {}, Tags: {}", msgId, tags);
} catch (Exception e) { log.error("用户事件处理失败 - MsgId: {}, 错误信息: {}", messageExt.getMsgId(), e.getMessage(), e); throw new RuntimeException("用户事件处理失败", e); } }
private void handleUserLogin(String messageBody) { log.info("处理用户登录事件: {}", messageBody); // 用户登录相关业务处理 }
private void handleUserLogout(String messageBody) { log.info("处理用户登出事件: {}", messageBody); // 用户登出相关业务处理 }
private void handleUserRegister(String messageBody) { log.info("处理用户注册事件: {}", messageBody); // 用户注册相关业务处理 }}@RocketMQMessageListener 注解参数详解
核心参数说明
| 参数 | 类型 | 必填 | 说明 | 示例值 |
|---|---|---|---|---|
consumerGroup | String | ✓ | 消费者组名,同组消费者共同消费Topic | ”ORDER_CONSUMER_GROUP” |
topic | String | ✓ | 要消费的Topic名称 | ”ORDER_TOPIC” |
consumeMode | ConsumeMode | ✗ | 消费模式:CONCURRENTLY(并发)、ORDERLY(顺序) | ConsumeMode.CONCURRENTLY |
consumeThreadMax | int | ✗ | 最大消费线程数,默认64 | 10 |
过滤参数
| 参数 | 类型 | 默认值 | 说明 | 示例值 |
|---|---|---|---|---|
selectorType | SelectorType | TAG | 过滤类型:TAG、SQL92 | SelectorType.TAG |
selectorExpression | String | ”*“ | 过滤表达式 | ”TagA|| TagB” |
重试和超时参数
| 参数 | 类型 | 默认值 | 说明 | 最佳实践 |
|---|---|---|---|---|
maxReconsumeTimes | int | -1(16次) | 最大重试次数 | 3-5次 |
consumeTimeout | long | 15 | 消费超时时间(分钟) | 10-20分钟 |
性能优化参数
| 参数 | 类型 | 默认值 | 说明 | 建议值 |
|---|---|---|---|---|
consumeMessageBatchMaxSize | int | 1 | 批量消费大小 | 1-32 |
pullInterval | int | 0 | 拉取间隔(毫秒) | 0(实时) |
pullBatchSize | int | 32 | 拉取批次大小 | 16-64 |
使用示例
生产者使用示例
@Servicepublic class MessageService {
@Autowired private IMQProducer<String> mqProducer;
public void sendMessage() { // 同步发送普通消息 SendResult result = mqProducer.syncSend("ORDER_TOPIC", "订单创建消息");
// 异步发送延迟消息 mqProducer.asyncSend("ORDER_TOPIC", "延迟处理消息", 3);
// 发送定时消息(1小时后执行) long deliverTime = System.currentTimeMillis() + 3600 * 1000; mqProducer.syncSendDeliverTimeMills("ORDER_TOPIC", "定时处理消息", deliverTime);
// 发送有序消息 mqProducer.syncSendOrderly("ORDER_TOPIC", "订单状态变更", "orderKey123");
// 发送带TAG的消息 mqProducer.syncSend("ORDER_TOPIC:CREATE", "订单创建消息"); }}消费者使用示例
// 并发消费示例@Profile({"prod"})@Slf4j@Service@RocketMQMessageListener( consumeThreadMax = 10, consumeMode = ConsumeMode.CONCURRENTLY, topic = "ORDER_TOPIC", consumerGroup = "ORDER_CONSUMER_GROUP")public class OrderMQConsumer implements RocketMQListener<MessageExt> {
@Override public void onMessage(MessageExt messageExt) { String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8); log.info("处理订单消息: {}", messageBody); // 业务处理逻辑 }}使用场景对比
并发消费 vs 顺序消费
| 特性 | 并发消费 | 顺序消费 |
|---|---|---|
| 吞吐量 | 高 | 相对较低 |
| 顺序性 | 不保证 | 严格保证 |
| 适用场景 | 日志、通知、统计 | 订单、账户、状态机 |
| 线程模型 | 多线程并发 | 单线程顺序 |
| 重试影响 | 不影响其他消息 | 阻塞后续消息 |
| 参数设置 | consumeThreadMax = 10 | consumeThreadMax = 10 |
最佳实践建议
1. 生产者最佳实践
- 异常处理: 完善的异常处理和日志记录
- 重试机制: 合理设置重试次数,避免无效重试
- 消息大小: 单条消息不超过4MB
- 批量发送: 高并发场景考虑批量发送
2. 消费者最佳实践
- 幂等处理: 确保消息处理的幂等性
- 异常处理: 根据业务需要决定是否重试
- 线程数配置: 根据业务特点合理设置线程数
- 监控告警: 监控消费延迟和积压情况
3. Topic和Tag设计
- Topic设计: 按业务模块划分Topic
- TAG使用: 用于消息分类和过滤
- 命名规范: 使用清晰的命名规范
总结
本文档提供了完整的RocketMQ消息队列解决方案,包含生产者和消费者的封装实现。通过合理的参数配置和代码设计,可以满足不同业务场景的需求:
- 生产者: 支持同步、异步、延迟、定时、有序等多种发送模式
- 消费者: 支持并发和顺序两种消费模式,参数配置灵活
- 配置: 详细的yml配置说明,包含权限认证等高级特性
选择使用建议:
- 高吞吐量场景: 使用并发消费,设置
consumeThreadMax = 10 - 严格顺序场景: 使用顺序消费,确保消息按序处理
- 业务隔离: 不同业务使用不同的Topic和ConsumerGroup