一、消息队列介绍
1、问:什么是消息队列?
答:是一个消息的链表,是一个异步处理的数据处理引擎。
2、问:有什么好处?
答:不仅能够提高系统的负荷,还能够改善因网络阻塞导致的数据缺失。
3、问:用途有哪些?
答:邮件发送、手机短信发送,数据表单提交、图片生成、视频转换、日志储存等。
4、问:有哪些软件?
答:RabbitMQ、ZeroMQ、Posix、SquirrelMQ、Redis、QDBM、Tokyo Tyrant、HTTPSQS等(linux平台下)。
5、问:怎么实现?
答:顾名思义,先入队,后出队;先把数据丢到消息队列(入队),后根据相应的key来获取数据(出队)。出入队为先进先出。
6、问:Redis可以做消息队列?
答:首先,redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列,它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可;redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。
二、Redis与RabbitMQ作为消息队列的比较
RabbitMQ
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
Redis
是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。
1、可靠消费
Redis:没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理
RabbitMQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费
2、可靠发布
Reids:不提供,需自行实现
RabbitMQ:具有发布确认功能,保证消息被发布到服务器
3、高可用
Redis:采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案
RabbitMQ:集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作
4、持久化
Redis:将整个Redis实例持久化到磁盘
RabbitMQ:队列,消息,都可以选择是否持久化
5、消费者负载均衡
Redis:不提供,需自行实现
RabbitMQ:根据消费者情况,进行消息的均衡分发
6、队列监控
Redis:不提供,需自行实现
RabbitMQ:后台可以监控某个队列的所有信息,(内存,磁盘,消费者,生产者,速率等)
7、流量控制
Redis:不提供,需自行实现
RabbitMQ:服务器过载的情况,对生产者速率会进行限制,保证服务可靠性
8、出入队性能
对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。
测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。
注:此数据来源于互联网
应用场景分析
Redis:轻量级,高并发,延迟敏感
即时数据分析、秒杀计数器、缓存等
RabbitMQ:重量级,高并发,异步
批量数据异步处理、并行任务串行化,高负载任务的负载均衡等
三、Redis实现消息队列应用
1、配置Redis 消息监听器
<!-- Redis消息监听器 , 注意:不能填写ID值,否则会影响到HttpSessionListener --> <bean class="org.springframework.data.redis.listener.RedisMessageListenerContainer"> <property name="connectionFactory" ref="jedisConnectionFactory"></property> <property name="messageListeners"> <map> <entry key-ref="cacheMessageListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="${redis.cache.topic}" /> </bean> </entry> <entry key-ref="systemCacheMessageListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="${redis.system.topic}" /> </bean> </entry> </map> </property> </bean>
2、应用中,我们应用到的缓存监听器,在缓存发生变化时,进行更新。
<!– 缓存监听器 –>
<bean id=”cacheMessageListener” class=”com.legendshop.cache.listener.CacheMessageListener”>
<constructor-arg index=”0″ name=”redisTemplate” ref=”jedisTemplate”></constructor-arg>
<constructor-arg index=”1″ name=”redisCaffeineCacheManager” ref=”cacheManager”></constructor-arg>
</bean>
<bean id=”systemCacheMessageListener” class=”com.legendshop.cache.listener.SystemCacheMessageListener”>
<constructor-arg index=”0″ name=”redisTemplate” ref=”jedisTemplate”></constructor-arg>
</bean>
3、更新缓存时触发消息推送
private static String getTopic(){ if(topic == null){ CacheRedisCaffeineProperties cacheRedisCaffeineProperties = (CacheRedisCaffeineProperties) ContextServiceLocator.getInstance().getBean(CacheRedisCaffeineProperties.class); topic = cacheRedisCaffeineProperties.getRedis().getSystemTopic(); } return topic; }
public static void refreshLocalCache(String key, Object value, boolean setLocal) { if(setLocal){ commonObject.put(key, value); }else{ String topic = getTopic(); if (AppUtils.isNotBlank(topic)) { getRedisTemplate().convertAndSend(topic, new CacheMessage(SystemCacheEnum.PROPERTIES.value(), new KeyValueObj(key, value))); } } }
4、监听器代码
public class SystemCacheMessageListener implements MessageListener
@Override public void onMessage(Message message, byte[] pattern) { if (message.getBody() == null) { return; } ReentrantLock lock = new ReentrantLock(); try { lock.lock(); CacheMessage cacheMessage = (CacheMessage) redisTemplate.getValueSerializer().deserialize(message.getBody()); logger.debug("recevice a redis topic message, clear local cache, the cacheName is {}, the key is {}", cacheMessage.getCacheName(), cacheMessage.getKey()); //更新本地缓存 if(cacheMessage != null){ logger.warn("notify other node to update their cache {} ", cacheMessage.getCacheName()); updateCache.onMessage(cacheMessage); } } catch (Exception e) { logger.error("", e); }finally { lock.unlock(); } }