From 34f37ed081dfd017c08c2ba8042f5f4bc188386a Mon Sep 17 00:00:00 2001 From: wanfeng Date: Tue, 18 Nov 2025 17:15:01 +0800 Subject: [PATCH] Encapsulates a linker used to return the edges related to a node --- graffiti/graph/linker.go | 318 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 318 insertions(+) create mode 100644 graffiti/graph/linker.go diff --git a/graffiti/graph/linker.go b/graffiti/graph/linker.go new file mode 100644 index 00000000..b20f8ae2 --- /dev/null +++ b/graffiti/graph/linker.go @@ -0,0 +1,318 @@ +package graph + +import ( + "reflect" + + "github.com/safchain/insanelock" + + "github.com/skydive-project/skydive/graffiti/getter" +) + +// Linker describes an object that returns incoming edges to a node +// and outgoing edges from that node +type Linker interface { + GetABLinks(node *Node) []*Edge + GetBALinks(node *Node) []*Edge +} + +// LinkerEventListener defines the event interface for linker +type LinkerEventListener interface { + OnError(err error) +} + +type listener struct { + DefaultGraphListener + graph *Graph + newLinksFunc func(node *Node) []*Edge + existingLinksFunc func(node *Node) []*Edge + metadata Metadata + links map[Identifier]bool + resourceLinker *ResourceLinker +} + +func mapOfLinks(edges []*Edge) map[Identifier]*Edge { + m := make(map[Identifier]*Edge) + for _, edge := range edges { + m[edge.ID] = edge + } + return m +} + +func (l *listener) nodeEvent(node *Node, deleted bool) { + var newLinks map[Identifier]*Edge + if !deleted { + newLinks = mapOfLinks(l.newLinksFunc(node)) + } + existingLinks := mapOfLinks(l.existingLinksFunc(node)) + + for id, newLink := range newLinks { + for k, v := range l.metadata { + newLink.Metadata[k] = v + } + + if oldLink, found := existingLinks[id]; !found { + if err := l.graph.AddEdge(newLink); err != nil { + l.resourceLinker.notifyError(err) + } else { + l.links[newLink.ID] = true + } + } else { + if !reflect.DeepEqual(newLink.Metadata, oldLink.Metadata) { + if err := l.graph.SetMetadata(oldLink, newLink.Metadata); err != nil { + l.resourceLinker.notifyError(err) + } + } + delete(existingLinks, id) + } + } + + for _, oldLink := range existingLinks { + if _, found := l.links[oldLink.ID]; found { + if err := l.graph.DelEdge(oldLink); err != nil { + l.resourceLinker.notifyError(err) + } + delete(l.links, oldLink.ID) + } + } +} + +func (l *listener) OnNodeAdded(node *Node) { + l.nodeEvent(node, false) +} + +func (l *listener) OnNodeUpdated(node *Node, ops []PartiallyUpdatedOp) { + l.nodeEvent(node, false) +} + +func (l *listener) OnNodeDeleted(node *Node) { + l.nodeEvent(node, true) +} + +// DefaultLinker returns a linker that does nothing +type DefaultLinker struct { +} + +// GetABLinks returns all the outgoing links for a node +func (dl *DefaultLinker) GetABLinks(node *Node) []*Edge { + return nil +} + +// GetBALinks returns all the incoming links for a node +func (dl *DefaultLinker) GetBALinks(node *Node) []*Edge { + return nil +} + +// ResourceLinker returns a resource linker. It listens for events from +// 2 graph events sources to determine if resources from one source should be +// linked with resources of the other source. +type ResourceLinker struct { + insanelock.RWMutex + g *Graph + abListener *listener + baListener *listener + glhs1 []ListenerHandler + glhs2 []ListenerHandler + linker Linker + metadata Metadata + links map[Identifier]bool + eventListeners []LinkerEventListener +} + +func (rl *ResourceLinker) getLinks(node *Node, direction string) []*Edge { + metadata := Metadata{} + for k, v := range rl.metadata { + metadata[k] = v + } + metadata[direction] = string(node.ID) + return rl.g.GetNodeEdges(node, metadata) +} + +// Start linking resources by listening for graph events +func (rl *ResourceLinker) Start() error { + links := make(map[Identifier]bool) + + if len(rl.glhs1) > 0 { + rl.abListener = &listener{ + graph: rl.g, + newLinksFunc: rl.linker.GetABLinks, + existingLinksFunc: func(node *Node) (edges []*Edge) { + return rl.getLinks(node, "Parent") + }, + metadata: rl.metadata, + links: links, + resourceLinker: rl, + } + for _, handler := range rl.glhs1 { + handler.AddEventListener(rl.abListener) + } + } + + if len(rl.glhs2) > 0 { + rl.baListener = &listener{ + graph: rl.g, + newLinksFunc: rl.linker.GetBALinks, + existingLinksFunc: func(node *Node) (edges []*Edge) { + return rl.getLinks(node, "Child") + }, + metadata: rl.metadata, + links: links, + resourceLinker: rl, + } + for _, handler := range rl.glhs2 { + handler.AddEventListener(rl.baListener) + } + } + + return nil +} + +// Stop linking resources +func (rl *ResourceLinker) Stop() { + for _, handler := range rl.glhs1 { + handler.RemoveEventListener(rl.abListener) + } + + for _, handler := range rl.glhs2 { + handler.RemoveEventListener(rl.baListener) + } +} + +func (rl *ResourceLinker) notifyError(err error) { + rl.RLock() + defer rl.RUnlock() + + for _, l := range rl.eventListeners { + l.OnError(err) + } +} + +// AddEventListener subscribe a new linker listener +func (rl *ResourceLinker) AddEventListener(l LinkerEventListener) { + rl.Lock() + defer rl.Unlock() + + rl.eventListeners = append(rl.eventListeners, l) +} + +// RemoveEventListener unsubscribe a linker listener +func (rl *ResourceLinker) RemoveEventListener(l LinkerEventListener) { + rl.Lock() + defer rl.Unlock() + + for i, el := range rl.eventListeners { + if l == el { + rl.eventListeners = append(rl.eventListeners[:i], rl.eventListeners[i+1:]...) + break + } + } +} + +// NewResourceLinker returns a new resource linker +func NewResourceLinker(g *Graph, glhs1 []ListenerHandler, glhs2 []ListenerHandler, linker Linker, m Metadata) *ResourceLinker { + return &ResourceLinker{ + g: g, + glhs1: glhs1, + glhs2: glhs2, + linker: linker, + metadata: m, + } +} + +// getFieldsAsArray returns an array of corresponding values from a field list +func getFieldsAsArray(obj getter.Getter, fields []string) ([][]interface{}, error) { + values := make([][]interface{}, len(fields)) + for i, index := range fields { + v, err := obj.GetField(index) + if err != nil { + return nil, err + } + + if v2, ok := v.([]interface{}); ok { + values[i] = v2 + } else { + values[i] = []interface{}{v} + } + } + + if len(values) == 1 { + v := make([][]interface{}, len(values[0])) + for i, value := range values[0] { + v[i] = []interface{}{value} + } + return v, nil + } + + return cartN(values...), nil +} + +func cartN(a ...[]interface{}) (c [][]interface{}) { + if len(a) == 0 { + return [][]interface{}{nil} + } + + r := cartN(a[1:]...) + for _, e := range a[0] { + for _, p := range r { + c = append(c, append([]interface{}{e}, p...)) + } + } + return +} + +// MetadataIndexerLinker describes an object that links resources from one indexer +// to resources from an other indexer. +type MetadataIndexerLinker struct { + *ResourceLinker + indexer1 *MetadataIndexer + indexer2 *MetadataIndexer + edgeMetadata Metadata +} + +func (mil *MetadataIndexerLinker) genID(parent, child *Node) Identifier { + args := []string{string(parent.ID), string(child.ID)} + for k, v := range mil.edgeMetadata { + args = append(args, k, v.(string)) + } + return GenID(args...) +} + +func (mil *MetadataIndexerLinker) createEdge(node1, node2 *Node) *Edge { + return mil.g.CreateEdge(mil.genID(node1, node2), node1, node2, mil.edgeMetadata, TimeUTC(), "") +} + +// GetABLinks returns all the outgoing links for a node +func (mil *MetadataIndexerLinker) GetABLinks(node *Node) (edges []*Edge) { + if vFields, err := getFieldsAsArray(node, mil.indexer1.indexes); err == nil { + for _, fields := range vFields { + nodes, _ := mil.indexer2.Get(fields...) + for _, n := range nodes { + edges = append(edges, mil.createEdge(node, n)) + } + } + } + return +} + +// GetBALinks returns all the incoming links for a node +func (mil *MetadataIndexerLinker) GetBALinks(node *Node) (edges []*Edge) { + if vFields, err := getFieldsAsArray(node, mil.indexer2.indexes); err == nil { + for _, fields := range vFields { + nodes, _ := mil.indexer1.Get(fields...) + for _, n := range nodes { + edges = append(edges, mil.createEdge(n, node)) + } + } + } + return +} + +// NewMetadataIndexerLinker returns a new metadata based linker +func NewMetadataIndexerLinker(g *Graph, indexer1, indexer2 *MetadataIndexer, edgeMetadata Metadata) *MetadataIndexerLinker { + mil := &MetadataIndexerLinker{ + indexer1: indexer1, + indexer2: indexer2, + } + + mil.ResourceLinker = NewResourceLinker(g, []ListenerHandler{indexer1}, []ListenerHandler{indexer2}, mil, edgeMetadata) + return mil +} -- Gitee