From 7ea4e9575e611192ec4b81b51dfc13e89e200e74 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 28 Mar 2019 16:13:14 +0000 Subject: [PATCH] Break out multicast into a separate package --- cmd/yggdrasil/main.go | 11 +++- src/{yggdrasil => multicast}/multicast.go | 63 +++++++++++-------- .../multicast_darwin.go | 8 +-- .../multicast_other.go | 6 +- .../multicast_unix.go | 6 +- .../multicast_windows.go | 6 +- src/yggdrasil/admin.go | 4 +- src/yggdrasil/core.go | 30 ++++++--- src/yggdrasil/mobile.go | 4 +- src/yggdrasil/tcp.go | 36 +++++------ 10 files changed, 103 insertions(+), 71 deletions(-) rename src/{yggdrasil => multicast}/multicast.go (78%) rename src/{yggdrasil => multicast}/multicast_darwin.go (86%) rename src/{yggdrasil => multicast}/multicast_other.go (53%) rename src/{yggdrasil => multicast}/multicast_unix.go (75%) rename src/{yggdrasil => multicast}/multicast_windows.go (75%) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index cec7705..fe84849 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -19,6 +19,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/yggdrasil-network/yggdrasil-go/src/config" + "github.com/yggdrasil-network/yggdrasil-go/src/multicast" "github.com/yggdrasil-network/yggdrasil-go/src/tuntap" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" ) @@ -27,8 +28,9 @@ type nodeConfig = config.NodeConfig type Core = yggdrasil.Core type node struct { - core Core - tun tuntap.TunAdapter + core Core + tun tuntap.TunAdapter + multicast multicast.Multicast } func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig { @@ -254,6 +256,11 @@ func main() { logger.Errorln("An error occurred during startup") panic(err) } + // Start the multicast interface + n.multicast.Init(&n.core, cfg, logger) + if err := n.multicast.Start(); err != nil { + logger.Errorln("An error occurred starting multicast:", err) + } // The Stop function ensures that the TUN/TAP adapter is correctly shut down // before the program exits. defer func() { diff --git a/src/yggdrasil/multicast.go b/src/multicast/multicast.go similarity index 78% rename from src/yggdrasil/multicast.go rename to src/multicast/multicast.go index ab1b158..71bdbc8 100644 --- a/src/yggdrasil/multicast.go +++ b/src/multicast/multicast.go @@ -1,4 +1,4 @@ -package yggdrasil +package multicast import ( "context" @@ -7,23 +7,31 @@ import ( "regexp" "time" + "github.com/gologme/log" + + "github.com/yggdrasil-network/yggdrasil-go/src/config" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "golang.org/x/net/ipv6" ) -type multicast struct { - core *Core +type Multicast struct { + core *yggdrasil.Core + config *config.NodeConfig + log *log.Logger reconfigure chan chan error sock *ipv6.PacketConn groupAddr string - listeners map[string]*tcpListener + listeners map[string]*yggdrasil.TcpListener listenPort uint16 } -func (m *multicast) init(core *Core) { +func (m *Multicast) Init(core *yggdrasil.Core, config *config.NodeConfig, log *log.Logger) { m.core = core + m.config = config + m.log = log m.reconfigure = make(chan chan error, 1) - m.listeners = make(map[string]*tcpListener) - current, _ := m.core.config.Get() + m.listeners = make(map[string]*yggdrasil.TcpListener) + current := m.config //.Get() m.listenPort = current.LinkLocalTCPPort go func() { for { @@ -34,15 +42,15 @@ func (m *multicast) init(core *Core) { m.groupAddr = "[ff02::114]:9001" // Check if we've been given any expressions if count := len(m.interfaces()); count != 0 { - m.core.log.Infoln("Found", count, "multicast interface(s)") + m.log.Infoln("Found", count, "multicast interface(s)") } } -func (m *multicast) start() error { +func (m *Multicast) Start() error { if len(m.interfaces()) == 0 { - m.core.log.Infoln("Multicast discovery is disabled") + m.log.Infoln("Multicast discovery is disabled") } else { - m.core.log.Infoln("Multicast discovery is enabled") + m.log.Infoln("Multicast discovery is enabled") addr, err := net.ResolveUDPAddr("udp", m.groupAddr) if err != nil { return err @@ -67,9 +75,10 @@ func (m *multicast) start() error { return nil } -func (m *multicast) interfaces() map[string]net.Interface { +func (m *Multicast) interfaces() map[string]net.Interface { // Get interface expressions from config - current, _ := m.core.config.Get() + //current, _ := m.config.Get() + current := m.config exprs := current.MulticastInterfaces // Ask the system for network interfaces interfaces := make(map[string]net.Interface) @@ -106,7 +115,7 @@ func (m *multicast) interfaces() map[string]net.Interface { return interfaces } -func (m *multicast) announce() { +func (m *Multicast) announce() { groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) @@ -122,9 +131,9 @@ func (m *multicast) announce() { for name, listener := range m.listeners { // Prepare our stop function! stop := func() { - listener.stop <- true + listener.Stop <- true delete(m.listeners, name) - m.core.log.Debugln("No longer multicasting on", name) + m.log.Debugln("No longer multicasting on", name) } // If the interface is no longer visible on the system then stop the // listener, as another one will be started further down @@ -135,7 +144,7 @@ func (m *multicast) announce() { // It's possible that the link-local listener address has changed so if // that is the case then we should clean up the interface listener found := false - listenaddr, err := net.ResolveTCPAddr("tcp6", listener.listener.Addr().String()) + listenaddr, err := net.ResolveTCPAddr("tcp6", listener.Listener.Addr().String()) if err != nil { stop() continue @@ -184,17 +193,18 @@ func (m *multicast) announce() { // Join the multicast group m.sock.JoinGroup(&iface, groupAddr) // Try and see if we already have a TCP listener for this interface - var listener *tcpListener - if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil { + var listener *yggdrasil.TcpListener + if l, ok := m.listeners[iface.Name]; !ok || l.Listener == nil { // No listener was found - let's create one listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) - if li, err := m.core.link.tcp.listen(listenaddr); err == nil { - m.core.log.Debugln("Started multicasting on", iface.Name) + //if li, err := m.core.link.tcp.listen(listenaddr); err == nil { + if li, err := m.core.ListenTCP(listenaddr); err == nil { + m.log.Debugln("Started multicasting on", iface.Name) // Store the listener so that we can stop it later if needed m.listeners[iface.Name] = li listener = li } else { - m.core.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) + m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) } } else { // An existing listener was found @@ -205,7 +215,7 @@ func (m *multicast) announce() { continue } // Get the listener details and construct the multicast beacon - lladdr := listener.listener.Addr().String() + lladdr := listener.Listener.Addr().String() if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { a.Zone = "" destAddr.Zone = iface.Name @@ -219,7 +229,7 @@ func (m *multicast) announce() { } } -func (m *multicast) listen() { +func (m *Multicast) listen() { groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) @@ -251,8 +261,9 @@ func (m *multicast) listen() { continue } addr.Zone = "" - if err := m.core.link.call("tcp://"+addr.String(), from.Zone); err != nil { - m.core.log.Debugln("Call from multicast failed:", err) + //if err := m.core.link.call("tcp://"+addr.String(), from.Zone); err != nil { + if err := m.core.CallPeer("tcp://"+addr.String(), from.Zone); err != nil { + m.log.Debugln("Call from multicast failed:", err) } } } diff --git a/src/yggdrasil/multicast_darwin.go b/src/multicast/multicast_darwin.go similarity index 86% rename from src/yggdrasil/multicast_darwin.go rename to src/multicast/multicast_darwin.go index 5364611..900354c 100644 --- a/src/yggdrasil/multicast_darwin.go +++ b/src/multicast/multicast_darwin.go @@ -1,6 +1,6 @@ // +build darwin -package yggdrasil +package multicast /* #cgo CFLAGS: -x objective-c @@ -31,11 +31,11 @@ import ( var awdlGoroutineStarted bool -func (m *multicast) multicastStarted() { +func (m *Multicast) multicastStarted() { if awdlGoroutineStarted { return } - m.core.log.Infoln("Multicast discovery will wake up AWDL if required") + m.log.Infoln("Multicast discovery will wake up AWDL if required") awdlGoroutineStarted = true for { C.StopAWDLBrowsing() @@ -49,7 +49,7 @@ func (m *multicast) multicastStarted() { } } -func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error { +func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { var control error var reuseport error var recvanyif error diff --git a/src/yggdrasil/multicast_other.go b/src/multicast/multicast_other.go similarity index 53% rename from src/yggdrasil/multicast_other.go rename to src/multicast/multicast_other.go index e20bbda..16ea15d 100644 --- a/src/yggdrasil/multicast_other.go +++ b/src/multicast/multicast_other.go @@ -1,13 +1,13 @@ // +build !linux,!darwin,!netbsd,!freebsd,!openbsd,!dragonflybsd,!windows -package yggdrasil +package multicast import "syscall" -func (m *multicast) multicastStarted() { +func (m *Multicast) multicastStarted() { } -func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error { +func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { return nil } diff --git a/src/yggdrasil/multicast_unix.go b/src/multicast/multicast_unix.go similarity index 75% rename from src/yggdrasil/multicast_unix.go rename to src/multicast/multicast_unix.go index 3da1ab4..9d6a01a 100644 --- a/src/yggdrasil/multicast_unix.go +++ b/src/multicast/multicast_unix.go @@ -1,15 +1,15 @@ // +build linux netbsd freebsd openbsd dragonflybsd -package yggdrasil +package multicast import "syscall" import "golang.org/x/sys/unix" -func (m *multicast) multicastStarted() { +func (m *Multicast) multicastStarted() { } -func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error { +func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { var control error var reuseport error diff --git a/src/yggdrasil/multicast_windows.go b/src/multicast/multicast_windows.go similarity index 75% rename from src/yggdrasil/multicast_windows.go rename to src/multicast/multicast_windows.go index 3e07f6c..7a846b1 100644 --- a/src/yggdrasil/multicast_windows.go +++ b/src/multicast/multicast_windows.go @@ -1,15 +1,15 @@ // +build windows -package yggdrasil +package multicast import "syscall" import "golang.org/x/sys/windows" -func (m *multicast) multicastStarted() { +func (m *Multicast) multicastStarted() { } -func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error { +func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { var control error var reuseaddr error diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 002f974..2db7ad4 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -209,13 +209,13 @@ func (a *admin) init(c *Core) { }, nil } })*/ - a.addHandler("getMulticastInterfaces", []string{}, func(in admin_info) (admin_info, error) { + /*a.addHandler("getMulticastInterfaces", []string{}, func(in admin_info) (admin_info, error) { var intfs []string for _, v := range a.core.multicast.interfaces() { intfs = append(intfs, v.Name) } return admin_info{"multicast_interfaces": intfs}, nil - }) + })*/ a.addHandler("getAllowedEncryptionPublicKeys", []string{}, func(in admin_info) (admin_info, error) { return admin_info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil }) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index a583d58..461e8e9 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -40,9 +40,9 @@ type Core struct { dht dht admin admin searches searches - multicast multicast - link link - log *log.Logger + //multicast multicast + link link + log *log.Logger } func (c *Core) init() error { @@ -82,7 +82,7 @@ func (c *Core) init() error { c.searches.init(c) c.dht.init(c) c.sessions.init(c) - c.multicast.init(c) + //c.multicast.init(c) c.peers.init(c) c.router.init(c) c.switchTable.init(c) // TODO move before peers? before router? @@ -137,7 +137,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { c.router.cryptokey.reconfigure, c.switchTable.reconfigure, c.link.reconfigure, - c.multicast.reconfigure, + //c.multicast.reconfigure, } for _, component := range components { @@ -228,10 +228,10 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.multicast.start(); err != nil { + /*if err := c.multicast.start(); err != nil { c.log.Errorln("Failed to start multicast interface") return err - } + }*/ if err := c.router.tun.Start(c.router.addr, c.router.subnet); err != nil { c.log.Errorln("Failed to start TUN/TAP") @@ -251,6 +251,11 @@ func (c *Core) Stop() { c.admin.close() } +// ListenOn starts a new listener +func (c *Core) ListenTCP(uri string) (*TcpListener, error) { + return c.link.tcp.listen(uri) +} + // Generates a new encryption keypair. The encryption keys are used to // encrypt traffic and to derive the IPv6 address/subnet of the node. func (c *Core) NewEncryptionKeys() (*crypto.BoxPubKey, *crypto.BoxPrivKey) { @@ -303,11 +308,20 @@ func (c *Core) SetLogger(log *log.Logger) { } // Adds a peer. This should be specified in the peer URI format, i.e. -// tcp://a.b.c.d:e, udp://a.b.c.d:e, socks://a.b.c.d:e/f.g.h.i:j +// tcp://a.b.c.d:e, udp://a.b.c.d:e, socks://a.b.c.d:e/f.g.h.i:j. This adds the +// peer to the peer list, so that they will be called again if the connection +// drops. func (c *Core) AddPeer(addr string, sintf string) error { return c.admin.addPeer(addr, sintf) } +// Calls a peer. This should be specified in the peer URI format, i.e. +// tcp://a.b.c.d:e, udp://a.b.c.d:e, socks://a.b.c.d:e/f.g.h.i:j. This calls the +// peer once, and if the connection drops, it won't be called again. +func (c *Core) CallPeer(addr string, sintf string) error { + return c.link.call(addr, sintf) +} + // Adds an allowed public key. This allow peerings to be restricted only to // keys that you have selected. func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error { diff --git a/src/yggdrasil/mobile.go b/src/yggdrasil/mobile.go index 81aa47f..bad1424 100644 --- a/src/yggdrasil/mobile.go +++ b/src/yggdrasil/mobile.go @@ -115,7 +115,7 @@ func (c *Core) GetSigPubKeyString() string { // dummy adapter in place of real TUN - when this call returns a packet, you // will probably want to give it to the OS to write to TUN. func (c *Core) RouterRecvPacket() ([]byte, error) { - packet := <-c.router.tun.recv + packet := <-c.router.tun.Recv return packet, nil } @@ -125,6 +125,6 @@ func (c *Core) RouterRecvPacket() ([]byte, error) { // Yggdrasil. func (c *Core) RouterSendPacket(buf []byte) error { packet := append(util.GetBytes(), buf[:]...) - c.router.tun.send <- packet + c.router.tun.Send <- packet return nil } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index f25ac41..b9681fc 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -36,14 +36,14 @@ type tcp struct { link *link reconfigure chan chan error mutex sync.Mutex // Protecting the below - listeners map[string]*tcpListener + listeners map[string]*TcpListener calls map[string]struct{} conns map[linkInfo](chan struct{}) } -type tcpListener struct { - listener net.Listener - stop chan bool +type TcpListener struct { + Listener net.Listener + Stop chan bool } // Wrapper function to set additional options for specific connection types. @@ -64,7 +64,7 @@ func (t *tcp) getAddr() *net.TCPAddr { t.mutex.Lock() defer t.mutex.Unlock() for _, l := range t.listeners { - return l.listener.Addr().(*net.TCPAddr) + return l.Listener.Addr().(*net.TCPAddr) } return nil } @@ -76,7 +76,7 @@ func (t *tcp) init(l *link) error { t.mutex.Lock() t.calls = make(map[string]struct{}) t.conns = make(map[linkInfo](chan struct{})) - t.listeners = make(map[string]*tcpListener) + t.listeners = make(map[string]*TcpListener) t.mutex.Unlock() go func() { @@ -103,7 +103,7 @@ func (t *tcp) init(l *link) error { t.mutex.Lock() if listener, ok := t.listeners[d[6:]]; ok { t.mutex.Unlock() - listener.stop <- true + listener.Stop <- true } else { t.mutex.Unlock() } @@ -129,7 +129,7 @@ func (t *tcp) init(l *link) error { return nil } -func (t *tcp) listen(listenaddr string) (*tcpListener, error) { +func (t *tcp) listen(listenaddr string) (*TcpListener, error) { var err error ctx := context.Background() @@ -138,9 +138,9 @@ func (t *tcp) listen(listenaddr string) (*tcpListener, error) { } listener, err := lc.Listen(ctx, "tcp", listenaddr) if err == nil { - l := tcpListener{ - listener: listener, - stop: make(chan bool), + l := TcpListener{ + Listener: listener, + Stop: make(chan bool), } go t.listener(&l, listenaddr) return &l, nil @@ -150,7 +150,7 @@ func (t *tcp) listen(listenaddr string) (*tcpListener, error) { } // Runs the listener, which spawns off goroutines for incoming connections. -func (t *tcp) listener(l *tcpListener, listenaddr string) { +func (t *tcp) listener(l *TcpListener, listenaddr string) { if l == nil { return } @@ -158,7 +158,7 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) { t.mutex.Lock() if _, isIn := t.listeners[listenaddr]; isIn { t.mutex.Unlock() - l.listener.Close() + l.Listener.Close() return } else { t.listeners[listenaddr] = l @@ -167,20 +167,20 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) { // And here we go! accepted := make(chan bool) defer func() { - t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String()) - l.listener.Close() + t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) + l.Listener.Close() t.mutex.Lock() delete(t.listeners, listenaddr) t.mutex.Unlock() }() - t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String()) + t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) for { var sock net.Conn var err error // Listen in a separate goroutine, as that way it does not block us from // receiving "stop" events go func() { - sock, err = l.listener.Accept() + sock, err = l.Listener.Accept() accepted <- true }() // Wait for either an accepted connection, or a message telling us to stop @@ -192,7 +192,7 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) { return } go t.handler(sock, true, nil) - case <-l.stop: + case <-l.Stop: return } }