Source code for gaphor.core.modeling.diagram

"""This module contains a model element Diagram.

Diagrams can be visualized and edited.
"""
from __future__ import annotations

import logging
from collections.abc import Callable, Collection, Iterable, Iterator, Sequence
from dataclasses import dataclass
from functools import lru_cache
from typing import (
    Protocol,
    TypeVar,
    overload,
    runtime_checkable,
)

import gaphas
from cairo import Context as CairoContext

from gaphor.core.modeling.collection import collection
from gaphor.core.modeling.element import (
    Element,
    Id,
    RepositoryProtocol,
    generate_id,
    self_and_owners,
)
from gaphor.core.modeling.event import (
    AssociationAdded,
    AssociationDeleted,
    DiagramUpdateRequested,
)
from gaphor.core.modeling.presentation import Presentation
from gaphor.core.modeling.properties import (
    association,
    attribute,
    relation_many,
    relation_one,
)
from gaphor.core.modeling.stylesheet import StyleSheet
from gaphor.core.styling import CompiledStyleSheet, Style, StyleNode
from gaphor.i18n import translation

log = logging.getLogger(__name__)

# Not all styles are required: "background-color", "font-weight",
# "text-color", and "text-decoration" are optional (can default to None)
FALLBACK_STYLE: Style = {
    "color": (0, 0, 0, 1),
    "font-family": "sans",
    "font-size": 14,
}


@dataclass(frozen=True)
class UpdateContext:
    """Context used when updating items (Presentation's).

    Style contains the base style, no style alterations due to view
    state (focus, hover, etc.).
    """

    style: Style


@dataclass(frozen=True)
class DrawContext:
    """Special context for draw()'ing the item.

    The draw-context contains stuff like the cairo context and flags
    like selected and focused.
    """

    cairo: CairoContext
    style: Style
    selected: bool
    focused: bool
    hovered: bool
    dropzone: bool


@lru_cache()
def attrname(obj, lower_name):
    """Look up a real attribute name based on a lower case (normalized)
    name."""
    return next((name for name in dir(obj) if name.lower() == lower_name), lower_name)


NO_ATTR = object()


def rgetattr(obj, names):
    """Recursively get a name, based on a list of names."""
    name, *tail = names
    v = getattr(obj, attrname(obj, name), NO_ATTR)
    if isinstance(v, (collection, list, tuple)):
        if tail and not v:
            yield NO_ATTR
        if tail:
            for m in v:
                yield from rgetattr(m, tail)
        else:
            yield from v
    elif tail:
        yield from rgetattr(v, tail)
    elif v is not None:
        yield v


def attrstr(obj):
    """Returns lower-case string representation of an attribute."""
    if isinstance(obj, str):
        return obj.lower()
    elif isinstance(obj, (bool, int)):
        return "true" if obj else ""
    elif isinstance(obj, Element):
        return obj.__class__.__name__.lower()
    log.warn(
        f'Can not make a string out of {obj}, returning "". Please raise an issue.'
    )
    return ""


def lookup_attribute(element: Element, name: str) -> str | None:
    """Look up an attribute from an element.

    Attributes can be nested, e.g. ``owner.name``.

    Returns ``""`` if the value is empty,
    ``None`` if the attribute does not exist.
    """
    fields = name.split(".")
    values = list(rgetattr(element, fields))
    attr_values = [v for v in values if v is not NO_ATTR]
    if not attr_values and NO_ATTR in values:
        return None
    return " ".join(map(attrstr, attr_values)).strip()


def qualifiedName(element: Element) -> list[str]:
    """Returns the qualified name of the element as a tuple."""
    qname = [getattr(e, "name", "??") for e in self_and_owners(element)]
    qname.reverse()
    return qname


class StyledDiagram:
    def __init__(
        self,
        diagram: Diagram,
        selection: gaphas.selection.Selection | None = None,
        dark_mode: bool | None = None,
    ):
        self.diagram = diagram
        self.selection = selection
        self.pseudo: str | None = None
        self.dark_mode = dark_mode

    def name(self) -> str:
        return "diagram"

    def parent(self) -> StyleNode | None:
        return None

    def children(self) -> Iterator[StyleNode]:
        return (
            StyledItem(item, self.selection, dark_mode=self.dark_mode)
            for item in self.diagram.get_all_items()
            if not item.parent
        )

    def attribute(self, name: str) -> str | None:
        return lookup_attribute(self.diagram, name)

    def state(self) -> Sequence[str]:
        return ()

    def __hash__(self):
        return hash((self.diagram, self.state(), self.dark_mode))

    def __eq__(self, other):
        return (
            isinstance(other, StyledDiagram)
            and self.diagram == other.diagram
            and self.state() == other.state()
            and self.dark_mode == other.dark_mode
        )


class StyledItem:
    """Wrapper to allow style information to be retrieved.

    For convenience, a selection can be added. The selection instance
    will provide pseudo-classes for the item (focus, hover, etc.).
    """

    def __init__(
        self,
        item: Presentation,
        selection: gaphas.selection.Selection | None = None,
        dark_mode: bool | None = None,
    ):
        assert item.diagram
        self.item = item
        self.diagram = item.diagram
        self.selection = selection
        self.pseudo: str | None = None
        self.dark_mode = dark_mode
        self._state = (
            (
                "active" if item in selection.selected_items else "",
                "focus" if item is selection.focused_item else "",
                "hover" if item is selection.hovered_item else "",
                "drop" if item is selection.dropzone_item else "",
                "disabled" if item in selection.grayed_out_items else "",
            )
            if selection
            else ()
        )

    def name(self) -> str:
        return type(self.item).__name__.removesuffix("Item").lower()

    def parent(self) -> StyleNode | None:
        parent = self.item.parent
        return (
            StyledItem(parent, self.selection, dark_mode=self.dark_mode)
            if parent
            else StyledDiagram(self.diagram, self.selection, self.dark_mode)
        )

    def children(self) -> Iterator[StyleNode]:
        item = self.item
        yield from (node.style_node(self) for node in item.css_nodes())

        selection = self.selection
        yield from (
            StyledItem(child, selection, dark_mode=self.dark_mode)
            for child in item.children
        )

    def attribute(self, name: str) -> str | None:
        a = lookup_attribute(self.item, name)
        if a is None and self.item.subject:
            a = lookup_attribute(self.item.subject, name)
        return a

    def state(self) -> Sequence[str]:
        return self._state

    def __hash__(self):
        return hash((self.item, self.state(), self.dark_mode))

    def __eq__(self, other):
        return (
            isinstance(other, StyledItem)
            and self.item == other.item
            and self.state() == other.state()
            and self.dark_mode == other.dark_mode
        )


P = TypeVar("P", bound=Presentation)


[docs] class Diagram(Element): """Diagrams may contain :obj:`Presentation` elements and can be owned by any element.""" name: attribute[str] = attribute("name", str) diagramType: attribute[str] = attribute("diagramType", str) element: relation_one[Element] def __init__(self, id: Id | None = None, model: RepositoryProtocol | None = None): """Initialize the diagram with an optional id and element model.""" super().__init__(id, model) self._connections = gaphas.connections.Connections() self._connections.add_handler(self._on_constraint_solved) self._compiled_style_sheet: CompiledStyleSheet | None = None self._registered_views: set[gaphas.model.View] = set() self._dirty_items: set[gaphas.Item] = set() self._watcher = self.watcher() self._watcher.watch("ownedPresentation", self._owned_presentation_changed) self._watcher.watch("ownedPresentation.parent", self._order_owned_presentation) ownedPresentation: relation_many[Presentation] = association( "ownedPresentation", Presentation, composite=True, opposite="diagram" ) @property def qualifiedName(self) -> list[str]: """Returns the qualified name of the element as a tuple.""" return qualifiedName(self) def _owned_presentation_changed(self, event): if isinstance(event, AssociationDeleted) and event.old_value: self._update_dirty_items(removed_items={event.old_value}) elif isinstance(event, AssociationAdded): self._order_owned_presentation() def _order_owned_presentation(self, event=None): if event and event.property is not Presentation.parent: return ownedPresentation = self.ownedPresentation def traverse_items(parent=None) -> Iterable[Presentation]: for item in ownedPresentation: if item.parent is parent: yield item yield from traverse_items(item) new_order = sorted( traverse_items(), key=lambda e: int(isinstance(e, gaphas.Line)) ) self.ownedPresentation.order(new_order.index) @property def styleSheet(self) -> StyleSheet | None: return next(self.model.select(StyleSheet), None) def style(self, node: StyleNode) -> Style: if not (compiled_style_sheet := self._compiled_style_sheet): style_sheet = self.styleSheet compiled_style_sheet = self._compiled_style_sheet = ( style_sheet.new_compiled_style_sheet() if style_sheet else None ) return ( compiled_style_sheet.compute_style(node) if compiled_style_sheet else FALLBACK_STYLE ) def gettext(self, message: str) -> str: """Translate a message to the language used in the model.""" style_sheet = self.styleSheet if style_sheet and style_sheet.naturalLanguage: return translation(style_sheet.naturalLanguage).gettext(message) return message def postload(self): """Handle post-load functionality for the diagram.""" self._order_owned_presentation() super().postload()
[docs] def create( self, type_: type[P], parent: Presentation | None = None, subject: Element | None = None, ) -> P: """Create a new diagram item on the diagram. It is created with a unique ID, and it is attached to the diagram's root item. The type parameter is the element class to create. The new element also has an optional parent and subject. """ return self.create_as(type_, generate_id(), parent, subject)
def create_as( self, type_: type[P], id: Id, parent: Presentation | None = None, subject: Element | None = None, ) -> P: assert isinstance(self.model, PresentationRepositoryProtocol) item = self.model.create_as(type_, id, diagram=self) if not isinstance(item, gaphas.Item): raise TypeError(f"Type {type_} does not comply with Item protocol") if subject: item.subject = subject if parent: item.parent = parent self.update({item}) return item
[docs] def lookup(self, id: Id) -> Presentation | None: """Find a presentation item by id. Returns a presentation in this diagram or return ``None``. """ return next((item for item in self.get_all_items() if item.id == id), None)
def unlink(self): """Unlink all canvas items then unlink this diagram.""" for item in self.ownedPresentation: self.connections.remove_connections_to_item(item) self._watcher.unsubscribe_all() super().unlink() @overload def select( self, expression: Callable[[Presentation], bool] ) -> Iterator[Presentation]: ... @overload def select(self, expression: type[P]) -> Iterator[P]: ... @overload def select(self, expression: None) -> Iterator[Presentation]: ...
[docs] def select(self, expression=None): """Return an iterator of all canvas items that match expression.""" if expression is None: yield from self.get_all_items() elif isinstance(expression, type): yield from (e for e in self.get_all_items() if isinstance(e, expression)) else: yield from (e for e in self.get_all_items() if expression(e))
[docs] def update(self, dirty_items: Collection[Presentation] = ()) -> None: """Update the diagram. All items that requested an update via :meth:`request_update` are now updates. If an item has an ``update(context: UpdateContext)`` method, it's invoked. Constraints are solved. """ self._update_dirty_items(dirty_items) # Clear our (cached) style sheet first self._compiled_style_sheet = None def dirty_items_with_ancestors(): for item in self._dirty_items: yield item yield from gaphas.canvas.ancestors(self, item) for item in reversed(list(self.sort(dirty_items_with_ancestors()))): if update := getattr(item, "update", None): update(UpdateContext(style=self.style(StyledItem(item)))) self._connections.solve() self._dirty_items.clear()
# gaphas.model.Model protocol: @property def connections(self) -> gaphas.connections.Connections: return self._connections def get_all_items(self) -> Iterable[Presentation]: """Get all items owned by this diagram, ordered depth-first.""" yield from self.ownedPresentation def get_parent(self, item: Presentation) -> Presentation | None: return item.parent def get_children(self, item: Presentation) -> Iterable[Presentation]: return iter(item.children) def sort(self, items: Sequence[Presentation]) -> Iterable[Presentation]: items_set = set(items) return (n for n in self.get_all_items() if n in items_set)
[docs] def request_update(self, item: gaphas.item.Item) -> None: """Schedule an item for updating. No update is done at this point, it's only added to the set of to-be updated items. This method is part of the :obj:`gaphas.model.Model` protocol. """ if item in self.ownedPresentation: self._update_dirty_items(dirty_items={item})
def update_now(self, _dirty_items: Collection[Presentation]) -> None: pass def register_view(self, view: gaphas.model.View[Presentation]) -> None: self._registered_views.add(view) def unregister_view(self, view: gaphas.model.View[Presentation]) -> None: self._registered_views.discard(view) def _update_dirty_items(self, dirty_items=(), removed_items=()): """Send an update notification to all registered views.""" should_emit = not bool(self._dirty_items) if dirty_items: self._dirty_items.update(dirty_items) if removed_items: self._dirty_items.difference_update(removed_items) if should_emit: self.handle(DiagramUpdateRequested(self)) # We can directly request updates on the view, since those happen asynchronously for v in self._registered_views: v.request_update(dirty_items, removed_items) def _on_constraint_solved(self, cinfo: gaphas.connections.Connection) -> None: dirty_items = set() if cinfo.item: dirty_items.add(cinfo.item) if cinfo.connected: dirty_items.add(cinfo.connected) if dirty_items: self._update_dirty_items(dirty_items)
@runtime_checkable class PresentationRepositoryProtocol(Protocol): def create_as(self, type: type[P], id: str, diagram: Diagram) -> P: ...