Java源码示例:com.aliyun.openservices.ons.api.transaction.TransactionProducer

示例1
public static void statsSendMessage(Long startDeliverTime, Map<String, Object> consumerContainer, RocketMessage rocketMessage, Object message, byte[] bytes, ApplicationContext applicationContex) throws RocketException {
	if (message instanceof CommonMessage) {
		CommonMessage commonMessage = (CommonMessage) message;
		Producer producer = ProducerConsumerFactory.getProducer(consumerContainer, rocketMessage, commonMessage);
		SendMessageFactory.sendMessage(startDeliverTime,producer, commonMessage, bytes, applicationContex);
		return;
	}
	if (message instanceof OrderMessage) {
		OrderMessage orderMessage = (OrderMessage) message;
		OrderProducer orderProducer = ProducerConsumerFactory.getProducer(consumerContainer, rocketMessage, orderMessage);
		SendMessageFactory.sendMessage(orderProducer, orderMessage, bytes);
		return;
	}
	if (message instanceof TransactionMessage) {
		TransactionMessage transactionMessage = (TransactionMessage) message;
		TransactionProducer transactionProducer = ProducerConsumerFactory.getProducer(consumerContainer, rocketMessage, transactionMessage);
		SendMessageFactory.sendMessage(transactionProducer, transactionMessage, bytes, applicationContex);
	}
}
 
示例2
/**
 * 获取消息的 Producer
 *
 * @return Producer
 */
@Override
public void afterPropertiesSet() {
    Properties properties = new Properties();

    // 您在控制台创建的 Group ID 注意:事务消息的 Group ID 不能与其他类型消息的 Group ID 共用
    properties.put(PropertyKeyConst.GROUP_ID, mqGroupId);
    // 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey, mqAccessKey);
    // 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, mqSecretKey);
    // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
    properties.put(PropertyKeyConst.NAMESRV_ADDR, mqNameSrvAddr);
    // 初始化事务消息Producer时,需要注册一个本地事务状态的Checker
    TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
    transactionProducer.start();
    this.transactionProducer = transactionProducer;
    Producer producer = ONSFactory.createProducer(properties);
    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    producer.start();
    this.producer = producer;
    log.info("init done");
}
 
示例3
public static void sendMessage(TransactionProducer transactionProducer, TransactionMessage transactionMessage, byte[] bytes, ApplicationContext applicationContext) {
    Message message = MessageFactory.createMessage (transactionMessage, bytes);
    transactionProducer.send (message, ApplicationContextUtils.getLocalTransactionExecuter (applicationContext, transactionMessage.executer ()), null);
}
 
示例4
public static TransactionProducer createTransactionProducer(RocketMessage rocketMessage, RocketProperties rocketProperties, LocalTransactionChecker localTransactionChecker) {
	Properties properties = ProducerPropertiesFactory.createProducerProperties(rocketMessage, rocketProperties);
	properties.put(PropertyKeyConst.CheckImmunityTimeInSeconds, rocketProperties.getCheckImmunityTimeInSeconds());
	return ONSFactory.createTransactionProducer(properties, localTransactionChecker);
}
 
示例5
public static TransactionProducer getProducer(Map<String, Object> consumerContainer, RocketMessage rocketMessage, TransactionMessage transactionMessage){
	String producerConsumerKey = ProducerConsumerFactory.getProducerConsumerKey(rocketMessage, transactionMessage);
	return (TransactionProducer) consumerContainer.get(producerConsumerKey);
}