消息中间件activemq的helloworld程序 ( topic )

weblog 925 0 0
pom依赖
     <!--  activemq  所需要的jar 包-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.9</version>
    </dependency>

    <!--  activemq 和 spring 整合的基础包 -->
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.16</version>
    </dependency>
生产者
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicP {
	public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
	public static final String TOPIC_NAME = "topic";

	public static void main(String[] args) throws Exception {

		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic(TOPIC_NAME);
		MessageProducer messageProducer = session.createProducer(topic);
		for (int i = 1; i < 4; i++) {
			TextMessage textMessage = session.createTextMessage("msg--" + i);
			messageProducer.send(textMessage);
		}
		messageProducer.close();
		session.close();
		connection.close();
		// session.commit();
		System.out.println("  **** 消息发送到MQ完成 ****");
	}
}
消费者
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;

public class TopicC  {
	public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
    public static final String TOPIC_NAME = "topic";

	public static void main(String[] args) throws Exception {
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
		RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
		redeliveryPolicy.setMaximumRedeliveries(3);
		// 当你第四次消费消息的时候就会将消息加入死信队列
		activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

		Connection connection = activeMQConnectionFactory.createConnection();

		connection.start();

		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic(TOPIC_NAME);
		MessageConsumer messageConsumer = session.createConsumer(topic);
		/**
		 * 第一种方法
		while(true) {
			TextMessage t=(TextMessage) messageConsumer.receive();	
			if(t!=null) {
				System.out.println(t.getText());
			}else{
				break;
			}
		}
		**/
		messageConsumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				try {
					if (null != message && message instanceof TextMessage) {
						TextMessage textMessage = (TextMessage) message;
						System.out.println("****cluster msg:" + textMessage.getText());
					}
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}); 
		System.in.read();
		messageConsumer.close();
		session.close();
		connection.close();
		System.out.println("///");
	}
}

 


猜你喜欢
weblog 916 pom依赖!--activemq所需要jar包--dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all
框架 1504 activemq下载地址:http://activemq.apache.org/activemq-5140-release.html把下载tar.gz文放在linux系统/opt/文夹下,解
mqtt协议 1589 应用;(4)断开与服务器连接。mqtt服务器MQTT服务器以称为”代理”(Broker),可以是一个应用或一台设备。它是位于发布者和订阅者之,它可以:(1)接受来自客户网络连
rabbitmq,springboot 2276 客户端应用。 为了费事,应用需要声明一个队列,并绑定到一个系统指定交换器去。 插在默认虚拟主机上声明了一个topic类型exchange(交换器
前端 1451   例如有一个含有固定格式字符串,/test/zzh/00a7700/dev/invock,而且/zzh/和它下一个/是一个一个固定格式。那么想获取/zzh/和它下一个/之内容方法如下
java基础 3301 1.什么是线通信线通信两个基本问题是互斥和同步。线同步是指线所具有一种制约关系,一个线执行依赖另一个线,当它没有得到另一个线时应等待,直到到达时才被唤醒
official 1249 发送确认在使用mq发送时候,由于一些不确定因素,可能会导致发送失败,比如网络问题,服务器问题,或mq本身问题都可能会导致发送失败。那么当发送成功或失败后如何感知呢?那就
其他 4001 1.什么是钩子hook(钩子)是windows提供一种处理机制平台,是指在正常运行接受信之前预先启动函数,用来检查和修改传给该,(钩子)实际上是一个处理段,通
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。