gaphor.core.modeling.diagram 源代码

"""This module contains a model element Diagram.

Diagrams can be visualized and edited.
"""

from __future__ import annotations

import logging
from collections.abc import Callable, Collection, Iterable, Iterator, Sequence
from dataclasses import dataclass
from functools import lru_cache
from typing import (
    Protocol,
    TypeVar,
    overload,
    runtime_checkable,
)

import gaphas
from cairo import Context as CairoContext

from gaphor.core.modeling.base import Base, Id, RepositoryProtocol, generate_id
from gaphor.core.modeling.collection import collection
from gaphor.core.modeling.event import (
    AssociationAdded,
    AssociationDeleted,
    DiagramUpdateRequested,
)
from gaphor.core.modeling.presentation import Presentation
from gaphor.core.modeling.properties import (
    association,
    attribute,
    relation_many,
)
from gaphor.core.modeling.stylesheet import StyleSheet
from gaphor.core.styling import Style, StyleNode
from gaphor.i18n import translation

log = logging.getLogger(__name__)


@dataclass(frozen=True)
class UpdateContext:
    """Context used when updating items (Presentation's).

    Style contains the base style, no style alterations due to view
    state (focus, hover, etc.).
    """

    style: Style


@dataclass(frozen=True)
class DrawContext:
    """Special context for draw()'ing the item.

    The draw-context contains stuff like the cairo context and flags
    like selected and focused.
    """

    cairo: CairoContext
    style: Style
    selected: bool
    focused: bool
    hovered: bool
    dropzone: bool


@lru_cache
def attrname(obj, lower_name):
    """Look up a real attribute name based on a lower case (normalized)
    name."""
    return next((name for name in dir(obj) if name.lower() == lower_name), lower_name)


NO_ATTR = object()


def rgetattr(obj, names):
    """Recursively get a name, based on a list of names."""
    name, *tail = names
    v = getattr(obj, attrname(obj, name), NO_ATTR)
    if isinstance(v, collection | list | tuple):
        if tail and not v:
            yield NO_ATTR
        if tail:
            for m in v:
                yield from rgetattr(m, tail)
        else:
            yield from v
    elif tail:
        yield from rgetattr(v, tail)
    elif v is not None:
        yield v


def attrstr(obj):
    """Returns lower-case string representation of an attribute."""
    if isinstance(obj, str):
        return obj.lower()
    elif isinstance(obj, bool | int):
        return "true" if obj else ""
    elif isinstance(obj, Base):
        return obj.__class__.__name__.lower()
    log.warning(
        f'Can not make a string out of {obj}, returning "". Please raise an issue.'
    )
    return ""


def lookup_attribute(element: Base, name: str) -> str | None:
    """Look up an attribute from an element.

    Attributes can be nested, e.g. ``owner.name``.

    Returns ``""`` if the value is empty,
    ``None`` if the attribute does not exist.
    """
    fields = name.split(".")
    values = list(rgetattr(element, fields))
    attr_values = [v for v in values if v is not NO_ATTR]
    if not attr_values and NO_ATTR in values:
        return None
    return " ".join(map(attrstr, attr_values)).strip()


class StyledDiagram:
    def __init__(
        self,
        diagram: Diagram,
        selection: gaphas.selection.Selection | None = None,
    ):
        self.diagram = diagram
        self.selection = selection
        self.pseudo: str | None = None

    def name(self) -> str:
        return "diagram"

    def parent(self) -> StyleNode | None:
        return None

    def children(self) -> Iterator[StyleNode]:
        return (
            StyledItem(item, self.selection)
            for item in self.diagram.get_all_items()
            if not item.parent
        )

    def classes(self) -> Sequence[str]:
        return []

    def attribute(self, name: str) -> str | None:
        return lookup_attribute(self.diagram, name)

    def state(self) -> Sequence[str]:
        return ()

    def __hash__(self):
        return hash((self.diagram, self.state()))

    def __eq__(self, other):
        return (
            isinstance(other, StyledDiagram)
            and self.diagram == other.diagram
            and self.state() == other.state()
        )


class StyledItem:
    """Wrapper to allow style information to be retrieved.

    For convenience, a selection can be added. The selection instance
    will provide pseudo-classes for the item (focus, hover, etc.).
    """

    def __init__(
        self,
        item: Presentation,
        selection: gaphas.selection.Selection | None = None,
    ):
        assert item.diagram
        self.item = item
        self.diagram = item.diagram
        self.selection = selection
        self.pseudo: str | None = None
        self._state = (
            (
                "active" if item in selection.selected_items else "",
                "focus" if item is selection.focused_item else "",
                "hover" if item is selection.hovered_item else "",
                "drop" if item is selection.dropzone_item else "",
                "disabled" if item in selection.grayed_out_items else "",
            )
            if selection
            else ()
        )

    def name(self) -> str:
        return css_name(self.item)

    def parent(self) -> StyleNode | None:
        parent = self.item.parent
        # Do not propagate `selection`: item style should not be based on pseudo styles.
        return StyledItem(parent) if parent else StyledDiagram(self.diagram)

    def children(self) -> Iterator[StyleNode]:
        item = self.item
        yield from (node.style_node(self) for node in item.css_nodes())

        selection = self.selection
        yield from (StyledItem(child, selection) for child in item.children)

    def classes(self) -> Sequence[str]:
        return ["item"]

    def attribute(self, name: str) -> str | None:
        if item_value := lookup_attribute(self.item, name):
            return item_value

        if (
            self.item.subject
            and (subject_value := lookup_attribute(self.item.subject, name)) is not None
        ):
            return subject_value

        return item_value

    def state(self) -> Sequence[str]:
        return self._state

    def __hash__(self):
        return hash((self.item, self.state()))

    def __eq__(self, other):
        return (
            isinstance(other, StyledItem)
            and self.item == other.item
            and self.state() == other.state()
        )


def css_name(item) -> str:
    return type(item).__name__.removesuffix("Item").lower()


P = TypeVar("P", bound=Presentation)


[文档] class Diagram(Base): """Diagrams may contain :obj:`Presentation` elements. If diagrams need to be owned, the modeling language (e.g. UML) should subclass ``Diagram`` and add ownership relationships and rules. """ name: attribute[str] = attribute("name", str) diagramType: attribute[str] = attribute("diagramType", str) def __init__(self, id: Id | None = None, model: RepositoryProtocol | None = None): """Initialize the diagram with an optional id and element model.""" super().__init__(id, model) self._connections = gaphas.connections.Connections() self._connections.add_handler(self._on_constraint_solved) self._registered_views: set[gaphas.model.View] = set() self._dirty_items: set[gaphas.Item] = set() self._watcher = self.watcher() self._watcher.watch("ownedPresentation", self._owned_presentation_changed) self._watcher.watch("ownedPresentation.parent", self._order_owned_presentation) ownedPresentation: relation_many[Presentation] = association( "ownedPresentation", Presentation, composite=True, opposite="diagram" ) def _owned_presentation_changed(self, event): if isinstance(event, AssociationDeleted) and event.old_value: self._update_dirty_items(removed_items={event.old_value}) elif isinstance(event, AssociationAdded): self._order_owned_presentation() def _order_owned_presentation(self, event=None): if event and event.property is not Presentation.parent: return owned_presentation = self.ownedPresentation def traverse_items(parent=None) -> Iterable[Presentation]: for item in owned_presentation: if item.parent is parent: yield item yield from traverse_items(item) new_order = sorted( traverse_items(), key=lambda e: int(isinstance(e, gaphas.Line)) ) self.ownedPresentation.order(new_order.index) def gettext(self, message: str) -> str: """Translate a message to the language used in the model.""" style_sheet = self.model.style_sheet if style_sheet and style_sheet.naturalLanguage: return translation(style_sheet.naturalLanguage).gettext(message) return message def postload(self): """Handle post-load functionality for the diagram.""" self._order_owned_presentation() super().postload()
[文档] def create( self, type_: type[P], parent: Presentation | None = None, subject: Base | None = None, ) -> P: """Create a new diagram item on the diagram. It is created with a unique ID, and it is attached to the diagram's root item. The type parameter is the element class to create. The new element also has an optional parent and subject. """ return self.create_as(type_, generate_id(), parent, subject)
def create_as( self, type_: type[P], id: Id, parent: Presentation | None = None, subject: Base | None = None, ) -> P: assert isinstance(self.model, PresentationRepositoryProtocol) item = self.model.create_as(type_, id, diagram=self) if not isinstance(item, gaphas.Item): raise TypeError(f"Type {type_} does not comply with Item protocol") if subject: item.subject = subject if parent: item.parent = parent self.update({item}) return item
[文档] def lookup(self, id: Id) -> Presentation | None: """Find a presentation item by id. Returns a presentation in this diagram or return ``None``. """ return next((item for item in self.get_all_items() if item.id == id), None)
def unlink(self) -> None: """Unlink all canvas items then unlink this diagram.""" for item in self.ownedPresentation: self.connections.remove_connections_to_item(item) self._watcher.unsubscribe_all() super().unlink() @overload def select( self, expression: Callable[[Presentation], bool] ) -> Iterator[Presentation]: ... @overload def select(self, expression: type[P]) -> Iterator[P]: ... @overload def select(self, expression: None) -> Iterator[Presentation]: ...
[文档] def select(self, expression=None): """Return an iterator of all canvas items that match expression.""" if expression is None: yield from self.get_all_items() elif isinstance(expression, type): yield from (e for e in self.get_all_items() if isinstance(e, expression)) else: yield from (e for e in self.get_all_items() if expression(e))
[文档] def update(self, dirty_items: Collection[Presentation] = ()) -> None: """Update the diagram. All items that requested an update via :meth:`request_update` are now updates. If an item has an ``update(context: UpdateContext)`` method, it's invoked. Constraints are solved. """ if not self._model: return def dirty_items_with_ancestors(): for item in self._dirty_items: yield item yield from gaphas.canvas.ancestors(self, item) self._update_dirty_items(dirty_items) style_sheet = self.model.style_sheet or StyleSheet() style_sheet.clear_caches() for item in reversed(list(self.sort(dirty_items_with_ancestors()))): if update := getattr(item, "update", None): update(UpdateContext(style=style_sheet.compute_style(StyledItem(item)))) self._connections.solve() self._dirty_items.clear()
# gaphas.model.Model protocol: @property def connections(self) -> gaphas.connections.Connections: return self._connections def get_all_items(self) -> Iterable[Presentation]: """Get all items owned by this diagram, ordered depth-first.""" yield from self.ownedPresentation def get_parent(self, item: Presentation) -> Presentation | None: return item.parent def get_children(self, item: Presentation) -> Iterable[Presentation]: return iter(item.children) def sort(self, items: Sequence[Presentation]) -> Iterable[Presentation]: items_set = set(items) return (n for n in self.get_all_items() if n in items_set)
[文档] def request_update(self, item: gaphas.item.Item) -> None: """Schedule an item for updating. No update is done at this point, it's only added to the set of to-be updated items. This method is part of the :obj:`gaphas.model.Model` protocol. """ if item in self.ownedPresentation: self._update_dirty_items(dirty_items={item})
def update_now(self, _dirty_items: Collection[Presentation]) -> None: # We skip forced updates, since they happen outside of transactions pass def register_view(self, view: gaphas.model.View[Presentation]) -> None: self._registered_views.add(view) def unregister_view(self, view: gaphas.model.View[Presentation]) -> None: self._registered_views.discard(view) def _update_dirty_items(self, dirty_items=(), removed_items=()): """Send an update notification to all registered views.""" should_emit = not bool(self._dirty_items) if dirty_items: self._dirty_items.update(dirty_items) if removed_items: self._dirty_items.difference_update(removed_items) if should_emit: self.handle(DiagramUpdateRequested(self)) # We can directly request updates on the view, since those happen asynchronously for v in self._registered_views: v.request_update(dirty_items, removed_items) def _on_constraint_solved(self, cinfo: gaphas.connections.Connection) -> None: dirty_items = set() if cinfo.item: dirty_items.add(cinfo.item) if cinfo.connected: dirty_items.add(cinfo.connected) if dirty_items: self._update_dirty_items(dirty_items)
@runtime_checkable class PresentationRepositoryProtocol(Protocol): def create_as(self, type: type[P], id: Id, diagram: Diagram) -> P: ...