JVM并发编程模型概述

 2024-02-19 05:02:38  阅读 0

本文介绍并回顾了 JVM 上的 Pool、Rx、async-await、Fiber、Actor 等并发编程模型。 本人经验有限,难免有粗糙之处,还请各位专家多多指教。

我们知道程序分为同步风格和异步风格。

为什么异步

异步的目的:充分利用计算资源。

同步导致线程阻塞,导致等待。

异步是非阻塞的,不需要等待。

如果发生不必要的等待,就会浪费资源,程序也会变慢。

例如这个程序:

val res1 = get("http://server1")
val res2 = get("http://server2")
compute(res1, res2)

根据同步编程风格,必须先获取res1,然后才能开始获取res2。

根据异步编程风格,res1和res2是相互独立的。 发起res1的获取后,无需等待结果。 相反,会立即开始获取 res2。 当它到达时,需要阻塞等待这两个数据。

这是一种“顺序解耦”。 有时我们并不要求某些操作按顺序执行! 那么为什么要强行执行它的命令呢? 异步风格让我们放弃强制,释放资源,减少不必要的等待。

如果异步操作能够并行化,程序性能将会得到提高。 如果它们不能并行化,程序性能将不会提高。 在当今的硬件条件下,并行性一般是可以实现的,因此异步已经成为一种趋势。

什么样的并行方法? 这要从计算机体系结构开始。 我们把任何具有处理能力的硬件想象成一个处理单元——CPU显然是主要处理单元,I/O设备也是处理单元,比如网卡、内存控制器、硬盘控制器等。 CPU可以向一个或多个I/O设备发出请求。 当设备准备数据时,CPU可以做其他事情(当设备准备好时,将使用中断来通知CPU)。 此时,n个硬件正在并行运行! 而且CPU本质上是多核的,可以进行并行计算。 另外,在分布式系统中,可以同时调动多台计算机来完成任务,这也是并行的。

因此,让CPU等待、一次只请求一个I/O设备、不利用多核、不利用其他空闲计算机都是浪费的。

我们来分析一下常见的并发编程模型。

基本模型

这是最简单的模型。 创建一个线程来执行任务,完成后销毁该线程。 当任务数量较多时,会创建大量线程。

大家都知道大量线程会降低性能,但是你真的知道性能开销在哪里吗? 让我尝试列出它们:

创建线程非常耗时。 它需要请求操作系统、分配堆栈空间、初始化等工作。

操作系统的基本概念大家都知道,不再赘述。 值得注意的是,有状态线程(主要出现在 I/O 等待中)很少被调度,因此它们不会导致太多的上下文切换。

大量线程频繁切换,不可避免地会访问不同的数据,破坏空间局部性,导致CPU缓存未命中增加,并且需要频繁访问较慢的内存,这将显着影响CPU密集型程序的性能。 恐怕你没想到这一点。 。

线程会增加内存使用量。 线程的堆栈空间通常占用1MB,1000就是1GB。 而且,很多对象都是在栈上引用的,暂时无法回收。 你认为有多少GB?

一些有限的资源,比如锁、数据库连接、文件句柄等,当线程被挂起或阻塞时,暂时没有人可用,这是一种浪费! 还有死锁的风险!

那么应该分配多少个线程呢?

传统的网络程序每个会话占用一个连接和一个线程。 I/O多路复用(I/O:多个会话共享一个连接)是为了应对C10K问题而诞生的,即10000个连接。 10000个连接就消耗了大量的系统资源,更不用说10000个线程了。 从上面的分析可知,I/O复用可以从C1K开始。

水池

在池中保留一些可重用的线程来重复接受任务。 线程的数量可以是固定的,也可以在一定范围内变化,具体取决于所选的线程池实现。

这种模型非常常用,例如使用线程池来处理请求。

注意——尽量不要阻塞任务线程; 如果不可避免,就多开一个线程——每有一个线程被阻塞,线程池就会少一个可用线程。

Java中典型的线程池有`.` `.` `.` `.ol`等,也可以直接使用`new`(可以指定线程数的上下限)。

Scala并没有添加新的线程池类型,但是有一个方法可以告诉线程池某个调用将会阻塞,需要临时添加一个线程。

它是一个将来有价值的对象,相当于一个占位符(送货凭证!)。

当把一个任务放入线程池执行时,可以给该任务绑定一个,这样以后就可以获取任务执行结果。 未来是什么时候? 这是通过检查内部状态得知的——当任务完成时该状态将被修改,并且执行结果将被存储在其中。

原始代码示例可以重写为:

// 两个future是并行的
val f1 = Future { get("http://server1") }
val f2 = Future { get("http://server2") }
compute(f1.get(), f2.get())

高级模型接收

Rx()是响应式编程的一种特定形式。 **反应式编程是一种面向数据流和变更传播的编程模型。 **

我们知道Java 8提供了类型,它代表有限或无限的数据流,并且可以应用于map、 等操作。 Rx类似于有限或无限的数据流,但数据操作可以委托给线程池异步执行。 (Rx也像生产者/消费者模型的扩展,增加了分发和转换的能力。数据流被连接和组合,这里生产,那里分发和转换,不断地交给消费者。)

例如:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);

例如:

Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

从代码中可以看出,数据流的操作和集合的功能操作非常相似,都是异步且异步有返回值。

主流的实现有Akka、Akka、Akka,API不同。 但它们都在向常态靠拢,并且肯定会变得越来越相似。

异步等待

async-await 是一种特殊的语法,可以自动将同步风格代码转换为异步风格代码。 使用得当,代码在阻塞时可以自动放弃控制。

C# 内置的 async-await 是最完整的实现。 Scala 通过 Async 库提供此语法。 代码大致是这样的:

val future = async {
  println("Begin blocking")
  await {
    async {Thread.sleep(1000)}
  }
  println("End blocking")	
}

并行计算java_并行编程语言_java函数式编程 并行化

代码会自动转换成各种组合。 不需要特殊处理,可以并行的部分会自动并行。

纤维

Fiber 是协程的模仿。 一般来说,多线程就是抢占式调度。 如果一个任务运行得很好,你会突然被暂停。 协程是协作的。 当一个任务被阻塞或完成时,必须主动放弃控制权,让调度器切换到另一个任务。

async-await自动将代码转换成可以自动放弃控制的形式,已经具备了协程的雏形。 光纤更智能。 您甚至不需要 async-await 语法。 你只需要在 Fiber 中编写代码,它就会像在其中编写一样自动异步。

async-await只能临时存储当前作用域(转换为闭包),而Fiber可以临时存储整个执行堆栈(每个作用域只是一个堆栈帧)。 当然,使用嵌套的async-await也可以临时存储整个执行堆栈。 我更喜欢这个,因为它可以更好地控制内存使用。

JVM上的主流实现是通过java-agent重写字节码。 当需要放弃控制时,抛出异常来中断控制流(不必担心异常的性能开销),保存执行堆栈,然后交换到另一个。 任务。

Java 示例:

new Fiber<V>() {
  @Override
  protected V run() throws SuspendExecution, InterruptedException {
    // your code
  }
}.start();

例子:

fiber @Suspendable {
  // your code
}

代码中任何调用的阻塞方法都必须标记@,让你知道调用该方法时,必须暂停当前Fiber并执行另一个Fiber,并且必须使用另一个线程池来执行阻塞方法。

演员

一种源于电信领域的编程模型。 Actor是任务处理单元:每个Actor只处理一个任务,每个任务同时只由一个Actor处理(如果有大任务,必须将其分解为小任务)。 消息用于参与者之间的通信。

中,每个actor都是一个轻量级的进程,拥有独立的内存空间(所以通信只能依靠消息),所以有独立的垃圾回收,不会stop the world。

Actor 可以发送消息并忽略它(告诉),这是典型的异步; 它还可以发送消息并等待响应(询问)。 返回值为一。 事实上,它创建了一个新的演员并静静地等待回应。 它仍然是异步的。

Actor可以透明地分布在不同的机器上,消息可以发送到本地Actor或远程Actor。

JVM 上唯一成熟的实现是 Akka。 JVM 无法为每个 Actor 提供独立的内存,垃圾收集仍然可能会导致世界停止。

演员显然是一个具有状态和行为的对象。

演员也可以被认为是一个闭包,具有函数和上下文(整个对象的状态就是上下文)。

参与者一次可以接收并处理一条消息。 在处理过程中,它可以向自己或另一个参与者发送消息,然后挂起或结束。

为什么要给自己发消息? 因为在处理消息时不能挂起,所以只能在“一条消息之后和下一条消息之前”的间隙挂起。

假设你收到一条A消息,执行前半部分业务逻辑,进行一次I/O,然后执行后半部分业务逻辑。 在做I/O的时候,应该结束当前的处理,当IO完成后给自己发送一个B消息,让你在下次处理B消息的同时完成剩下的业务逻辑。 前后台逻辑必须分开编写,共享变量必须声明为actor的对象字段。

伪代码如下:

class MyActor extends BasicActor {
  var halfDoneResult: XXX = None
  def receive(): Receive = {
    case A => {
      halfDoneResult = 前半段逻辑()
      doIO(halfDoneResult).onComplete {
        self ! B()
      }
    }
    case B => 后半段逻辑(halfDoneResult)
  }
}

并行计算java_并行编程语言_java函数式编程 并行化

当actor的状态需要完全改变时,可以使用操作来彻底改变actor的行为。 从面向对象编程设计模式的角度来看,这就是状态。 从函数式编程的角度来看,这就是将一个函数转换为另一个函数。

可以看出,Actor模型将函数表示为更容易控制的对象,以便满足一些并发或分布架构的约束。

如果把这个逻辑改写为async-await或者fibre,伪代码如下,就简单多了:

def logicInAsync() = async {
  val halfDoneResult = 前半段逻辑()
  await { doIO(halfDoneResult) }
  后半段逻辑(halfDoneResult)
}
def logicInFiber() = fiber {
  val halfDoneResult = 前半段逻辑()
  doIO(halfDoneResult)
  后半段逻辑(halfDoneResult)
}

参与者和分布式架构

可以看出,相比于async-await或者Fiber,actor是一个状态机,是一种比较底层且难于使用的编程模型。 但参与者具有成熟的分布式能力。

我觉得actor就像一个异步版本的EJB。 EJB中有bean和bean,actor也可以通过 和 来分类。

支付系统基于 Akka,并为此目的编写了 Squbs 框架并开源。 业务逻辑仍然使用actor来实现,Squbs只是增加了集成和运维的支持(这也很重要)。 不过,我对这种技术路线持谨慎态度(业务逻辑是基于参与者的)。 接下来我就分门别类的阐述一下我的看法:

无状态分布式架构

在我看来,这种架构只需要三种通信模型:消息队列、同步RPC、异步RPC。

消息队列和同步RPC不需要Akka出现,有各种MQ和RPC框架来解决。 对于异步RPC,GRPC是一个跨语言的RPC框架,也可以构建基于协议的RPC框架。 如果不需要跨语言,也可以使用Akka,但它并不是直接基于Akka编程——而是在Akka之上构建了一个RPC层。 如果你技术高的话,可以直接基于Netty构建RPC层。

当参与者执行“请求-响应”往返通信时,请求参与者在接收响应之前必须暂停并临时存储在内存中。 当协程进行这种通信时,请求者的执行堆栈被挂起并暂时存储在内存中。

有状态的分布式架构

这是演员茁壮成长的地方,也是最适合使用他们的地方。

以即时聊天(IM)为例,如何使用actor来实现?

因此,我们选择第一种实现方式:每个actor对应一个人,actor必须记住自己对应的是哪个人以及消息是如何交换的。 这就是“状态”! 如果有10万个用户在线,就需要10万个连接(这和IO复用无关吧?)。 显然,单台机器扛不住,需要多台机器。 如果actor A和actor B不在同一台机器上,则需要远程通信。 对于基于 Akka 的程序,本地或远程通信是透明的,太棒了!

事实上,无需演员也能实现。 所有状态和关系都可以使用数据结构来表达,但参与者可能更方便。

总而言之,Akka 模仿并精心设计了与业务无关的 Actor 的概念。 然而,与业务无关的参与者的概念设计得越仔细,就越有可能无法满足不断变化的业务需求:)。 如果你问我是否用演员,我只能说,看情况。 也希望哪位英雄能够介绍一两个需要演员的场景。

再和RPC对比一下

现在,假设有一个微服务架构。 众多服务中,有A、B、C三个服务,调用顺序为A->B->C。 RPC只能向A->B->C方向请求,然后向CC->B->A方向响应; actor可以允许C直接向A发送响应。但是如果C想要直接回复A,它需要与A建立连接,这会使网络拓扑和依赖关系管理变得复杂——不要让它变得不必要的复杂。

为了避免这种情况,使用 MQ 发送响应? MQ 就像一个聊天服务,允许分布式服务相互聊天。 IM、Actor、MQ,一切都是互联的。 你感受到美妙的意境了吗?

但压力集中在MQ上,网络多了一跳(->->),对性能有影响。

结论

本文对JVM上各种常见的并发模型进行了介绍和点评,并尝试建立模型之间的联系。 最后以分布式架构为例进行分析。

那么如何编写应用程序呢? 查看文档。 各种库或者框架都希望有人会用,所以满足他们吧!

如本站内容信息有侵犯到您的权益请联系我们删除,谢谢!!


Copyright © 2020 All Rights Reserved 京ICP5741267-1号 统计代码