[docs]defevent_handler(*event_types):"""Mark a function/method as an event handler for a particular type of event. Given a custom event type: >>> class CustomEvent: ... def __str__(self): ... return type(self).__name__ You can apply this to a handler method or function: >>> @event_handler(CustomEvent) ... def custom_handler(event: CustomEvent): ... print(event) This will allow you to let the even be handled by an event manager: >>> event_manager = EventManager() >>> event_manager.subscribe(custom_handler) >>> event_manager.handle(CustomEvent()) CustomEvent """defwrapper(func):func.__event_types__=event_typesreturnfuncreturnwrapper
[docs]classEventManager(Service):"""The Event Manager provides a flexible way to dispatch events. Event dispatching is a central component in Gaphor. It allows components in Gaphor to react to changes in the application. Events are dispatched by type. """def__init__(self)->None:self._events=_Manager()self._priority=_Manager()self._queue:deque[Event]=deque()self._async_tasks:set[asyncio.Task]=set()self._handling=Falsedefshutdown(self)->None:fortaskinset(self._async_tasks):task.cancel()defsubscribe(self,handler:Handler)->None:"""Register a handler. Handlers are triggered (executed) when events are emitted through the :obj:`~gaphor.core.eventmanager.EventManager.handle` method. """ifinspect.iscoroutinefunction(handler):defasync_handler(event:Event)->None:coro=handler(event)task=asyncio.create_task(coro)self._async_tasks.add(task)task.add_done_callback(self._task_done)self._subscribe(self._events,async_handler,getattr(handler,"__event_types__",None))else:self._subscribe(self._events,handler)asyncdefgather_tasks(self):ifself._async_tasks:awaitasyncio.gather(*self._async_tasks)def_task_done(self,task:asyncio.Task)->None:self._async_tasks.remove(task)defpriority_subscribe(self,handler:Handler)->None:"""Register a handler. Priority handlers are executed directly. They should not raise other events, cause that can cause a problem in the exection order. It's basically to make sure that all events are recorded by the undo manager. """self._subscribe(self._priority,handler)def_subscribe(self,manager:_Manager,handler:Handler,event_types=None)->None:event_types=getattr(handler,"__event_types__",event_types)ifnotevent_types:raiseException(f"No event types provided for function {handler}")foretinevent_types:manager.subscribe(handler,et)defunsubscribe(self,handler:Handler)->None:"""Unregister a previously registered handler."""event_types=getattr(handler,"__event_types__",None)ifnotevent_types:raiseException(f"No event types provided for function {handler}")foretinevent_types:self._priority.unsubscribe(handler,et)self._events.unsubscribe(handler,et)defhandle(self,*events:Event)->None:"""Send event notifications to registered handlers."""queue=self._queuequeue.extendleft(events)foreventinevents:self._priority.handle(event)ifnotself._handling:self._handling=Truetry:whilequeue:self._events.handle(queue.pop())finally:self._handling=False