diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 93bf94b..fc7b137 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -15,6 +15,11 @@ import ( "golang.org/x/net/ipv6" ) +const ( + // GroupAddr contains the multicast group and port used for multicast packets. + GroupAddr = "[ff02::114]:9001" +) + // Multicast represents the multicast advertisement and discovery mechanism used // by Yggdrasil to find peers on the same subnet. When a beacon is received on a // configured multicast interface, Yggdrasil will attempt to peer with that node @@ -25,30 +30,42 @@ type Multicast struct { config *config.NodeState log *log.Logger sock *ipv6.PacketConn - groupAddr string - listeners map[string]*listenerInfo + groupAddr *net.UDPAddr + listeners map[string]*multicastInterface listenPort uint16 isOpen bool - announcer *time.Timer + monitor *time.Timer platformhandler *time.Timer + _interfaces map[string]net.Interface + _interfaceAddrs map[string]addrInfo } -type listenerInfo struct { +type addrInfo struct { + addrs []net.Addr + time time.Time +} + +type multicastInterface struct { + phony.Inbox + sock *ipv6.PacketConn + destAddr net.UDPAddr listener *yggdrasil.TcpListener - time time.Time + zone string + timer *time.Timer interval time.Duration + stop chan interface{} } // Init prepares the multicast interface for use. -func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log.Logger, options interface{}) error { +func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log.Logger, options interface{}) (err error) { m.core = core m.config = state m.log = log - m.listeners = make(map[string]*listenerInfo) + m.listeners = make(map[string]*multicastInterface) current := m.config.GetCurrent() m.listenPort = current.LinkLocalTCPPort - m.groupAddr = "[ff02::114]:9001" - return nil + m.groupAddr, err = net.ResolveUDPAddr("udp6", GroupAddr) + return } // Start starts the multicast interface. This launches goroutines which will @@ -71,7 +88,7 @@ func (m *Multicast) _start() error { return nil } m.log.Infoln("Starting multicast module") - addr, err := net.ResolveUDPAddr("udp", m.groupAddr) + addr, err := net.ResolveUDPAddr("udp", GroupAddr) if err != nil { return err } @@ -90,8 +107,8 @@ func (m *Multicast) _start() error { m.isOpen = true go m.listen() - m.Act(m, m.multicastStarted) - m.Act(m, m.announce) + m.Act(nil, m._multicastStarted) + m.Act(nil, m._monitorInterfaceChanges) return nil } @@ -118,8 +135,9 @@ func (m *Multicast) Stop() error { func (m *Multicast) _stop() error { m.log.Infoln("Stopping multicast module") m.isOpen = false - if m.announcer != nil { - m.announcer.Stop() + for name := range m.listeners { + close(m.listeners[name].stop) + delete(m.listeners, name) } if m.platformhandler != nil { m.platformhandler.Stop() @@ -134,7 +152,7 @@ func (m *Multicast) _stop() error { // and then signals the various module goroutines to reconfigure themselves if // needed. func (m *Multicast) UpdateConfig(config *config.NodeConfig) { - m.Act(m, func() { m._updateConfig(config) }) + m.Act(nil, func() { m._updateConfig(config) }) } func (m *Multicast) _updateConfig(config *config.NodeConfig) { @@ -156,10 +174,92 @@ func (m *Multicast) _updateConfig(config *config.NodeConfig) { m.log.Debugln("Reloaded multicast configuration successfully") } +func (m *Multicast) _monitorInterfaceChanges() { + m._updateInterfaces() // update interfaces and interfaceAddrs + + // Look for interfaces we don't know about yet. + for name, intf := range m._interfaces { + if _, ok := m.listeners[name]; !ok { + // Look up interface addresses. + addrs := m._interfaceAddrs[intf.Name].addrs + // Find the first link-local address. + for _, addr := range addrs { + addrIP, _, _ := net.ParseCIDR(addr.String()) + // Join the multicast group. + m.sock.JoinGroup(&intf, m.groupAddr) + // Construct a listener on this address. + listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, intf.Name, m.listenPort) + listener, err := m.core.ListenTCP(listenaddr) + if err != nil { + m.log.Warnln("Not multicasting on", name, "due to error:", err) + continue + } + // This is a new interface. Start an announcer for it. + multicastInterface := &multicastInterface{ + sock: m.sock, + destAddr: *m.groupAddr, + listener: listener, + stop: make(chan interface{}), + zone: name, + } + multicastInterface.Act(m, multicastInterface._announce) + m.listeners[name] = multicastInterface + m.log.Debugln("Started multicasting on", name) + break + } + } + } + // Look for interfaces we knew about but are no longer there. + for name, intf := range m.listeners { + if _, ok := m._interfaces[name]; !ok { + // This is a disappeared interface. Stop the announcer. + close(intf.stop) + delete(m.listeners, name) + m.log.Debugln("Stopped multicasting on", name) + } + } + // Queue the next check. + m.monitor = time.AfterFunc(time.Second, func() { + m.Act(nil, m._monitorInterfaceChanges) + }) +} + +func (m *multicastInterface) _announce() { + // Check if the multicast interface has been stopped. This will happen + // if it disappears from the system or goes down. + select { + case <-m.stop: + return + default: + } + // Send the beacon. + lladdr := m.listener.Listener.Addr().String() + if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { + a.Zone = "" + msg := []byte(a.String()) + m.sock.WriteTo(msg, nil, &m.destAddr) + } + // Queue the next beacon. + if m.interval.Seconds() < 15 { + m.interval += time.Second + } + m.timer = time.AfterFunc(m.interval, func() { + m.Act(nil, m._announce) + }) +} + // GetInterfaces returns the currently known/enabled multicast interfaces. It is // expected that UpdateInterfaces has been called at least once before calling // this method. func (m *Multicast) Interfaces() map[string]net.Interface { + var interfaces map[string]net.Interface + phony.Block(m, func() { + interfaces = m._interfaces + }) + return interfaces +} + +func (m *Multicast) _updateInterfaces() { interfaces := make(map[string]net.Interface) // Get interface expressions from config current := m.config.GetCurrent() @@ -170,6 +270,7 @@ func (m *Multicast) Interfaces() map[string]net.Interface { panic(err) } // Work out which interfaces to announce on + interfaceAddrs := make(map[string]addrInfo) for _, iface := range allifaces { if iface.Flags&net.FlagUp == 0 { // Ignore interfaces that are down @@ -183,6 +284,26 @@ func (m *Multicast) Interfaces() map[string]net.Interface { // Ignore point-to-point interfaces continue } + var aInfo addrInfo + var isIn bool + if aInfo, isIn = m._interfaceAddrs[iface.Name]; isIn && time.Since(aInfo.time) < time.Minute { + // don't call iface.Addrs, it's unlikely things have changed + } else { + aInfo.addrs, _ = iface.Addrs() + aInfo.time = time.Now() + } + lladdrs := aInfo.addrs[:0] + for _, addr := range aInfo.addrs { + addrIP, _, _ := net.ParseCIDR(addr.String()) + if addrIP.To4() == nil && addrIP.IsLinkLocalUnicast() { + lladdrs = append(lladdrs, addr) + } + } + aInfo.addrs = lladdrs + if len(lladdrs) == 0 { + // Ignore interfaces without link-local addresses + continue + } for _, expr := range exprs { // Compile each regular expression e, err := regexp.Compile(expr) @@ -192,133 +313,16 @@ func (m *Multicast) Interfaces() map[string]net.Interface { // Does the interface match the regular expression? Store it if so if e.MatchString(iface.Name) { interfaces[iface.Name] = iface + interfaceAddrs[iface.Name] = aInfo } } } - return interfaces -} - -func (m *Multicast) announce() { - groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) - if err != nil { - panic(err) - } - destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) - if err != nil { - panic(err) - } - interfaces := m.Interfaces() - // There might be interfaces that we configured listeners for but are no - // longer up - if that's the case then we should stop the listeners - for name, info := range m.listeners { - // Prepare our stop function! - stop := func() { - info.listener.Stop() - delete(m.listeners, name) - m.log.Debugln("No longer multicasting on", name) - } - // If the interface is no longer visible on the system then stop the - // listener, as another one will be started further down - if _, ok := interfaces[name]; !ok { - stop() - continue - } - // It's possible that the link-local listener address has changed so if - // that is the case then we should clean up the interface listener - found := false - listenaddr, err := net.ResolveTCPAddr("tcp6", info.listener.Listener.Addr().String()) - if err != nil { - stop() - continue - } - // Find the interface that matches the listener - if intf, err := net.InterfaceByName(name); err == nil { - if addrs, err := intf.Addrs(); err == nil { - // Loop through the addresses attached to that listener and see if any - // of them match the current address of the listener - for _, addr := range addrs { - if ip, _, err := net.ParseCIDR(addr.String()); err == nil { - // Does the interface address match our listener address? - if ip.Equal(listenaddr.IP) { - found = true - break - } - } - } - } - } - // If the address has not been found on the adapter then we should stop - // and clean up the TCP listener. A new one will be created below if a - // suitable link-local address is found - if !found { - stop() - } - } - // Now that we have a list of valid interfaces from the operating system, - // we can start checking if we can send multicasts on them - for _, iface := range interfaces { - // Find interface addresses - addrs, err := iface.Addrs() - if err != nil { - panic(err) - } - for _, addr := range addrs { - addrIP, _, _ := net.ParseCIDR(addr.String()) - // Ignore IPv4 addresses - if addrIP.To4() != nil { - continue - } - // Ignore non-link-local addresses - if !addrIP.IsLinkLocalUnicast() { - continue - } - // Join the multicast group - m.sock.JoinGroup(&iface, groupAddr) - // Try and see if we already have a TCP listener for this interface - var info *listenerInfo - if nfo, ok := m.listeners[iface.Name]; !ok || nfo.listener.Listener == nil { - // No listener was found - let's create one - listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) - if li, err := m.core.ListenTCP(listenaddr); err == nil { - m.log.Debugln("Started multicasting on", iface.Name) - // Store the listener so that we can stop it later if needed - info = &listenerInfo{listener: li, time: time.Now()} - m.listeners[iface.Name] = info - } else { - m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) - } - } else { - // An existing listener was found - info = m.listeners[iface.Name] - } - // Make sure nothing above failed for some reason - if info == nil { - continue - } - if time.Since(info.time) < info.interval { - continue - } - // Get the listener details and construct the multicast beacon - lladdr := info.listener.Listener.Addr().String() - if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { - a.Zone = "" - destAddr.Zone = iface.Name - msg := []byte(a.String()) - m.sock.WriteTo(msg, nil, destAddr) - } - if info.interval.Seconds() < 15 { - info.interval += time.Second - } - break - } - } - m.announcer = time.AfterFunc(time.Second, func() { - m.Act(m, m.announce) - }) + m._interfaces = interfaces + m._interfaceAddrs = interfaceAddrs } func (m *Multicast) listen() { - groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) + groupAddr, err := net.ResolveUDPAddr("udp6", GroupAddr) if err != nil { panic(err) } @@ -351,6 +355,7 @@ func (m *Multicast) listen() { if addr.IP.String() != from.IP.String() { continue } + // Note that m.Interfaces would block if it was being run by the actor itself if _, ok := m.Interfaces()[from.Zone]; ok { addr.Zone = "" if err := m.core.CallPeer("tcp://"+addr.String(), from.Zone); err != nil { diff --git a/src/multicast/multicast_darwin.go b/src/multicast/multicast_darwin.go index 4cfef9e..e020106 100644 --- a/src/multicast/multicast_darwin.go +++ b/src/multicast/multicast_darwin.go @@ -31,7 +31,7 @@ import ( var awdlGoroutineStarted bool -func (m *Multicast) multicastStarted() { +func (m *Multicast) _multicastStarted() { C.StopAWDLBrowsing() for intf := range m.Interfaces() { if intf == "awdl0" { @@ -40,7 +40,7 @@ func (m *Multicast) multicastStarted() { } } m.platformhandler = time.AfterFunc(time.Minute, func() { - m.Act(m, m.multicastStarted) + m.Act(m, m._multicastStarted) }) } diff --git a/src/multicast/multicast_other.go b/src/multicast/multicast_other.go index 16ea15d..dfcf625 100644 --- a/src/multicast/multicast_other.go +++ b/src/multicast/multicast_other.go @@ -4,7 +4,7 @@ package multicast import "syscall" -func (m *Multicast) multicastStarted() { +func (m *Multicast) _multicastStarted() { } diff --git a/src/multicast/multicast_unix.go b/src/multicast/multicast_unix.go index 9d6a01a..1ff48b1 100644 --- a/src/multicast/multicast_unix.go +++ b/src/multicast/multicast_unix.go @@ -5,7 +5,7 @@ package multicast import "syscall" import "golang.org/x/sys/unix" -func (m *Multicast) multicastStarted() { +func (m *Multicast) _multicastStarted() { } diff --git a/src/multicast/multicast_windows.go b/src/multicast/multicast_windows.go index 7a846b1..3666faa 100644 --- a/src/multicast/multicast_windows.go +++ b/src/multicast/multicast_windows.go @@ -5,7 +5,7 @@ package multicast import "syscall" import "golang.org/x/sys/windows" -func (m *Multicast) multicastStarted() { +func (m *Multicast) _multicastStarted() { }