springcloud

微服务中分消费者(调用服务的)和提供者(被调用的服务),角色是相对的,一个服务既可以是提供者也可以是消费者

要注意版本的问题

微服务远程调用
spring中提供了一个工具restTemplate

在配置类中,启动类也属于配置类,注册bean对象

1
2
3
4
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}

在需要发送请求的服务中,使用RestTemplate对象发送请求即可,列如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;
@Autowired
private RestTemplate restTemplate;

public Order queryOrderById(Long orderId) {
// 1.查询订单
Order order = orderMapper.findById(orderId);
//url路径
//注:此处存在硬编码
String url="http://localhost:8081/user/"+order.getUserId();
//发送http请求,实现远程调用
User user = restTemplate.getForObject(url, User.class);
// 2.设置用户信息
order.setUser(user);
// 4.返回
return order;
}
}

Eureka

eureka的作用

消费者该如何获取服务提供者具体信息?

  • 服务提供者启动时向eureka注册自己的信息
  • eureka保存这些信息
  • 消费者根据服务名称向eureka拉取提供者信息

如果有多个服务提供者,消费者该如何选择?

  • 服务消费者利用负载均衡算法,从服务列表中挑选一个

消费者如何感知服务提供者健康状态?

  • 服务提供者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
  • eureka会更新记录服务列表信息,心跳不正常会被剔除
  • 消费者就可以拉取到最新的信息

搭建EurekaServer

eureka也是个微服务,eureka在启动的时候也会把自己注册到eureka上

1.创建maven项目,引入spring-cloud-starter-netflix-eureka-server的依赖

1
2
3
4
5
<dependency>
<!--eureka服务端依赖-->
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

2.编写启动类,添加@EnableEurekaServer注解
1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaServer
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class, args);
}
}

3.添加application.yml文件,编写下面的配置:

1
2
3
4
5
6
7
8
9
10
server:
port: 10086 #服务端口
spring:
application:
name: eurekaserver #服务名称
eureka:
client:
service-url: #eurek的地址信息
defaultZone: http://127.0.0.1:10086/eureka #注册中心地址

服务注册:注册user-service

1.在user-service项目下引入spring-cloud-starter-netflix-eureka-client的依赖

1
2
3
4
5
<dependency>
<!--eureka客户端端依赖-->
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

2.在application.yml文件,编写下面的配置

1
2
3
4
5
6
7
8
9
spring:
application:
name: userservice #服务名称

eureka:
client:
service-url: #eurek的地址信息
defaultZone: http://127.0.0.1:10086/eureka #注册中心地址

我们还可以将一个服务多次启动,来模拟多实例部署,但为了避免端口冲突,需要修改端口设置:

  • 在idea下方Services(服务)窗口中
  • 右键目标服务,选择CopConfiguration(复制配置)
  • 旧版idea,就在Environment项下的VM options里输入:-Dserver.port=端口号
  • 新版idea中,选择修改配置,在修改选项中勾选允许多个实例即可,按alter+p 覆盖配置属性 server.port 8082

前面发送http请求时,远程调用的url地址是写死了的,现在就可以用eureka完解决该问题

服务发现(拉取)

前面orderService中,因为需要获取完整的订单的信息,所以通过resrtemplate发送http请求来获取对应的user信息

服务拉取是基于服务名称获取服务列表,然后在对服务列表做负载均衡
1.修改OrderService的代码,修改访问的url路径,用服务名代替ip、端口:
String url="http://userservice/user/"+order.getUserId()

  1. 在order-service项目的启动类OrderApplication中的RestTemplate添加负载均衡注解:
    1
    2
    3
    4
    5
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
    return new RestTemplate();
    }

oderservice需要调用userservice,而userservice有多个实例时,可以通过定义IRule实现来修改负载均衡规则,有两种方式
1.代码方式:在消费者的启动类中,定义一个新的IRule(作用于全体)

1
2
3
4
@Bean
public IRule randomRule() {
return new RandomRule();
}

2.配置文件方式:在消费者的applicatiuon.yml文件中,添加新的配置也可以修改规则(针对某个微服务):
1
2
3
4
userservice:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule #负载均衡规则

饥饿加载

ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长。而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,可以通过下面的配置开启饥饿加载

1
2
3
4
5
ribbon:
eager-load:
enabled: true #开启饥饿加载
clients:
- userservice #指定饥饿加载的服务

Nacos

官网:nacos.io
启动:在bin目录进入cmd,执行startup.cmd -m standalone
浏览器输入运行成功的 Console的值即可看到nacos控制台界面
默认为8848端口,可以在conf下的application.properties下修改

服务注册到Nacos

1.在cloud-demo父工程中添加spring-cloud-alilbaba的管理依赖:

1
2
3
4
5
6
7
8
<!-- nacos的管理依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>

2.注释之前的服务中的eureka依赖

3.添加Nacos的客户端依赖

1
2
3
4
5
<!-- nacos客户端依赖包-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

4.修改application.yml文件(并注释掉eureka的配置)

1
2
3
4
5
spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848 #nacos服务地址

Nacos服务分级模型

  1. Nacos服务分级存储模型
    ① 一级是服务,例如userservice
    ②二级是集群,例如杭州或上海
    ③三级是实例,例如杭州机房的某台部署了userservice的服务器
  2. 如何设置实例的集群属性
    ① 修改application.yml文件,添加spring.cloud.nacos.discovery.cluster-name属性即可,如:
    1
    2
    3
    4
    5
    6
    spring:
    cloud:
    nacos:
    discovery:
    server-addr: localhost:8848 #nacos服务地址
    cluster-name: HN #集群名称 HN代指湖南

Nacos负载均衡

使服务优先访问集群内的服务
举例现在基于上面的配置,现在有一个order服务,,一个实例,属于集群HN,一个user服务,3个实例,2个属于HN集群,1个属于HZ集群。现在用order服务向user服务发送多次请求,发现三个实例均被调用到(能观察到都有日志输出)

现在对order服务中application.yml,添加如下负载均衡策略

1
2
3
userservice: #要做配置的微服务名称
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule #负载均衡规则

重启order服务,再次发送请求,可以看到,只有属于同一个集群的两个user服务响应了。即:优先选择本地集群,再在本地集群的多个服务当中随机访问(注意:不是轮询访问)。如果本地服务没有,或者本地服务挂了,才会跨集群访问(注意:不是跨集群就不能访问了)

总结Nacos负载均衡策略
1.优先选择同集群服务实例列表
2.本地集群找不到提供者,才去其它集群寻找,并且会报警告
3.确定了可用实例列表后,再采用随机负载均衡挑选实例

根据权重负载均衡

1.在nacos控制台可以点击实例后的编辑按钮即可修改权重
2.权重值为0~1,将权重配置为0.1即可大大降低被访问的频率
3.如果权重调成0,将不会被访问到

环境隔离-namespace(命名空间)

1.在Nacos控制台可以创建namespace,用来隔离不同的环境

  • 在左侧菜单栏点击命名空间,再点击新建命名空间
  • 填写命名空间名和描述信息

2.在application.yml配置文件中添加命名空间,值为命名空间id

1
2
3
4
5
spring:
cloud:
nacos:
discovery:
namespace: #dev环境

每个namespace都有唯一id,不同namespace下的服务不可见

nacos中的实例默认为临时实例,当实例状态为不健康时会直接在服务列表里干掉,设置为非临时实例,nacos则会每隔一段时间会查询服务的健康状态。
设置如下配置即可将配置该为非临时实例。

1
2
3
4
5
spring:
cloud:
nacos:
discovery:
ephemeral: false #是否是临时实例

Nacos于Eureka对比

Nacos与eureka的共同点

  • 都支持服务注册和服务拉取
  • 都支持服务提供者心跳方式做健康检测

Nacos与Eureka的区别

  • Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
    • 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
    • Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
    • Nacos集群默认采用AP方式,当集群中存在非临时
      实例时,采用CP模式;Eureka采用AP方式

Nacos配置管理

配置更改热更新

将配置交给nacos管理

在nacos的配置管理的配置列表中点击创建配置

  • Data ID: 唯一,服务名称-profile(运行环境).yaml,如、userservice-dev.yaml
  • Group: 一般默认即可
  • 描述:写配置文件作用,如、userservice的开发配置文件
  • 配置格式: yaml
  • 配置内容: 一般写那些需要更改的配置,
  • 再点击发布即可

配置如下信息便于下面验证:

1
2
pattern:
dateformat: yyyy-MM-dd HH:mm:ss

让微服务读取nacos中配置文件

配置获取的步骤如下:
项目启动->读取nacos中配置文件->读取本地配置文件application.yml->创建spring容器->加载bean

1.引入Nacos的配置管理客户端依赖:

1
2
3
4
5
<!--nacos的配置管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

2.在userservice中的resource目录添加一个bootstrap.yml文件,这个文件是引导文件,优先级高于application.yml

1
2
3
4
5
6
7
8
9
10
spring:
application:
name: userservice # 服务名称
profiles:
active: dev #环境
cloud:
nacos:
server-addr: localhost:8848 # nacos 地址
config:
file-extension: yaml # 文件后缀名

在user服务里读取配置来验证是否成功

1
2
3
4
5
6
7
@Value("${pattern.dateformat}")
private String dateformat;

@GetMapping("/now")
public String now(){
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dateformat));
}

多环境配置共享

微服务会从nacos中读取配置文件:
1.[服务名]-[spring.profile.active].yaml,环境配置
2.[服务名].yaml,默认配置,多环境共享

我们需要了解配置文件的优先级:
[服务名]-[环境].yaml>[服务名].yaml>本地配置

如:userservice-dev.yaml>userservice.yaml>本地的application.yaml

所以我们可以在nacos中新建一个服务名.yaml的配置文件,在里面填写我们的多环境共享配置。

统一服务在不同环境下能访问到公共环境配置,但如果当配置属性相同时,按优先级来

http客户端Feign

定义和使用Feign客户端

1.引入依赖

1
2
3
4
5
<!--feign客户端依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

2.在order-service的启动类上添加注解开启Feign的功能
@EnableFeignClients// 开启feign

3.编写Feign客户端:
主要是基于SpringMVC的注解来声明远程调用的信息,比如:

  • 服务名称:userservice
  • 请求方式:GET
  • 请求路径:/user/{id}
  • 请求参数:Long id
  • 返回值类型:User
1
2
3
4
5
6
7
@FeignClient("userservice")// 调用userservice服务
public interface UserClient {

//Get请求,返回user对象,参数id
@GetMapping("/user/{id}")
User findById(@PathVariable("id") Long id);
}

对比前后两种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;
@Autowired
private UserClient userClient;

public Order queryOrderById(Long orderId) {
// 1.查询订单
Order order = orderMapper.findById(orderId);

//2. 使用feign远程调用
User user = userClient.findById(order.getUserId());
// 3.封装user
order.setUser(user);
return order;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;
@Autowired
private RestTemplate restTemplate;

public Order queryOrderById(Long orderId) {
// 1.查询订单
Order order = orderMapper.findById(orderId);
// 2.远程调用
String url="http://userservice/user/"+order.getUserId();
User user = restTemplate.getForObject(url, User.class);
// 3.封装user
order.setUser(user);
return order;
}
}

自定义Feign的配置

Feign运行自定义配置来覆盖默认配置,可以修改的配置如下:

类型 作用 说明
feign.Logger.Level 修改日志级别 包含四种不同的级别:NONE、BASIC、HEADERS、FULL
feign.codec.Decoder 响应结果的解析器 http远程调用的结果做解析,例如解析json字符串为java对象
feign.codec.Encoder 请求参数编码 将请求参数编码,便于通过http请求发送
feign.Contract 支持的注解格式 默认是SpringMVC的注解
feign.Retryer 失败重试机制 请求失败的重试机制,默认是没有,不过会使用Ribbon的重试

一般我们需要配置的就是日志级别。配置Feign日志有两种方式:

1
2
3
4
5
feign:
client:
config:
default: #这里是default就是全局配置,如果写服务名称如userservice,那么就是对指定服务进行配置
logger-level: FULL #feign日志级别

首先声明一个Bean:

1
2
3
4
5
6
public class DefaultFeignConfiguration {
@Bean
public Logger.Level feignLoggerLevel() {
return Logger.Level.BASIC;// 日志级别
}
}

该配置类暂时不会生效,使它生效的话
如果要全局配置,则把它放到@EnableFeignClients注解里
如:@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration.class)// 开启feign

如果要局部配置,则把它放到@FeignClient注解里
如:@FeignClient(value = "userservice",configuration = DefaultFeignConfiguration.class)// 调用userservice服务
那么该日志就只针对userservice服务

Feign的性能优化

优化feign的性能主要包括:
1.使用连接池代替默认的URLConnection
2.日志级别,最好用basic或者none

feign性能优化-连接池配置
feign添加HttpClient的支持,引入依赖
该依赖已经被spring管理起来了,引入不需要管版本

1
2
3
4
5
<!-- 引入HtttpClient的依赖 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>

配置连接池

1
2
3
4
5
feign:
httpclient:
enabled: true #开启feign对httpclient的支持
max-connections: 200 #最大连接数
max-connections-per-route: 50 #每个路最大连接数

统一网关Gateway

网关功能有:身份认证和权限校验、服务路由、负载均衡、请求限流…
在SpringCloud中网关的实现包括两种:gatewayzuul
Zuul是基于Servlet的实现,属于阻塞式编程。而SpringCloudGateway则是基于Spring5中提供的WebFlux,属于响应式编程的实现,具备更好的性能。

搭建网关服务

1.创建新的module,引入SpringCloudGateway的依赖和nacos的服务发现依赖:

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<!--nacos服务注册发现依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--网关gateway依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
</dependencies>

2.创建启动类
1
2
3
4
5
6
@SpringBootApplication
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}

2.编辑路由配置以及nacos地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server:
port: 10010 # 网关端口
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
discovery:
server-addr: localhost:8848 #配置Nacos地址
gateway:
routes: # 网关路由配置
- id: user-service # 路由标识,必须唯一
uri: lb://userservice # 路由的目标地址,支持lb和http两种格式
predicates: # 路由断言,判断请求是否符合规范
- Path=/user/** # 路径断言,判断路径是否以/user开头,符合则转发到目标地址
- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**

启动网关,访问localhost:10010/user/1即被转发为locahost:8081/user/1
启动网关,访问localhost:10010/order/106即被转发为locahost:8080/order/1

路由断言工厂Route Predicate Factory

上面的配置文件中了解到predicates是路由断言,是判断规则。而配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件。
Spring提供了11种基本的Predicate工厂:

名称 说明 示例
After 是某个时间点后的请求 - After=2037-01-20T17:42:47.789-07:00[America/Denver]
Before 是某个时间点之前的请求 - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai]
Between 是某两个时间点之前的请求 - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver]
Cookie 请求必须包含某些cookie - Cookie=chocolate, ch.p
Header 请求必须包含某些header - Header=X-Request-ld, \d+
Host 请求必须是访问某个host(域名) - Host=.somehost.org, .anotherhost.org
Method 请求方式必须是指定方式 -Method=GET, POST
Path 请求路径必须符合指定规则 - Path=/red/(segment),/blue/**
Query 请求参数必须包含指定参数 - Query=name, Jack或者-Query=name
RemoteAddr 请求者的ip必须是指定范围 -RemoteAddr=192.168.1.1/24
Weight 权重处理

路由过滤器GatewayFilter

GatewayFilter是网关提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理:
[https://docs.spring.io/spring-cloud-gateway/reference/4.1-SNAPSHOT/spring-cloud-gateway.html],现在已有近37种过滤配置
下面只做几个演示

给所有进入xx服务的请求添加一个请求头

局部服务添加:在网关的user-service中添加过滤器

1
2
3
4
5
6
7
8
gateway:
routes:
- id: user-service # 路由标识,必须唯一
uri: lb://userservice # 路由的目标地址
predicates: # 路由断言,判断请求是否符合规范
- Path=/user/** # 路径断言,判断路径是否以/user开头,符合则转发到目标地址
filters: # 过滤器
- AddRequestHeader=Truth,Facts are the only test of truth! # 添加请求头,key为Truth,value为Facts are the only test of truth!

在userservice的服务中获取请求头并打印中控制台来验证

1
2
3
4
5
6
@GetMapping("/{id}")
public User queryById(@PathVariable("id") Long id,
@RequestHeader(value = "Truth",required = false) String truth) {
System.out.println("Truth : "+truth);
return userService.queryById(id);
}

当调用到该服务时可以看到Truth : Facts are the only test of truth!被输出在控制台

给所有微服务添加:在于routers的平级位置下方添加default-filters

1
2
3
4
5
6
7
8
9
10
11
12
gateway:
routes:
- id: user-service # 路由标识,必须唯一
uri: lb://userservice # 路由的目标地址
predicates: # 路由断言,判断请求是否符合规范
- Path=/user/** # 路径断言,判断路径是否以/user开头,符合则转发到目标地址
- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**
default-filters: # 全局过滤器
- AddRequestHeader=Truth,Facts are the only test of truth!

全局过滤器 GlobalFilter

全局过滤器的作用是处理一切进入网关的请求和微服务响应,于GatewayFilter的作用一样,区别在于GatewayFilter通过配置定义,处理逻辑是固定的。而GlobalFilter的逻辑需要自己写代码实现

实现方式为实现GlobalFilter接口

1
2
3
4
5
6
7
8
9
10
public interface GlobalFilter{

/**
* 处理当前请求,有必要的话通过{@link GatewayFilterChain}将请求交给下一个过滤器处理
* @param exchange 请求上下文,里面可以获取Request、Response等信息
* @param chain 用来把请求委托给下一个过滤器
* @return {@code Mono<Void>}返回标示当前过滤器业务结束
*/
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

通过一个模拟需求来演示如何实现:定义全局过滤器,拦截并判断用户身份
需求:定义全局过滤器,拦截请求,判断请求的参数是否满足以下条件:

  • 参数中是否有authorization,
  • authorizationca参数值是否为admin
    如果满足就放行,否则则拦截

在gateway中新建一个类AuthorizeFilter

  • 1.实现GlobalFilter接口
  • 2.添加@Order注解或实现Ordered接口
  • 3.编写处理逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Order(-1)//值越小,优先级越高,也可以实现Ordered接口来实现优先级
@Component
public class AuthorizeFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//1.获取请求参数
ServerHttpRequest request = exchange.getRequest();
MultiValueMap<String, String> params = request.getQueryParams();
//2.获取参数中的authorization参数
String auth = params.getFirst("Authorization");
//3.判断参数值是否为admin
if("admin".equals(auth)){
//4.是,放行
return chain.filter(exchange);
}

//5.否,拦截,设置状态码为401,
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
//6.拦截请求
return exchange.getResponse().setComplete();
}
}

可以访问http://localhost:10010/order/107?Authorization=admin来验证,当不带Authorization时会返回401

执行顺序

路由过滤器,defaultFilter,全局过滤器的执行顺序:

  • 1.order值越小,优先级越高
  • 2.当order值一样时,顺序是defaultFilter最先,然后是局部路由过滤器,最后是全局过滤器

跨域问题处理

跨域:域名不一致就是跨域,主要包括:
域名不同:www.taobao.com 和www.taobao.org 和www.jd.com 和miaosha.jd.com
域名相同,端口不同:localhost:8080和localhost8081
跨域问题:浏览器禁止请求的发起者与服务端发生跨域ajax请求,请求被浏览器拦截的问题
解决方案:CORS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spring:
cloud:
gateway:
#...
globalcors: # 全局的跨域处理
add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题
corsConfigurations:
'[/**]':
allowedOrigins: # 允许哪些网站的跨域请求
- "http://localhost:8090"
- "http://www.leyou.com"
allowedMethods: # 允许跨域的ajax的请求方式
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders: "*" # 允许在请求中携带的头信息
allowedCredentials: true # 是否允许携带cookie
maxAge: 360000 # 这次跨域检测的有效期

Docker

镜像:Docker将应用程序及其所需的依赖,函数库,环境,配置文件等打包在一起,称为镜像。
容器:镜像中的应用程序运行后形成的进程就是容器,只是Docker会做容器做隔离,对外不可见。

docker架构
Docker是一个CS架构的程序,由两部分组成:
服务端(server):Docker守护进程,负责处理Docker指令,管理镜像,容器等
客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。

DockerHub
一个镜像托管的服务器,类似的还有阿里云镜像服务,统称为DockerRegistry

在CentOS上安装Docker

关于centos,可以看我往期linux的文章:

如果之前安装过旧版本的Docker,可以使用下面命令卸载:

1
2
3
4
5
6
7
8
9
10
11
yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine \
docker-ce

首先需要虚拟机联网,安装yum工具

1
2
3
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2 --skip-broken

然后更新本地镜像源:

1
2
3
4
5
6
7
8
# 设置docker镜像源
yum-config-manager \
--add-repo \
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

sed -i 's/download.docker.com/mirrors.aliyun.com\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo

yum makecache fast

然后输入命令:

1
yum install -y docker-ce

docker-ce为社区免费版本。稍等片刻,docker即可安装成功。

启动docker

Docker应用需要用到各种端口,逐一去修改防火墙设置。非常麻烦,因此建议大家直接关闭防火墙!

1
2
3
4
# 关闭
systemctl stop firewalld
# 禁止开机启动防火墙
systemctl disable firewalld

通过命令启动docker:

1
2
3
4
5
systemctl start docker  # 启动docker服务

systemctl stop docker # 停止docker服务

systemctl restart docker # 重启docker服务
1
2
3
systemctl status docker # 查看docker服务状态

docker -v # 查看docker版本

配置镜像加速

docker官方镜像仓库网速较差,我们需要设置国内镜像服务:

最新请参考阿里云的镜像加速文档:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors

通过修改daemon配置文件/etc/docker/daemon.json来使用加速器

1
2
3
4
5
6
7
8
sudo mkdir -p /etc/docker #创建文件夹
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://yi6wshly.mirror.aliyuncs.com"]
}
EOF # 配置写入daemon.json文件中
sudo systemctl daemon-reload #重新加载文件
sudo systemctl restart docker #重启docker

Docker基本操作

镜像操作

镜像名称一般由两部分组成:[repository]:[tag]。如mysql:5.7和mysql:5.6就是两个不同的镜像
如果不写tag,则默认拉取最新版本的,tag即为latest

操作 命令
Dockerfile构建镜像到本地: docker build
local(本地)查看镜像: docker images
local(本地)删除镜像: docker rmi 后面接镜像名
从Docker Registry镜像服务器拉取镜像到本地: docker pull 后接[repository]:[tag]
本地保存镜像为一个压缩包: docker save 后接 -o 压缩包名 镜像名
加载压缩包为镜像: docker load 后面接 压缩包名
更多命令请查看帮助文档: docker --helper
查看具体命令: docker command --helpe

例:拉取nignx的镜像:docker pull nginx

更多请前往:https://hub.docker.com/
嘘!更多有关docker镜像:

操作示例:

1
2
3
4
5
docker pull redis  # 拉取最新的redis镜像
docker images # 查看本地镜像
docker save -o redis.tar redis:latest # 将redis:latest镜像保存为redis.tar压缩包
docker rmi redis:latest # 删除redis:latest镜像
docker load -i redis.tar # 将reids.tar压缩包加载为镜像

容器操作

镜像运行为容器:docker run
当不知道运行某个容器时,建议去dockerHub官网上查看:https://hub.docker.com/

容器常用操作 命令
运行到暂停 docker pause
暂停到运行 docker unpasue
运行到停止 docker stop
停止到运行 docker start
查看容器运行日志 docker logs
查看所有运行的容器及状态 docker ps
进入容器执行命令 docker exec
删除指定容器 docker rm

容器操作示例:拉取nginx镜像,创建一个nginx容器,运行nginx容器,进入nginx容器,修改html内容,添加”阿徐到此一游“

查看容器操作示例

运行一个名nginx的容器,取名为mn
docker pull nginx:拉取最新的nginx镜像

去docker hub 查看Nginx的容器运行命令
docker run --name mn -p 80:80 -d nginx
命令解读:

  • docker run: 创建并运行一个容器
  • --name: 给容器取一个名字,这里叫mn
  • -p:将宿主机端口与容器端口映射,冒号左侧是宿主端口,右侧是容器端口
  • -d:后台运行容器
  • nginx:镜像名称,列如nginx

运行完返回的一长串字符是容器id(CONTAINER ID)
如,我的虚拟机的ip为192.168.255.128,运行完后访问192.168.255.128:80即可看到nginx的运行界面

docker logs mn:查看日志

进入容器:docker exec -it mn bash
命令解读:

  • docker exec:进入容器内部,执行一个命令
  • -it:给当前容器创建一个标准输入,输出终端,允许我们与容器交互
  • mn:要进入的容器的名称,这里前面我们创建的nginx容器叫mn
  • bash:进入容器后执行的命令,bash是一个linux终端交互命令

可以看到界面左侧变成了:容器id:/#的格式
输入pwd发现在根目录/,输入ls可以看到熟悉的home lib64 bin media root sys usr var等目录,这就是一个阉割的linux系统,我们需要找到nginx在哪个目录,访问https://hub.docker.com/_/nginx:官网可知静态资源应该放在/usr/share/nginx/html目录下。

切换路径:cd /usr/share/nginx/html,ls可看到50x.htmlindex.html

替代原有的内容:

1
2
sed -i 's#Welcome to nginx#阿徐到此一游#g' index.html
sed -i 's#<head>#<head><meta charset="utf-8">#g' index.html

刷新网页即可看到改动。

输入exit退出容器,停止容器:docker stop mn

输入docker ps -a即可查看所有容器状态,包括已经停止的容器

数据卷(容器数据管理)

容器现在与数据十分耦合,导致不便于修改,数据不可复用,升级维护困难
数据卷(volume)是一个虚拟目录,指向宿主机文件系统中的某个目录

数据操作的基本语法为:docker volume [COMMAND]
docker volume命令是数据卷操作,根据命令后跟随的command来确定下一步操作:

  • create 创建一个volume
  • inspect 显示一个或多个volume的信息
  • ls 列出所有的的volume
  • prune 删除未使用的volume
  • rm 删除一个或多个指定的volume

操作示例:创建一个数据卷,并查看数据卷在宿主机的目录位置

查看操作示例

创建数据卷:docker volume create html

查看所有数据:docker volume ls

查看数据卷详细信息卷:docker volume inspect html

挂载数据卷
我们在创建容器时,可以通过-v参数来挂载一个数据卷到某个容器目录

操作示例:创建一个nginx容器,修改容器内的html目录内的index.html内容

查看操作示例挂载数据卷

1.创建一个叫mn的nginx容器并运行它,同时挂载html数据卷到容器内的/usr/share/nginx/html目录上,并把nginx的80端口暴露在虚拟机80端口。
docker run --name mn -p 80:80 -v html:/usr/share/nginx/html -d nginx

注:如果挂载时数据卷不存在,docker会自动创建该数据卷

2.查看数据卷html信息docker inspect html
"Mountpoint": "/var/lib/docker/volumes/html/_data",,得知挂载点在这个目录里/var/lib/docker/volumes/html/_data

3.进入数据卷所在位置,并修改html内容
cd /var/lib/docker/volumes/html/_data
ls发现里面存在了50x.html和index.html证明我们已经将容器的/usr/share/nginx/html目录挂载到了html数据卷的真实目录下

现在我们可以直接在/var/lib/docker/volumes/html/_data这个目录下对文件进行修改

注:如果用finashell在目录栏输入这个地址一直在加载的话,在finashell连接编辑中,把用户名改为root

右键index.html文件,用系统相关打开(如用vscode),用vscode直接编辑即可,保存刷新网页即可看到效果

我们也可以直接将宿主机的目录挂载到容器内的目录:-v [宿主机目录]:[容器内目录]
也可以将宿主机文件挂载到容器内文件:-v [宿主机文件]:[容器内文件]

Dockerfile自定义镜像

  1. Dockerfile的本质是一个文件,通过指令描述镜像的构建过程
  2. Dockerfile的第一行必须是FROM,从一个基础镜像来构建
  3. 基础镜像可以是基本操作系统,如Ubuntu。也可以是其他人制作好的镜像,例如:java:8-alpine

DockerCompose

DockerCompose的详细语法参考官网:https://docs.docker.com/compose/compose-file/
DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。
Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。

作用:帮助我们快速部署分布式应用,无需一个个微服务去构建镜像和部署。

Docker镜像仓库

搭建私有镜像厂库

配置Docker信任地址

我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:
ip地址记得换成自己虚拟机的ip。

1
2
3
4
5
6
7
8
# 打开要修改的文件
vi /etc/docker/daemon.json
# 添加内容:
"insecure-registries":["http://192.168.255.128:8080"]
# 重加载
systemctl daemon-reload
# 重启docker
systemctl restart docker

带有图形化界面版本

1.创建一个docker镜像仓库存放的文件夹如:mkdir registry-ui(在哪个目录创建都差不多,这里是/tmp/)
2.进入该文件夹:cd registry-ui/
3.创建一个docker-compose文件:touch docker-compose.yml
4.将下面命令粘贴到文件内:
使用DockerCompose部署带有图象界面的DockerRegistry,命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
version: '3.0'
services:
registry:
image: registry
volumes:
- ./registry-data:/var/lib/registry
ui:
image: joxit/docker-registry-ui:static
ports:
- 8080:80
environment:
- REGISTRY_TITLE=阿杰学习私有仓库
- REGISTRY_URL=http://registry:5000
depends_on:
- registry

5.执行docker-compose up -d。之后查看日志docker-compose logs -f可以看到已经启动了,访问http://192.168.255.128:8080/即可看到我们部署的私有仓库

向私有仓库推送或拉取镜像

推送镜像到私有镜像服务前必须先tag,步骤如下:

  1. 本地有一个nginx:latest镜像,重新tag镜像,名称前缀为私有仓库的地址:192.168.255.128:8080/
    docker tag nginx:latest 192.168.255.128:8080/nginx:1.0

  2. 推送镜像
    docker push 192.168.255.128:8080/nginx:1.0
    刷新仓库界面,可以看到镜像上传到成功私有仓库

  3. 拉取镜像
    docker pull 192.168.255.128:8080/nginx:1.0

服务异步通讯-RabbitMQ

MQ,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
我们在Centos7虚拟机中使用Docker来安装。
在线拉取

1
docker pull rabbitmq:3-management

执行下面的命令来运行MQ容器:

1
2
3
4
5
6
7
8
9
docker run \
-e RABBITMQ_DEFAULT_USER=xnj \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

访问:http://192.168.255.128:15672

创建一个mq-demo父工程,管理consumer和publisher两个子模块,后面以该项目来演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.itcast.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.itcast.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>publisher</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>

基本消息队列(BasicQueue)

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接收并缓存消息
  • consumer:订阅队列,处理队列的消息

consumer中创建测试类ConsumerTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ConsumerTest {

public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.255.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xnj");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();

// 2.创建通道Channel
Channel channel = connection.createChannel();

// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}

publisher中创建测试类PublisherTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.255.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xnj");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();

// 2.创建通道Channel
Channel channel = connection.createChannel();

// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");

// 5.关闭通道和连接
channel.close();
connection.close();

}
}

基本消息队列的消息发送流程
建立connection,建立channel,利用channel声明队列,使用channel向队列发送消息
debug运行PublisherTest,可以在rabbitMq看到,依次建立了连接,然后建立通道,有了通道后就可以向队列中发送消息,之后创建了一个simple.queue的队列,点开该队列,再点GetMessage(s),可以看到发送的消息内容:hello, rabbitmq!,并且可以注意到,发送了消息之后,发送者就已经关闭了通道和连接。

基本消息队列的消息接收流程
建立connection,创建channel,利用channel声明队列,定义consumer的消费行为handleDelivery(),利用channel将消费者与队列绑定
运行ConsumerTest,可以看到rabbitMq有新的连接出现,之后创建通道,再声明队列(但并没有新的队列产生),但它预防了建立连接时队列不存在的情况,接收消息是回调机制,消费者回调函数与队列绑定后,等待rabitMq把消息投递回来后,回调函数才会执行,所以先打印:等待接收消息。。。。接收到消息,接收完消息后,rabbitMq上就会删除消息(阅后即焚)

官方的比较繁琐。下面使用SpringAOP来实现基础消息队列功能

  1. 在父工程引入依赖

    1
    2
    3
    4
    5
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 在publisher中的application.yml配置文件中,添加mq连接信息

    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 192.168.255.128 #主机名
    port: 5672 #端口号
    virtual-host: / #虚拟主机
    username: xnj #用户名
    password: 123456 #密码
  3. 在publisher服务中新建一个测试类,编写测试方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
    String queueName = "simple.queue";
    String message = "hello, spring amqp!";
    rabbitTemplate.convertAndSend(queueName, message);
    }
    }
  4. 在consumer中配置文件,添加mq连接信息

    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 192.168.255.128 #主机名
    port: 5672 #端口号
    virtual-host: / #虚拟主机
    username: xnj #用户名
    password: 123456 #密码
  5. 在consumer服务中新建一个类,编写消费逻辑:

    1
    2
    3
    4
    5
    6
    7
    8
    @Component
    public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException{
    System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
    }

消息一旦消费就会从队列中删除,RabbitMQ没有消息回溯功能

工作消息队列(WorkQueue)

工作队列可以提高消息处理速度,避免队列消息堆积

发送者

1
2
3
4
5
6
7
8
9
@Test
public void testsendMessageWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message__!";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message+i);
Thread.sleep(20);
}
}

消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenWork1QueueMessage(String msg) throws InterruptedException{
System.out.println("消费者1接收到消息:【" + msg + "】"+ LocalDateTime.now());
Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWork2QueueMessage(String msg) throws InterruptedException{
System.err.println("消费者2接收到消息:【" + msg + "】"+ LocalDateTime.now());
Thread.sleep(200);
}
}

修改application.yml,设置preFetch这个值,可以控制预取消息的上限:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: 192.168.255.128 #主机名
port: 5672 #端口号
virtual-host: / #虚拟主机
username: xnj #用户名
password: 123456 #密码
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成之后才能获取下一个消息

发布订阅(Publish、Subscribe)

发布和订阅模式与前面的区别是允许将同一消息发送给多个消费者,实现方式是加了exchange(交换机)
交换机能接收publisher发送的消息,将消息按规则路由到与之绑定的队列,不能缓存消息,路由失败,消息丢失。
根据交换机的类型可以分为三种:

  • Fanout Exchange: 广播
  • Direct Exchange: 路由
  • Topic Exchange: 主题

Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange,Queue和绑定关系对象Binding:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class FanoutConfig {
//声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hnit.fanout");
}
//声明第一个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑定队列1到交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//....略,以相同方式声明第2个队列,并完成绑定
}

接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.err.println("消费者接收到fanout.queue1的消息:【" + msg + "】"+ LocalDateTime.now());
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.err.println("消费者接收到fanout.queue2的消息:【" + msg + "】"+ LocalDateTime.now());
}
}


在publisher的测试类中发送
1
2
3
4
5
6
7
8
9
@Test
public void testsendFanoutExchange() throws InterruptedException {
// 交换机名称
String exchangeName = "hnit.fanout";
//消息
String message = "hello, everybody!";
//发送消息,第一个参数是交换机名称,第二个参数是路由键,第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName,"", message);
}

运行能看到一次发送,多个消费者都能接收到

Direct Exchange

Direct Exchange会将消息根据规则路由到指定的Queue,因此称为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在消费者SpringRabbitListener里添加如下,运行后能在RabbitMQ中看到direct.queue1和direct.queue2两个队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name="hnit.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg) {
System.err.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name="hnit.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg) {
System.err.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

在发送者里指定发送的routerKey=blue

1
2
3
4
5
6
7
8
9
@Test
public void testsendDirectExchange() {
// 交换机名称
String exchangeName = "hnit.direct";
//消息
String message = "hello, blue!";
//发送消息,第一个参数是交换机名称,第二个参数是路由键,第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName,"blue", message);
}

consumer控制台输出 消费者接收到direct.queue1的消息:【hello, blue!】
将路由键改为yellow 消费者接收到direct.queue2的消息:【hello, yellow!】
将路由键改为red :
消费者接收到direct.queue2的消息:【hello, red!】
消费者接收到direct.queue1的消息:【hello, red!】

TopicExchange

TopicExchange和DirectExchange类似,区别在于routingKey必须是多个单词列表,并且以.分隔
如:china.news 代表中国的新闻消息、china.weather 代表中国天气消息、japan.news、japan.weather
Queue于Exchange指定BindingKey时可以使用通配符:#代指0个或多个单词,*:代指一个单词

下面示例
消费者:

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue"),
exchange = @Exchange(name="hnit.topic",type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue(String msg) {
System.err.println("消费者接收到topic.queue的消息:【" + msg + "】");
}

生产者:
1
2
3
4
5
6
7
8
9
@Test
public void testsendTopicExchange() {
// 交换机名称
String exchangeName = "hnit.topic";
//消息
String message = "今天的天气真不错!";
//发送消息,第一个参数是交换机名称,第二个参数是路由键,第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName,"china.weather", message);
}

SpringAMQP—消息转换器

前面我们发送的方法中,消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
我们在consumer中利用@Bean声明一个队列

1
2
3
4
@Bean
public Queue objectMessageQueue(){
return new Queue("object.queue");
}

在publisher中发送消息以测试
1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testsendObjectMessage() {
// 队列名称
String queueName = "object.queue";
//准备消息
Map<String, Object> msg = new HashMap<>();
msg.put("name", "张三");
msg.put("age", 20);
msg.put("address", "北京市");
// 发送消息
rabbitTemplate.convertAndSend(queueName, msg);
}

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用ISON方式序列化,步骤如下:
1.在publisher服务引入依赖
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

2.在publisher服务声明

1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

接收json格式消息
1.在consumer服务引入Jackson依赖

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

2.在consumer服务定义MessageConverter

1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

3.接收消息

1
2
3
4
5
6
7
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg) {
System.err.println("消费者接收到object.queue的消息:【" + msg + "】");
}
}

控制台输出:消费者接收到object.queue的消息:【{address=北京市, name=张三, age=20}】