博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitMQ 在spring 的使用
阅读量:7225 次
发布时间:2019-06-29

本文共 9806 字,大约阅读时间需要 32 分钟。

hot3.png

一、准备工作

maven依赖

  
com.rabbitmq
  
amqp-client
  
4.0.2
 
  
org.springframework.amqp
  
spring-rabbit
  
1.7.9.RELEASE

创建配置文件spring-rabbitmq.xml

 

 二、配置消息生产者

1、配置连接

 注:该配置还有publisher-confirms、publisher-returns等参数,用于消息确认。

 

2、配置admin:producer中的exchange,queue会自动的利用该admin自动在spring中生成

3、定义rabbitmq模板(消息生产者通过模板类进行推送数据)

注意:

    消费发送是通过rabbitTemplate.convertAndSend()这个方法进行发送的。

    rabbitTemplate.convertAndSend()用三个参数,分别代表exchange、routing-key(queue)、message,如果调用时不写exchange,第二个参数代表queue。

    比如rabbitTemplate.convertAndSend(queue,message)表示将消息直接发布到queue队列中

    如果在模板中指定了默认exchange和queue,如果消息在发布时没指定exchange和queue,则消息直接通过默认的exchange将消息推送给对应的queue,如果只配置了queue,则表示直接将消息发布到queue队列中。

    我通常的做法是不指定exchange和queue,通过代码进行指定。

4、配置队列

5、设置exchange,并且配置与队列queue的关系(durable、auto-delete与队列参数同一个意思)

    
        
    
 
    
        
    
  
    
       
        
       
    

注意:

    路由模式需要指定key,表示exchange通过key(routing-key)将消息发布到queue中;

    主题模式是通过pattern参数来表示routing-key的

 

6、定义消息发布类

public class SpringSender {    public static void sendMessage(String exchange,String routingKey,Object  message){        //加载配置文件        AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq.xml");        //获取rabbitmqTemplate模板        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);        //发送消息        rabbitTemplate.convertAndSend(exchange,routingKey,message);    }     public static void main(String[] args) throws Exception{        SpringSender.sendMessage("myTopicExchange","warn.item","主题模式,发布警告信息");    }}

注意:

    根据上面消息的配置,我们可以知道该消息会发布到myTopicQueue_error和myTopicQueue_warn两个队列中,因为routing-key为“warn.item”,符合上述其与exchange的绑定关系。

    rabbitTemplate.convertAndSend()通过多态实现的。如果exchange为空,等同与调用rabbitTemplate.convertAndSend(routingKey,message),表示将message发送到名为“routingKey”的队列中。

    如果exchange、routingKey为空时,等同于调用rabbitTemplate.convertAndSend(message),此时rabbitmq模板需要指定默认queue。

    如果rabbitmq模板指定了默认exchange、queue,但是程序里调用发布消息的方法也指定了exchange、queue,那么以程序里面的参数为准

    如果rabbitmq模板指定了默认exchange,但是调用rabbitTemplate.convertAndSend(routingKey,message)时,routingKey(queue)跟exchange没有绑定关系,发送数据失败。

 

rabbitTemplate.convertAndSend方法说明:    

/** * 发送消息到默认的交换机和队列(不带有自定义参数) * @param messageObject 消息对象 * @return boolean 发送标记  */RabbitTemplate.convertAndSend(messageObject); /** * 发送消息到默认的交换机和队列 * @param messageObject 消息对象 * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。 * @return boolean 发送标记  */RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata); /** * 发送消息到指定的队列 * @param queue           队列名称 * @param messageObject   消息对象 * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。 * @return boolean 发送标记  */RabbitTemplate.convertAndSend(queue, messageObject,correlationdata); /** * 发送消息到指定的交换机和队列 * @param exchange       交换机名称 * @param queue          队列名称 * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。 * @return boolean 发送标记  */RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata); /** * 发送消息到默认的交换机和队列(不带有自定义参数) Send方法还有很多,此处只列举一种 * @param Message AMQP封装的消息对象 * @return void */RabbitTemplate.send(Message message); 

 

三、配置消息消费者

1、定义主题模式的两个实现类

/** * 用于接收routing-key为warn或error的消息 */public class TopicErrorReceiver implements ChannelAwareMessageListener{    @Override    public void onMessage(Message message, Channel channel) throws Exception {        try{            System.out.println("************************ddd********************************");            System.out.println("主题模式 warn/error 接收信息:"+new String(message.getBody()));            System.out.println("********************************************************");            //设置手工应答            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        }catch (Exception e){            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        }    }} /** * 用于接收routing-key为warn的消息 */public class TopicWarnReceiver implements ChannelAwareMessageListener{     @Override    public void onMessage(Message message, Channel channel) throws Exception {        try{            System.out.println("************************ddd********************************");            System.out.println("主题模式 warm 接收信息:"+new String(message.getBody()));            System.out.println("********************************************************");            //设置手工应答            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        }catch (Exception e){            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        }    }}

    注:接收类都是实现了ChannelAwareMessageListener接口,并重写了onMessage方法

 

2、定义监听,并绑定接收者与队列的关系(即接收者监听哪些队列)

    
    
    
    
    
    
    
 

注意:

    acknowledge="manual" 表示手工应答,如果值为auto则表示自动应答

    DirectReceiver、FanoutReceiver写法与TopicErrorReceiver 类似,这里不重复

 

3、测试

1aa202474321f4c6835bf34a824be2abee5.jpg

    从上面的例子可以看出,接收者跟所谓的模式没有关系,它只跟绑定的队列有关队列有数据即进行接受。至于队列的数据是通过什么模式得到的,都与接收者无关。

 

四、confirm-callback监听(用于监听exchange是否接收成功)

1、在配置工厂连接的时候,设置publisher-confirms="true"

 

2、在定义rabbitmq模板时,指定confirm-callback的实现类

3、创建实现类ConfirmCallback,实现RabbitTemplate.ConfirmCallback接口

public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {    /**     * CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。     * 通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能     *      * @param correlationData 回调的相关数据.     * @param ack true for ack, false for nack     * @param cause 专门给NACK准备的一个可选的原因,其他情况为null。     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        System.out.println("********************************************************");        System.out.println("exChange确认"+ ack +"   "+cause);        System.out.println("********************************************************");    }}

4、在配置文件中定义confirmCallback

5、测试:执行SpringSender.sendMessage("myTopicExchange","error","queue message fanout" );

    不管发送成功与否都会执行这个方法,只有在配置中找不到exchange,ack才会是false。

    发送数据是exchange为空,或者不填的时候,ack都为true即调用rabbitTemplate.convertAndSend(“”,routingKey,message)或者rabbitTemplate.convertAndSend(routingKey,message),ack都是true。

 

五、confirm-callback监听(basicpublish推送消息到queue失败时回调)

1、在定义rabbitmq模板时,指定return-callback的实现类,并且设置mandatory="true"

    注:mandatory为true表示推送消息到queue失败时调用return-callback

 

2、创建实现类ReturnCallback,实现RabbitTemplate.ReturnCallback接口

public class ReturnCallback implements RabbitTemplate.ReturnCallback {    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        System.out.println("********************************************************");        System.out.println("失败确认:"+message+" | "+replyCode+" | "+replyText+" | "+exchange+" | "+routingKey);        System.out.println("********************************************************");    }}

3、在配置文件中定义returnedMessage

4、测试:执行SpringSender.sendMessage("myTopicExchange","error123","queue message" );,

    由于routing-key没匹配到对应的队列,所以控制台打印报错信息。

    如果执行SpringSender.sendMessage("","error123","queue message" );由于没有找到对应的队列“error123”,所以调用ReturnCallback.returnedMessage方法,confirm-callback监听ack返回false,不管是否匹配到队列,都不会执行ReturnCallback.returnedMessage方法。

    因为exchange接收数据是否,此时还没走到推送数据到队列这一步,所以不会以失败处理。

 

六、FastJsonMessageConverter转换类(可以将map自动转成json格式)

1、添加maven依赖

  
org.codehaus.jackson
  
jackson-mapper-asl
  
1.9.13

2、在定义rabbitmq模板时,指定转换器message-converter="jsonMessageConverter"

3、配置bean(也可以重写)

4、测试:(将map转成json格式)

public class SpringSender {    public static void sendMessage(String exchange,String routingKey,Object  message){        //加载配置文件        AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq.xml");        //获取rabbitmqTemplate模板        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);        //发送消息        rabbitTemplate.convertAndSend(exchange,routingKey,message);    }     public static void main(String[] args) throws Exception{        HashMap
map = new HashMap
();        map.put("message","queue message fanout");        SpringSender.sendMessage("myFanoutExchange","myFanoutQueue",map);    }}

 

输出:

********************************************************

发布/订阅 接收信息:{"message":"queue message fanout"}

********************************************************

注:如果发生的消息是字符串,接收到的信息为字符串

 

七、完整配置文件spring-rabbitmq.xml内容

     
    
        
            
                
classpath:conf/rabbitmq.properties
            
        
    
     
    
     
    
     
    
      
    
    
    
    
    
    
    
      
    
        
            
        
    
     
    
        
            
        
    
      
    
        
           
            
           
        
    
      
    
        
        
        
        
        
        
        
    
     
    
    
    
    
     
    
    
    
    
    

    注:由于接收者都是继承ChannelAwareMessageListener,实现onMessage方法,所以这里不提供相应的代码。

转载于:https://my.oschina.net/langwanghuangshifu/blog/3004803

你可能感兴趣的文章
Java-抽象类定义构造方法
查看>>
一键安装IIS的点点滴滴——For所有Microsoft的操作系统(上)
查看>>
Android 短信模块分析(二) MMS中四大组件核心功能详解
查看>>
Eclipse 工程使用相对路径导入Jar包设置
查看>>
Struts2中的 配置文件
查看>>
手动安装 MyEclipse6.5 FindBugs
查看>>
poj 3615(floyd变形)
查看>>
缓存子系统如何设计(Cachable tag, Memcache/redis support, xml config support, LRU/LFU/本地缓存命中率)...
查看>>
解决数据库 Table 'content_tags' is marked as crashed and should be repaired 表损坏问题
查看>>
算法-随手写的二分查找
查看>>
测量史上首个易语言工程测量模块
查看>>
面向对象初步总结
查看>>
分享45个设计师应该见到的新鲜的Web移动设备用户界面PSD套件
查看>>
SDL_BlitSurface
查看>>
Ubuntu12.04编译Android2.3.4
查看>>
IDA设置函数类型
查看>>
日期控件ie9失效
查看>>
群里一个高手写的url?传参执行php函数的小程序, 收藏下
查看>>
Linux桌面扩展 Docky
查看>>
Android实现图片顺时逆时旋转及拖拽显示效果
查看>>