|
文章目录1.调整MQ的配置1.进入bin目录2.关闭broker和namesrv3.查看进程确认关闭4.编辑配置文件broker.conf,配置brokerIP15.开放端口109116.重新启动1.进入bin目录2.启动mqnamesrv和mqbroker1.启动NameServer并将输出重定向到mqnamesrv.log2.**启动Broker并将输出重定向到****mqbroker.log**3.**实时监控NameServer的日志文件**4.**实时监控Broker的日志文件**5.查看进程2.项目集成MQ1.domain引入mq依赖2.sun-club-application-mq引入domain依赖,用于消费mq3.sun-club-starter引入mq层4.application.yml配置mq5.SubjectController.java1.依赖注入RocketMQTemplate2.编写controller,作为消息生产者6.TestConsumer.java测试消费5.测试3.点赞业务优化为MQ处理1.SubjectLikedMessage.java点赞消息实体2.sun-club-domain同步点赞数据1.SubjectLikedDomainService.java2.SubjectLikedDomainServiceImpl.java3.add方法逻辑修改4.测试1.调整MQ的配置1.进入bin目录cd/usr/local/soft/rocketmq-all-4.8.0-bin-release/bin12.关闭broker和namesrvshmqshutdownbroker&shmqshutdownnamesrv13.查看进程确认关闭ps-ef|grepNamesrvStartup&ps-ef|grepBrokerStartup14.编辑配置文件broker.conf,配置brokerIP1vim/usr/local/soft/rocketmq-all-4.8.0-bin-release/conf/broker.conf1#NameServer地址(开端口)namesrvAddr=#brokerIP1指定了Broker对外提供服务的IP地址brokerIP1=#listenPort指定了Broker监听客户端连接的端口(开端口)listenPort=10911#当这个选项设置为true时,如果客户端尝试向一个不存在的主题发送消息,Broker会自动创建这个主题autoCreateTopicEnable=true12345678910115.开放端口10911systemctlstartfirewalld&firewall-cmd--permanent--add-port=10911/tcp&firewall-cmd--reload&firewall-cmd--query-port=10911/tcp16.重新启动1.进入bin目录cd/usr/local/soft/rocketmq-all-4.8.0-bin-release/bin12.启动mqnamesrv和mqbroker1.启动NameServer并将输出重定向到mqnamesrv.lognohupshmqnamesrv>mqnamesrv.log2>&1&12.启动Broker并将输出重定向到mqbroker.lognohupshmqbroker-c../conf/broker.conf>mqbroker.log2>&1&13.实时监控NameServer的日志文件tail-fmqnamesrv.log&14.实时监控Broker的日志文件tail-fmqbroker.log&15.查看进程ps-ef|grepNamesrvStartup&ps-ef|grepBrokerStartup12.项目集成MQ1.domain引入mq依赖org.apache.rocketmq2.1.11234562.sun-club-application-mq引入domain依赖,用于消费mqcom.sun.club1.0-SNAPSHOT1234563.sun-club-starter引入mq层com.sun.club1.0-SNAPSHOT1234564.application.yml配置mq#mq配置rocketmq:name-server:#作用是服务注册和发现,会自动发现brokerproducer:group:test-group1234565.SubjectController.java1.依赖注入RocketMQTemplate@ResourceprivateRocketMQTemplaterocketMQTemplate;122.编写controller,作为消息生产者/***测试mq发送*@return*/@GetMapping("/pushMessage")publicResultpushMessage(@Param("id")intid){rocketMQTemplate.convertAndSend("first-topic","hello"+id);returnResult.ok();}1234567896.TestConsumer.java测试消费packagecom.sunxiansheng.subject.application.mq;importcom.sun.media.jfxmedia.logging.Logger;importgroovy.util.logging.Slf4j;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Component;/***Description:*@Authorsun*@Create2024/7/1213:24*@Version1.0*/@Component//topic:主题,就是生产者那里指定的主题//consumerGroup:消费组,在application.yml文件中配置的//RocketMQListener:这里的泛型就是消息的类型@RocketMQMessageListener(topic="first-topic",consumerGroup="test-group")@Slf4jpublicclassTestConsumerimplementsRocketMQListener{privatestaticfinalorg.slf4j.Loggerlog=LoggerFactory.getLogger(TestConsumer.class);/***对消息进行消费*@params*/@OverridepublicvoidonMessage(Strings){log.info("接受到消息了:{}",s);}}12345678910111213141516171819202122232425262728293031323334355.测试3.点赞业务优化为MQ处理1.SubjectLikedMessage.java点赞消息实体packagecom.sunxiansheng.subject.domain.entity;importlombok.Data;importlombok.experimental.Accessors;importjava.io.Serializable;/***题目点赞消息*/@Data@Accessors(chain=true)//支持链式调用publicclassSubjectLikedMessageimplementsSerializable{/***题目id*/privateLongsubjectId;/***点赞人id*/privateStringlikeUserId;/***点赞状态1点赞0不点赞*/privateIntegerstatus;}1234567891011121314151617181920212223242526272829302.sun-club-domain同步点赞数据1.SubjectLikedDomainService.java/***MQ同步点赞数据*@paramsubjectLikedBO*/voidsyncLikedByMsg(SubjectLikedBOsubjectLikedBO);123452.SubjectLikedDomainServiceImpl.java@OverridepublicvoidsyncLikedByMsg(SubjectLikedBOsubjectLikedBO){SubjectLikedsubjectLiked=newSubjectLiked();subjectLiked.setSubjectId(subjectLikedBO.getSubjectId());subjectLiked.setLikeUserId(subjectLikedBO.getLikeUserId());subjectLiked.setStatus(subjectLikedBO.getStatus());subjectLiked.setIsDeleted(IsDeleteFlagEnum.UN_DELETED.getCode());subjectLikedService.insert(subjectLiked);}1234567893.add方法逻辑修改4.测试
|
|