diff --git a/graffiti/graph/processor.go b/graffiti/graph/processor.go new file mode 100644 index 0000000000000000000000000000000000000000..1e0f1d914c0d00138323f35e769b9ae3699d4a73 --- /dev/null +++ b/graffiti/graph/processor.go @@ -0,0 +1,97 @@ +package graph + +import ( + "github.com/safchain/insanelock" +) + +// NodeAction is a callback to perform on a node. The action is kept +// active as long as it returns true. +type NodeAction interface { + ProcessNode(g *Graph, n *Node) bool +} + +// deferred represents a node action with additional info if needed +// for cancellation. +type deferred struct { + action NodeAction +} + +// Processor encapsulates an indexer that will process NodeActions +// on the nodes that filter +type Processor struct { + insanelock.RWMutex + DefaultGraphListener + *MetadataIndexer + actions map[string][]deferred +} + +// NewProcessor creates a Processor on the graph g, a stream of +// events controlled by listenerHandler, that match a first set +// of metadata m. Actions will be associated to a given set +// of values for indexes. +func NewProcessor(g *Graph, listenerHandler ListenerHandler, m ElementMatcher, indexes ...string) (processor *Processor) { + processor = &Processor{ + MetadataIndexer: NewMetadataIndexer(g, listenerHandler, m, indexes...), + actions: make(map[string][]deferred), + } + processor.AddEventListener(processor) + return +} + +// DoAction will perform the action for nodes matching values. +func (processor *Processor) DoAction(action NodeAction, values ...interface{}) { + nodes, _ := processor.Get(values...) + kont := true + for _, node := range nodes { + kont = action.ProcessNode(processor.graph, node) + if !kont { + break + } + } + if kont { + act := deferred{action: action} + hash := Hash(values...) + processor.Lock() + if actions, ok := processor.actions[hash]; ok { + processor.actions[hash] = append(actions, act) + } else { + actions := []deferred{act} + processor.actions[hash] = actions + } + processor.Unlock() + } +} + +// Cancel the actions attached to a given set of values. +func (processor *Processor) Cancel(values ...interface{}) { + processor.Lock() + delete(processor.actions, Hash(values...)) + processor.Unlock() +} + +// OnNodeAdded event +func (processor *Processor) OnNodeAdded(n *Node) { + if vValues, err := getFieldsAsArray(n, processor.indexes); err == nil { + for _, values := range vValues { + hash := Hash(values...) + processor.RLock() + actions, ok := processor.actions[hash] + processor.RUnlock() + if ok { + var keep []deferred + for _, action := range actions { + if action.action.ProcessNode(processor.graph, n) { + keep = append(keep, action) + } + } + processor.Lock() + if len(keep) == 0 { + delete(processor.actions, hash) + } else { + processor.actions[hash] = keep + } + processor.Unlock() + } + } + } +}