【应用】SpringBoot -- Webflux + R2DBC 操作 MySQL
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
SpringBoot -- Webflux + R2DBC 操作 MySQL
Webflux 概述
简单来说Webflux 是响应式编程的框架与其对等的概念是 SpringMVC。两者的不同之处在于 Webflux 框架是异步非阻塞的其可以通过较少的线程处理高并发请求。
Webflux 的框架底层采用了 Reactor 响应式编程框架以及 Netty关于这两部分内容可以参看我之前的学习笔记
作为一个异步框架来说必须保证整个程序链中的每一步都是异步操作如果在某一步出现了同步阻塞如等待数据库 IO则整个程序还是回出现阻塞的问题。因此本文主要介绍 Webflux 框架的基本使用并通过异步数据库驱动 R2DBC 实现了对 MySQL 数据库的异步操作。
注意单纯使用 Webflux 框架并不一定会提高接口的响应速度其作用是提高系统的吞吐量。具体接口的响应速度还要看我们本身的业务逻辑。
Webflux 基本使用
首先创建 maven 项目在项目的 pom 文件中引入相应的依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
创建项目的启动类
@SpringBootApplication
public class WebfluxDemoApplication {
public static void main(String[] args) {
SpringApplication.run(WebfluxDemoApplication.class);
}
}
此时我们就可以编写一个简单的 Controller 来感受一下 Webflux 框架异步相应的概念
@RestController
@RequestMapping("/test")
public class TestController {
@GetMapping("/hello")
public String hello() {
long start = System.currentTimeMillis();
String helloStr = getHelloStr();
System.out.println("普通接口耗时" + (System.currentTimeMillis() - start));
return helloStr;
}
@GetMapping("/helloWebFlux")
public Mono<String> hello0() {
long start = System.currentTimeMillis();
Mono<String> hello0 = Mono.fromSupplier(this::getHelloStr);
System.out.println("WebFlux 接口耗时" + (System.currentTimeMillis() - start));
return hello0;
}
private String getHelloStr() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}
}
上述代码中我们定义了一个普通的接口和一个异步响应的接口启动程序调用相应接口观察两个接口的耗时可以发现异步相应接口在处理任务时不会阻塞而是直接向下运行当业务产生结果后再将结果通过“预留的通道”反向推送到请求者而普通接口的整个过过程都是同步的。
同时观察 Postman 调用接口的接口响应时间我们可以发现无论是普通接口还是异步接口其接口相应时间均为 2s 多。这也印证了 Webflux 框架并不一定会提高接口的响应时间起主要作用是提高系统的吞吐量。
Webflux + R2DBC 操作 MySQL
R2DBC 是一个异步操作数据库的驱动区别于传统的同步数据库驱动 JDBCR2DBC 与数据库的各种操作也是异步的这将大量节省高并发系统的线程数量。
首先创建一个 User 实体类用于测试同时在 MySQL 中创建相应的数据库以及表结构
@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("webflux_user")
public class User {
@Id
private int id;
private String username;
private String password;
}
编写数据仓库层使用 Spring-data 封装好的简单 CRUD 接口用法类似 JPA
public interface UserRepository extends ReactiveCrudRepository<User, Integer> {
}
此时就可以调用封装好的 CRUD 方法进行简单的增删改查操作了。在 Webflux 框架中我们可以使用 SpringMVC 中 Controller + Service 的模式进行开发也可以使用 Webflux 中 route + handler 的模式进行开发。
Controller + Service
编写 Service 调用 UserRepository
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
public Mono<User> addUser(User user) {
return userRepository.save(user);
}
public Mono<ResponseEntity<Void>> delUser(int id) {
return userRepository.findById(id)
.flatMap(user -> userRepository.delete(user).then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity<Void>(HttpStatus.NOT_FOUND));
}
public Mono<ResponseEntity<User>> updateUser(User user) {
return userRepository.findById(user.getId())
.flatMap(user0 -> userRepository.save(user))
.map(user0 -> new ResponseEntity<User>(user0, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<User>(HttpStatus.NOT_FOUND));
}
public Flux<User> getAllUser() {
return userRepository.findAll();
}
}
编写 Controller 进行测试
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;
@PostMapping
public Mono<User> addUser(@RequestBody User user) {
return userService.addUser(user);
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> delUser(@PathVariable int id) {
return userService.delUser(id);
}
@PutMapping
public Mono<ResponseEntity<User>> updateUser(@RequestBody User user) {
return userService.updateUser(user);
}
@GetMapping
public Flux<User> getAllUser() {
return userService.getAllUser();
}
}
Route + Handler
handler 就相当于定义很多处理器其中不同的方法负责处理不同路由的请求其对应的是传统的 Service 层
@Component
public class UserHandler {
@Autowired
private UserRepository userRepository;
public Mono<ServerResponse> addUser(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userRepository.saveAll(request.bodyToMono(User.class)), User.class);
}
public Mono<ServerResponse> delUser(ServerRequest request) {
return userRepository.findById(Integer.parseInt(request.pathVariable("id")))
.flatMap(user -> userRepository.delete(user).then(ServerResponse.ok().build()))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userRepository.saveAll(request.bodyToMono(User.class)), User.class);
}
public Mono<ServerResponse> getAllUser(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userRepository.findAll(), User.class);
}
public Mono<ServerResponse> getAllUserStream(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(userRepository.findAll(), User.class);
}
}
route 就是路由配置其规定路由的分发规则将不同的请求路由分发给相应的 handler 进行业务逻辑的处理其对应的就是传统的 Controller 层
@Configuration
public class RouteConfig {
@Bean
RouterFunction<ServerResponse> userRoute(UserHandler userHandler) {
return RouterFunctions.nest(
RequestPredicates.path("/userRoute"),
RouterFunctions.route(RequestPredicates.POST(""), userHandler::addUser)
.andRoute(RequestPredicates.DELETE("/{id}"), userHandler::delUser)
.andRoute(RequestPredicates.PUT(""), userHandler::updateUser)
.andRoute(RequestPredicates.GET(""), userHandler::getAllUser)
.andRoute(RequestPredicates.GET("/stream"), userHandler::getAllUserStream)
);
}
}