消息事务

2017/03/28 midware

消息事务

参考: https://github.com/zjpjohn/ReliableMeageSystem

消息记录和业务DB在一个库中,业务DB操作完成后(事务完成后),开始发送消息。需要用到Spring中的事务同步TransactionSynchronizationAdapter

MessageProducer

    public void sendMessage(Map<String, String> data) {
        Preconditions.checkArgument(data != null && data.size() != 0, "message must not be empty...");
        transactionSynchronize();
        QMessage message = convertMessage(data);
        int result = qMessageService.addQMessage(message);
        if (result != 0) {
            MessageHolder.set(message.getMessageId());
        }
    }
    /**
     * 添加事务同步
     */
    private void transactionSynchronize() {
        MessageTransactionSynchronizationAdapter synchronizationAdapter = new MessageTransactionSynchronizationAdapter();
        synchronizationAdapter.setqMessageService(qMessageService);
        synchronizationAdapter.setTransactionMessageProducer(transactionMessageProducer);
        TransactionSynchronizationManager.registerSynchronization(synchronizationAdapter);
    }

MessageTransactionSynchronizationAdapter

    @Override
    public void afterCompletion(int status) {
        try {
            if (STATUS_COMMITTED == status) {
                List<String> messageIds = MessageHolder.get();
                sendMessageToBroker(messageIds);
            } else if (STATUS_ROLLED_BACK == status) {
                log.warn("事务提交失败,数据库回滚后,清空缓存中的消息:{}", MessageHolder.get());
            }
        } finally {
            MessageHolder.clear();
        }
    }

TransactionMessageProducer

   public void sendMessage(QMessage qMessage) {
        Session session = null;
        try {
            session = connection.createSession(qMessage.getTransaction() != 0, ActiveMQSession.AUTO_ACKNOWLEDGE);
            //????????
            Queue queue = session.createQueue(qMessage.getDestination());
            //?????????????
            MessageProducer producer = session.createProducer(queue);
            producer.setDeliveryMode(qMessage.getPersistent() != 0 ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
            //???????
            MapMessage message = session.createMapMessage();
            message.setString("messageId", qMessage.getMessageId());
            message.setString("data", qMessage.getMessageContent());
            message.setString("timeStamp", String.valueOf(qMessage.getTimeStamp()));
            //????????n2????????????
            if (qMessage.getN2() != 0) {
                if (StringUtils.isNotBlank(qMessage.getBusinessMark())) {
                    message.setString("businessMark", qMessage.getBusinessMark());
                } else {
                    //???n2??????????businessMark?????????
                    throw new RuntimeException("n2 level message require businessMark not empty...");
                }
            }
            //???????
            producer.send(message);
            //????????????
            if (qMessage.getTransaction() != 0) {
                session.commit();
            }
            //?????broker??????
            messageCallback.onSuccess(qMessage.getMessageId());
        } catch (JMSException e) {
            log.error("send message to broker error:{}", e);
            messageCallback.onFail(e,qMessage.getMessageId());
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    log.error("close session error:{}", e);
                }
            }
        }
    }

Search

    Table of Contents