{"msg":"操作成功","code":200,"data":{"createBy":"admin","createTime":"2020-11-21 18:02:25","updateBy":"admin","updateTime":"2020-11-21 18:02:25","remark":null,"id":41,"articleTitle":"ActiveMQ（三）持久化机制","articleUrl":"activemq_persistentence","articleThumbnail":"https://www.asumimoe.com/imgfiles/20220910/73ccf56bfe814abeb684840abb28eee5.jpg","articleFlag":"0","draftStatus":"1","reprintStatement":"1","articleSummary":"为避免意外宕机之后丢失信息，需要做到重启后可以恢复消息队列，消息系统一般都会采用持久化机制。AMQ的消息持久化机制有JDBC、AMQ、kahaDB、LevelDB，无论哪种持久化方式，消息的存储逻辑都是一致的。","articleContent":"## AMQ持久化机制\n\n为避免意外宕机之后丢失信息，需要做到重启后可以恢复消息队列，消息系统一般都会采用持久化机制。\n\nAMQ的消息持久化机制有JDBC、AMQ、kahaDB、LevelDB，无论哪种持久化方式，消息的存储逻辑都是一致的。\n\n就是在发送者将消息发送出去后，消息中心首先将消息存到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接受者，成功则将消息从存储中删除，失败则继续尝试发送。\n\n消息中心启动后首先要检查指定的存储位置，如果有未发送成功的消息，则需要把消息发送出去。\n\n \n\n### 1.AMQ Message Store \n\n是一种文件存储的形式，它具有写入速度快和容易恢复的特点。消息存储在一个个文件中，文件的默认大小为32M，当一个文件中的消息已经全部被消费，那么这个文件将被标识为可删除，在下一个清除阶段，这个文件将被删除。AMQ适用于ActiveMQ5.3之前的版本。现在基本已被淘汰。\n\n \n\n### 2.kahaDB消息存储（默认）\n\n基于日志文件，从5.4版本开始默认的持久化插件，activemq.xml配置文件中有说明：\n\n官网参考地址：http://activemq.apache.org/kahadb\n\n```xml\n<!--\n  Configure message persistence for the broker. The default persistence\n  mechanism is the KahaDB store (identified by the kahaDB tag).\n  For more information, see:\n  http://activemq.apache.org/persistence.html\n-->\n<persistenceAdapter>\n    <kahaDB directory=\"${activemq.data}/kahadb\"/>\n</persistenceAdapter>\n```\n\nKahaDB是目前默认的存储方式，可适用于任何场景，提高了性能和恢复能力。消息存储使用一个事务日志和仅仅使用一个索引文件来存储它的所有地址。是一个专门针对消息持久化的解决方案，它对典型的消息使用模式进行了优化。数据被追加到data logs中。当不在需要log文件中的数据的时候，log文件会被丢弃。\n\n \n\n**存储原理**：\n\nkahadb在消息保存目录中只有4类文件（log、data、redo、free）和一个lock\n\ndb-\\<number\\>.log 存储消息的预定义大小的数据文件，数据已满时会创建一个新的文件，number数也随之递增，随着消息的数量增多而增加。当不在有引用到数据文件中的任何消息时，文件会被删除或归档。\n\ndb.data 该文件包含了持久化的BTree索引，索引了消息数据记录中的消息，它是消息的索引文件，本质上时B-Tree，使用B-Tree作为索引指向db-\\<number\\>.log里面存储的消息。\n\ndb.free 当前db.data文件里哪些页面是空闲的，文件具体内容是所有空闲页的ID。\n\ndb.redo 用来进行消息恢复，如果kahaDB消息存储在强制退出后启动，用户恢复BTree索引。\n\nlock文件锁，表示当前获得kahadb读写权限的broker。\n\n \n\n### 3.LevelDB\n\n这种文件系统是在5.8版本后引进的，它和KahaDB非常相似，也是基于文件的本地数据库储存形式，但是它提供比KahaDB更快的持久性。但它不使用自定义B-Tree实现索引预写日志，而是基于LevelDB的索引\n\n默认配置：\n\n```xml\n<persistenceAdapter>\n   <levelDB directory=\"activemq-data\"/>\n</persistenceAdapter>\n```\n\n### 4.JDBC Message store\n\n配置参考http://activemq.apache.org/persistence\n\n1. 安装MQ+MySQL\n\n2. 添加MySQL数据库的驱动包到lib文件夹\n\n   mysql-connector-java-2.1.38.jar\n\n3. jdbcPersistenceAdapter配置\n\n   ```xml\n   <persistenceAdapter>\n       <jdbcPersistenceAdapter dataSource=\"#mysql-ds\"/>\n   </persistenceAdapter>\n   ```\n\n   dataSource指定将要引用的持久化数据库的bean名称，createTablesOnStartup是否在启动的时候创建数据表，默认值是true，这样每次启动都会去创建数据表了，一般是第一次启动的时候设置为true之后改为false。\n\n4. 数据库连接池配置（注意标签位置）\n\n   ```xml\n   </broker>\n   \n   <bean id=\"mysql-ds\" class=\"org.apache.commons.dbcp2.BasicDataSource\" destroy-method=\"close\"> \n     <property name=\"driverClassName\" value=\"com.mysql.jdbc.Driver\"/> \n     <property name=\"url\" value=\"jdbc:mysql://ip:3306/activemq?relaxAutoCommit=true\"/> \n     <property name=\"username\" value=\"activemq\"/> \n     <property name=\"password\" value=\"activemq\"/> \n     <property name=\"maxTotal\" value=\"200\"/>\n     <property name=\"poolPreparedStatements\" value=\"true\"/> \n   </bean> \n   \n   <import resource=\"jetty.xml\"/>\n   ```\n\n5. 建库建表\n\n   ```css\n   创建一个名为activemq的数据库；\n   \n   三张表：ACTIVEMQ_MSGS ACTIVEMQ_ACKS ACTIVEMQ_LOCK\n   \n   ACTIVEMQ_MSGS（queue和topic都存在里面）\n   \tID：主键\n       CONTAINER：消息的Destination\n       MSGID_PROD：消息发送者的主键\n       MSG_SEQ：发送消息的顺序，MSGID_PROD+MSG_SEQ可以组成JMS的MessageID\n       EXPIRATION：消息的过期时间，存储的是1970-01-01到现在的毫秒数\n       MSG：消息本体的java序列化对象的二进制数据\n       PRIORITY：优先级，0-9，数字越大优先级越高\n   \n   ACTIVEMQ_ACKS（用于存储订阅关系，如果是持久化Topic，订阅者和服务器的订阅关系在这个表保存）\n   \tID：主键\n       CONTAINER：消息的Destination\n       SUB_DEST：如果是使用static集群，这个字段会有集群其他系统的信息\n       CLIENT_ID：每个订阅者都有一个唯一的客户端ID用以区分\n       SUB_NAME：订阅者名称\n       SELECTOR：选择器，可以选择至消费满足条件的消息。条件可以用自定义属性实现，可支持多属性AND和OR操作\n       LAST_ACKED_ID：记录消费过的消息的ID\n   \n   ACTIVEMQ_LOCK（在集群环境中才有用，只有一个Broker可以获得消息，称为Master Broker，其他只能作为备份等待Master Broker不可用，才可能成为下一个Master Broker。这个表用于记录那个Broker是当前的Master Broker）\n       ID：主键\n       Broker Name：带有lock的broker名字\n   ```\n\n6. 代码验证\n\n   开启持久化messageProducer.setDeliveryMode(DeliveryMode PERSISTENCE)\n\n7. 数据库情况\n\n   点对点：消息生产者开启持久化后，消息被保存在数据库中，消息一旦被Consumer消费就从Broker中删除；\n\n   发布/订阅：ack表保存订阅者信息，消息保存在数据库中，消费后也不会被删除\n\n8. 系统中主机名不能带有下划线\n\n### 5.JDBC Message store with ActiveMQ Journal\n\n克服了JDBC Store的不足，JDBC每次消息过来都会写库和读库.ActiveMQ Journal使用高速缓存写入技术，大大提高了性能。当消费者的消费速度能够及时跟上生产者生产消息的速度时，journal文件能够大大减少需要写到DB中的消息。\n\n例如：有1000条消息生产之后，这些消息会保存到journal文件中，如果消费者消费速度很快，已经消费了90%，那么journal文件只需要同步剩余10%消息到DB，如果消费者消费速度很慢，这个时候journal文件以批量方式写入DB。\n\n配置文件：\n\n```xml\n<persistenceFacory>\n\t<journalPersistenceAdapterFactory\n\t\tjournalLogFiles=\"4\"\n\t\tjournalLogFileSize=\"32768\"\n\t\tuseJournal=\"true\"\n\t\tuseQuickJournal=\"true\"\n\t\tdataDirectory=\"activemq-data\"\n\t\tdataSource=\"#mysql-ds\"/>\n</persistenceFactory>\n\n```","categoryId":2,"viewCount":838,"categoryName":"中间件","author":"球接子","authorAvatar":null,"tagIds":[14,15],"tagNames":["中间件","ActiveMQ"]}}