{"msg":"操作成功","code":200,"data":{"createBy":"admin","createTime":"2020-10-21 10:03:51","updateBy":"admin","updateTime":"2020-10-21 10:03:51","remark":null,"id":39,"articleTitle":"ActiveMQ（一）之activemq介绍","articleUrl":"activemq_introduction","articleThumbnail":"https://www.asumimoe.com/imgfiles/20220910/73ccf56bfe814abeb684840abb28eee5.jpg","articleFlag":"0","draftStatus":"1","reprintStatement":"1","articleSummary":"MQ全称为Message Queue,  消息队列（MQ）是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据（消息）来通信，而无需专用连接来链接它们。","articleContent":"## 一、消息中间件\n\n### 1.MQ简介\n\nMQ全称为Message Queue,  消息队列（MQ）是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据（消息）来通信，而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信，而不是通过直接调用彼此来通信，直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求\n\n### 2.应用场景\n\n#### 1）异步处理\n\n**异步处理：**调用者发起请求后，调用者不会立刻得到结果，也无需等待结果，继续执行其他业务逻辑。提高了效率但存在异步请求失败的隐患，适用于非核心业务逻辑处理。\n **同步处理：**调用者发起请求后，调用者必须等待直到返回结果，再根据返回的结果执行其他业务逻辑。效率虽然没有异步处理高，但能保证业务逻辑可控性，适用于核心业务逻辑处理。\n\n#### 2）系统耦合和事务的最终一致性\n\n分布式系统是若干个独立的计算机（系统）集合。每个计算机负责自己的模块，实现系统的解耦，也避免单点故障对整个系统的影响。每个系统还可以做一个集群，进一步降低故障的发生概率。\n\n#### 3）流量削峰\n\n流量削锋也称限流。在秒杀，抢购的活动中，为了不影响整个系统的正常使用，一般会通过消息中间件做限流，避免流量突增压垮系统，前端页面可以提示\"排队等待\"，即便用户体验很差，也不能让系统垮掉。\n\n### 3.MQ的消息传输模式\n\n1）点对点模式\n\n点对点(P2P)模式有三个角色：消息队列（Queue），发送者(Sender)，接收者(Receiver)。发送者将消息发送到一个特定的队列中，等待接收者从队列中获取消息消耗。\n\nP2P的三个特点：\n\n1. 每个消息只能被一个接收者消费，且消息被消费后默认从队列中删掉（也可以通过其他签收机制重复消费）。\n2. 发送者和接收者之间没有依赖性，生产者发送消息和消费者接收消息并不要求同时运行。\n3. 接收者在成功接受消息之后需向队列发送接收成功的确认消息。\n\n2）发布订阅模式\n\n发布订阅(Pub/Sub)模式也有三个角色：主题（Topic），发布者（Publisher），订阅者（Subscriber）。发布者将消息发送到主题队列中，系统再将这些消息传递给订阅者。\n\nPub/Sub的特点：\n\n1. 每个消息可以被多个订阅者消费。\n2. 发布者和订阅者之间存在依赖性。订阅者必须先订阅主题后才能接收到消息，在订阅前发布的消息，订阅者是接收不到的。\n3. 非持久化订阅：如果订阅者不在线，此时发布的消息订阅者是也接收不到，即便订阅者重新上线也接收不到。\n4. 持久化订阅：订阅者订阅主题后，即便订阅者不在线，此时发布的消息可以在订阅者重新上线后接收到的。\n\n3）双向应答模式\n\n双向应答模式并不是消息中间件提供的一种通信模式，它是由于实际生成环境的需要，在原有的基础上做了改良。即消息的发送者也是消息的接收者。消息的接收者也是消息的发送者。\n\n## 二、ActiveMQ安装与使用\n\n### linux系统下安装activemq\n\n1. 下载\n\n   登录官网[http://activemq.apache.org/](http://activemq.apache.org/)下载Activemq安装包。\n\n2. 将安装包上传至服务器\n\n3. 创建安装目录并将安装包解压到该目录\n\n   ```bash\n   mkdir /opt/avtivemq\n   tar -xvf apache-activemq-5.15.9-bin.tar.gz -C /opt/avtivemq\n   ```\n\n### 配置文件介绍\n\n配置文件为conf目录下的activemq.xml\n\n```xml\n<beans\n  xmlns=\"http://www.springframework.org/schema/beans\"\n  xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n  xsi:schemaLocation=\"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd\n  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd\">\n\n    <!-- Allows us to use system properties as variables in this configuration file -->\n    <!--连接到ActiveMQ的用户认证-->\n    <bean class=\"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer\">\n        <property name=\"locations\">\n            <value>file:${activemq.conf}/credentials.properties</value>\n        </property>\n    </bean>\n\n   <!-- Allows accessing the server log -->\n    <bean id=\"logQuery\" class=\"io.fabric8.insight.log.log4j.Log4jLogQuery\"\n          lazy-init=\"false\" scope=\"singleton\"\n          init-method=\"start\" destroy-method=\"stop\">\n    </bean>\n\n    <!--\n        The <broker> element is used to configure the ActiveMQ broker.\n    -->\n    <!--broker节点：见下文说明1-->\n    <broker xmlns=\"http://activemq.apache.org/schema/core\" brokerName=\"localhost\" dataDirectory=\"${activemq.data}\">\n\n        <!--目的地策略，见下文说明2-->\n        <destinationPolicy>\n            <policyMap>\n              <policyEntries>\n                <!--订阅/发布-->\n                <policyEntry topic=\">\" >\n                    <!-- The constantPendingMessageLimitStrategy is used to prevent\n                         slow topic consumers to block producers and affect other consumers\n                         by limiting the number of messages that are retained\n                         For more information, see:\n\n                         http://activemq.apache.org/slow-consumer-handling.html\n\n                    -->\n                  <!--消息限制策略等，见下文说明3-->\n                  <pendingMessageLimitStrategy>\n                    <constantPendingMessageLimitStrategy limit=\"1000\"/>\n                  </pendingMessageLimitStrategy>\n                </policyEntry>\n              </policyEntries>\n            </policyMap>\n        </destinationPolicy>\n\n\n        <!--\n            The managementContext is used to configure how ActiveMQ is exposed in\n            JMX. By default, ActiveMQ uses the MBean server that is started by\n            the JVM. For more information, see:\n\n            http://activemq.apache.org/jmx.html\n        -->\n        <managementContext>\n            <managementContext createConnector=\"false\"/>\n        </managementContext>\n\n        <!--\n            Configure message persistence for the broker. The default persistence\n            mechanism is the KahaDB store (identified by the kahaDB tag).\n            For more information, see:\n\n            http://activemq.apache.org/persistence.html\n        -->\n        <!--持久化存储，见下文说明4-->\n        <persistenceAdapter>\n            <kahaDB directory=\"${activemq.data}/kahadb\"/>\n        </persistenceAdapter>\n\n\n          <!--\n            The systemUsage controls the maximum amount of space the broker will\n            use before disabling caching and/or slowing down producers. For more information, see:\n            http://activemq.apache.org/producer-flow-control.html\n          -->\n          <!--系统内存和磁盘空间使用量\n\t\t  broker一直没有可使用空间将有可能导致消息生产者的send()方法无限阻塞，一种替代方式是使用sendFailIfNoSpace=\"true\"，这时send()方法将会失败并抛出一个javax.jms.ResourceAllocationException异常。\n        更好的解决方式如下，客户端会首先等待3000毫秒，然后再次尝试，如果此时broker依然没有足够的空间可用，才抛出异常\n          <systemUsage sendFailIfNoSpaceAfterTimeout=\"3000\" sendFailIfNoSpace=\"true\"> -->\n          <systemUsage>\n            <systemUsage>\n                <!--非持久化消息最大占用内存大小-->\n                <memoryUsage>\n                    <memoryUsage percentOfJvmHeap=\"70\" />\n                </memoryUsage>\n                <!--持久化消息最大占用硬盘大小-->\n                <storeUsage>\n                    <storeUsage limit=\"100 gb\"/>\n                </storeUsage>\n                <!--临时消息最大占用硬盘大小-->\n                <tempUsage>\n                    <tempUsage limit=\"50 gb\"/>\n                </tempUsage>\n            </systemUsage>\n        </systemUsage>\n\n        <!--\n            The transport connectors expose ActiveMQ over a given protocol to\n            clients and other brokers. For more information, see:\n\n            http://activemq.apache.org/configuring-transports.html\n        -->\n        <!--传输器配置，一般会干掉我们不需要的传输协议，见下文说明5-->\n        <transportConnectors>\n            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->\n            <transportConnector name=\"openwire\" uri=\"tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\"/>\n            <transportConnector name=\"amqp\" uri=\"amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\"/>\n            <transportConnector name=\"stomp\" uri=\"stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\"/>\n            <transportConnector name=\"mqtt\" uri=\"mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\"/>\n            <transportConnector name=\"ws\" uri=\"ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\"/>\n        </transportConnectors>\n\n        <!-- destroy the spring context on shutdown to stop jetty -->\n        <shutdownHooks>\n            <bean xmlns=\"http://www.springframework.org/schema/beans\" class=\"org.apache.activemq.hooks.SpringContextHook\" />\n        </shutdownHooks>\n\n    </broker>\n\n    <!--\n        Enable web consoles, REST and Ajax APIs and demos\n        The web consoles requires by default login, you can disable this in the jetty.xml file\n\n        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details\n    -->\n    <import resource=\"jetty.xml\"/>\n\n</beans>\n<!-- END SNIPPET: example -->\n\n```\n\n**说明1：broker节点**\n\n|            参数名：类型            |  默认值   |                             说明                             |\n| :--------------------------------: | :-------: | :----------------------------------------------------------: |\n|           brokerName:str           | localhost |                       机器名网络内唯一                       |\n|         persistent:boolean         |   true    | 是否持久化<br />true表示需要持久化，需与元素persistentAdapter结合使用<br />false表示不持久化，重启后消息丢失且十分消耗内存 |\n|         dataDirectory:str          |           |                       默认持久化的目录                       |\n| deleteAllMessagesOnStartup:boolean |   false   |                  启动时是否清空持久化的消息                  |\n|      enableStatistics:boolean      |   true    |                       是否启用数据收集                       |\n\n**说明2：目的地策略**\n\npolicyEntry节点：\n\ntopic：匹配的主题，自定义，可以使用wildcards[http://activemq.apache.org/wildcards.html](http://activemq.apache.org/wildcards.html)\n\nproducerFlowControl：是否对producer进行控制，如果ActiveMQ服务端的底层性能和消费端的性能足够高可以设置为false，否则设置为true，同时使用memoryLimit限制对列使用的内存大小。\n\n```xml\n<policyEntries>\n  <policyEntry topic=\">\" producerFlowControl=\"true\" optimizedDispatch=\"true\"  memoryLimit=\"16mb\">\n</policyEntries>\n```\n\n**说明3.1：消息限制策略**\n\n面向Slow Consumer，此策略只对topic有效，只对nondurable订阅者有效，当通道中有大量消息积压时，broker可以保留的消息量。为防止topic中有慢速消费者，导致整个通道消息积压。\n\n```xml\n<pendingMessageLimitStrategy>\n  <!--ConstantPendingMessageLimitStrategy: 保留固定条数的消息，如果消息量超过limit，将使用“MessageEvictionStrategy”移除消息PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍数条消息。-->\n  <!-- 如果prefetchSize为100，则保留10 * 100条消息 -->  \n  <prefetchRatePendingMessageLimitStrategy multiplier=\"10\"/>  \n</pendingMessageLimitStrategy>\n```\n\n**说明3.2：消息剔除策略**\n\n```xml\n<!--消息剔除策略 面向Slow Consumer的\n配合PendingMessageLimitStrategy，只对Topic有效,只对nondurable订阅者有效。当PendingMessage的数量超过限制时，broker该如何剔除多余的消息。当Topic接收到信息消息后，会将消息“Copy”给每个订阅者，在保存这个消息时(保存策略\"PendingSubscriberMessageStoragePolicy\")，将会检测pendingMessages的数量是否超过限制(由\"PendingMessageLimitStrategy\"来检测)，如果超过限制，将会在pendingMessages中使用MessageEvicationStrategy移除多余的消息，此后将新消息保存在PendingMessages中。-->\n<messageEvictionStrategy>\n<!--OldestMessageEvictionStrategy: 移除旧消息，默认策略。\nOldestMessageWithLowestPriorityEvictionStrategy: 旧数据中权重较低的消息，将会被移除。\nUniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。开发者可以指定property的名称，从此属性值相同的消息列表中移除较旧的（根据消息的创建时间）。-->\n  <OldestMessageWithLowestPriorityEvictionStrategy />  \n</messageEvictionStrategy>\n```\n\n**说明3.3：转发策略**\n\n```xml\n<dispatchPolicy>\n<!--RoundRobinDispatchPolicy: “轮询”，消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列，在转发消息时，对于匹配消息的第一个订阅者，将会被移动到“订阅者”列表的尾部，这也意味着“下一条”消息，将会较晚的转发给它。\nStrictOrderDispatchPolicy: 严格有序，消息依次发送给每个订阅者，按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是，没有移动“订阅者”顺序的操作。\nPriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority，默认每个consumer的权重都一样。\nSimpleDispatchPolicy: 默认值，按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。-->\n  <strictOrderDispatchPolicy/>\n</dispatchPolicy>\n```\n\n**说明3.4：恢复策略**\n\n```xml\n<!--恢复策略 ActiveMQ重启如何恢复数据-->\n<subscriptionRecoveryPolicy>\n<!--\nFixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息，broker将为此Topic开辟定额的RAM用来保存最新的消息。使用maximumSize属性指定保存的size数量\nFixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。 使用maximumSize属性指定保存的size数量\nLastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据\nQueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存，具体能够“恢复”多少消息，由底层存储机制决定；比如对于非持久化消息，只要内存中还存在，则都可以恢复。\nTimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。使用recoverDuration属性指定保存时间 单位毫秒\nNoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。-->\n  <!--恢复最近30分钟内的信息-->\n  <timedSubscriptionRecoveryPolicy recoverDuration=\"1800000\"/>\n</subscriptionRecoveryPolicy> \n```\n\n**说明4：消息持久化存储**\n\nActiveMQ支持多种消息持久化方式：AMQ Message Store、leveldb、kahadb、JDBC Message store等，这些会在以后的文章中进行介绍\n\n```xml\n<!--Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在 Kaha 中，数据被追加到 data logs 中。当不再需要 log文件中的数据的时候，log 文件会被丢弃。-->\n<persistenceAdapter>\n  <kahaDB directory=\"${activemq.data}/kahadb\"/>\n</persistenceAdapter>\n```\n\n**说明5：消息传输协议**\n\n```xml\n <transportConnectors>\n<!--ActiveMQ支持的传输协议 http://activemq.apache.org/configuring-transports.html\nopenwire:activemq自定义的一种协议 具体请阅读http://activemq.apache.org/openwire.html\namqp:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息，并不受客户端/中间件不同产品，不同开发语言等条件的限制。\nstomp:STOMP，Streaming Text Orientated Message Protocol，是流文本定向消息协议，是一种为MOM(Message Oriented Middleware，面向消息的中间件)设计的简单文本协议。\nmqtt:MQTT（Message Queuing Telemetry Transport，消息队列遥测传输）是IBM开发的一个即时通讯协议，有可能成为物联网的重要组成部分。该协议支持所有平台，几乎可以把所有联网物品和外部连接起来，被用来当做传感器和致动器（比如通过Twitter让房屋联网）的通信协议。另外它还支持tcp、udp、xmpp等协议，http://activemq.apache.org/protocols.html\n\nuri格式:scheme://ip:port?k1=v1&k2=v2 参考:http://activemq.apache.org/tcp-transport-reference.html-->    \n  <transportConnector name=\"tcp+nio\" uri=\"tcp+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\" discoveryUri=\"multicast://default\" />\n  <transportConnector name=\"mqtt+nio\" uri=\"mqtt+nio://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\" discoveryUri=\"multicast://default\"/>\n</transportConnectors> \n```\n### ActiveMQ启动与控制台\n\n1. 启动\n\n   ```bash\n   切换到bin目录下\n   cd /bin\n   ./activemq start\n   ```\n\n2. 查看是否启动成功\n\n   ```bash\n   netstat -anp | grep 61616\n   tcp6       0      0 :::61616                :::*                    LISTEN      6505/java           \n   ```\n\n3. 控制台访问\n\n   ```xml\n   vim jetty.xml\n   <bean id=\"jettyPort\" class=\"org.apache.activemq.web.WebConsolePort\" init-met\n   hod=\"start\">\n     <!-- the default port number for the web console -->\n     <!--允许所有主机访问-->\n     <property name=\"host\" value=\"0.0.0.0\"/>\n     <!--默认管理端口-->\n     <property name=\"port\" value=\"8161\"/>\n   </bean>\n   ```\n\n   登录http://127.0.0.1:8161默认账号与密码均为admin\n![](https://www.asumimoe.com/imgfiles/20220908/60e1134a50f74441bc55dc7e12dfc791.png)","categoryId":2,"viewCount":1212,"categoryName":"中间件","author":"球接子","authorAvatar":null,"tagIds":[14,15],"tagNames":["中间件","ActiveMQ"]}}