举个栗子:营销系统判断该用户是否有权限领券,在判断权限的过程中,需要做这几件事情:
- 请求风控系统判断该用户是否在黑名单中
- 请求业务系统A用户是否有开通某领券服务(领券的前置条件),etc
那么对于这几件事情,可以认为这些调用都是互相没有关系的,所以我们可以舍弃之前的串行调用方式,改用并行调用,本章即是讨论并行调用的几种方式,并简要分析。
1. 线程池处理
因为需要获取返回值,所以普通的Runnable一定是不行的,需要去实现Callable接口才可以,然后将远程执行的RPC任务放到线程池中,轮训每次的调用结果,并将结果放到集合中,如果所有的异步任务处理完成,则进行汇总,然后返回到调用方,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public static void testWithThread(List<String> ret) { List<CallableTask> rpcs = ImmutableList.of(new CallableTask("1"), new CallableTask("2"), new CallableTask("3"),new CallableTask("4")); Queue<Future<String>> list = rpcs.stream() .map(ThreadPoolConfig.EXECUTOR::submit) .collect(Collectors.toCollection(Lists::newLinkedList)); while (!list.isEmpty()) { String ans = null; Future<String> poll = list.poll(); try { ans = poll.get(100, TimeUnit.MILLISECONDS); } catch (TimeoutException | InterruptedException | ExecutionException e) { System.out.println("time out"); list.offer(poll); } if (ans != null) { ret.add(ans); } } }
public class CallableTask implements Callable<String> {
private String ans;
public CallableTask (String ans) { this.ans = ans; } @Override public String call() throws Exception { Thread.sleep(1000L); return ans; } }
|
但是如上所示,由调用者去轮训且将未完成的Future放到队列中重新轮训去获得结果显然不够优雅,所以还需要对线程的编排进一步优化
2. CompletableFuture
正是因为线程编排和获取返回值太过于麻烦,所以Java8之后有了CompletableFuture
类去协助我们做线程编排
1 2 3 4 5 6 7 8
| public static List<String> testWithCompleteFuture(List<String> ret) { List<CallableTask> rpcs = ImmutableList.of(new CallableTask("1"), new CallableTask("2"), new CallableTask("3"),new CallableTask("4")); List<CompletableFuture<String>> collect = rpcs.stream() .map(e -> CompletableFuture.supplyAsync(e, ThreadPoolConfig.EXECUTOR).whenComplete(((s, throwable) -> ret.add(s)))) .collect(Collectors.toList()); CompletableFuture.allOf(collect.toArray(new CompletableFuture[]{})).join(); }
|
但是CompletableFuture最大的作用并不是如上所述简单的并行调度,它更强大的功能是各种任务调用的编排,譬如先并行执行a,b,c三个任务,等任务执行完之后,再执行d任务等,在此只是抛砖引用,具体的高阶功能不再赘述
3. One More Thing
其实上文中说的主要是需要获取并行调用结果的情况,还有一种情况也很常见,就是后置通知的流程,它不需要获取Provider的响应来提供给上游,仍然举个栗子:用户开通某服务之后,服务管理系统需要处理这几件事情:
- 将用户的资料存入用户信息系统
- 通知关联业务用户已经开通
- 将用户的附加信息进行标记,etc
注意:下面的讨论,建立在这些后置处理的事情,是和主业务保持最终一致性,而不是强一致性,譬如不存在用户资料存储失败,就算是开通失败
对于这些处理,上游的调用方是不需要感知到的,一般有两种处理方式,第一种是本地系统直接发送消息,由下游系统订阅并处理,第二种方式是本地系统直接调用下游的RPC服务,直接处理流程。本段主要讨论第二种方式。
并发调用的时候,可以使用上文中提到的线程池或者直接用CompletableFuture进行线程编排。另一种方式是使用Spring Event来完成后置通知,但是对于Spring Event来说,它本身是同步的,需要结合Spring Async的能力进行异步化。
尤其要注意的是,不管使用哪种方式,一定要考虑调用失败的场景(如服务找不到,调用超时,调用服务内部异常等),因为是要保证最终一致性,可以把调用失败的上下文落库,等待定时调度任务重试即可
不过要说明的是,不能只用Spring的Async注解,因为Spring的异步能力默认是一个方法新开一个线程,当异步Listener过多的是时候,容易导致线程的OOM,所以此时需要我们自定义线程池去复用,一个简单的例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Component public class Client { @Autowired private ApplicationEventPublisher publisher;
public void invoke() { SpringEvent springEvent = new SpringEvent(); publisher.publishEvent(springEvent); }
}
@Component public class Listener {
@EventListener @Async public void test1(SpringEvent event) { String name = Thread.currentThread().getName(); System.out.println(event + name + " 111 " + System.currentTimeMillis()); }
@EventListener @Async public void test2(SpringEvent event) { String name = Thread.currentThread().getName(); System.out.println(event + name + " 222 " + System.currentTimeMillis()); } }
@EnableAsync @Configuration public class AsyncConfig implements AsyncConfigurer {
@Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(2); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(15); taskExecutor.setThreadNamePrefix("async-thread-"); taskExecutor.initialize(); return taskExecutor; }
@Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return this::processException; }
private void processException(Throwable throwable, Method method, Object... objects) { } }
|