消息中间件activemq的helloworld程序 ( topic )
pom依赖
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
生产者
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicP {
public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("msg--" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
// session.commit();
System.out.println(" **** 消息发送到MQ完成 ****");
}
}
消费者
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
public class TopicC {
public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
// 当你第四次消费消息的时候就会将消息加入死信队列
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
/**
* 第一种方法
while(true) {
TextMessage t=(TextMessage) messageConsumer.receive();
if(t!=null) {
System.out.println(t.getText());
}else{
break;
}
}
**/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("****cluster msg:" + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
System.out.println("///");
}
}
猜你喜欢
weblog
916
pom依赖!--activemq所需要的jar包--dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all
框架
1504
activemq下载地址:http://activemq.apache.org/activemq-5140-release.html把下载的tar.gz文件放在linux系统的/opt/文件夹下,解
blog
mqtt 协议的概念和理解
mqtt协议
1589
应用程序的消息;(4)断开与服务器连接。mqtt服务器MQTT服务器以称为”消息代理”(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:(1)接受来自客户的网络连
rabbitmq,springboot
2276
客户端应用程序。
为了消费事件,应用程序需要声明一个队列,并绑定到一个系统指定的交换器去消费消息。
插件在默认的虚拟主机上声明了一个topic类型的exchange(交换器
前端
1451
例如有一个含有固定格式字符串,/test/zzh/00a7700/dev/invock,而且/zzh/和它下一个/是一个一个固定格式。那么想获取/zzh/和它下一个/之间的内容方法如下
blog
线程之间的通讯
java基础
3301
1.什么是线程通信线程之间通信的两个基本问题是互斥和同步。线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒
official
1249
消息发送确认在使用mq发送消息的时候,由于一些不确定因素,可能会导致消息发送失败,比如网络的问题,服务器问题,或mq本身的问题都可能会导致消息发送失败。那么当消息发送成功或失败后程序如何感知呢?那就
其他
4001
1.什么是钩子程序hook(钩子)是windows提供的一种消息处理机制平台,是指在程序正常运行中接受信息之前预先启动的函数,用来检查和修改传给该程序的信息,(钩子)实际上是一个处理消息的程序段,通
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。