{"msg":"操作成功","code":200,"data":{"createBy":"admin","createTime":"2020-11-16 22:36:40","updateBy":"admin","updateTime":"2020-11-16 22:36:40","remark":null,"id":40,"articleTitle":"ActiveMQ（二）之生产者与消费者","articleUrl":"activemq_producer_consumer","articleThumbnail":"https://www.asumimoe.com/imgfiles/20220910/73ccf56bfe814abeb684840abb28eee5.jpg","articleFlag":"0","draftStatus":"1","reprintStatement":"1","articleSummary":"生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯，而通过阻塞队列来进行通讯，所以生产者生产完数据之后不用等待消费者处理，直接扔给阻塞队列，消费者不找生产者要数据，而是直接从阻塞队列里取，阻塞队列就相当于一个缓冲区，平衡了生产者和消费者的处理能力。","articleContent":"## JMS（Java Message Service）介绍\n\nJMS是一组Java应用程序接口，它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法，类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API，使得Java程序能够与不同厂商的消息组件很好地进行通信。\n\n### JMS的消息模型\n\n在JMS  API出现之前，大部分产品使用“点对点”和“发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范，它们相互独立。任何JMS的提供者可以实现其中的一种或两种模型，这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。\n\n1. 点对点通信模型\n   - 每个消息只要一个消费者\n   - 发送者和接收者在时间上是没有时间的约束，也就是说发送者在发送完消息之后，不管接收者有没有接受消息，都不会影响发送方发送消息到消息队列中。\n   - 发送方不管是否在发送消息，接收方都可以从消息队列中去到消息（The receiver can fetch message whether it is running or not when the sender sends the message）\n   - 接收方在接收完消息之后，需要向消息队列应答成功\n2. 发布/订阅通信模型\n   - 一个消息可以传递个多个订阅者（即：一个消息可以有多个接受方）\n   - 发布者与订阅者具有时间约束，针对某个主题（Topic）的订阅者，它必须创建一个订阅者之后，才能消费发布者的消息，而且为了消费消息，订阅者必须保持运行的状态。\n   - 为了缓和这样严格的时间相关性，JMS允许订阅者创建一个可持久化的订阅。这样，即使订阅者没有被激活（运行），它也能接收到发布者的消息。\n\n### JMS接收消息\n\n在JMS中，对于消费来说，JMS可以通过两种方式来消费消息。\n\n- 同步(Synchronous)\n\n  在同步消费信息模式模式中，订阅者/接收方通过调用 receive（）方法来接收消息。在receive（）方法中，线程会阻塞直到消息到达或者到指定时间后消息仍未到达。\n\n- 异步(Asynchronous)\n\n  使用异步方式接收消息的话，消息订阅者需注册一个消息监听者，类似于事件监听器，只要消息到达，JMS服务提供者会通过调用监听器的onMessage()递送消息。\n\n### JMS消息组成\n\n**1.JMS消息头**\n\n- JMSDestination\n\n  消息发送目的地，主要指Queue和Topic\n\n- JMSDeliveryMode\n\n  持久(PERSISTENT)或非持久模式(NON_PERSISTENT)\n\n  持久的消息：应该被传送“一次仅仅一次”，这就意味着如果JMS提供者出现故障，该消息并不会丢失，它会在服务器恢复之后再次传递。\n\n  非持久的消息：最多会传送一次，这意味着服务器出现故障，该消息将永远丢失。\n\n- JMSExpiration\n\n  消息过期时间，等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值为0，则JMSExpiration被设为零，表示该消息永不过期。\n\n  如果发送后，在消息过期时间之后消息还没有被发送到目的地，则消息被清除。\n\n- JMSPriority\n\n  0-4消息优先级，0-9十个级别，0-4是普通消息，5-9是加急消息。\n\n  JMS不要求严格按照这十个优先级发送消息，但必须保证加急消息要优先于普通消息到达。默认是4级。\n\n- JMSMessageID\n\n  唯一识别某条消息的ID，由MQ产生。\n\n**2.JMS消息体**\n\nJMS共有5中消息体格式\n\n- TextMessage：普通字符串消息，包含一个String\n- MapMessage：一个Map类型消息，key为string类型，value为java的基本类型\n- BytesMessage：二进制数组消息，包含一个byte[]\n- StreamMessage：java数据流消息，用标准流操作来顺序填充和读取\n- ObjectMessage：对象消息，包含一个可序列化的java对象\n\n**3.JMS消息属性**\n\n如果需要除消息头字段以外的值，可以使用消息属性。是以属性名和属性值对的形式指定的。可以将属性视为消息头的扩展，属性指定一些消息头没有包括的附加信息，比如可以在属性里指定消息选择器。\n\n识别/去重/重点标注等操作非常有用的方法。\n\n### JMS开发的基本步骤\n\n1. 创建一个connection factory\n2. 通过connection factory来创建JMS connection\n3. 启动JMS connection\n4. 通过JMS connection创建JMS session\n5. 创建JMS destination\n6. 创建JMS producer 或者创建JMS message并设置destination\n7. 创建JMS consumer 或者是注册一个JMS message listener\n8. 发送或接收JMS message\n9. 关闭所有资源（connection、session、producer、consumer等）\n\n## 消息生产者与消费者\n\n在POM文件中引入activemq依赖\n\n```xml\n<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<project xmlns=\"http://maven.apache.org/POM/4.0.0\"\n         xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n         xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\">\n    <modelVersion>4.0.0</modelVersion>\n\n    <groupId>com.lt.mq</groupId>\n    <artifactId>activemq</artifactId>\n    <version>1.0-SNAPSHOT</version>\n    <build>\n        <plugins>\n            <plugin>\n                <groupId>org.apache.maven.plugins</groupId>\n                <artifactId>maven-compiler-plugin</artifactId>\n                <configuration>\n                    <source>8</source>\n                    <target>8</target>\n                </configuration>\n            </plugin>\n        </plugins>\n    </build>\n\n    <dependencies>\n        <dependency>\n            <groupId>org.apache.activemq</groupId>\n            <artifactId>activemq-all</artifactId>\n            <version>5.15.9</version>\n        </dependency>\n        <dependency>\n            <groupId>org.apache.xbean</groupId>\n            <artifactId>xbean-spring</artifactId>\n            <version>3.16</version>\n        </dependency>\n    </dependencies>\n\n</project>\n```\n\n### 1.Queue通信方式\n\n生产者Producer：\n\n```java\npackage com.lt.mq.queue;\n\nimport org.apache.activemq.ActiveMQConnectionFactory;\nimport javax.jms.*;\npublic class Producer {\n\n    private static final String ACTIVEMQ_URL = \"tcp://192.168.52.233:61616\";\n    private static final String QUEUE_NAME = \"Queue01\";\n\n    public static void main(String[] args) throws JMSException {\n\n        //1.创建连接工厂，按照给定的url地址，采用默认的用户名和密码\n        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);\n        //2.通过连接工厂，获得连接connection并启动\n        Connection connection = activeMQConnectionFactory.createConnection();\n        connection.start();\n        //3.创建会话session\n        //两个参数：第一个叫事务，第二个叫签收\n        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\n        //4.创建目的地（具体是队列还是主题）\n        Queue queue = session.createQueue(QUEUE_NAME);\n        //5.创建消息的生产者\n        MessageProducer messageProducer = session.createProducer(queue);\n        //6.通过使用消息生产者MessageProducer生产3条消息发送到MQ的队列中\n        for (int i = 1; i <4 ; i++) {\n            //7.创建消息\n            TextMessage textMessage = session.createTextMessage(\"msg-\" + i);//理解为一个字符串\n            //8.通过MessageProducer发送给MQ\n            messageProducer.send(textMessage);\n        }\n        //9.关闭资源\n        messageProducer.close();\n        session.close();\n        connection.close();\n\n        System.out.println(\"***消息已发送到MQ***\");\n    }\n}\n```\n\n运行生产者代码，可以在activemq控制台查看到入队消息为3，由于还没有消费者，待消费消息数量同样为3。\n\n消费者Consumer：\n\n```java\npackage com.lt.mq.queue;\n\nimport org.apache.activemq.ActiveMQConnectionFactory;\n\nimport javax.jms.*;\nimport java.io.IOException;\n\npublic class Customer {\n    private static final String ACTIVEMQ_URL = \"tcp://192.168.52.233:61616\";\n    private static final String QUEUE_NAME = \"Queue01\";\n\n    public static void main(String[] args) throws JMSException, IOException {\n\n        //1.创建连接工厂，按照给定的url地址，采用默认的用户名和密码\n        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);\n        //2.通过连接工厂，获得连接connection并启动\n        Connection connection = activeMQConnectionFactory.createConnection();\n        connection.start();\n        //3.创建会话session\n        //两个参数：第一个叫事务，第二个叫签收\n        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\n        //4.创建目的地（具体是队列还是主题）\n        Queue queue = session.createQueue(QUEUE_NAME);\n        //5.创建消息的消费者\n        MessageConsumer messageConsumer = session.createConsumer(queue);\n        /*同步阻塞方式（receive()）\n        订阅者或接受者调用MessageConsumer的receive()方法来接收消息，receive方法能够在接收到消息之前（或者超时之前）将一直阻塞。*/\n\n        while (true) {\n            //消费者接收的消息类型要与生产者发送的消息类型一致，receive方法可以指定等待超时时间。\n            TextMessage textMessage = (TextMessage) messageConsumer.receive(4000L);\n            if (null != textMessage)\n            {\n                System.out.println(\"消费者接收到消息：\"+textMessage.getText());\n            } else {\n                break;\n            }\n        }\n        //6.关闭资源\n        System.in.read();\n        messageConsumer.close();\n        session.close();\n        connection.close();\n    }\n}\n```\n\n异步监听方式接收消息：\n\n```java\n        MessageConsumer messageConsumer = session.createConsumer(queue);\n\t\t/*通过监听方式消费消息\n        异步非阻塞方式（监听器onMessage()）\n        订阅者或接受者通过MessageConsumer的setMessageListener(MessageListener Listener)注册一个消息监听器，\n        当消息到达之后，系统自动调用监听器MessageListener的onMessage(Message message)方法*/\n        messageConsumer.setMessageListener(new MessageListener() {\n            @Override\n            public void onMessage(Message message) {\n                if (null != message && message instanceof TextMessage) {\n                    TextMessage textMessage = (TextMessage) message;\n                    try {\n                        System.out.println(\"消费者接收到消息：\"+textMessage.getText());\n                    } catch (JMSException e) {\n                        e.printStackTrace();\n                    }\n                }\n            }\n        });\n\t\tSystem.in.read();\n```\n\n### 2.Topic通信方式\n\n发布者Publisher：\n\n```java\npackage com.lt.mq.topic;\n\nimport org.apache.activemq.ActiveMQConnectionFactory;\n\nimport javax.jms.*;\n\npublic class Publisher {\n\n    private static final String ACTIVEMQ_URL = \"tcp://192.168.52.233:61616\";\n    private static final String TOPIC_NAME = \"Topic01\";\n\n    public static void main(String[] args) throws JMSException {\n\n        //1.创建连接工厂，按照给定的url地址，采用默认的用户名和密码\n        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);\n        //2.通过连接工厂，获得连接connection并启动\n        Connection connection = activeMQConnectionFactory.createConnection();\n        connection.start();\n        //3.创建会话session\n        //两个参数：第一个叫事务，第二个叫签收\n        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\n        //4.创建目的地（具体是队列还是主题）\n        Topic topic = session.createTopic(TOPIC_NAME);\n        //5.创建消息的生产者\n        MessageProducer messageProducer = session.createProducer(topic);\n        //6.通过使用消息生产者MessageProducer生产3条消息发送到MQ的队列中\n        for (int i = 1; i <4 ; i++) {\n            //7.创建消息\n            TextMessage textMessage = session.createTextMessage(\"msg-\" + i);//理解为一个字符串\n            //8.通过MessageProducer发送给MQ\n            messageProducer.send(textMessage);\n        }\n        //9.关闭资源\n        messageProducer.close();\n        session.close();\n        connection.close();\n    }\n}\n```\n\n订阅者Subscriber：\n\n```java\npackage com.lt.mq.topic;\n\nimport org.apache.activemq.ActiveMQConnectionFactory;\n\nimport javax.jms.*;\nimport java.io.IOException;\n\npublic class Subscriber {\n    private static final String ACTIVEMQ_URL = \"tcp://192.168.52.233:61616\";\n    private static final String TOPIC_NAME = \"Topic01\";\n\n    public static void main(String[] args) throws JMSException, IOException {\n        System.out.println(\"1号订阅者\");\n        //1.创建连接工厂，按照给定的url地址，采用默认的用户名和密码\n        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);\n        //2.通过连接工厂，获得连接connection并启动\n        Connection connection = activeMQConnectionFactory.createConnection();\n        connection.start();\n        //3.创建会话session\n        //两个参数：第一个叫事务，第二个叫签收\n        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\n        //4.创建目的地（具体是队列还是主题）\n        Topic topic = session.createTopic(TOPIC_NAME);\n        //5.创建消息的消费者\n        MessageConsumer messageConsumer = session.createConsumer(topic);\n\n        messageConsumer.setMessageListener((message) -> {\n            if (null != message && message instanceof TextMessage) {\n                TextMessage textMessage = (TextMessage) message;\n                try {\n                    System.out.println(\"消费者接收到消息：\"+textMessage.getText());\n                } catch (JMSException e) {\n                    e.printStackTrace();\n                }\n            }\n        });\n        System.in.read();\n        messageConsumer.close();\n        session.close();\n        connection.close();\n    }\n}\n```","categoryId":2,"viewCount":1283,"categoryName":"中间件","author":"球接子","authorAvatar":null,"tagIds":[14,15],"tagNames":["中间件","ActiveMQ"]}}