From a115d18595e748dc1deddc5fd12a56bb0f747014 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 2 May 2020 22:26:41 +0100 Subject: [PATCH] Refactor the multicast code a bit --- src/multicast/multicast.go | 275 ++++++++++++++++++------------------- 1 file changed, 134 insertions(+), 141 deletions(-) diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 93bf94b..6ceef0a 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -15,40 +15,57 @@ import ( "golang.org/x/net/ipv6" ) +const ( + // GroupAddr contains the multicast group and port used for multicast packets. + GroupAddr = "[ff02::114]:9001" +) + // Multicast represents the multicast advertisement and discovery mechanism used // by Yggdrasil to find peers on the same subnet. When a beacon is received on a // configured multicast interface, Yggdrasil will attempt to peer with that node // automatically. type Multicast struct { phony.Inbox - core *yggdrasil.Core - config *config.NodeState - log *log.Logger - sock *ipv6.PacketConn - groupAddr string - listeners map[string]*listenerInfo - listenPort uint16 - isOpen bool - announcer *time.Timer - platformhandler *time.Timer + core *yggdrasil.Core + config *config.NodeState + log *log.Logger + sock *ipv6.PacketConn + groupAddr *net.UDPAddr + listeners map[string]*multicastInterface + listenPort uint16 + isOpen bool + interfaceMonitor *time.Timer + announcer *time.Timer + platformhandler *time.Timer } -type listenerInfo struct { +type multicastInterface struct { + phony.Inbox + sock *ipv6.PacketConn + destAddr net.UDPAddr listener *yggdrasil.TcpListener - time time.Time + zone string + timer *time.Timer interval time.Duration + send chan<- beacon + stop chan interface{} +} + +type beacon struct { + llAddr string + zone string } // Init prepares the multicast interface for use. -func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log.Logger, options interface{}) error { +func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log.Logger, options interface{}) (err error) { m.core = core m.config = state m.log = log - m.listeners = make(map[string]*listenerInfo) + m.listeners = make(map[string]*multicastInterface) current := m.config.GetCurrent() m.listenPort = current.LinkLocalTCPPort - m.groupAddr = "[ff02::114]:9001" - return nil + m.groupAddr, err = net.ResolveUDPAddr("udp6", GroupAddr) + return } // Start starts the multicast interface. This launches goroutines which will @@ -71,7 +88,7 @@ func (m *Multicast) _start() error { return nil } m.log.Infoln("Starting multicast module") - addr, err := net.ResolveUDPAddr("udp", m.groupAddr) + addr, err := net.ResolveUDPAddr("udp", GroupAddr) if err != nil { return err } @@ -91,7 +108,7 @@ func (m *Multicast) _start() error { m.isOpen = true go m.listen() m.Act(m, m.multicastStarted) - m.Act(m, m.announce) + m.Act(m, m.monitorInterfaceChanges) return nil } @@ -118,9 +135,14 @@ func (m *Multicast) Stop() error { func (m *Multicast) _stop() error { m.log.Infoln("Stopping multicast module") m.isOpen = false - if m.announcer != nil { - m.announcer.Stop() - } + /* + if m.monitorInterfaceChanges != nil { + m.monitorInterfaceChanges.Stop() + } + if m.sendBeacons != nil { + m.sendBeacons.Stop() + } + */ if m.platformhandler != nil { m.platformhandler.Stop() } @@ -156,6 +178,83 @@ func (m *Multicast) _updateConfig(config *config.NodeConfig) { m.log.Debugln("Reloaded multicast configuration successfully") } +func (m *Multicast) monitorInterfaceChanges() { + interfaces := m.Interfaces() + + // Look for interfaces we don't know about yet. + for name, intf := range interfaces { + if _, ok := m.listeners[name]; !ok { + // Look up interface addresses. + addrs, err := intf.Addrs() + if err != nil { + continue + } + // Find the first link-local address. + for _, addr := range addrs { + addrIP, _, _ := net.ParseCIDR(addr.String()) + // Join the multicast group. + m.sock.JoinGroup(&intf, m.groupAddr) + // Construct a listener on this address. + listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, intf.Name, m.listenPort) + listener, err := m.core.ListenTCP(listenaddr) + if err != nil { + m.log.Warnln("Not multicasting on", name, "due to error:", err) + continue + } + // This is a new interface. Start an announcer for it. + multicastInterface := &multicastInterface{ + sock: m.sock, + destAddr: *m.groupAddr, + listener: listener, + stop: make(chan interface{}), + zone: name, + } + multicastInterface.Act(multicastInterface, multicastInterface.announce) + m.listeners[name] = multicastInterface + m.log.Infoln("Started multicasting on", name) + break + } + } + } + // Look for interfaces we knew about but are no longer there. + for name, intf := range m.listeners { + if _, ok := interfaces[name]; !ok { + // This is a disappeared interface. Stop the announcer. + close(intf.stop) + delete(m.listeners, name) + m.log.Infoln("Stopped multicasting on", name) + } + } + // Queue the next check. + m.interfaceMonitor = time.AfterFunc(time.Second, func() { + m.Act(m, m.monitorInterfaceChanges) + }) +} + +func (m *multicastInterface) announce() { + // Check if the multicast interface has been stopped. This will happen + // if it disappears from the system or goes down. + select { + case <-m.stop: + return + default: + } + // Send the beacon. + lladdr := m.listener.Listener.Addr().String() + if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { + a.Zone = "" + msg := []byte(a.String()) + m.sock.WriteTo(msg, nil, &m.destAddr) + } + // Queue the next beacon. + if m.interval.Seconds() < 15 { + m.interval += time.Second + } + m.timer = time.AfterFunc(m.interval, func() { + m.Act(m, m.announce) + }) +} + // GetInterfaces returns the currently known/enabled multicast interfaces. It is // expected that UpdateInterfaces has been called at least once before calling // this method. @@ -183,6 +282,19 @@ func (m *Multicast) Interfaces() map[string]net.Interface { // Ignore point-to-point interfaces continue } + addrs, _ := iface.Addrs() + hasLLAddr := false + for _, addr := range addrs { + addrIP, _, _ := net.ParseCIDR(addr.String()) + if addrIP.To4() == nil && addrIP.IsLinkLocalUnicast() { + hasLLAddr = true + break + } + } + if !hasLLAddr { + // Ignore interfaces without link-local addresses + continue + } for _, expr := range exprs { // Compile each regular expression e, err := regexp.Compile(expr) @@ -198,127 +310,8 @@ func (m *Multicast) Interfaces() map[string]net.Interface { return interfaces } -func (m *Multicast) announce() { - groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) - if err != nil { - panic(err) - } - destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) - if err != nil { - panic(err) - } - interfaces := m.Interfaces() - // There might be interfaces that we configured listeners for but are no - // longer up - if that's the case then we should stop the listeners - for name, info := range m.listeners { - // Prepare our stop function! - stop := func() { - info.listener.Stop() - delete(m.listeners, name) - m.log.Debugln("No longer multicasting on", name) - } - // If the interface is no longer visible on the system then stop the - // listener, as another one will be started further down - if _, ok := interfaces[name]; !ok { - stop() - continue - } - // It's possible that the link-local listener address has changed so if - // that is the case then we should clean up the interface listener - found := false - listenaddr, err := net.ResolveTCPAddr("tcp6", info.listener.Listener.Addr().String()) - if err != nil { - stop() - continue - } - // Find the interface that matches the listener - if intf, err := net.InterfaceByName(name); err == nil { - if addrs, err := intf.Addrs(); err == nil { - // Loop through the addresses attached to that listener and see if any - // of them match the current address of the listener - for _, addr := range addrs { - if ip, _, err := net.ParseCIDR(addr.String()); err == nil { - // Does the interface address match our listener address? - if ip.Equal(listenaddr.IP) { - found = true - break - } - } - } - } - } - // If the address has not been found on the adapter then we should stop - // and clean up the TCP listener. A new one will be created below if a - // suitable link-local address is found - if !found { - stop() - } - } - // Now that we have a list of valid interfaces from the operating system, - // we can start checking if we can send multicasts on them - for _, iface := range interfaces { - // Find interface addresses - addrs, err := iface.Addrs() - if err != nil { - panic(err) - } - for _, addr := range addrs { - addrIP, _, _ := net.ParseCIDR(addr.String()) - // Ignore IPv4 addresses - if addrIP.To4() != nil { - continue - } - // Ignore non-link-local addresses - if !addrIP.IsLinkLocalUnicast() { - continue - } - // Join the multicast group - m.sock.JoinGroup(&iface, groupAddr) - // Try and see if we already have a TCP listener for this interface - var info *listenerInfo - if nfo, ok := m.listeners[iface.Name]; !ok || nfo.listener.Listener == nil { - // No listener was found - let's create one - listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) - if li, err := m.core.ListenTCP(listenaddr); err == nil { - m.log.Debugln("Started multicasting on", iface.Name) - // Store the listener so that we can stop it later if needed - info = &listenerInfo{listener: li, time: time.Now()} - m.listeners[iface.Name] = info - } else { - m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) - } - } else { - // An existing listener was found - info = m.listeners[iface.Name] - } - // Make sure nothing above failed for some reason - if info == nil { - continue - } - if time.Since(info.time) < info.interval { - continue - } - // Get the listener details and construct the multicast beacon - lladdr := info.listener.Listener.Addr().String() - if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { - a.Zone = "" - destAddr.Zone = iface.Name - msg := []byte(a.String()) - m.sock.WriteTo(msg, nil, destAddr) - } - if info.interval.Seconds() < 15 { - info.interval += time.Second - } - break - } - } - m.announcer = time.AfterFunc(time.Second, func() { - m.Act(m, m.announce) - }) -} - func (m *Multicast) listen() { - groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) + groupAddr, err := net.ResolveUDPAddr("udp6", GroupAddr) if err != nil { panic(err) }