🐠组件异步层面的线程池
版本支持:v2.13.0+
LiteFlow中有很多编排场景,都涉及到了异步。分别为:
1.WHEN场景
<chain id="chain1">
WHEN(a, b, c);
</chain>
2.异步循环场景
<chain name="chain1">
FOR(2).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
WHILE(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
ITERATOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
3.WHEN线程池隔离场景
关于这个功能的介绍请参照开启WHEN线程池隔离。
liteflow.when-thread-pool-isolate=true
# 全局线程池
默认情况下,以上这些场景都共用一个线程池,即全局线程池。全局线程池有默认提供实现方式,可以在配置项里进行配置线程池大小和队列大小。
#全局异步节点线程池大小,默认为64
liteflow.global-thread-pool-size: 64
#全局异步节点线程池队列大小,默认为512
liteflow.global-thread-pool-queue-size: 512
当然框架也提供自定义的全局线程池的实现,你需要在配置项里的配置:
#全局异步节点线程池自定义Builder,LiteFlow提供了默认的线程池Builder
liteflow.global-thread-pool-executor-class: com.yomahub.liteflow.thread.LiteFlowDefaultGlobalExecutorBuilder
你可以替换默认实现,自己实现的线程池提供类需要实现ExecutorBuilder
接口:
public class CustomThreadBuilder implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
...
}
}
当然你定义了自定义的线程池实现类,liteflow.global-thread-pool-size
和liteflow.global-thread-pool-queue-size
失效。
# Chain层面的线程池
如果你不想所有的都用同一个全局线程池,想为某个chain单独定义线程池,就需要如下定义:
<chain name="chain1" thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHEN(a,b);
</chain>
如果你这样定义了,那么这个chain内的所有异步场景,都会走你定义的线程池。
同样的,这个自定义的线程池提供类需要你来实现:
public class CustomChainThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
...
}
}
# 表达式层面的线程池
如果你还想更细化点来定义你的线程池,LiteFlow还支持表达式层面的线程池自定义关键字:
<chain name="chain1">
WHEN(c, d).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor");
</chain>
也可以用于异步循环场景:
<chain name="chain1">
FOR(2).parallel(true).DO(THEN(a,b,c)).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor");
</chain>
<chain name="chain2">
WHILE(x).parallel(true).DO(THEN(a,b,c)).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor");
</chain>
<chain name="chain3">
ITERATOR(x).parallel(true).DO(THEN(a,b,c)).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor");
</chain>
当然这里循环必须要为异步parallel(true)
,如果不是异步的,那么threadPool
即使加了也没任何意义。
同样的,这个自定义的线程池提供类需要你来实现:
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
...
}
}
# 各个维度线程池的优先级
如果全局线程池,Chain层面线程池,表达式层面线程池共存,优先级顺序依次为:
表达式层面线程池 > Chain层面线程池 > 全局线程池
# 默认线程池的丢弃策略
这里提一嘴,在2.13.0之后的默认线程池中的队列丢弃策略是ThreadPoolExecutor.CallerRunsPolicy()
。
这意味着将不会丢弃线程,如果你的并发数量特别多,建议扩大线程池大小,或者自己定义。
帮助我们改善此文档 (opens new window)
上次更新: 2025/02/22, 13:47:36