12-反应式编程

Charlie

1. 使用场景

  • IO密集型场景
  • 同步阻塞模型,阻塞线程多,CPU利用率不高,性能下降
  • 管理多线程,意味着更高的复杂性

2. 👍Reactor

  • Reactive Streams:Netflix、Lightbend和Pirotal于2013年开始制定的一种规范,旨在提供无阻塞回压的异步流处理标准
    • 和JDK里的Stream不是同一个东西
  • Reactor:Spring Pivotal团队提供的响应式编程的Java实现,其它类似实现:RxJava
    • 函数式、声明式,描述数据会流经的管道或流
  • Spring WebFlux:启用基于响应式编程的Web应用程序的开发。提供类似于Spring MVC的编程模型

3. 👍Java的stream与反应式的流区别

  1. Java的stream通常都是同步的,并且只能处理有限的数据集,本质上来说,它们只是使用函数来对集合进行迭代的一种方式
  2. JDK9中的 Flow API对应反应式流

4. 👍反应式流规范定义的4个接口

  • org.reactivestreams.*

image.png

  • Publisher:数据发布者
  • Subscriber:数据订阅者
  • Processor:处理器
  • Subscription:协调

处理过程是异步的

消费者驱动,消费者去请求发布者才会发布数据

5. 基本概念

  1. Flux:包含 0 到 N 个元素的异步序列
  2. Mono:包含 0 或者 1 个元素的异步序列
  3. 消息:正常的包含元素的消息、序列结束的消息和序列出错的消息
  4. 操作符(Operator):对流上元素的操作

6. 反应式流图

6.1. Flux

Flux:发送0-n个元素的异步序列

image.png

6.2. Mono

Mono:0-1个数据

image.png

7. 流的操作

  • 类型
    • 创建操作
    • 组合操作
    • 转换操作
    • 逻辑操作

7.1. 创建Flux

  • Flux的静态方法
    • just:根据对象创建
    • fromArray:根据集成创建,数组、Iterable、Java Stream

image.png

7.2. 组合Flux流

  • mergeWith
  • zip:自己决定如何合并
  • first:取两个流里面最先有的数据

7.3. 过滤Flux流

  • skip:指定个数/时间
  • take:指定个数/时间
  • filter:需要提供Predicate
  • distinct:只发布源Flux中尚未发布过的数据项

image.png

image.png

image.png

7.4. 转换Flux流

7.4.1. 👍map/flatMap

  • map:

    • 同步
    • 返回具体值
  • flatMap

    • 异步
    • 转换出来的返回结果还是一个流(Mono/Flux)
    • 可以并发处理,指定用哪个并发模型处理
      • 多个流并发处理结果合并成一个流,但结果顺序不可控
    • 扁平化
  • 并发模型(Schedulers方法)

    • .immediate()
    • .single()
    • .newSingle()
    • .elastic()
    • .parallel()

image.png

如果map返回流会怎么样?没有下游订阅者驱动,流是死的(流不被订阅就不会发送任何事情)。

java - map vs flatMap in reactor - Stack Overflow

segmentfault.com/a/1190000013229461

7.4.2. 缓冲

  • buffer,缓冲数据,bufferAndFlatMap
  • collectList,同:buffer不带参数则缓冲所有数据到列表
  • collectMap,需要提供生成key的函数

image.png

7.5. 逻辑操作

  • all
    • 要提供Predicate函数
    • 所有元素满足条件True
    • 返回Mono<Boolean>
  • any
    • 要提供Predicate函数
    • 有元素满足条件返回True
    • 返回Mono<Boolean>

image.png

8. 不同的消息

  • 反应式编程
    • Java中的一个普通对象,没有消息头和消息体
    • 只在JVM中流动
  • spring intergrestion
    • header:消息头
    • payload:消息体
    • 只在JVM中流动
  • 消息队列
    • 也有header和payload
    • 要远程传输,在网络中流动,所以需要消息转换器(序列化/反序列化)
  • 标题: 12-反应式编程
  • 作者: Charlie
  • 创建于 : 2023-11-30 18:11:00
  • 更新于 : 2024-07-05 12:55:04
  • 链接: https://chillcharlie357.github.io/posts/d938b74e/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论