消息事务
参考: https://github.com/zjpjohn/ReliableMeageSystem
消息记录和业务DB在一个库中,业务DB操作完成后(事务完成后),开始发送消息。需要用到Spring中的事务同步TransactionSynchronizationAdapter
MessageProducer
public void sendMessage(Map<String, String> data) {
Preconditions.checkArgument(data != null && data.size() != 0, "message must not be empty...");
transactionSynchronize();
QMessage message = convertMessage(data);
int result = qMessageService.addQMessage(message);
if (result != 0) {
MessageHolder.set(message.getMessageId());
}
}
/**
* 添加事务同步
*/
private void transactionSynchronize() {
MessageTransactionSynchronizationAdapter synchronizationAdapter = new MessageTransactionSynchronizationAdapter();
synchronizationAdapter.setqMessageService(qMessageService);
synchronizationAdapter.setTransactionMessageProducer(transactionMessageProducer);
TransactionSynchronizationManager.registerSynchronization(synchronizationAdapter);
}
MessageTransactionSynchronizationAdapter
@Override
public void afterCompletion(int status) {
try {
if (STATUS_COMMITTED == status) {
List<String> messageIds = MessageHolder.get();
sendMessageToBroker(messageIds);
} else if (STATUS_ROLLED_BACK == status) {
log.warn("事务提交失败,数据库回滚后,清空缓存中的消息:{}", MessageHolder.get());
}
} finally {
MessageHolder.clear();
}
}
TransactionMessageProducer
public void sendMessage(QMessage qMessage) {
Session session = null;
try {
session = connection.createSession(qMessage.getTransaction() != 0, ActiveMQSession.AUTO_ACKNOWLEDGE);
//????????
Queue queue = session.createQueue(qMessage.getDestination());
//?????????????
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(qMessage.getPersistent() != 0 ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
//???????
MapMessage message = session.createMapMessage();
message.setString("messageId", qMessage.getMessageId());
message.setString("data", qMessage.getMessageContent());
message.setString("timeStamp", String.valueOf(qMessage.getTimeStamp()));
//????????n2????????????
if (qMessage.getN2() != 0) {
if (StringUtils.isNotBlank(qMessage.getBusinessMark())) {
message.setString("businessMark", qMessage.getBusinessMark());
} else {
//???n2??????????businessMark?????????
throw new RuntimeException("n2 level message require businessMark not empty...");
}
}
//???????
producer.send(message);
//????????????
if (qMessage.getTransaction() != 0) {
session.commit();
}
//?????broker??????
messageCallback.onSuccess(qMessage.getMessageId());
} catch (JMSException e) {
log.error("send message to broker error:{}", e);
messageCallback.onFail(e,qMessage.getMessageId());
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
log.error("close session error:{}", e);
}
}
}
}