Java源码示例:com.aliyun.openservices.ons.api.transaction.TransactionProducer
示例1
public static void statsSendMessage(Long startDeliverTime, Map<String, Object> consumerContainer, RocketMessage rocketMessage, Object message, byte[] bytes, ApplicationContext applicationContex) throws RocketException {
if (message instanceof CommonMessage) {
CommonMessage commonMessage = (CommonMessage) message;
Producer producer = ProducerConsumerFactory.getProducer(consumerContainer, rocketMessage, commonMessage);
SendMessageFactory.sendMessage(startDeliverTime,producer, commonMessage, bytes, applicationContex);
return;
}
if (message instanceof OrderMessage) {
OrderMessage orderMessage = (OrderMessage) message;
OrderProducer orderProducer = ProducerConsumerFactory.getProducer(consumerContainer, rocketMessage, orderMessage);
SendMessageFactory.sendMessage(orderProducer, orderMessage, bytes);
return;
}
if (message instanceof TransactionMessage) {
TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionProducer transactionProducer = ProducerConsumerFactory.getProducer(consumerContainer, rocketMessage, transactionMessage);
SendMessageFactory.sendMessage(transactionProducer, transactionMessage, bytes, applicationContex);
}
}
示例2
/**
* 获取消息的 Producer
*
* @return Producer
*/
@Override
public void afterPropertiesSet() {
Properties properties = new Properties();
// 您在控制台创建的 Group ID 注意:事务消息的 Group ID 不能与其他类型消息的 Group ID 共用
properties.put(PropertyKeyConst.GROUP_ID, mqGroupId);
// 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, mqAccessKey);
// 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, mqSecretKey);
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, mqNameSrvAddr);
// 初始化事务消息Producer时,需要注册一个本地事务状态的Checker
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
this.transactionProducer = transactionProducer;
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
this.producer = producer;
log.info("init done");
}
示例3
public static void sendMessage(TransactionProducer transactionProducer, TransactionMessage transactionMessage, byte[] bytes, ApplicationContext applicationContext) {
Message message = MessageFactory.createMessage (transactionMessage, bytes);
transactionProducer.send (message, ApplicationContextUtils.getLocalTransactionExecuter (applicationContext, transactionMessage.executer ()), null);
}
示例4
public static TransactionProducer createTransactionProducer(RocketMessage rocketMessage, RocketProperties rocketProperties, LocalTransactionChecker localTransactionChecker) {
Properties properties = ProducerPropertiesFactory.createProducerProperties(rocketMessage, rocketProperties);
properties.put(PropertyKeyConst.CheckImmunityTimeInSeconds, rocketProperties.getCheckImmunityTimeInSeconds());
return ONSFactory.createTransactionProducer(properties, localTransactionChecker);
}
示例5
public static TransactionProducer getProducer(Map<String, Object> consumerContainer, RocketMessage rocketMessage, TransactionMessage transactionMessage){
String producerConsumerKey = ProducerConsumerFactory.getProducerConsumerKey(rocketMessage, transactionMessage);
return (TransactionProducer) consumerContainer.get(producerConsumerKey);
}