RocketMq-半消息(十)-创新互联

概念: 
半消息: 在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知rocketmq发送

操作步骤 :
1.(生产者)发送-【半消息】
2.(生产者)本地监听-【半消息】处理结果
3.(消费者)处理-【半消息】

1.(生产者)发送-【半消息】

创新互联专注为客户提供全方位的互联网综合服务,包含不限于成都网站设计、成都网站建设、麻城网络推广、微信小程序、麻城网络营销、麻城企业策划、麻城品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们大的嘉奖;创新互联为所有大学生创业者提供麻城建站搭建服务,24小时服务热线:18980820575,官方网址:www.cdcxhl.com
// 消息体
@Data
@Builder
@ToString
public class UserMoneyParams {
    int userId;
    String act;
    double money;
    String info;
    String infoParams;
}

// 发送消息
// 发送-队列半消息: rocketMQ
@RequestMapping("rocketMQHalf")
public ApiResult rocketMQHalf() {
        int orderId = 2;
        double money = 10;

        // 用户余额变更-参数体
        UserMoneyParams userMoneyParams = UserMoneyParams.builder()
                        .act("pay-order")
                        .userId(orderId)
                        .money(money)
                        .build();

        // 用户数据变更-参数
        UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1);

        log.info("发送前参数: "+userMoneyParams.toString());

        rocketMQTemplate.sendMessageInTransaction(
                        // 半消息-分组
                        "tsca-group-half",
                        // 半消息-topic
                        "member-change-money-half-topic",
                        // 半消息-数据体
                        MessageBuilder
                                        .withPayload(userMoneyParams)
                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
                                        .build(),
                        userOrder
        );

        return ApiResult.success("发送队列-半消息");
}

2.(生产者)本地监听-【半消息】处理结果

@RocketMQTransactionListener(txProducerGroup = "tsca-group-half")
@RequiredArgsConstructor
@Slf4j
public class UserMoneyHalfListener implements RocketMQLocalTransactionListener {

    @Autowired
    RedisUtil redisUtil;

    @Autowired
    UserOrderService userOrderService;

    // 生产者-消息处理完毕,继续执行本地方法(含事务)
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            Object userMoneyParams=message.getPayload();
            log.info("消息-args:"+arg);
            // 消息主体加密无法获取
            log.info("消息-主体:"+ JSON.toJSONString(userMoneyParams));
            log.info("消息-主体-头部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID));
            log.info("半消息-本地-处理完成");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.warn("半消息-本地-发生异常,回滚: "+e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 生产者-消息处理超时
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 查询消息是否已经处理
        String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id"));
        Object messageData = this.redisUtil.getValue(messageID, String.class);
        if (messageData != null && messageData.equals("ok")) {
            // 超时且消息已经处理完毕
            log.info("半消息-本地消息超时-且已经处理完毕");
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            log.info("半消息-本地消息超时-且未处理完毕");
            // 超时且消息未处理完毕
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

3.(消费者)处理-【半消息】

@Service
@RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic")
@Slf4j
public class UserMoneyHalfListener implements RocketMQListener {
//    @Autowired
//    UserMoneyService memberOrderService;

    @Override
    public void onMessage(UserMoneyParams memberMoneyMessage) {
        log.info("收到-用户余额变动-半消息");
        try {
        } catch (Exception e) {
            log.info("更改余额错误: "+e.getMessage());
            e.printStackTrace();
        }
        log.info(JSON.toJSONString(memberMoneyMessage));
    }
}

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


名称栏目:RocketMq-半消息(十)-创新互联
浏览路径:http://ybzwz.com/article/degddp.html