本系列文章主要分析 EventBus 框架的架构和原理,,基于最新的 3.1.0 版本。
这是 EventBus 开源库的地址,大家可以直接访问https://github.com/greenrobot/EventBus
本篇文章是 EventBus 的第四篇,主要分析发送消息的流程;
1 回顾 我们回顾下 eventbus 的使用:
1 EventBus.getDefault().post(messageEvent);
1 EventBus.getDefault().postSticky(messageEvent)
这里我们来分析下 post 的流程,也是最后一篇了;
2 EventBus 2.1 post 发送普通消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void post (Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true ; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset" ); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0 ), postingState); } } finally { postingState.isPosting = false ; postingState.isMainThread = false ; } } }
这段逻辑不是很复杂!!
isMainThread 方法很简单,就不多说了。。。
2.1.1 postSingleEvent 发送单个事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void postSingleEvent (Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false ; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0 ; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this , event)); } } }
EventBus 中有一个 eventTypesCache 的 hash:
1 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
key 是事件的 class,而 value 是一个 list,用于保存 class 和其 superClass,以及其他的所有接口;
因为如果允许事件继承的话,那么根据多态的概念,必须要收集所有的父类和接口;
2.1.1.1 lookupAllEventTypes 查询所有的事件类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null ) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null ) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces());、 clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
这部分的代码 ,主要逻辑如下 :
将 eventClass 加入到 eventTypesCache 的 eventTypes list 中;
向上遍历 ,对于每一个 super class ,都会将其加入到 eventTypesCache 的 eventTypes list 中;
对于每个 class ,将其直接实现和间接实现的所有接口,都添加到 eventTypesCache 的 eventTypes list 中;
2.1.1.2 addInterfaces 添加接口集合,就是事件类实现的所有接口:
1 2 3 4 5 6 7 8 9 10 static void addInterfaces (List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } }
逻辑很简单!
2.1.2 postSingleEventForEventType 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private boolean postSingleEventForEventType (Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this ) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false ; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null ; postingState.subscription = null ; postingState.canceled = false ; } if (aborted) { break ; } } return true ; } return false ; }
方法很简单;
2.2.1 postToSubscription - 线程模式处理 分发事件,根据订阅方法的线程模式启动不同的 poster;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break ; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break ; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case ASYNC: asyncPoster.enqueue(subscription, event); break ; default : throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
这里我们看到了四种不同的线程模式,每种模式有着不同的处理!
同时也看到了一个重要的数据结构:Poster
1 2 3 private final Poster mainThreadPoster; private final BackgroundPoster backgroundPoster; private final AsyncPoster asyncPoster;
对于 mainThreadPoster,他是在 AndroidHandlerMainThreadSupport 中创建的:
1 2 3 4 @Override public Poster createPoster (EventBus eventBus) { return new HandlerPoster(eventBus, looper, 10 ); }
不多说了!
2.2.2 invokeSubscriber 分发订阅,也就是调用订阅者的方法处理订阅事件:
1 2 3 4 5 6 7 8 9 10 void invokeSubscriber (Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception" , e); } }
方法很简单,不多说了;
2.2 postSticky 发送粘性消息,这可以看到,该方法会将 event 保存到 stickyEvents 表中:
1 2 3 4 5 6 7 8 public void postSticky (Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } post(event); }
在前面 register 的时候,我们有分析过在 register 时会立刻处理 Sticky 事件的分发;
这里就不再多说了;
3 PostingThreadState 这个类用于保存 thread post 的状态,在 EventBus 中有个 ThreadLocal 成员变量:
1 2 3 4 5 6 7 private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue () { return new PostingThreadState(); } };
用于保存每一个线程的 post 状态!!
3.1 成员变量 1 2 3 4 5 6 7 8 final static class PostingThreadState { final List<Object> eventQueue = new ArrayList<>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; }
4 Poster poster 用于订阅事件的最终分发,所有的 Poster 都实现了下面的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 interface Poster { void enqueue (Subscription subscription, Object event) ; }
我们接着分析:
4.1 HandlerPoster 处理 main thread 的事件分发:
4.1.1 成员变量 1 2 3 4 5 public class HandlerPoster extends Handler implements Poster { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive;
参数 maxMillisInsideHandleMessage 表示处理消息的函数的执行事件,单位是毫秒,传入的是 10;
1 2 3 4 5 6 ---> [AndroidHandlerMainThreadSupport.java] @Override public Poster createPoster (EventBus eventBus) { return new HandlerPoster(eventBus, looper, 10 ); }
在 AndroidHandlerMainThreadSupport 创建了一个 HandlerPoster,他会作为 EventBus 单例的成员变量;
4.1.2 enqueue 添加 post 到队列 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true ; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } } } }
HandlerPoster 本质上是一个 handler!
4.1.3 run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public void handleMessage (Message msg) { boolean rescheduled = false ; try { long started = SystemClock.uptimeMillis(); while (true ) { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { handlerActive = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } rescheduled = true ; return ; } } } finally { handlerActive = rescheduled; } }
可以看到,主线程的分发策略是:
尽可能一次性处理完成 PendingPostQueue 中的所有消息;
如果某个消息的处理时间超过 10 毫秒,说明主线程很卡,那么就会退出 while 循环;
4.2 BackgroundPoster 处理 background thread 的事件分发:
4.2.1 成员变量 1 2 3 4 final class BackgroundPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning;
可以看到 BackgroundPoster 是一个 Runnable;
4.2.2 enqueue 添加消息到 poster 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true ; eventBus.getExecutorService().execute(this ); } } }
这个地方加了锁,这是因为 post 方法可以在多线程调用;
4.2.3 run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void run () { try { try { while (true ) { PendingPost pendingPost = queue.poll(1000 ); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { executorRunning = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted" , e); } } finally { executorRunning = false ; } }
可以看到,这个线程因为 while (true) 一直处于 runnable/running 的状态;
4.3 AsyncPoster 处理 async thread 的事件分发:
4.3.1 成员变量 1 2 3 class AsyncPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus;
4.3.2 enqueue 添加消息到 poster 中:
1 2 3 4 5 6 7 public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this ); }
这个人地方竟然没有加锁,奇怪啊~
4.3.3 run 1 2 3 4 5 6 7 8 9 10 @Override public void run () { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { throw new IllegalStateException("No pending post available" ); } eventBus.invokeSubscriber(pendingPost); }
6 PendingPost 表示一个正在分发的 post。
6.1 成员变量 1 2 3 4 5 6 final class PendingPost { private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); Object event; Subscription subscription; PendingPost next;
6.2 obtainPendingPost 获取一个 PendingPost :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static PendingPost obtainPendingPost (Subscription subscription, Object event) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0 ) { PendingPost pendingPost = pendingPostPool.remove(size - 1 ); pendingPost.event = event; pendingPost.subscription = subscription; pendingPost.next = null ; return pendingPost; } } return new PendingPost(event, subscription); }
这里有加锁的!
6.3 releasePendingPost 消息 post 完成后,会缓存 post :
1 2 3 4 5 6 7 8 9 10 11 static void releasePendingPost (PendingPost pendingPost) { pendingPost.event = null ; pendingPost.subscription = null ; pendingPost.next = null ; synchronized (pendingPostPool) { if (pendingPostPool.size() < 10000 ) { pendingPostPool.add(pendingPost); } } }
可以看到:pendingPostPool 不会超过 10000 个;
7 PendingPostQueue 这是一个由链表构成的 正在分发的 post 的队列!
7.1 成员变量 1 2 3 final class PendingPostQueue { private PendingPost head; private PendingPost tail;
内部有队列头和队列尾两个属性;
这个方法的 enqueue 和 poll 是加锁的~
7.2 enqueue 将 PendingPost 放入到队列中,默认是加入到队尾,该方法是加锁了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 synchronized void enqueue (PendingPost pendingPost) { if (pendingPost == null ) { throw new NullPointerException("null cannot be enqueued" ); } if (tail != null ) { tail.next = pendingPost; tail = pendingPost; } else if (head == null ) { head = tail = pendingPost; } else { throw new IllegalStateException("Head present, but no tail" ); } notifyAll(); }
7.3 poll PendingPost 出队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 synchronized PendingPost poll () { PendingPost pendingPost = head; if (head != null ) { head = head.next; if (head == null ) { tail = null ; } } return pendingPost; } synchronized PendingPost poll (int maxMillisToWait) throws InterruptedException { if (head == null ) { wait(maxMillisToWait); } return poll(); }
8 总结 到这里,EventBus 就整完了,驾鹤西去呦~~