Redis实现简单的消息队列

一、消息队列介绍

1、问:什么是消息队列?

   答:是一个消息的链表,是一个异步处理的数据处理引擎。

2、问:有什么好处?

     答:不仅能够提高系统的负荷,还能够改善因网络阻塞导致的数据缺失。

3、问:用途有哪些?

     答:邮件发送、手机短信发送,数据表单提交、图片生成、视频转换、日志储存等。

4、问:有哪些软件?

     答:RabbitMQ、ZeroMQ、Posix、SquirrelMQ、Redis、QDBM、Tokyo Tyrant、HTTPSQS等(linux平台下)。

5、问:怎么实现?

     答:顾名思义,先入队,后出队;先把数据丢到消息队列(入队),后根据相应的key来获取数据(出队)。出入队为先进先出。

6、问:Redis可以做消息队列?

     答:首先,redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列,它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可;redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。

二、Redis与RabbitMQ作为消息队列的比较

RabbitMQ
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

Redis
是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。

1、可靠消费
Redis:没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理
RabbitMQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费

2、可靠发布
Reids:不提供,需自行实现
RabbitMQ:具有发布确认功能,保证消息被发布到服务器

3、高可用
Redis:采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案
RabbitMQ:集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作

4、持久化
Redis:将整个Redis实例持久化到磁盘
RabbitMQ:队列,消息,都可以选择是否持久化

5、消费者负载均衡
Redis:不提供,需自行实现
RabbitMQ:根据消费者情况,进行消息的均衡分发

6、队列监控
Redis:不提供,需自行实现
RabbitMQ:后台可以监控某个队列的所有信息,(内存,磁盘,消费者,生产者,速率等)

7、流量控制
Redis:不提供,需自行实现
RabbitMQ:服务器过载的情况,对生产者速率会进行限制,保证服务可靠性

8、出入队性能
对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。
测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。

注:此数据来源于互联网

应用场景分析
Redis:轻量级,高并发,延迟敏感
即时数据分析、秒杀计数器、缓存等

RabbitMQ:重量级,高并发,异步
批量数据异步处理、并行任务串行化,高负载任务的负载均衡等

三、Redis实现消息队列应用

1、配置Redis 消息监听器

<!-- Redis消息监听器 , 注意:不能填写ID值,否则会影响到HttpSessionListener -->
<bean class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
   <property name="connectionFactory" ref="jedisConnectionFactory"></property>
   <property name="messageListeners">
      <map>
         <entry key-ref="cacheMessageListener">
            <bean class="org.springframework.data.redis.listener.ChannelTopic">
               <constructor-arg value="${redis.cache.topic}" />
            </bean>
         </entry>
         <entry key-ref="systemCacheMessageListener">
            <bean class="org.springframework.data.redis.listener.ChannelTopic">
               <constructor-arg value="${redis.system.topic}" />
            </bean>
         </entry>
      </map>
   </property>
</bean>

2、应用中,我们应用到的缓存监听器,在缓存发生变化时,进行更新。

     <!– 缓存监听器 –>
    <bean id=”cacheMessageListener” class=”com.legendshop.cache.listener.CacheMessageListener”>
        <constructor-arg index=”0″ name=”redisTemplate” ref=”jedisTemplate”></constructor-arg>
        <constructor-arg index=”1″ name=”redisCaffeineCacheManager” ref=”cacheManager”></constructor-arg>
    </bean>
   
    <bean id=”systemCacheMessageListener” class=”com.legendshop.cache.listener.SystemCacheMessageListener”>
        <constructor-arg index=”0″ name=”redisTemplate” ref=”jedisTemplate”></constructor-arg>
    </bean>

3、更新缓存时触发消息推送

private static String getTopic(){ if(topic == null){ CacheRedisCaffeineProperties cacheRedisCaffeineProperties = (CacheRedisCaffeineProperties) ContextServiceLocator.getInstance().getBean(CacheRedisCaffeineProperties.class); topic = cacheRedisCaffeineProperties.getRedis().getSystemTopic(); } return topic; }

public static void refreshLocalCache(String key, Object value, boolean setLocal) { if(setLocal){ commonObject.put(key, value); }else{ String topic = getTopic(); if (AppUtils.isNotBlank(topic)) { getRedisTemplate().convertAndSend(topic, new CacheMessage(SystemCacheEnum.PROPERTIES.value(), new KeyValueObj(key, value))); } } }

4、监听器代码

public class SystemCacheMessageListener implements MessageListener
@Override
public void onMessage(Message message, byte[] pattern) {
   if (message.getBody() == null) {
      return;
   }
   ReentrantLock lock = new ReentrantLock();
   try {
      lock.lock();
      CacheMessage cacheMessage = (CacheMessage) redisTemplate.getValueSerializer().deserialize(message.getBody());
      logger.debug("recevice a redis topic message, clear local cache, the cacheName is {}, the key is {}", cacheMessage.getCacheName(),
            cacheMessage.getKey());
      //更新本地缓存
      if(cacheMessage != null){
         logger.warn("notify other node to update their cache {} ", cacheMessage.getCacheName());
         updateCache.onMessage(cacheMessage);
      }      
   } catch (Exception e) {
      logger.error("", e);
   }finally {
      lock.unlock();
   }
}

如何安装Zookeeper集群

Zookeeper集群方案

主机IP消息端口通信端口节点目录/usr/local/下
192.168.1.121812888:3888zookeeper
192.168.1.221812888:3888zookeeper
192.168.1.321812888:3888zookeeper

安装

安装JDK 略

以下操作3台机器同时操作

新建用户

useradd zookeeper

passwd zookeeper

设置密码

下载

官网下载zookeeper-3.4.6
Linux wget http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

解压

tar -zxvf zookeeper-3.4.6.tar.gz —C /usr/local

改名

cd /usr/local

mv zookeeper-3.4.6 zookeeper

建立以下文件夹

cd /usr/local/zookeeper

mkdir data

mkdir logs

修改配置文件

将/conf目录下的zoo_sample.cfg文件拷贝一份, 命名为为zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# 2888 端口号是 zookeeper 服务之间通信的端口。
# 3888 是 zookeeper 与其他应用程序通信的端口。
server.0=192.168.1.1:2888:3888
server.1=192.168.1.2:2888:3888
server.2=192.168.1.3:2888:3888

解释配置文件

initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不
是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到
Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没
有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是
5*2000=10 秒。

syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时
间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4
秒。

server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务
器的 IP 地址或/etc/hosts 文件中映射了 IP 的主机名;C 表示的是这个服务器与
集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务
器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是
用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是
一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同
的端口号。

新建myid文件

cd /usr/local/zookeeper/data

  • 192.168.1.1机器: echo 0 >> myid
  • 192.168.1.2机器: echo 1 >> myid
  • 192.168.1.3机器: echo 2 >> myid

编辑.bash_profile文件

添加

export ZOOKEEPER_HOME=/usr/local/zookeeper
# zookeeper env
export PATH=$ZOOKEEPER_HOME/bin:$PATH

使配置文件生效
$ source /home/zookeeper/.bash_profile

添加防火墙规则

在防火墙中打开要用到的端口2181、2888、3888

切换到 root 用户权限,执行以下命令:
chkconfig iptables on
service iptables start
vim /etc/sysconfig/iptables
增加以下 3 行:
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
重启防火墙:

service iptables restart

zk操作

启动zk

zkServer.sh start

查看有无启动成功

jps

1456 QuorumPeerMain
1475 Jps

启动成功

查看状态

zkServer.shstatus

查看日志文件

$ tail -500f /usr/local/zookeeper/logs/zookeeper.out

停止zookeeper进程:

zkServer.sh stop

配置zookeeper开机使用xubin用户启动:

编辑/etc/rc.local 文件,加入:

su – zookeeper -c ‘/usr/local/zookeeper/bin/zkServer.sh start’

zookeeper查看插件

eclipse

idea插件

直接插件库中搜索zookeeper

不过我不推荐使用
插件开发成那样还上线…(不过是免费的 想用就用吧)

Spring Cloud服务发现框架Eureka配置说明

服务治理是微服务架构中最为核心和基础的模块,它主要用来实现各个微服务实例的自动化注册和发现。

在 SpringCloud 中使用了大量的Netflix 的开源项目,而其中 Eureka 属于Netflix 提供的发现服务组件,所有的微服务在使用之中全部向 Eureka 之中进行注册,而后客户端直接利用 Eureka 进行服务信息的获取。


Eureka服务治理体系

服务注册

在服务治理框架中,通常都会构建一个注册中心,每个服务单元向注册中心登记自己提供的服务,包括服务的主机与端口号、服务版本号、通讯协议等一些附加信息。注册中心按照服务名分类组织服务清单,同时还需要以心跳检测的方式去监测清单中的服务是否可用,若不可用需要从服务清单中剔除,以达到排除故障服务的效果。

服务发现

在服务治理框架下,服务间的调用不再通过指定具体的实例地址来实现,而是通过服务名发起请求调用实现。服务调用方通过服务名从服务注册中心的服务清单中获取服务实例的列表清单,通过指定的负载均衡策略取出一个服务实例位置来进行服务调用。

Eureka服务端配置

1、新建的是一个maven新项目 , 在pom.xml追加相应的依赖支持

<dependencies>
<!--SpringCloud eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
</project>

2、修改yml 配置 文件,主要进行 eureka 服务的定义。

###服务端口号
server:
port: 8888
###eureka 基本信息配置
eureka:
instance:
###注册到eurekaip地址
hostname: 127.0.0.1
client:
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
###因为自己是为注册中心,不需要自己注册自己
register-with-eureka: false
###因为自己是为注册中心,不需要检索服务
fetch-registry: false
#server:
# 测试时关闭自我保护机制,保证不可用服务及时踢出
#enable-self-preservation: false
#eviction-interval-timer-in-ms: 2000

3、修改 Eureka 程序启动类,添加 Eureka 服务声明

/**
* eureka服务端启动
* @author zibin
*
*/
@SpringBootApplication
@EnableEurekaServer
public class EurekaApp {

/**
* The main method.
*
* @param args the args
*/
public static void main(String[] args) {
SpringApplication.run(EurekaApp.class, args);
}
}

4、启动 Eureka

IDEA启动SpringBoot项目无法访问JSP页面

Springboot项目,兼容JSP后 使用IDEA就一直报  
找不到的网页 ,报404错误,

application.yml 已经配置
spring:
application:
name:
legendshop-web-admin
mvc:
view:
# 页面默认前缀目录
prefix: /WEB-INF/JSP/
# 响应页面默认后缀
suffix: .jsp
但感觉IDEA 没有加载JSP成功。参考网上资料发现是要改两处:

1、添加 web.xml 在src\main\webapp\WEB-INF\web.xml

2、直接添加 Web Resources Directory 默认到项目,不用改。

3、在 Edit Configuration 修改运行的目录

重新启动就行了 。

Maven POM元素继承

为了减少重复代码的编写,我们需要创建POM的父子结构,然后在POM中申明一些配置供子POM继承,以实现“一处申明,多处使用的”目的。以之前的模块中的结构为基础,

创建父模块,创建一个pom.xml文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.legendshop</groupId>
  <artifactId>legendshop_parent</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>pom</packaging>
  <modules>
   <module>legendshop_basic</module>
   <module>legendshop_common</module>
   <module>legendshop_service_api</module>
   <module>legendshop_service</module>
  </modules>
  
  <!-- springBoot版本依赖2.0.7 -->
  <parent>
    <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.0.7.RELEASE</version>
  </parent>
  
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <java.version>1.8</java.version>
   </properties>

需要注意的是它的packaging的值必须为pom,这一点与模块聚合一样,作为父模块的POM,其打包类型也必须为pom。由于父模块只是为了消除配置的重复,因此也就不需要src/main/java等目录了。

有了父模块就让其他子模块来继承它,修改子类型的pom文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>cn.legendshop</groupId>
    <artifactId>legendshop_parent</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  <artifactId>legendshop_basic</artifactId>
  <packaging>pom</packaging>
  <modules>
   <module>legendshop_eureka</module>
   <module>legendshop_apollo</module>
   <module>legendshop_gateway</module>
   <module>legendshop-zipkin</module>
  </modules>
</project>

当前我们会再按类型建立自己的父子点

子项目再引用类型的父节点

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>cn.legendshop</groupId>
    <artifactId>legendshop_basic</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  <artifactId>legendshop_eureka</artifactId>
  
   <dependencies>
   <!--SpringCloud eureka-server -->
   <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
       <version>2.0.2.RELEASE</version>
   </dependency>
  </dependencies>
</project>

使用HTTPS访问配置Harbour

由于Harbour不附带任何证书,它默认使用HTTP来提供注册表请求。但是,强烈建议为任何生产环境启用安全性。Harbour有一个Nginx实例作为所有服务的反向代理,您可以使用准备脚本来配置Nginx来启用https。

获得证书

编辑https://help.aliyun.com/video_detail/54216.html?spm=a2c4g.11186623.4.2.Jd2zGG

配置和安装

编辑

获取yourdomain.com.crtyourdomain.com.key文件后,可以将它们放入如下目录中/root/cert/:

  cp yourdomain.com.crt /root/cert/
  cp yourdomain.com.key /root/cert/ 

接下来,编辑文件make / harbor.cfg,更新主机名和协议,并更新属性ssl_cert和ssl_cert_key:

  #set hostname
  hostname = reg.yourdomain.com
  #set ui_url_protocol
  ui_url_protocol = https
  ......
  #The path of cert and key files for nginx, they are applied only the protocol is set to https 
  ssl_cert = /root/cert/yourdomain.com.crt
  ssl_cert_key = /root/cert/yourdomain.com.key

为Harbour生成配置文件:

  ./prepare

如果Harbor已经运行,请停止并删除现有的实例。您的图像数据保留在文件系统中

  docker-compose down  

最后重启Harbour:

  docker-compose up -d

为Harbour设置HTTPS后,您可以通过以下步骤验证它:

  1. 打开浏览器并输入地址:https//reg.yourdomain.com。它应该显示Harbor的用户界面。
  2. 在具有Docker守护进程的机器上,确保选项“-insecure-registry”不存在,并且您必须将上述步骤中生成的ca.crt复制到/etc/docker/certs.d/reg.yourdomain.com(或您的注册表主机IP),如果该目录不存在,请创建它。如果您将nginx端口443映射到另一个端口,则应该创建目录/etc/docker/certs.d/reg.yourdomain.com:port(或您的注册表主机IP:端口)。然后运行任何docker命令来验证设置,例如
  docker login reg.yourdomain.com

如果您已将nginx 443端口映射到另一个端口,则需要添加要登录的端口,如下所示:

  docker login reg.yourdomain.com:port

故障排除

编辑

  1. 您可能会从证书颁发者处获得中间证书。在这种情况下,您应该将中间证书与您自己的证书合并以创建证书包。您可以通过以下命令来实现此目的:cat intermediate-certificate.pem >> yourdomain.com.crt
  2. 在运行docker守护程序的某些系统上,您可能需要在操作系统级别信任该证书。
    在Ubuntu上,这可以通过以下命令完成:cp youdomain.com.crt /usr/local/share/ca-certificates/reg.yourdomain.com.crt 更新CA证书在Red Hat(CentOS等)上,命令是:cp yourdomain.com.crt /etc/pki/ca-trust/source/anchors/reg.yourdomain.com.crt 更新CA信任