"""This module contains a model element Diagram.
Diagrams can be visualized and edited.
"""
from __future__ import annotations
import logging
from collections.abc import Callable, Collection, Iterable, Iterator, Sequence
from dataclasses import dataclass
from functools import lru_cache
from typing import (
Protocol,
TypeVar,
overload,
runtime_checkable,
)
import gaphas
from cairo import Context as CairoContext
from gaphor.core.modeling.base import Base, Id, RepositoryProtocol, generate_id
from gaphor.core.modeling.collection import collection
from gaphor.core.modeling.event import (
AssociationAdded,
AssociationDeleted,
DiagramUpdateRequested,
)
from gaphor.core.modeling.presentation import Presentation
from gaphor.core.modeling.properties import (
association,
attribute,
relation_many,
)
from gaphor.core.modeling.stylesheet import StyleSheet
from gaphor.core.styling import Style, StyleNode
from gaphor.i18n import translation
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class UpdateContext:
"""Context used when updating items (Presentation's).
Style contains the base style, no style alterations due to view
state (focus, hover, etc.).
"""
style: Style
@dataclass(frozen=True)
class DrawContext:
"""Special context for draw()'ing the item.
The draw-context contains stuff like the cairo context and flags
like selected and focused.
"""
cairo: CairoContext
style: Style
selected: bool
focused: bool
hovered: bool
dropzone: bool
@lru_cache
def attrname(obj, lower_name):
"""Look up a real attribute name based on a lower case (normalized)
name."""
return next((name for name in dir(obj) if name.lower() == lower_name), lower_name)
NO_ATTR = object()
def rgetattr(obj, names):
"""Recursively get a name, based on a list of names."""
name, *tail = names
v = getattr(obj, attrname(obj, name), NO_ATTR)
if isinstance(v, collection | list | tuple):
if tail and not v:
yield NO_ATTR
if tail:
for m in v:
yield from rgetattr(m, tail)
else:
yield from v
elif tail:
yield from rgetattr(v, tail)
elif v is not None:
yield v
def attrstr(obj):
"""Returns lower-case string representation of an attribute."""
if isinstance(obj, str):
return obj.lower()
elif isinstance(obj, bool | int):
return "true" if obj else ""
elif isinstance(obj, Base):
return obj.__class__.__name__.lower()
log.warning(
f'Can not make a string out of {obj}, returning "". Please raise an issue.'
)
return ""
def lookup_attribute(element: Base, name: str) -> str | None:
"""Look up an attribute from an element.
Attributes can be nested, e.g. ``owner.name``.
Returns ``""`` if the value is empty,
``None`` if the attribute does not exist.
"""
fields = name.split(".")
values = list(rgetattr(element, fields))
attr_values = [v for v in values if v is not NO_ATTR]
if not attr_values and NO_ATTR in values:
return None
return " ".join(map(attrstr, attr_values)).strip()
class StyledDiagram:
def __init__(
self,
diagram: Diagram,
selection: gaphas.selection.Selection | None = None,
):
self.diagram = diagram
self.selection = selection
self.pseudo: str | None = None
def name(self) -> str:
return "diagram"
def parent(self) -> StyleNode | None:
return None
def children(self) -> Iterator[StyleNode]:
return (
StyledItem(item, self.selection)
for item in self.diagram.get_all_items()
if not item.parent
)
def classes(self) -> Sequence[str]:
return []
def attribute(self, name: str) -> str | None:
return lookup_attribute(self.diagram, name)
def state(self) -> Sequence[str]:
return ()
def __hash__(self):
return hash((self.diagram, self.state()))
def __eq__(self, other):
return (
isinstance(other, StyledDiagram)
and self.diagram == other.diagram
and self.state() == other.state()
)
class StyledItem:
"""Wrapper to allow style information to be retrieved.
For convenience, a selection can be added. The selection instance
will provide pseudo-classes for the item (focus, hover, etc.).
"""
def __init__(
self,
item: Presentation,
selection: gaphas.selection.Selection | None = None,
):
assert item.diagram
self.item = item
self.diagram = item.diagram
self.selection = selection
self.pseudo: str | None = None
self._state = (
(
"active" if item in selection.selected_items else "",
"focus" if item is selection.focused_item else "",
"hover" if item is selection.hovered_item else "",
"drop" if item is selection.dropzone_item else "",
"disabled" if item in selection.grayed_out_items else "",
)
if selection
else ()
)
def name(self) -> str:
return css_name(self.item)
def parent(self) -> StyleNode | None:
parent = self.item.parent
# Do not propagate `selection`: item style should not be based on pseudo styles.
return StyledItem(parent) if parent else StyledDiagram(self.diagram)
def children(self) -> Iterator[StyleNode]:
item = self.item
yield from (node.style_node(self) for node in item.css_nodes())
selection = self.selection
yield from (StyledItem(child, selection) for child in item.children)
def classes(self) -> Sequence[str]:
return ["item"]
def attribute(self, name: str) -> str | None:
if item_value := lookup_attribute(self.item, name):
return item_value
if (
self.item.subject
and (subject_value := lookup_attribute(self.item.subject, name)) is not None
):
return subject_value
return item_value
def state(self) -> Sequence[str]:
return self._state
def __hash__(self):
return hash((self.item, self.state()))
def __eq__(self, other):
return (
isinstance(other, StyledItem)
and self.item == other.item
and self.state() == other.state()
)
def css_name(item) -> str:
return type(item).__name__.removesuffix("Item").lower()
P = TypeVar("P", bound=Presentation)
[文档]
class Diagram(Base):
"""Diagrams may contain :obj:`Presentation` elements.
If diagrams need to be owned, the modeling language (e.g. UML)
should subclass ``Diagram`` and add ownership relationships and rules.
"""
name: attribute[str] = attribute("name", str)
diagramType: attribute[str] = attribute("diagramType", str)
def __init__(self, id: Id | None = None, model: RepositoryProtocol | None = None):
"""Initialize the diagram with an optional id and element model."""
super().__init__(id, model)
self._connections = gaphas.connections.Connections()
self._connections.add_handler(self._on_constraint_solved)
self._registered_views: set[gaphas.model.View] = set()
self._dirty_items: set[gaphas.Item] = set()
self._watcher = self.watcher()
self._watcher.watch("ownedPresentation", self._owned_presentation_changed)
self._watcher.watch("ownedPresentation.parent", self._order_owned_presentation)
ownedPresentation: relation_many[Presentation] = association(
"ownedPresentation", Presentation, composite=True, opposite="diagram"
)
def _owned_presentation_changed(self, event):
if isinstance(event, AssociationDeleted) and event.old_value:
self._update_dirty_items(removed_items={event.old_value})
elif isinstance(event, AssociationAdded):
self._order_owned_presentation()
def _order_owned_presentation(self, event=None):
if event and event.property is not Presentation.parent:
return
owned_presentation = self.ownedPresentation
def traverse_items(parent=None) -> Iterable[Presentation]:
for item in owned_presentation:
if item.parent is parent:
yield item
yield from traverse_items(item)
new_order = sorted(
traverse_items(), key=lambda e: int(isinstance(e, gaphas.Line))
)
self.ownedPresentation.order(new_order.index)
def gettext(self, message: str) -> str:
"""Translate a message to the language used in the model."""
style_sheet = self.model.style_sheet
if style_sheet and style_sheet.naturalLanguage:
return translation(style_sheet.naturalLanguage).gettext(message)
return message
def postload(self):
"""Handle post-load functionality for the diagram."""
self._order_owned_presentation()
super().postload()
[文档]
def create(
self,
type_: type[P],
parent: Presentation | None = None,
subject: Base | None = None,
) -> P:
"""Create a new diagram item on the diagram.
It is created with a unique ID, and it is attached to the
diagram's root item. The type parameter is the element class to
create. The new element also has an optional parent and
subject.
"""
return self.create_as(type_, generate_id(), parent, subject)
def create_as(
self,
type_: type[P],
id: Id,
parent: Presentation | None = None,
subject: Base | None = None,
) -> P:
assert isinstance(self.model, PresentationRepositoryProtocol)
item = self.model.create_as(type_, id, diagram=self)
if not isinstance(item, gaphas.Item):
raise TypeError(f"Type {type_} does not comply with Item protocol")
if subject:
item.subject = subject
if parent:
item.parent = parent
self.update({item})
return item
[文档]
def lookup(self, id: Id) -> Presentation | None:
"""Find a presentation item by id.
Returns a presentation in this diagram or return ``None``.
"""
return next((item for item in self.get_all_items() if item.id == id), None)
def unlink(self) -> None:
"""Unlink all canvas items then unlink this diagram."""
for item in self.ownedPresentation:
self.connections.remove_connections_to_item(item)
self._watcher.unsubscribe_all()
super().unlink()
@overload
def select(
self, expression: Callable[[Presentation], bool]
) -> Iterator[Presentation]: ...
@overload
def select(self, expression: type[P]) -> Iterator[P]: ...
@overload
def select(self, expression: None) -> Iterator[Presentation]: ...
[文档]
def select(self, expression=None):
"""Return an iterator of all canvas items that match expression."""
if expression is None:
yield from self.get_all_items()
elif isinstance(expression, type):
yield from (e for e in self.get_all_items() if isinstance(e, expression))
else:
yield from (e for e in self.get_all_items() if expression(e))
[文档]
def update(self, dirty_items: Collection[Presentation] = ()) -> None:
"""Update the diagram.
All items that requested an update via :meth:`request_update`
are now updates. If an item has an ``update(context: UpdateContext)``
method, it's invoked. Constraints are solved.
"""
if not self._model:
return
def dirty_items_with_ancestors():
for item in self._dirty_items:
yield item
yield from gaphas.canvas.ancestors(self, item)
self._update_dirty_items(dirty_items)
style_sheet = self.model.style_sheet or StyleSheet()
style_sheet.clear_caches()
for item in reversed(list(self.sort(dirty_items_with_ancestors()))):
if update := getattr(item, "update", None):
update(UpdateContext(style=style_sheet.compute_style(StyledItem(item))))
self._connections.solve()
self._dirty_items.clear()
# gaphas.model.Model protocol:
@property
def connections(self) -> gaphas.connections.Connections:
return self._connections
def get_all_items(self) -> Iterable[Presentation]:
"""Get all items owned by this diagram, ordered depth-first."""
yield from self.ownedPresentation
def get_parent(self, item: Presentation) -> Presentation | None:
return item.parent
def get_children(self, item: Presentation) -> Iterable[Presentation]:
return iter(item.children)
def sort(self, items: Sequence[Presentation]) -> Iterable[Presentation]:
items_set = set(items)
return (n for n in self.get_all_items() if n in items_set)
[文档]
def request_update(self, item: gaphas.item.Item) -> None:
"""Schedule an item for updating.
No update is done at this point, it's only added to the set of
to-be updated items.
This method is part of the :obj:`gaphas.model.Model` protocol.
"""
if item in self.ownedPresentation:
self._update_dirty_items(dirty_items={item})
def update_now(self, _dirty_items: Collection[Presentation]) -> None:
# We skip forced updates, since they happen outside of transactions
pass
def register_view(self, view: gaphas.model.View[Presentation]) -> None:
self._registered_views.add(view)
def unregister_view(self, view: gaphas.model.View[Presentation]) -> None:
self._registered_views.discard(view)
def _update_dirty_items(self, dirty_items=(), removed_items=()):
"""Send an update notification to all registered views."""
should_emit = not bool(self._dirty_items)
if dirty_items:
self._dirty_items.update(dirty_items)
if removed_items:
self._dirty_items.difference_update(removed_items)
if should_emit:
self.handle(DiagramUpdateRequested(self))
# We can directly request updates on the view, since those happen asynchronously
for v in self._registered_views:
v.request_update(dirty_items, removed_items)
def _on_constraint_solved(self, cinfo: gaphas.connections.Connection) -> None:
dirty_items = set()
if cinfo.item:
dirty_items.add(cinfo.item)
if cinfo.connected:
dirty_items.add(cinfo.connected)
if dirty_items:
self._update_dirty_items(dirty_items)
@runtime_checkable
class PresentationRepositoryProtocol(Protocol):
def create_as(self, type: type[P], id: Id, diagram: Diagram) -> P: ...