微服务
springcloud-RestTemplate
说明
微服务中分消费者(调用服务的)和提供者(被调用的服务),角色是相对的,一个服务既可以是提供者也可以是消费者
要注意版本的问题
微服务项目中,当一个模块需要向另一个模块数据和操作时,微服务远程调用
spring中提供了一个工具restTemplate
1
2
3
4
5
6<!--spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2021.0.3</version>
</dependency>配置
在配置类中(启动类也属于配置类)注册bean对象1
2
3
4
public RestTemplate restTemplate() {
return new RestTemplate();
}使用
在需要发送请求的服务中,使用RestTemplate对象发送请求即可,列如:
如下场景中,购物车服务cart-service
需要向商品服务item-service
查询商品信息,此时需要远程调用商品服务,使用RestTemplate:2.1~2.3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class CartServiceImpl extends ServiceImpl<CartMapper, Cart> implements ICartService {
// 1.注入restTemplate
private RestTemplate restTemplate;
// 2.查询我的购物车列表
public List<CartVO> queryMyCarts() {
// 1.查询我的购物车列表
List<Cart> carts = lambdaQuery().eq(Cart::getUserId, 1L/*TODO UserContext.getUser()*/).list();
if (CollUtils.isEmpty(carts)) {
return CollUtils.emptyList();
}
// 2.转换VO
List<CartVO> vos = BeanUtils.copyList(carts, CartVO.class);
// 3.处理VO中的商品信息
handleCartItems(vos);
// 4.返回
return vos;
}
// 3.远程调用商品服务
private void handleCartItems(List<CartVO> vos) {
//TODO 1.获取商品id
Set<Long> itemIds = vos.stream().map(CartVO::getItemId).collect(Collectors.toSet());
/* // 2.查询商品 ,为单体项目时,直接调用商品查询即可,现在拆分为微服务模块
List<ItemDTO> items = itemService.queryItemByIds(itemIds);
if (CollUtils.isEmpty(items)) {
return;
}*/
//2.查询商品
//2.1 发送商品管理服务(item-service)远端请求
ResponseEntity<List<ItemDTO>> response = restTemplate.exchange(
"http://localhost:8081/items?ids={ids}", // url 根据商品id查询商品列表
HttpMethod.GET, // 请求方式
null, // 请求实体
new ParameterizedTypeReference<List<ItemDTO>>() {}, // 返回值类型
Map.of("ids", CollUtil.join(itemIds, ","))
);
//2.2 解析响应,判断是否成功
if (!response.getStatusCode().is2xxSuccessful()) {
// 失败
return;
}
//2.3 解析响应,获取数据
List<ItemDTO> items = response.getBody();
if (CollUtils.isEmpty(items)) {
return;
}
// 3.转为 id 到 item的map
Map<Long, ItemDTO> itemMap = items.stream().collect(Collectors.toMap(ItemDTO::getId, Function.identity()));
// 4.写入vo
for (CartVO v : vos) {
ItemDTO item = itemMap.get(v.getItemId());
if (item == null) {
continue;
}
v.setNewPrice(item.getPrice());
v.setStatus(item.getStatus());
v.setStock(item.getStock());
}
}
}总结
在这个过程中,item-service
提供了查询接口,cart-service
利用Http请求调用该接口。因此item-service可以称为服务的提供者,而cart-service则称为服务的消费者或服务调用者
Eureka
eureka的作用
消费者该如何获取服务提供者具体信息?
- 服务提供者启动时向eureka注册自己的信息
- eureka保存这些信息
- 消费者根据服务名称向eureka拉取提供者信息
如果有多个服务提供者,消费者该如何选择?
- 服务消费者利用负载均衡算法,从服务列表中挑选一个
消费者如何感知服务提供者健康状态?
- 服务提供者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
- eureka会更新记录服务列表信息,心跳不正常会被剔除
- 消费者就可以拉取到最新的信息
搭建EurekaServer
eureka也是个微服务,eureka在启动的时候也会把自己注册到eureka上
1.创建maven项目,引入spring-cloud-starter-netflix-eureka-server的依赖1
2
3
4
5<dependency>
<!--eureka服务端依赖-->
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
2.编写启动类,添加@EnableEurekaServer注解1
2
3
4
5
6
7
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class, args);
}
}
3.添加application.yml文件,编写下面的配置:1
2
3
4
5
6
7
8
9
10server:
port: 10086 #服务端口
spring:
application:
name: eurekaserver #服务名称
eureka:
client:
service-url: #eurek的地址信息
defaultZone: http://127.0.0.1:10086/eureka #注册中心地址
服务注册:注册user-service
1.在user-service项目下引入spring-cloud-starter-netflix-eureka-client的依赖1
2
3
4
5<dependency>
<!--eureka客户端端依赖-->
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
2.在application.yml文件,编写下面的配置1
2
3
4
5
6
7
8
9spring:
application:
name: userservice #服务名称
eureka:
client:
service-url: #eurek的地址信息
defaultZone: http://127.0.0.1:10086/eureka #注册中心地址
我们还可以将一个服务多次启动,来模拟多实例部署,但为了避免端口冲突,需要修改端口设置:
- 在idea下方Services(服务)窗口中
- 右键目标服务,选择CopConfiguration(复制配置)
- 旧版idea,就在Environment项下的VM options里输入:
-Dserver.port=端口号
- 新版idea中,选择修改配置,在修改选项中勾选允许多个实例即可,按alter+p 覆盖配置属性 server.port 8082
前面发送http请求时,远程调用的url地址是写死了的,现在就可以用eureka完解决该问题
服务发现(拉取)
前面orderService中,因为需要获取完整的订单的信息,所以通过resrtemplate发送http请求来获取对应的user信息
服务拉取是基于服务名称获取服务列表,然后在对服务列表做负载均衡
1.修改OrderService的代码,修改访问的url路径,用服务名代替ip、端口:String url="http://userservice/user/"+order.getUserId()
- 在order-service项目的启动类OrderApplication中的RestTemplate添加负载均衡注解:
1
2
3
4
5
public RestTemplate restTemplate() {
return new RestTemplate();
}
oderservice需要调用userservice,而userservice有多个实例时,可以通过定义IRule实现来修改负载均衡规则,有两种方式
1.代码方式:在消费者的启动类中,定义一个新的IRule(作用于全体)1
2
3
4
public IRule randomRule() {
return new RandomRule();
}
2.配置文件方式:在消费者的applicatiuon.yml文件中,添加新的配置也可以修改规则(针对某个微服务):1
2
3
4userservice:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule #负载均衡规则
饥饿加载
ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长。而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,可以通过下面的配置开启饥饿加载1
2
3
4
5ribbon:
eager-load:
enabled: true #开启饥饿加载
clients:
- userservice #指定饥饿加载的服务
Nacos
官网:nacos.io
启动:在bin目录进入cmd,执行startup.cmd -m standalone
浏览器输入运行成功的 Console
的值即可看到nacos控制台界面
默认为8848端口:localhost:88848/nacos
可以在conf下的application.properties下修改
服务注册到Nacos
1.在cloud-demo父工程中添加spring-cloud-alilbaba的管理依赖:1
2
3
4
5
6
7
8<!-- nacos的管理依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
2.注释之前的服务中的eureka依赖
3.添加Nacos的客户端依赖1
2
3
4
5<!-- nacos客户端依赖包-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
4.修改application.yml文件(并注释掉eureka的配置)1
2
3
4
5spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848 #nacos服务地址
Nacos服务分级模型
- Nacos服务分级存储模型
① 一级是服务,例如userservice
②二级是集群,例如杭州或上海
③三级是实例,例如杭州机房的某台部署了userservice的服务器 - 如何设置实例的集群属性
① 修改application.yml文件,添加spring.cloud.nacos.discovery.cluster-name
属性即可,如:1
2
3
4
5
6spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848 #nacos服务地址
cluster-name: HN #集群名称 HN代指湖南
Nacos负载均衡
使服务优先访问集群内的服务
举例现在基于上面的配置,现在有一个order服务,,一个实例,属于集群HN,一个user服务,3个实例,2个属于HN集群,1个属于HZ集群。现在用order服务向user服务发送多次请求,发现三个实例均被调用到(能观察到都有日志输出)
现在对order服务中application.yml,添加如下负载均衡策略1
2
3userservice: #要做配置的微服务名称
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule #负载均衡规则
重启order服务,再次发送请求,可以看到,只有属于同一个集群的两个user服务响应了。即:优先选择本地集群,再在本地集群的多个服务当中随机访问(注意:不是轮询访问)。如果本地服务没有,或者本地服务挂了,才会跨集群访问(注意:不是跨集群就不能访问了)
总结Nacos负载均衡策略
1.优先选择同集群服务实例列表
2.本地集群找不到提供者,才去其它集群寻找,并且会报警告
3.确定了可用实例列表后,再采用随机负载均衡挑选实例
根据权重负载均衡
1.在nacos控制台可以点击实例后的编辑按钮即可修改权重
2.权重值为0~1,将权重配置为0.1即可大大降低被访问的频率
3.如果权重调成0,将不会被访问到
环境隔离-namespace(命名空间)
1.在Nacos控制台可以创建namespace,用来隔离不同的环境
- 在左侧菜单栏点击命名空间,再点击新建命名空间
- 填写命名空间名和描述信息
2.在application.yml配置文件中添加命名空间,值为命名空间id1
2
3
4
5spring:
cloud:
nacos:
discovery:
namespace: #dev环境
每个namespace都有唯一id,不同namespace下的服务不可见
nacos中的实例默认为临时实例,当实例状态为不健康时会直接在服务列表里干掉,设置为非临时实例,nacos则会每隔一段时间会查询服务的健康状态。
设置如下配置即可将配置该为非临时实例。1
2
3
4
5spring:
cloud:
nacos:
discovery:
ephemeral: false #是否是临时实例
Nacos于Eureka对比
Nacos与eureka的共同点
- 都支持服务注册和服务拉取
- 都支持服务提供者心跳方式做健康检测
Nacos与Eureka的区别
- Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
- 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
- Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
- Nacos集群默认采用AP方式,当集群中存在非临时
实例时,采用CP模式;Eureka采用AP方式
Nacos配置管理
配置更改热更新
将配置交给nacos管理
在nacos的配置管理的配置列表中点击创建配置
- Data ID: 唯一,服务名称-profile(运行环境).yaml,如、userservice-dev.yaml
- Group: 一般默认即可
- 描述:写配置文件作用,如、userservice的开发配置文件
- 配置格式: yaml
- 配置内容: 一般写那些需要更改的配置,
- 再点击发布即可
配置如下信息便于下面验证:1
2pattern:
dateformat: yyyy-MM-dd HH:mm:ss
让微服务读取nacos中配置文件
配置获取的步骤如下:
项目启动->读取nacos中配置文件->读取本地配置文件application.yml->创建spring容器->加载bean
1.引入Nacos的配置管理客户端依赖:1
2
3
4
5<!--nacos的配置管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
2.在userservice中的resource目录添加一个bootstrap.yml文件,这个文件是引导文件,优先级高于application.yml1
2
3
4
5
6
7
8
9
10spring:
application:
name: userservice # 服务名称
profiles:
active: dev #环境
cloud:
nacos:
server-addr: localhost:8848 # nacos 地址
config:
file-extension: yaml # 文件后缀名
在user服务里读取配置来验证是否成功1
2
3
4
5
6
7
private String dateformat;
public String now(){
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dateformat));
}
多环境配置共享
微服务会从nacos中读取配置文件:
1.[服务名]-[spring.profile.active].yaml,环境配置
2.[服务名].yaml,默认配置,多环境共享
我们需要了解配置文件的优先级:
[服务名]-[环境].yaml>[服务名].yaml>本地配置
如:userservice-dev.yaml>userservice.yaml>本地的application.yaml
所以我们可以在nacos中新建一个服务名.yaml
的配置文件,在里面填写我们的多环境共享配置。
统一服务在不同环境下能访问到公共环境配置,但如果当配置属性相同时,按优先级来
Nacos简化理解
在微服务远程调用的过程中,包括两个角色:
- 服务提供者:提供接口供其它微服务访问,比如item-service
- 服务消费者:调用其它微服务提供的接口,比如cart-service
在大型微服务项目中,服务提供者的数量会非常多,为了管理这些服务就引入了注册中心的概念。注册中心、服务提供者、服务消费者三者间关系如下:
目前开源的注册中心框架有很多,国内比较常见的有:
- Eureka:Netflix公司出品,目前被集成在SpringCloud当中,一般用于Java应用
- Nacos:Alibaba公司出品,目前被集成在SpringCloudAlibaba中,一般用于Java应用
- Consul:HashiCorp公司出品,目前集成在SpringCloud中,不限制微服务语言
nacos官网如下
如果在虚拟机上安装并启动好nacos,访问 http://192.168.40.101:8848/nacos/ ,注意将192.168.40.101替换为你自己的虚拟机IP地址。
服务注册
引入依赖
1
2
3
4
5
6<!--nacos 服务注册发现-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.0.4.0</version>
</dependency>注册nacos
1
2
3
4
5
6spring:
application:
name: item-service # 服务名称这里为商品管理
cloud:
nacos:
server-addr: 192.168.40.101:8848 # nacos地址访问nacos控制台,在nacos控制台的服务管理/服务列表中可以看到
服务发现
服务的消费者要去nacos订阅服务,这个过程就是服务发现,步骤如下:
- 引入依赖
- 配置Nacos地址
- 发现并调用服务
服务发现除了要引入nacos依赖以外,由于还需要负载均衡,因此要引入SpringCloud提供的LoadBalancer依赖。
1
2
3
4
5
6<!--nacos 服务注册发现-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.0.4.0</version>
</dependency>1
2
3
4
5
6spring:
application:
name: cart-service #消费者,购物车服务
cloud:
nacos:
server-addr: 192.168.40.101:8848 # nacos地址调用服务
上面的RestTemplate中,发送请求的地址为硬编码,而我们现在使用了nacos,就可以从naocs中获取服务地址,从而实现负载均衡。
步骤:根据服务名获取实例列表,设置负载均衡策略,获取实例,再发送请求。
服务发现需要用到一个工具,DiscoveryClient
,SpringCloud已经帮我们自动装配,我们可以直接注入使用:
见下方示例代码36~42行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class CartServiceImpl extends ServiceImpl<CartMapper, Cart> implements ICartService {
// 1.注入restTemplate
private RestTemplate restTemplate;
// 注入DiscoveryClient
private DiscoveryClient discoveryClient;
// 2.查询我的购物车列表
public List<CartVO> queryMyCarts() {
// 1.查询我的购物车列表
List<Cart> carts = lambdaQuery().eq(Cart::getUserId, 1L/*TODO UserContext.getUser()*/).list();
if (CollUtils.isEmpty(carts)) {
return CollUtils.emptyList();
}
// 2.转换VO
List<CartVO> vos = BeanUtils.copyList(carts, CartVO.class);
// 3.处理VO中的商品信息
handleCartItems(vos);
// 4.返回
return vos;
}
// 3.远程调用商品服务
private void handleCartItems(List<CartVO> vos) {
//TODO 1.获取商品id
Set<Long> itemIds = vos.stream().map(CartVO::getItemId).collect(Collectors.toSet())
//根据服务名称发现实例列表
List<ServiceInstance> instances = discoveryClient.getInstances("item-service");
if(CollUtil.isEmpty(instances)){
return;
}
//设置负均衡,随机获取一个实例
ServiceInstance serviceInstance = instances.get(RandomUtil.randomInt(instances.size()));
//2.查询商品
//2.1 发送商品管理服务(item-service)远端请求
ResponseEntity<List<ItemDTO>> response = restTemplate.exchange(
serviceInstance.getUri()+"/items?ids={ids}", // url 根据商品id查询商品列表
HttpMethod.GET, // 请求方式
null, // 请求实体
new ParameterizedTypeReference<List<ItemDTO>>() {}, // 返回值类型
Map.of("ids", CollUtil.join(itemIds, ","))
);
//2.2 解析响应,判断是否成功
if (!response.getStatusCode().is2xxSuccessful()) {
// 失败
return;
}
//2.3 解析响应,获取数据
List<ItemDTO> items = response.getBody();
if (CollUtils.isEmpty(items)) {
return;
}
// 3.转为 id 到 item的map
Map<Long, ItemDTO> itemMap = items.stream().collect(Collectors.toMap(ItemDTO::getId, Function.identity()));
// 4.写入vo
for (CartVO v : vos) {
ItemDTO item = itemMap.get(v.getItemId());
if (item == null) {
continue;
}
v.setNewPrice(item.getPrice());
v.setStatus(item.getStatus());
v.setStock(item.getStock());
}
}
}
http客户端Feign-OpenFeign
在上面,我们利用Nacos实现了服务的治理,利用RestTemplate实现了服务的远程调用。但是远程调用的代码太复杂了
而且这种调用方式,与原本的本地方法调用差异太大,编程时的体验也不统一,一会儿远程调用,一会儿本地调用。
因此,我们必须想办法改变远程调用的开发模式,让远程调用像本地方法调用一样简单。而这就要用到OpenFeign组件了。
其实远程调用的关键点就在于四个:
- 请求方式
- 请求路径
- 请求参数
- 返回值类型
所以,OpenFeign就利用SpringMVC的相关注解来声明上述4个参数,然后基于动态代理帮我们生成远程调用的代码,而无需我们手动再编写,非常方便。
定义和使用Feign客户端
引入依赖
1
2
3
4
5
6
7
8
9
10
11
12<!--openFeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2021.0.4.0</version>
</dependency>
<!--负载均衡器-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>3.1.3</version>
</dependency>在cart-service的启动类上添加注解开启Feign的功能
@EnableFeignClients// 开启feign
编写Feign客户端:
主要是基于SpringMVC的注解来声明远程调用的信息,比如:- 服务名称:item-service
- 请求方式:GET
- 请求路径:/items
- 请求参数:Collection
ids - 返回值类型:List
1
2
3
4
5
6// 调用的服务名
public interface ItemClient {
// 调用的服务地址
List<ItemDTO> queryItemByIds(; Collection<Long> ids)
}
对比下面两个
不再需要RestTemplate
和DiscoveryClient
了,原本的复杂的代码也删掉了,只需要调用Feign客户端即可。并且代码更简洁,更易读。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CartServiceImpl extends ServiceImpl<CartMapper, Cart> implements ICartService {
private ItemClient itemClient;
public List<CartVO> queryMyCarts() {
// 1.查询我的购物车列表
List<Cart> carts = lambdaQuery().eq(Cart::getUserId, 1L/*TODO UserContext.getUser()*/).list();
if (CollUtils.isEmpty(carts)) {
return CollUtils.emptyList();
}
// 2.转换VO
List<CartVO> vos = BeanUtils.copyList(carts, CartVO.class);
// 3.处理VO中的商品信息
handleCartItems(vos);
// 4.返回
return vos;
}
private void handleCartItems(List<CartVO> vos) {
//TODO 1.获取商品id
Set<Long> itemIds = vos.stream().map(CartVO::getItemId).collect(Collectors.toSet());
// 2.查询商品
List<ItemDTO> items = itemClient.queryItemByIds(itemIds);
if(CollUtil.isEmpty(items)){
return;
}
// 3.转为 id 到 item的map
Map<Long, ItemDTO> itemMap = items.stream().collect(Collectors.toMap(ItemDTO::getId, Function.identity()));
// 4.写入vo
for (CartVO v : vos) {
ItemDTO item = itemMap.get(v.getItemId());
if (item == null) {
continue;
}
v.setNewPrice(item.getPrice());
v.setStatus(item.getStatus());
v.setStock(item.getStock());
}
}
}
可以看到接口和我们在cart-service中使用FeignClient
声明的接口是一致的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ItemController {
private final IItemService itemService;
public List<ItemDTO> queryItemByIds({ List<Long> ids)
return itemService.queryItemByIds(ids);
}
}
自定义Feign的配置
Feign运行自定义配置来覆盖默认配置,可以修改的配置如下:
类型 | 作用 | 说明 |
---|---|---|
feign.Logger.Level | 修改日志级别 | 包含四种不同的级别:NONE、BASIC、HEADERS、FULL |
feign.codec.Decoder | 响应结果的解析器 | http远程调用的结果做解析,例如解析json字符串为java对象 |
feign.codec.Encoder | 请求参数编码 | 将请求参数编码,便于通过http请求发送 |
feign.Contract | 支持的注解格式 | 默认是SpringMVC的注解 |
feign.Retryer | 失败重试机制 | 请求失败的重试机制,默认是没有,不过会使用Ribbon的重试 |
一般我们需要配置的就是日志级别。配置Feign日志有两种方式:
1 | feign: |
首先声明一个Bean:1
2
3
4
5
6public class DefaultFeignConfiguration {
public Logger.Level feignLoggerLevel() {
return Logger.Level.BASIC;// 日志级别
}
}
该配置类暂时不会生效,使它生效的话
如果要全局配置,则把它放到@EnableFeignClients
注解里
如:@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration.class)// 开启feign
如果要局部配置,则把它放到@FeignClient
注解里
如:@FeignClient(value = "userservice",configuration = DefaultFeignConfiguration.class)// 调用userservice服务
那么该日志就只针对userservice服务
Feign的性能优化-连接池和日志
优化feign的性能主要包括:
1.默认底层实现的每次访问都需要去创建一个新的请求,使用连接池代替默认的URLConnection,依赖于其他框架,主要有下面三种- HttpURLConnection: 默认实现,不支持连接池
- Apache HttpClient: 支持连接池
- OKHttp: 支持连接池
2.OpenFeign只会在FeignClient所在包的日志级别为DEBUG时,才会输出日志。而且其日志级别有4级:
- NONE:不记录任何日志信息,这是默认值。
- BASIC:仅记录请求的方法,URL以及响应状态码和执行时间
- HEADERS:在BASIC的基础上,额外记录了请求和响应的头信息
- FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。
Feign默认的日志级别就是NONE,所以默认我们看不到请求日志。
feign性能优化-连接池配置
- 引入
okhttp
依赖1
2
3
4
5<!--OK http 的依赖 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
</dependency> - 开启连接池
1
2
3feign:
okhttp:
enabled: true # 开启OKHttp功能
- 引入
feign性能优化-日志级别配置
- 定义一个Feign配置类,定义Feign的日志级别:
1
2
3
4
5
6
7
8
9import feign.Logger;
import org.springframework.context.annotation.Bean;
public class DefaultFeignConfig {
public Logger.Level logLevel() {
return Logger.Level.FULL;
}
} - 接下来,要让日志级别生效,还需要配置这个类。有两种方式:
- 局部生效:在某个FeignClient中配置,只对当前FeignClient生效
1
- 全局生效:在
@EnableFeignClients
注解中配置,对所有FeignClient生效1
- 局部生效:在某个FeignClient中配置,只对当前FeignClient生效
- 定义一个Feign配置类,定义Feign的日志级别:
统一网关Gateway
网关功能有:身份认证和权限校验、服务路由、负载均衡、请求限流…
在SpringCloud中网关的实现包括两种:gateway
和 zuul
Zuul是基于Servlet的实现,属于阻塞式编程。而SpringCloudGateway则是基于Spring5中提供的WebFlux,属于响应式编程的实现,具备更好的性能。
由于网关本身也是一个独立的微服务,因此也需要创建一个模块开发功能。
搭建网关服务
创建新的module,引入SpringCloudGateway的依赖和nacos的服务发现依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos discovery-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--负载均衡-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>创建启动类
1
2
3
4
5
6
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}创建
application.yml
编辑路由配置以及nacos地址1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29server:
port: 8080 # 网关端口
spring:
application:
name: gateway # 服务名称 网关本身也是一个服务将注册进naocs
cloud:
nacos:
discovery:
server-addr: 192.168.40.101:8848 #配置Nacos地址
gateway:
routes: # 网关路由配置
- id: user # 路由规则id,自定义,唯一
uri: lb://user-service # 路由的目标地址,支持lb和http两种格式,lb代表负载均衡,会从注册中心拉取服务列表
predicates: # 路由断言,判断请求是否符合规范,符合则路由到目标服务
- Path=/user/** # 路径断言,判断路径是否以/user开头,符合则转发到目标地址
- id: item
uri: lb://item-service
predicates:
- Path=/items/**,/search/**
- id: user
uri: lb://user-service
predicates:
- Path=/users/**,/addresses/**
- id: trade
uri: lb://trade-service
predicates:
- Path=/orders/**
- id: pay
uri: lb://pay-service启动测试
启动UserApplication、GatewayApplication
启动网关,访问localhost:8080/items/page?pageNo=1&pageSize=1
即被转发为locahost:8081/items/page?pageNo=1&pageSize=1
路由过滤器GatewayFilter
GatewayFilter是网关提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理:
官网:https://docs.spring.io/spring-cloud-gateway/reference/4.1-SNAPSHOT/spring-cloud-gateway.html ,现在已有近37种过滤配置
下面只做几个演示
给所有进入xx服务的请求添加一个请求头
局部服务添加:在网关的user-service中添加过滤器1
2
3
4
5
6
7
8gateway:
routes:
- id: user-service # 路由标识,必须唯一
uri: lb://user-service # 路由的目标地址
predicates: # 路由断言,判断请求是否符合规范
- Path=/user/** # 路径断言,判断路径是否以/user开头,符合则转发到目标地址
filters: # 过滤器
- AddRequestHeader=Truth,Facts are the only test of truth! # 添加请求头,key为Truth,value为Facts are the only test of truth!
在userservice的服务中获取请求头并打印中控制台来验证1
2
3
4
5
6
public User queryById( Long id,
{ String truth)
System.out.println("Truth : "+truth);
return userService.queryById(id);
}
当调用到该服务时可以看到Truth : Facts are the only test of truth!
被输出在控制台
给所有微服务添加:在于routers
的平级位置下方添加default-filters
1
2
3
4
5
6
7
8
9
10
11
12gateway:
routes:
- id: user-service # 路由标识,必须唯一
uri: lb://userservice # 路由的目标地址
predicates: # 路由断言,判断请求是否符合规范
- Path=/user/** # 路径断言,判断路径是否以/user开头,符合则转发到目标地址
- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**
default-filters: # 全局过滤器
- AddRequestHeader=Truth,Facts are the only test of truth!
全局过滤器 GlobalFilter
全局过滤器的作用是处理一切进入网关的请求和微服务响应,于GatewayFilter的作用一样,区别在于GatewayFilter通过配置定义,处理逻辑是固定的。而GlobalFilter的逻辑需要自己写代码实现
实现方式为实现GlobalFilter
接口1
2
3
4
5
6
7
8
9
10public interface GlobalFilter{
/**
* 处理当前请求,有必要的话通过{@link GatewayFilterChain}将请求交给下一个过滤器处理
* @param exchange 请求上下文,里面可以获取Request、Response等信息
* @param chain 用来把请求委托给下一个过滤器
* @return {@code Mono<Void>}返回标示当前过滤器业务结束
*/
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
通过一个模拟需求来演示如何实现:定义全局过滤器,拦截并判断用户身份
需求:定义全局过滤器,拦截请求,判断请求的参数是否满足以下条件:
- 参数中是否有authorization,
- authorizationca参数值是否为admin
如果满足就放行,否则则拦截
在gateway中新建一个类AuthorizeFilter
- 1.实现GlobalFilter接口
- 2.添加@Order注解或实现Ordered接口
- 3.编写处理逻辑
1 | //值越小,优先级越高,也可以实现Ordered接口来实现优先级 |
可以访问http://localhost:10010/order/107?Authorization=admin
来验证,当不带Authorization时会返回401
执行顺序
路由过滤器,defaultFilter,全局过滤器的执行顺序:
- 1.order值越小,优先级越高
- 2.当order值一样时,顺序是defaultFilter最先,然后是局部路由过滤器,最后是全局过滤器
跨域问题处理
跨域:域名不一致就是跨域,主要包括:
域名不同:www.taobao.com 和www.taobao.org 和www.jd.com 和miaosha.jd.com
域名相同,端口不同:localhost:8080和localhost8081
跨域问题:浏览器禁止请求的发起者与服务端发生跨域ajax请求,请求被浏览器拦截的问题
解决方案:CORS
1 | spring: |
路由断言工厂Route Predicate Factory
上面的配置文件中了解到predicates
是路由断言,是判断规则。而配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory
读取并处理,转变为路由判断的条件。
Spring提供了11种基本的Predicate工厂:
名称 | 说明 | 示例 |
---|---|---|
After | 是某个时间点后的请求 | - After=2037-01-20T17:42:47.789-07:00[America/Denver] |
Before | 是某个时间点之前的请求 | - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai] |
Between | 是某两个时间点之前的请求 | - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver] |
Cookie | 请求必须包含某些cookie | - Cookie=chocolate, ch.p |
Header | 请求必须包含某些header | - Header=X-Request-ld, \d+ |
Host | 请求必须是访问某个host(域名) | - Host=.somehost.org, .anotherhost.org |
Method | 请求方式必须是指定方式 | -Method=GET, POST |
Path | 请求路径必须符合指定规则 | - Path=/red/(segment),/blue/** |
Query | 请求参数必须包含指定参数 | - Query=name, Jack或者-Query=name |
RemoteAddr | 请求者的ip必须是指定范围 | -RemoteAddr=192.168.1.1/24 |
Weight | 权重处理 |
Docker
镜像:Docker将应用程序及其所需的依赖,函数库,环境,配置文件等打包在一起,称为镜像。
容器:镜像中的应用程序运行后形成的进程就是容器,只是Docker会做容器做隔离,对外不可见。
docker架构
Docker是一个CS架构的程序,由两部分组成:
服务端(server):Docker守护进程,负责处理Docker指令,管理镜像,容器等
客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。
DockerHub
一个镜像托管的服务器,类似的还有阿里云镜像服务,统称为DockerRegistry
在CentOS上安装Docker
关于centos,可以看我往期linux的文章:
如果之前安装过旧版本的Docker,可以使用下面命令卸载:
1 | yum remove docker \ |
然后更新本地镜像源(阿里):
1 | 设置docker镜像源 |
更新yum,建立缓存
1 | sudo yum makecache fast |
最后,执行命令,安装Docker
1 | yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin |
docker-ce为社区免费版本。稍等片刻,docker即可安装成功。
启动docker
Docker应用需要用到各种端口,逐一去修改防火墙设置。非常麻烦,因此建议大家直接关闭防火墙!
1 | # 关闭 |
通过命令启动docker:
1 | systemctl start docker # 启动docker服务 |
1 | systemctl status docker # 查看docker服务状态 |
配置镜像加速
docker官方镜像仓库网速较差,我们需要设置国内镜像服务:
最新请参考阿里云的镜像加速文档:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
通过修改daemon配置文件/etc/docker/daemon.json来使用加速器
1 | sudo mkdir -p /etc/docker #创建文件夹 |
Docker基本操作
镜像操作
镜像名称一般由两部分组成:[repository]:[tag]
。如mysql:5.7和mysql:5.6就是两个不同的镜像
如果不写tag,则默认拉取最新版本的,tag即为latest
操作 | 命令 |
---|---|
Dockerfile构建镜像到本地: | docker build |
local(本地)查看镜像: | docker images |
local(本地)删除镜像: | docker rmi 后面接镜像名 |
从Docker Registry镜像服务器拉取镜像到本地: | docker pull 后接[repository]:[tag] |
本地保存镜像为一个压缩包: | docker save 后接 -o 压缩包名 镜像名 |
加载压缩包为镜像: | docker load 后面接 压缩包名 |
更多命令请查看帮助文档: | docker --helper |
查看具体命令: | docker command --helpe |
例:拉取nignx的镜像:docker pull nginx
更多请前往:https://hub.docker.com/
嘘!更多有关docker镜像:
操作示例:1
2
3
4
5docker pull redis # 拉取最新的redis镜像
docker images # 查看本地镜像
docker save -o redis.tar redis:latest # 将redis:latest镜像保存为redis.tar压缩包
docker rmi redis:latest # 删除redis:latest镜像
docker load -i redis.tar # 将reids.tar压缩包加载为镜像
容器操作
镜像运行为容器:docker run
当不知道运行某个容器时,建议去dockerHub官网上查看:https://hub.docker.com/
容器常用操作 | 命令 |
---|---|
运行到暂停 | docker pause |
暂停到运行 | docker unpasue |
运行到停止 | docker stop |
停止到运行 | docker start |
查看容器运行日志 | docker logs |
查看所有运行的容器及状态 | docker ps |
进入容器执行命令 | docker exec |
删除指定容器 | docker rm |
容器操作示例:拉取nginx镜像,创建一个nginx容器,运行nginx容器,进入nginx容器,修改html内容,添加”阿徐到此一游“ 运行一个名nginx的容器,取名为mn 去docker hub 查看Nginx的容器运行命令 运行完返回的一长串字符是容器id(CONTAINER ID) 进入容器: 可以看到界面左侧变成了: 切换路径: 替代原有的内容: 输入 输入 查看容器操作示例
docker pull nginx
:拉取最新的nginx镜像docker run --name mn -p 80:80 -d nginx
命令解读:docker run
: 创建并运行一个容器--name
: 给容器取一个名字,这里叫mn-p
:将宿主机端口与容器端口映射,冒号左侧是宿主端口,右侧是容器端口-d
:后台运行容器nginx
:镜像名称,列如nginx
如,我的虚拟机的ip为192.168.255.128
,运行完后访问192.168.255.128:80
即可看到nginx的运行界面docker logs mn
:查看日志docker exec -it mn bash
命令解读:docker exec
:进入容器内部,执行一个命令-it
:给当前容器创建一个标准输入,输出终端,允许我们与容器交互mn
:要进入的容器的名称,这里前面我们创建的nginx容器叫mnbash
:进入容器后执行的命令,bash是一个linux终端交互命令容器id:/#
的格式
输入pwd
发现在根目录/
,输入ls
可以看到熟悉的home lib64 bin media root sys usr var
等目录,这就是一个阉割的linux
系统,我们需要找到nginx
在哪个目录,访问https://hub.docker.com/_/nginx
:官网可知静态资源应该放在/usr/share/nginx/html
目录下。cd /usr/share/nginx/html
,ls可看到50x.html
和index.html
1
2sed -i 's#Welcome to nginx#阿徐到此一游#g' index.html
sed -i 's#<head>#<head><meta charset="utf-8">#g' index.html
刷新网页即可看到改动。exit
退出容器,停止容器:docker stop mn
docker ps -a
即可查看所有容器状态,包括已经停止的容器
数据卷(容器数据管理)
容器现在与数据十分耦合,导致不便于修改,数据不可复用,升级维护困难
数据卷(volume)是一个虚拟目录,指向宿主机文件系统中的某个目录
数据操作的基本语法为:docker volume [COMMAND]
docker volume命令是数据卷操作,根据命令后跟随的command来确定下一步操作:
- create 创建一个volume
- inspect 显示一个或多个volume的信息
- ls 列出所有的的volume
- prune 删除未使用的volume
- rm 删除一个或多个指定的volume
操作示例:创建一个数据卷,并查看数据卷在宿主机的目录位置 创建数据卷: 查看所有数据: 查看数据卷详细信息卷: 查看操作示例
docker volume create html
docker volume ls
docker volume inspect html
挂载数据卷
我们在创建容器时,可以通过-v
参数来挂载一个数据卷到某个容器目录
操作示例:创建一个nginx容器,修改容器内的html目录内的index.html内容 1.创建一个叫mn的nginx容器并运行它,同时挂载html数据卷到容器内的/usr/share/nginx/html目录上,并把nginx的80端口暴露在虚拟机80端口。 注:如果挂载时数据卷不存在,docker会自动创建该数据卷 2.查看数据卷html信息 3.进入数据卷所在位置,并修改html内容 现在我们可以直接在 注:如果用finashell在目录栏输入这个地址一直在加载的话,在finashell连接编辑中,把用户名改为root 右键index.html文件,用系统相关打开(如用vscode),用vscode直接编辑即可,保存刷新网页即可看到效果 查看操作示例挂载数据卷
docker run --name mn -p 80:80 -v html:/usr/share/nginx/html -d nginx
docker inspect html
"Mountpoint": "/var/lib/docker/volumes/html/_data",
,得知挂载点在这个目录里/var/lib/docker/volumes/html/_data
cd /var/lib/docker/volumes/html/_data
ls
发现里面存在了50x.html和index.html证明我们已经将容器的/usr/share/nginx/html目录挂载到了html数据卷的真实目录下/var/lib/docker/volumes/html/_data
这个目录下对文件进行修改
我们也可以直接将宿主机的目录挂载到容器内的目录:-v [宿主机目录]:[容器内目录]
也可以将宿主机文件挂载到容器内文件:-v [宿主机文件]:[容器内文件]
Dockerfile自定义镜像
- Dockerfile的本质是一个文件,通过指令描述镜像的构建过程
- Dockerfile的第一行必须是FROM,从一个基础镜像来构建
- 基础镜像可以是基本操作系统,如Ubuntu。也可以是其他人制作好的镜像,例如:java:8-alpine
DockerCompose
DockerCompose的详细语法参考官网:https://docs.docker.com/compose/compose-file/
DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。
Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。
作用:帮助我们快速部署分布式应用,无需一个个微服务去构建镜像和部署。
下面为示例文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44version: '3'
services:
mysql:
image: mysql:5.7 # 指定镜像名
container_name: mysql # 指定容器名
ports:
- "3306:3306" # 映射端口
environment: # 设置环境变量
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: root
volumes: # 挂载数据卷
- "./mysql/data:/var/lib/mysql"
- "./mysql/conf:/etc/mysql/cnf.d"
- "./mysql/init:/docker-entrypoint-initdb.d"
networks: # 指定网络
- hm-net
hmall:
build:
context: . # 指定构建上下文,当前目录构建镜像
dockerfile: Dockerfile # 指定构建文件
container_name: hmall
ports:
- "8080:8080"
networks: # 指定网络
- hm-net
depends_on: # 依赖,可不写,写了会先创建依赖的容器
- mysql
nginx:
images: nginx
container_name: nginx
ports:
- "18080:18080"
- "18081:18081"
volumes:
- "./nginx/nginx.conf:/etc/nginx/nginx.conf"
- "./nginx/html:/usr/share/nginx/html"
depend_on:
- hmall
networks:
- hm-net
networks: # 创建网络
hm-net:
name: hmall
运行dockercompose命令:docker compose up -d
创建并后台运行所有service容器
Docker镜像仓库
搭建私有镜像厂库
配置Docker信任地址
我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:
ip地址记得换成自己虚拟机的ip。1
2
3
4
5
6
7
8# 打开要修改的文件
vi /etc/docker/daemon.json
# 添加内容:
"insecure-registries":["http://192.168.255.128:8080"]
# 重加载
systemctl daemon-reload
# 重启docker
systemctl restart docker
带有图形化界面版本
1.创建一个docker镜像仓库存放的文件夹如:mkdir registry-ui
(在哪个目录创建都差不多,这里是/tmp/)
2.进入该文件夹:cd registry-ui/
3.创建一个docker-compose文件:touch docker-compose.yml
4.将下面命令粘贴到文件内:
使用DockerCompose部署带有图象界面的DockerRegistry,命令如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15version: '3.0'
services:
registry:
image: registry
volumes:
- ./registry-data:/var/lib/registry
ui:
image: joxit/docker-registry-ui:static
ports:
- 8080:80
environment:
- REGISTRY_TITLE=阿杰学习私有仓库
- REGISTRY_URL=http://registry:5000
depends_on:
- registry
5.执行docker-compose up -d
。之后查看日志docker-compose logs -f
可以看到已经启动了,访问http://192.168.255.128:8080/
即可看到我们部署的私有仓库
向私有仓库推送或拉取镜像
推送镜像到私有镜像服务前必须先tag,步骤如下:
本地有一个nginx:latest镜像,重新tag镜像,名称前缀为私有仓库的地址:
192.168.255.128:8080/
docker tag nginx:latest 192.168.255.128:8080/nginx:1.0
推送镜像
docker push 192.168.255.128:8080/nginx:1.0
刷新仓库界面,可以看到镜像上传到成功私有仓库拉取镜像
docker pull 192.168.255.128:8080/nginx:1.0
服务异步通讯-RabbitMQ
MQ,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
我们在Centos7虚拟机中使用Docker来安装。
在线拉取
1 | docker pull rabbitmq:3-management |
执行下面的命令来运行MQ容器:
1 | docker run \ |
访问:http://192.168.255.128:15672
创建一个mq-demo父工程,管理consumer和publisher两个子模块,后面以该项目来演示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.itcast.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>consumer</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.itcast.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>publisher</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
基本消息队列(BasicQueue)
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列的消息
consumer中创建测试类ConsumerTest1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.255.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xnj");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
publisher中创建测试类PublisherTest1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class PublisherTest {
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.255.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xnj");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
基本消息队列的消息发送流程
建立connection,建立channel,利用channel声明队列,使用channel向队列发送消息
debug运行PublisherTest,可以在rabbitMq看到,依次建立了连接,然后建立通道,有了通道后就可以向队列中发送消息,之后创建了一个simple.queue
的队列,点开该队列,再点GetMessage(s),可以看到发送的消息内容:hello, rabbitmq!
,并且可以注意到,发送了消息之后,发送者就已经关闭了通道和连接。
基本消息队列的消息接收流程
建立connection,创建channel,利用channel声明队列,定义consumer的消费行为handleDelivery(),利用channel将消费者与队列绑定
运行ConsumerTest,可以看到rabbitMq有新的连接出现,之后创建通道,再声明队列(但并没有新的队列产生),但它预防了建立连接时队列不存在的情况,接收消息是回调机制,消费者回调函数与队列绑定后,等待rabitMq把消息投递回来后,回调函数才会执行,所以先打印:等待接收消息。。。。
后接收到消息
,接收完消息后,rabbitMq上就会删除消息(阅后即焚)
官方的比较繁琐。下面使用SpringAOP来实现基础消息队列功能
在父工程引入依赖
1
2
3
4
5<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>在publisher中的application.yml配置文件中,添加mq连接信息
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.255.128 #主机名
port: 5672 #端口号
virtual-host: / #虚拟主机
username: xnj #用户名
password: 123456 #密码在publisher服务中新建一个测试类,编写测试方法:
1
2
3
4
5
6
7
8
9
10
11
12
public class SpringAmqpTest {
private RabbitTemplate rabbitTemplate;
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}在consumer中配置文件,添加mq连接信息
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.255.128 #主机名
port: 5672 #端口号
virtual-host: / #虚拟主机
username: xnj #用户名
password: 123456 #密码在consumer服务中新建一个类,编写消费逻辑:
1
2
3
4
5
6
7
8
public class SpringRabbitListener {
public void listenSimpleQueueMessage(String msg) throws InterruptedException{
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
消息一旦消费就会从队列中删除,RabbitMQ没有消息回溯功能
工作消息队列(WorkQueue)
工作队列可以提高消息处理速度,避免队列消息堆积
发送者1
2
3
4
5
6
7
8
9
public void testsendMessageWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message__!";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message+i);
Thread.sleep(20);
}
}
消费者1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SpringRabbitListener {
public void listenWork1QueueMessage(String msg) throws InterruptedException{
System.out.println("消费者1接收到消息:【" + msg + "】"+ LocalDateTime.now());
Thread.sleep(20);
}
public void listenWork2QueueMessage(String msg) throws InterruptedException{
System.err.println("消费者2接收到消息:【" + msg + "】"+ LocalDateTime.now());
Thread.sleep(200);
}
}
修改application.yml,设置preFetch这个值,可以控制预取消息的上限:1
2
3
4
5
6
7
8
9
10spring:
rabbitmq:
host: 192.168.255.128 #主机名
port: 5672 #端口号
virtual-host: / #虚拟主机
username: xnj #用户名
password: 123456 #密码
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成之后才能获取下一个消息
发布订阅(Publish、Subscribe)
发布和订阅模式与前面的区别是允许将同一消息发送给多个消费者,实现方式是加了exchange(交换机)
交换机能接收publisher发送的消息,将消息按规则路由到与之绑定的队列,不能缓存消息,路由失败,消息丢失。
根据交换机的类型可以分为三种:
- Fanout Exchange: 广播
- Direct Exchange: 路由
- Topic Exchange: 主题
Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange,Queue和绑定关系对象Binding:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FanoutConfig {
//声明FanoutExchange交换机
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hnit.fanout");
}
//声明第一个队列
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑定队列1到交换机
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//....略,以相同方式声明第2个队列,并完成绑定
}
接收1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SpringRabbitListener {
public void listenFanoutQueue1(String msg) {
System.err.println("消费者接收到fanout.queue1的消息:【" + msg + "】"+ LocalDateTime.now());
}
public void listenFanoutQueue2(String msg) {
System.err.println("消费者接收到fanout.queue2的消息:【" + msg + "】"+ LocalDateTime.now());
}
}
在publisher的测试类中发送1
2
3
4
5
6
7
8
9
public void testsendFanoutExchange() throws InterruptedException {
// 交换机名称
String exchangeName = "hnit.fanout";
//消息
String message = "hello, everybody!";
//发送消息,第一个参数是交换机名称,第二个参数是路由键,第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName,"", message);
}
运行能看到一次发送,多个消费者都能接收到
Direct Exchange
Direct Exchange会将消息根据规则路由到指定的Queue,因此称为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
在消费者SpringRabbitListener里添加如下,运行后能在RabbitMQ中看到direct.queue1和direct.queue2
两个队列1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void listenDirectQueue1(String msg) {
System.err.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
public void listenDirectQueue2(String msg) {
System.err.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
在发送者里指定发送的routerKey=blue1
2
3
4
5
6
7
8
9
public void testsendDirectExchange() {
// 交换机名称
String exchangeName = "hnit.direct";
//消息
String message = "hello, blue!";
//发送消息,第一个参数是交换机名称,第二个参数是路由键,第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName,"blue", message);
}
consumer控制台输出 消费者接收到direct.queue1的消息:【hello, blue!】
将路由键改为yellow 消费者接收到direct.queue2的消息:【hello, yellow!】
将路由键改为red :
消费者接收到direct.queue2的消息:【hello, red!】
消费者接收到direct.queue1的消息:【hello, red!】
TopicExchange
TopicExchange和DirectExchange类似,区别在于routingKey必须是多个单词列表,并且以.
分隔
如:china.news 代表中国的新闻消息、china.weather 代表中国天气消息、japan.news、japan.weather
Queue于Exchange指定BindingKey时可以使用通配符:#
代指0个或多个单词,*
:代指一个单词
下面示例
消费者:1
2
3
4
5
6
7
8
public void listenTopicQueue(String msg) {
System.err.println("消费者接收到topic.queue的消息:【" + msg + "】");
}
生产者:1
2
3
4
5
6
7
8
9
public void testsendTopicExchange() {
// 交换机名称
String exchangeName = "hnit.topic";
//消息
String message = "今天的天气真不错!";
//发送消息,第一个参数是交换机名称,第二个参数是路由键,第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName,"china.weather", message);
}
SpringAMQP—消息转换器
前面我们发送的方法中,消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
我们在consumer中利用@Bean声明一个队列1
2
3
4
public Queue objectMessageQueue(){
return new Queue("object.queue");
}
在publisher中发送消息以测试1
2
3
4
5
6
7
8
9
10
11
12
public void testsendObjectMessage() {
// 队列名称
String queueName = "object.queue";
//准备消息
Map<String, Object> msg = new HashMap<>();
msg.put("name", "张三");
msg.put("age", 20);
msg.put("address", "北京市");
// 发送消息
rabbitTemplate.convertAndSend(queueName, msg);
}
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用ISON方式序列化,步骤如下:
1.在publisher服务引入依赖1
2
3
4
5<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2.在publisher服务声明1
2
3
4
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
接收json格式消息
1.在consumer服务引入Jackson依赖1
2
3
4
5<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2.在consumer服务定义MessageConverter1
2
3
4
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
3.接收消息1
2
3
4
5
6
7
public class SpringRabbitListener {
public void listenObjectQueue(Map<String,Object> msg) {
System.err.println("消费者接收到object.queue的消息:【" + msg + "】");
}
}
控制台输出:消费者接收到object.queue的消息:【{address=北京市, name=张三, age=20}】