消息中间件activemq的helloworld程序
pom依赖
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
生产者
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Test {
public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
public static final String QUEUE_NAME = "queue_cluster";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("msg--" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
// session.commit();
System.out.println(" **** 消息发送到MQ完成 ****");
}
}
消费者
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
public static final String QUEUE_NAME = "queue_cluster";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
// 当你第四次消费消息的时候就会将消息加入死信队列
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
/**
* 第一种方法
while(true) {
TextMessage t=(TextMessage) messageConsumer.receive();
if(t!=null) {
System.out.println(t.getText());
}else{
break;
}
}
**/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("****cluster msg:" + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
System.out.println("///");
}
}
猜你喜欢
weblog
923
pom依赖!--activemq所需要的jar包--dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all
框架
1504
activemq下载地址:http://activemq.apache.org/activemq-5140-release.html把下载的tar.gz文件放在linux系统的/opt/文件夹下,解
blog
线程之间的通讯
java基础
3301
1.什么是线程通信线程之间通信的两个基本问题是互斥和同步。线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒
official
1249
消息发送确认在使用mq发送消息的时候,由于一些不确定因素,可能会导致消息发送失败,比如网络的问题,服务器问题,或mq本身的问题都可能会导致消息发送失败。那么当消息发送成功或失败后程序如何感知呢?那就
其他
4001
1.什么是钩子程序hook(钩子)是windows提供的一种消息处理机制平台,是指在程序正常运行中接受信息之前预先启动的函数,用来检查和修改传给该程序的信息,(钩子)实际上是一个处理消息的程序段,通
official
1231
上一篇《(mq)rabbitmq安装延时队列插件实现延时消息1》文章中介绍了rabbitmq安装延时队列插件。本编将继续结合代码来实现延时队列(基于springboot项目)。下方所有源代码均已上传
official
1290
:AcknowledgeMode.NONE:不确认AcknowledgeMode.AUTO:自动确认AcknowledgeMode.MANUAL:手动确认rabbitmq默认使用的自动确认消息。如果业务代码中出现了异常
official
1245
续介绍在rabbitmq容器中安装rabbitmq_delayed_message_exchange插件并使用的方法。安装插件下载与rabbitmq版本相符的插件,下载地址:https://github
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。