From b97b0474d9fddc758852d43f48b54a5a3dd899da Mon Sep 17 00:00:00 2001 From: wanfeng Date: Thu, 23 Oct 2025 15:28:24 +0800 Subject: [PATCH] add StructSpeaker, a Speaker able to handle Struct Message and Request/Reply calls --- graffiti/websocket/message.go | 232 ++++++++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) diff --git a/graffiti/websocket/message.go b/graffiti/websocket/message.go index bf98c471..e7f67e15 100644 --- a/graffiti/websocket/message.go +++ b/graffiti/websocket/message.go @@ -297,3 +297,235 @@ func newStructSpeakerPoolEventDispatcher(pool SpeakerPool) *structSpeakerPoolEve } } +// StructSpeaker is a Speaker able to handle Struct Message and Request/Reply calls. +type StructSpeaker struct { + Speaker + *structSpeakerEventDispatcher + nsSubscribed map[string]bool + replyChanMutex insanelock.RWMutex + replyChan map[string]chan *StructMessage + logger logging.Logger +} + +// SendMessage sends a message according to the namespace. +func (s *StructSpeaker) SendMessage(m Message) error { + if msg, ok := m.(*StructMessage); ok { + if _, ok := s.nsSubscribed[msg.Namespace]; !ok { + if _, ok := s.nsSubscribed[WildcardNamespace]; !ok { + return nil + } + } + } + + return s.Speaker.SendMessage(m) +} + +func (s *StructSpeaker) onReply(m *StructMessage) bool { + s.replyChanMutex.RLock() + ch, ok := s.replyChan[m.UUID] + if ok { + ch <- m + } + s.replyChanMutex.RUnlock() + + return ok +} + +// Request sends a Struct message request waiting for a reply using the given timeout. +func (s *StructSpeaker) Request(m *StructMessage, timeout time.Duration) (*StructMessage, error) { + ch := make(chan *StructMessage, 1) + + s.replyChanMutex.Lock() + s.replyChan[m.UUID] = ch + s.replyChanMutex.Unlock() + + defer func() { + s.replyChanMutex.Lock() + delete(s.replyChan, m.UUID) + close(ch) + s.replyChanMutex.Unlock() + }() + + s.SendMessage(m) + + select { + case resp := <-ch: + return resp, nil + case <-time.After(timeout): + return nil, errors.New("Timeout") + } +} + +// OnMessage checks that the Message comes from a StructSpeaker. It parses +// the Struct message and then dispatch the message to the proper listeners according +// to the namespace. +func (s *StructSpeaker) OnMessage(c Speaker, m Message) { + if c, ok := c.(*StructSpeaker); ok { + // m is a rawmessage at this point + bytes, _ := m.Bytes(RawProtocol) + + var structMsg StructMessage + if err := structMsg.unmarshalByProtocol(bytes, c.GetClientProtocol()); err != nil { + s.logger.Error(err) + return + } + s.structSpeakerEventDispatcher.dispatchMessage(c, &structMsg) + } +} + +func newStructSpeaker(c Speaker, logger logging.Logger) *StructSpeaker { + s := &StructSpeaker{ + Speaker: c, + structSpeakerEventDispatcher: newStructSpeakerEventDispatcher(), + nsSubscribed: make(map[string]bool), + replyChan: make(map[string]chan *StructMessage), + logger: logger, + } + + // subscribing to itself so that the StructSpeaker can get Message and can convert them + // to StructMessage and then forward them to its own even listeners. + s.AddEventHandler(s) + return s +} + +// UpgradeToStructSpeaker a WebSocket client to a StructSpeaker +func (c *Client) UpgradeToStructSpeaker() *StructSpeaker { + s := newStructSpeaker(c, c.logger) + c.Lock() + c.wsSpeaker = s + s.nsSubscribed[WildcardNamespace] = true + c.Unlock() + return s +} + +func (c *wsIncomingClient) upgradeToStructSpeaker() *StructSpeaker { + s := newStructSpeaker(c, c.logger) + c.Lock() + c.wsSpeaker = s + c.Unlock() + return s +} + +// StructSpeakerPool is the interface of a pool of StructSpeakers. +type StructSpeakerPool interface { + SpeakerPool + SpeakerStructMessageDispatcher + Request(host string, request *StructMessage, timeout time.Duration) (*StructMessage, error) +} + +// StructClientPool is a ClientPool able to send StructMessage. +type StructClientPool struct { + *ClientPool + *structSpeakerPoolEventDispatcher +} + +// AddClient adds a Client to the pool. +func (s *StructClientPool) AddClient(c Speaker) error { + if wc, ok := c.(*Client); ok { + speaker := wc.UpgradeToStructSpeaker() + s.ClientPool.AddClient(speaker) + s.structSpeakerPoolEventDispatcher.AddStructSpeaker(speaker) + } else { + return errors.New("wrong client type") + } + return nil +} + +// Request sends a Request Struct message to the Speaker of the given remote host. +func (s *StructClientPool) Request(host string, request *StructMessage, timeout time.Duration) (*StructMessage, error) { + c, err := s.ClientPool.GetSpeakerByRemoteHost(host) + if err != nil { + return nil, err + } + + return c.(*StructSpeaker).Request(request, timeout) +} + +// NewStructClientPool returns a new StructClientPool. +func NewStructClientPool(name string, opts PoolOpts) *StructClientPool { + pool := NewClientPool(name, opts) + return &StructClientPool{ + ClientPool: pool, + structSpeakerPoolEventDispatcher: newStructSpeakerPoolEventDispatcher(pool), + } +} + +// StructServer is a Server able to handle StructSpeaker. +type StructServer struct { + *Server + *structSpeakerPoolEventDispatcher +} + +// Request sends a Request Struct message to the Speaker of the given remote host. +func (s *StructServer) Request(host string, request *StructMessage, timeout time.Duration) (*StructMessage, error) { + c, err := s.Server.GetSpeakerByRemoteHost(host) + if err != nil { + return nil, err + } + + return c.(*StructSpeaker).Request(request, timeout) +} + +// OnMessage websocket event. +func (s *StructServer) OnMessage(c Speaker, m Message) { +} + +// OnConnected websocket event. +func (s *StructServer) OnConnected(c Speaker) error { + return nil +} + +// OnDisconnected removes the Speaker from the incomer pool. +func (s *StructServer) OnDisconnected(c Speaker) { +} + +// NewStructServer returns a new StructServer +func NewStructServer(server *Server) *StructServer { + s := &StructServer{ + Server: server, + structSpeakerPoolEventDispatcher: newStructSpeakerPoolEventDispatcher(server), + } + + s.Server.incomerPool.AddEventHandler(s) + + // This incomerHandler upgrades the incomers to StructSpeaker thus being able to parse StructMessage. + // The server set also the StructSpeaker with the proper namspaces it subscribes to thanks to the + // headers. + s.Server.incomerHandler = func(conn *websocket.Conn, r *auth.AuthenticatedRequest, promoter clientPromoter) (Speaker, error) { + // the default incomer handler creates a standard wsIncomingClient that we upgrade to a StructSpeaker + // being able to handle the StructMessage + uc, err := s.Server.newIncomingClient(conn, r, func(ic *wsIncomingClient) (Speaker, error) { + c := ic.upgradeToStructSpeaker() + + // from headers + if namespaces, ok := r.Header["X-Websocket-Namespace"]; ok { + for _, ns := range namespaces { + c.nsSubscribed[ns] = true + } + } + + // from parameter, useful for browser client + if namespaces, ok := r.URL.Query()["x-websocket-namespace"]; ok { + for _, ns := range namespaces { + c.nsSubscribed[ns] = true + } + } + + // if empty use wildcard for backward compatibility + if len(c.nsSubscribed) == 0 { + c.nsSubscribed[WildcardNamespace] = true + } + + s.structSpeakerPoolEventDispatcher.AddStructSpeaker(c) + + return c, nil + }) + if err != nil { + return nil, err + } + + return uc, nil + } + + return s +} -- Gitee