Event Bus 设计模式

x33g5p2x  于2022-05-16 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(175)

一 点睛

消息中间件提供了系统之间的异步处理机制,比如在电商网站上支付订单之后,会触发库存计算,物流调度计算,甚至是营销人员绩效计算,报表统计等,诸如此类的操作一般会耗费比订单购买商品本身更多的时间,加之这样的操作没有即时的时效性要求,用户在下单之后完全没有必要等待电商后端做完所有操作才算成功,那么此时消息中间件是一种非常好的解决方案,用户下单支付之后即可向用户返回购买成功的通知,然后提交各种消息到消息中间件,这样注册在消息中间件的其他系统就可以顺利地接受订单通知了,然后执行各自的业务逻辑。消息中间件主要用于解决进程之间消息异步处理的解决方案。

Bus 接口:对外提供几种主要的使用方式,比如 post 方法用来发送 Event,register 方法用来注册 Evnet 接收者(Subscriber)接受相应事件,EventBus 采用同步的方式推送 Event,AsyncEventBus 采用异步的方式(Thread-Per-Message)推送 Event。

Register 注册表:主要用来记录对应 Subscriber 以及受理消息的回调方法,回调方法用注解 @Subscribe 来标识。

Dispatcher:主要用来将 event 广播给注册表中监听了 topic 的 Subscriber。

二 实战

1 Bus 接口注解

package concurrent.eventbus;

/**
 * @className: Bus
 * @description: 定义了 EventBus 的所有使用方法
 * @date: 2022/5/13
 * @author: cakin
 */
public interface Bus {
    // 将某个对象注册到 Bus 上,从此之后该类就成为了 Subscriber 了
    void register(Object subscriber);

    // 将某个对象从 Bus 上取消注册,取消注册之后就不会再接受到来自 Bus 的任何消息
    void unregister(Object subscriber);

    // 提交 Event 到默认的 topic
    void post(Object event);

    // 提交 Event 到指定的 topic
    void post(Object Event, String topic);

    // 关闭该 bus
    void close();

    // 返回 Bus 的名称标识
    String getBusName();
}

2 同步 EventBus

package concurrent.eventbus;

import java.util.concurrent.Executor;

/**
 * @className: EventBus
 * @description: 它实现了 Bus 所有的功能,采用的是同步的方式
 * @date: 2022/5/13
 * @author: cakin
 */
public class EventBus implements Bus {
    // 用于维护 Subscriber 的注册表
    private final Registry registry = new Registry();
    // Event Bus 的名字
    private String busName;

    // 默认的 Event Bus 的名字
    private final static String DEFAULT_BUS_NAME = "default";

    // 默认的 Event Bus 的名字
    private final static String DEFAULT_TOPIC = "default-topic";

    // 用于分发广播消息到各个 Subscriber 的类
    private final Dispatcher dispatcher;

    public EventBus() {
        this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    public EventBus(String busName) {
        this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    public EventBus(String busName, EventExceptionHandler eventExceptionHandler, Executor executor) {
        this.busName = busName;
        this.dispatcher = Dispatcher.newDispatcher(eventExceptionHandler, executor);
    }

    public EventBus(EventExceptionHandler eventExceptionHandler) {
        this(DEFAULT_BUS_NAME, eventExceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    @Override
    public void register(Object subscriber) {
        this.registry.bind(subscriber);
    }

    @Override
    public void unregister(Object subscriber) {
        this.registry.unbind(subscriber);

    }

    @Override
    public void post(Object event) {
        this.post(event, DEFAULT_TOPIC);
    }

    @Override
    public void post(Object event, String topic) {
        this.dispatcher.dispatch(this, registry, event, topic);
    }

    @Override
    public void close() {
        this.dispatcher.close();
    }

    @Override
    public String getBusName() {
        return null;
    }
}

3 异步 EventBus

package concurrent.eventbus;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @className: AsyncEventBus
 * @description: 异步 EventBus
 * @date: 2022/5/13
 * @author: cakin
 */
public class AsyncEventBus extends EventBus {
    AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
        super(busName, exceptionHandler, executor);
    }

    AsyncEventBus(String busName, ThreadPoolExecutor executor) {
        this(busName, null, executor);
    }

    AsyncEventBus(ThreadPoolExecutor executor) {
        this("default_async", null, executor);
    }

    AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
        this("default_async", exceptionHandler, executor);
    }
}

4 注册表 Registry

package concurrent.eventbus;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @className: Registry
 * @description: 注册表维护了 topic 和 subscriber 之间的关系,当有 Event 被 post 之后,Dispatcher 需要知道该消息应该发送哪个 Subscriber 的实例和对应的方法
 * @date: 2022/5/13
 * @author: cakin
 */
public class Registry {
    // 存储 Subscriber 集合和 topic 之间关系的 map
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();

    public void bind(Object subscriber) {
        // 获取 Subscriber Object 的方法集合,然后进行绑定
        List<Method> subscribeMethods = getSubscribeMethods(subscriber);
        subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
    }

    public void unbind(Object subscriber) {
        // unbind 为了提高速度,只对 Subscriber 进行失效操作
        subscriberContainer.forEach((key, queue) ->
                queue.forEach(s -> {
                    if (s.getSubscribeObject() == subscriber) {
                        s.setDisable(true);
                    }
                })
        );
    }

    private void tierSubscriber(Object subscriber, Method method) {
        final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
        String topic = subscribe.topic();
        // 当某个 topic 没有 Subscriber Queue 的时候创建一个
        subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());
        // 创建一个 subscriber 并且加入 subscriber 列表中
        subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
    }

    public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {
        return subscriberContainer.get(topic);

    }

    private List<Method> getSubscribeMethods(Object subcriber) {
        final List<Method> methods = new ArrayList<>();
        Class<?> temp = subcriber.getClass();
        // 不断获取所有的方法
        while (temp != null) {
            // 获取所有的方法
            Method[] declaredMethods = temp.getDeclaredMethods();
            // 只有 public 方法 && 有一个入参 && 被 @Subscribe 标记的方法才符合回调方法
            Arrays.stream(declaredMethods)
                    .filter(m -> m.isAnnotationPresent(Subscribe.class)
                            && m.getParameterCount() == 1
                            && m.getModifiers() == Modifier.PUBLIC)
                    .forEach(methods::add);
            temp = temp.getSuperclass();
        }
        return methods;
    }

}

5 Event 广播 Dispatch

package concurrent.eventbus;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
 * @className: Dispatcher
 * @description: Event 广播 Dispatch
 * @date: 2022/5/13
 * @author: cakin
 */
public class Dispatcher {
    private final Executor executorService;
    private final EventExceptionHandler exceptionHandler;
    public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;
    public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;

    public Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {
        this.executorService = executorService;
        this.exceptionHandler = exceptionHandler;
    }

    public void dispatch(Bus bus, Registry registry, Object event, String topic) {
        // 根据 topic 获取所有的 Subscriber 列表
        ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);
        if (null == subscribers) {
            if (exceptionHandler != null) {
                exceptionHandler.handler(new IllegalArgumentException("The topic" + topic + " note bind yet"), new BaseEventContext(bus.getBusName(), null, event));
                return;
            }
        }

        // 遍历所有的方法,并且通过反射的方式进行方法调用
        subscribers.stream()
                .filter(subscriber -> !subscriber.isDisable())
                .filter(subscriber -> {
                    Method subcribeMethod = subscriber.getSubscribeMethod();
                    Class<?> aClass = subcribeMethod.getParameterTypes()[0];
                    return aClass.isAssignableFrom(event.getClass());
                }).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
    }

    private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {
        Method subscribeMethod = subscriber.getSubscribeMethod();
        Object subscribeObject = subscriber.getSubscribeObject();
        executorService.execute(() -> {
            try {
                subscribeMethod.invoke(subscribeObject, event);
            } catch (Exception e) {
                if (null != exceptionHandler) {
                    exceptionHandler.handler(e, new BaseEventContext(bus.getBusName(), subscriber, event));
                }
            }
        });
    }

    public void close() {
        if (executorService instanceof ExecutorService) {
            ((ExecutorService) executorService).shutdown();
        }
    }

    static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {
        return new Dispatcher(executor, exceptionHandler);
    }

    static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
        return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
    }

    static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {
        return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
    }

    // 顺序执行的 ExecutorService
    private static class SeqExecutorService implements Executor {
        private final static SeqExecutorService INSTANCE = new SeqExecutorService();

        @Override
        public void execute(Runnable command) {
            command.run();
        }
    }

    // 每个线程负责一次消息推送
    private static class PreThreadExecutorService implements Executor {
        private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();

        @Override
        public void execute(Runnable command) {
            new Thread(command).start();
        }
    }

    // 默认 EventContext 实现
    private static class BaseEventContext implements EventContext {
        private final String eventBusName;

        private final Subscriber subscriber;

        private final Object event;

        private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {
            this.eventBusName = eventBusName;
            this.subscriber = subscriber;
            this.event = event;
        }

        @Override
        public String getSource() {
            return this.eventBusName;
        }

        @Override
        public Object getSubscriber() {
            return subscriber != null ? subscriber.getSubscribeObject() : null;
        }

        @Override
        public Method getSubscribe() {
            return subscriber != null ? subscriber.getSubscribeMethod() : null;
        }

        @Override
        public Object getEvent() {
            return this.event;
        }
    }
}

6 Subscriber 类

package concurrent.eventbus;

import java.lang.reflect.Method;

/**
 * @className: Subscriber
 * @description: 封装了对象实例和被 @Subscribe 标记的方法
 * @date: 2022/5/13
 * @author: cakin
 */
public class Subscriber {
    private final Object subscribeObject;
    private final Method subscribeMethod;
    private boolean disable = false;

    public Subscriber(Object subscribeObject, Method subscribeMethod) {
        this.subscribeObject = subscribeObject;
        this.subscribeMethod = subscribeMethod;
    }

    public Object getSubscribeObject() {
        return subscribeObject;
    }

    public Method getSubscribeMethod() {
        return subscribeMethod;
    }

    public boolean isDisable() {
        return disable;
    }

    public void setDisable(boolean disable) {
        this.disable = disable;
    }
}

7 EventExceptionHandle 接口

package concurrent.eventbus;

/**
 * @className: EventExceptionHandler
 * @description: 异常处理
 * @date: 2022/5/13
 * @author: cakin
 */
public interface EventExceptionHandler {
    void handler(Throwable cause, EventContext context);
}

8 EventContext

package concurrent.eventbus;

import java.lang.reflect.Method;

/**
 * @className: EventContext
 * @description: 事件上下文
 * @date: 2022/5/13
 * @author: cakin
 */
public interface EventContext {
    String getSource();

    Object getSubscriber();

    Method getSubscribe();

    Object getEvent();
}

9 Subscribe 注解

package concurrent.eventbus;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @className: Subscribe
 * @description: 注解在类的方法上,注解时可指定 topic,不指定的情况下未默认的 topic(default-topic)
 * @date: 2022/5/13
 * @author: cakin
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
    String topic() default "default-topic";
}

10 SimpleSubscriber1

package concurrent.eventbus;

/**
 * @className: SimpleSubsciber1
 * @description: TODO
 * @date: 2022/5/13
 * @author: cakin
 */
public class SimpleSubscriber1 {
    @Subscribe
    public void method1(String message){
        System.out.println("==SimpleSubscriber1==method1=="+message);
    }

    @Subscribe(topic = "test")
    public void method2(String message){
        System.out.println("==SimpleSubscriber1==method2=="+message);
    }
}

11 SimpleSubscriber2

package concurrent.eventbus;

/**
 * @className: SimpleSubsciber2
 * @description: SimpleSubsciber2
 * @date: 2022/5/13
 * @author: 贝医
 */
public class SimpleSubscriber2 {
    @Subscribe
    public void method1(String message){
        System.out.println("==SimpleSubscriber2==method1=="+message);
    }

    @Subscribe(topic = "test")
    public void method2(String message){
        System.out.println("==SimpleSubscriber2==method2=="+message);
    }
}

12  同步 Event Bus 测试类

package concurrent.eventbus;

public class SyncTest {
    public static void main(String[] args) {
        Bus bus = new EventBus("Test");
        bus.register(new SimpleSubscriber1());
        bus.register(new SimpleSubscriber2());
        bus.post("Hello");
        System.out.println("---------");
        bus.post("Hello", "test");
    }
}

13 异步 Event Bus 测试类

package concurrent.eventbus;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ASyncTest {
    public static void main(String[] args) {
        Bus bus = new AsyncEventBus("Test", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
        bus.register(new SimpleSubscriber1());
        bus.register(new SimpleSubscriber2());
        bus.post("Hello");
        System.out.println("---------");
        bus.post("Hello", "test");
    }
}

三 测试结果

1 同步 Event Bus 测试结果

==SimpleSubscriber1==method1==Hello

==SimpleSubscriber2==method1==Hello


==SimpleSubscriber1==method2==Hello

==SimpleSubscriber2==method2==Hello

2 异步  Event Bus 测试结果


==SimpleSubscriber2==method1==Hello

==SimpleSubscriber1==method1==Hello

==SimpleSubscriber1==method2==Hello

==SimpleSubscriber2==method2==Hello

相关文章