消息中间件activemq的helloworld程序

weblog 920 0 0
pom依赖
    <!--  activemq  所需要的jar 包-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.9</version>
    </dependency>

    <!--  activemq 和 spring 整合的基础包 -->
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.16</version>
    </dependency>
生产者
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Test {
	public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
	public static final String QUEUE_NAME = "queue_cluster";

	public static void main(String[] args) throws Exception {

		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Queue queue = session.createQueue(QUEUE_NAME);
		MessageProducer messageProducer = session.createProducer(queue);
		for (int i = 1; i < 4; i++) {
			TextMessage textMessage = session.createTextMessage("msg--" + i);
			messageProducer.send(textMessage);
		}
		messageProducer.close();
		session.close();
		connection.close();
		// session.commit();
		System.out.println("  **** 消息发送到MQ完成 ****");
	}
}
消费者
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;

public class JmsConsumer  {
	public static final String ACTIVEMQ_URL = "tcp://192.168.166.138:61616";
    public static final String QUEUE_NAME = "queue_cluster";

	public static void main(String[] args) throws Exception {
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
		RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
		redeliveryPolicy.setMaximumRedeliveries(3);
		// 当你第四次消费消息的时候就会将消息加入死信队列
		activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

		Connection connection = activeMQConnectionFactory.createConnection();

		connection.start();

		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Queue queue = session.createQueue(QUEUE_NAME);
		MessageConsumer messageConsumer = session.createConsumer(queue);
		/**
		 * 第一种方法
		while(true) {
			TextMessage t=(TextMessage) messageConsumer.receive();	
			if(t!=null) {
				System.out.println(t.getText());
			}else{
				break;
			}
		}
		**/
		messageConsumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				try {
					if (null != message && message instanceof TextMessage) {
						TextMessage textMessage = (TextMessage) message;
						System.out.println("****cluster msg:" + textMessage.getText());
					}
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}); 
		System.in.read();
		messageConsumer.close();
		session.close();
		connection.close();
		System.out.println("///");
	}
}

 


猜你喜欢
weblog 923 pom依赖!--activemq所需要jar包--dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all
框架 1504 activemq下载地址:http://activemq.apache.org/activemq-5140-release.html把下载tar.gz文放在linux系统/opt/文夹下,解
java基础 3301 1.什么是线通信线通信两个基本问题是互斥和同步。线同步是指线所具有一种制约关系,一个线执行依赖另一个线,当它没有得到另一个线时应等待,直到到达时才被唤醒
official 1249 发送确认在使用mq发送时候,由于一些不确定因素,可能会导致发送失败,比如网络问题,服务器问题,或mq本身问题都可能会导致发送失败。那么当发送成功或失败后如何感知呢?那就
其他 4001 1.什么是钩子hook(钩子)是windows提供一种处理机制平台,是指在正常运行接受信之前预先启动函数,用来检查和修改传给该,(钩子)实际上是一个处理段,通
official 1231 上一篇《(mq)rabbitmq安装延时队列插实现延时1》文章介绍了rabbitmq安装延时队列插。本编将继续结合代码来实现延时队列(基于springboot项目)。下方所有源代码均已上传
official 1290 :AcknowledgeMode.NONE:不确认AcknowledgeMode.AUTO:自动确认AcknowledgeMode.MANUAL:手动确认rabbitmq默认使用自动确认。如果业务代码出现了异常
official 1245 续介绍在rabbitmq容器安装rabbitmq_delayed_message_exchange插并使用方法。安装插下载与rabbitmq版本相符,下载地址:https://github
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。