From 219fb96553a0f647036657f99fc7ed6cfbe00d18 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 29 Dec 2018 18:51:51 +0000 Subject: [PATCH 01/22] Support notifying components for config reload, listen for SIGHUP --- cmd/yggdrasil/main.go | 12 ++++++++- src/yggdrasil/admin.go | 24 ++++++++++++++--- src/yggdrasil/core.go | 53 ++++++++++++++++++++++++++++++++------ src/yggdrasil/dht.go | 23 +++++++++++++---- src/yggdrasil/multicast.go | 19 +++++++++++--- src/yggdrasil/peer.go | 13 ++++++++++ src/yggdrasil/router.go | 33 ++++++++++++++---------- src/yggdrasil/search.go | 17 ++++++++++-- src/yggdrasil/session.go | 24 +++++++++++++++++ src/yggdrasil/switch.go | 7 +++++ 10 files changed, 189 insertions(+), 36 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 2b6d2f0..e98e623 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -314,7 +314,9 @@ func main() { logger.Printf("Your IPv6 subnet is %s", subnet.String()) // Catch interrupts from the operating system to exit gracefully. c := make(chan os.Signal, 1) + r := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) + signal.Notify(r, os.Interrupt, syscall.SIGHUP) // Create a function to capture the service being stopped on Windows. winTerminate := func() { c <- os.Interrupt @@ -322,5 +324,13 @@ func main() { minwinsvc.SetOnExit(winTerminate) // Wait for the terminate/interrupt signal. Once a signal is received, the // deferred Stop function above will run which will shut down TUN/TAP. - <-c + for { + select { + case _ = <-r: + n.core.UpdateConfig(cfg) + case _ = <-c: + goto exit + } + } +exit: } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index bd3c905..1c8c80e 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -22,10 +22,11 @@ import ( // TODO: Add authentication type admin struct { - core *Core - listenaddr string - listener net.Listener - handlers []admin_handlerInfo + core *Core + reconfigure chan bool + listenaddr string + listener net.Listener + handlers []admin_handlerInfo } type admin_info map[string]interface{} @@ -53,6 +54,21 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info) // init runs the initial admin setup. func (a *admin) init(c *Core, listenaddr string) { a.core = c + a.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-a.reconfigure: + a.core.configMutex.RLock() + a.core.log.Println("Notified: admin") + if a.core.config.AdminListen != a.core.configOld.AdminListen { + a.core.log.Println("AdminListen has changed!") + } + a.core.configMutex.RUnlock() + continue + } + } + }() a.listenaddr = listenaddr a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) { handlers := make(map[string]interface{}) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index e38274f..9e4bb62 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -7,6 +7,7 @@ import ( "log" "net" "regexp" + "sync" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -17,14 +18,26 @@ import ( var buildName string var buildVersion string +type module interface { + init(*config.NodeConfig) error + start() error +} + // The Core object represents the Yggdrasil node. You should create a Core // object for each Yggdrasil node you plan to run. type Core struct { // This is the main data structure that holds everything else for a node - boxPub crypto.BoxPubKey - boxPriv crypto.BoxPrivKey - sigPub crypto.SigPubKey - sigPriv crypto.SigPrivKey + // We're going to keep our own copy of the provided config - that way we can + // guarantee that it will be covered by the mutex + config config.NodeConfig // Active config + configOld config.NodeConfig // Previous config + configMutex sync.RWMutex // Protects both config and configOld + // Core-specific config + boxPub crypto.BoxPubKey + boxPriv crypto.BoxPrivKey + sigPub crypto.SigPubKey + sigPriv crypto.SigPrivKey + // Modules switchTable switchTable peers peers sessions sessions @@ -35,8 +48,9 @@ type Core struct { multicast multicast nodeinfo nodeinfo tcp tcpInterface - log *log.Logger - ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this + // Other bits + log *log.Logger + ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this } func (c *Core) init(bpub *crypto.BoxPubKey, @@ -62,8 +76,26 @@ func (c *Core) init(bpub *crypto.BoxPubKey, c.switchTable.init(c, c.sigPub) // TODO move before peers? before router? } -// Get the current build name. This is usually injected if built from git, -// or returns "unknown" otherwise. +// UpdateConfig updates the configuration in Core and then signals the +// various module goroutines to reconfigure themselves if needed +func (c *Core) UpdateConfig(config *config.NodeConfig) { + c.configMutex.Lock() + c.configOld = c.config + c.config = *config + c.configMutex.Unlock() + + c.admin.reconfigure <- true + c.searches.reconfigure <- true + c.dht.reconfigure <- true + c.sessions.reconfigure <- true + c.multicast.reconfigure <- true + c.peers.reconfigure <- true + c.router.reconfigure <- true + c.switchTable.reconfigure <- true +} + +// GetBuildName gets the current build name. This is usually injected if built +// from git, or returns "unknown" otherwise. func GetBuildName() string { if buildName == "" { return "unknown" @@ -96,6 +128,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.log.Println("Starting up...") + c.configMutex.Lock() + c.config = *nc + c.configOld = c.config + c.configMutex.Unlock() + var boxPub crypto.BoxPubKey var boxPriv crypto.BoxPrivKey var sigPub crypto.SigPubKey diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b52a820..3f2debd 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -65,11 +65,12 @@ type dhtReqKey struct { // The main DHT struct. type dht struct { - core *Core - nodeID crypto.NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks + core *Core + reconfigure chan bool + nodeID crypto.NodeID + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[crypto.NodeID]*dhtInfo imp []*dhtInfo @@ -78,6 +79,18 @@ type dht struct { // Initializes the DHT. func (t *dht) init(c *Core) { t.core = c + t.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-t.reconfigure: + t.core.configMutex.RLock() + t.core.log.Println("Notified: dht") + t.core.configMutex.RUnlock() + continue + } + } + }() t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) t.callbacks = make(map[dhtReqKey]dht_callbackInfo) diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 749dfcd..3d73237 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -10,13 +10,26 @@ import ( ) type multicast struct { - core *Core - sock *ipv6.PacketConn - groupAddr string + core *Core + reconfigure chan bool + sock *ipv6.PacketConn + groupAddr string } func (m *multicast) init(core *Core) { m.core = core + m.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-m.reconfigure: + m.core.configMutex.RLock() + m.core.log.Println("Notified: multicast") + m.core.configMutex.RUnlock() + continue + } + } + }() m.groupAddr = "[ff02::114]:9001" // Check if we've been given any expressions if len(m.core.ifceExpr) == 0 { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index a2b94b6..502ea67 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -19,6 +19,7 @@ import ( // In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { core *Core + reconfigure chan bool mutex sync.Mutex // Synchronize writes to atomic ports atomic.Value //map[switchPort]*peer, use CoW semantics authMutex sync.RWMutex @@ -31,6 +32,18 @@ func (ps *peers) init(c *Core) { defer ps.mutex.Unlock() ps.putPorts(make(map[switchPort]*peer)) ps.core = c + ps.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-ps.reconfigure: + ps.core.configMutex.RLock() + ps.core.log.Println("Notified: peers") + ps.core.configMutex.RUnlock() + continue + } + } + }() ps.allowedEncryptionPublicKeys = make(map[crypto.BoxPubKey]struct{}) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 87da882..096a978 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -37,19 +37,20 @@ import ( // The router struct has channels to/from the tun/tap device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. // The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { - core *Core - addr address.Address - subnet address.Subnet - in <-chan []byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - toRecv chan router_recvPacket // packets to handle via recvPacket() - tun tunAdapter // TUN/TAP adapter - adapters []Adapter // Other adapters - recv chan<- []byte // place where the tun pulls received packets from - send <-chan []byte // place where the tun puts outgoing packets - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff - cryptokey cryptokey + core *Core + reconfigure chan bool + addr address.Address + subnet address.Subnet + in <-chan []byte // packets we received from the network, link to peer's "out" + out func([]byte) // packets we're sending to the network, link to peer's "in" + toRecv chan router_recvPacket // packets to handle via recvPacket() + tun tunAdapter // TUN/TAP adapter + adapters []Adapter // Other adapters + recv chan<- []byte // place where the tun pulls received packets from + send <-chan []byte // place where the tun puts outgoing packets + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff + cryptokey cryptokey } // Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the tun. @@ -61,6 +62,7 @@ type router_recvPacket struct { // Initializes the router struct, which includes setting up channels to/from the tun/tap. func (r *router) init(core *Core) { r.core = core + r.reconfigure = make(chan bool, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... @@ -124,6 +126,11 @@ func (r *router) mainLoop() { } case f := <-r.admin: f() + case _ = <-r.reconfigure: + r.core.configMutex.RLock() + r.core.log.Println("Notified: router") + r.core.configMutex.RUnlock() + continue } } } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index c85b719..f522d7b 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -42,13 +42,26 @@ type searchInfo struct { // This stores a map of active searches. type searches struct { - core *Core - searches map[crypto.NodeID]*searchInfo + core *Core + reconfigure chan bool + searches map[crypto.NodeID]*searchInfo } // Intializes the searches struct. func (s *searches) init(core *Core) { s.core = core + s.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-s.reconfigure: + s.core.configMutex.RLock() + s.core.log.Println("Notified: searches") + s.core.configMutex.RUnlock() + continue + } + } + }() s.searches = make(map[crypto.NodeID]*searchInfo) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 4f395b0..78b36ec 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -18,6 +18,7 @@ import ( // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { core *Core + reconfigure chan bool theirAddr address.Address theirSubnet address.Subnet theirPermPub crypto.BoxPubKey @@ -101,6 +102,7 @@ func (s *sessionInfo) timedout() bool { // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { core *Core + reconfigure chan bool lastCleanup time.Time // Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey @@ -124,6 +126,22 @@ type sessions struct { // Initializes the session struct. func (ss *sessions) init(core *Core) { ss.core = core + ss.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case newConfig := <-ss.reconfigure: + ss.core.configMutex.RLock() + ss.core.log.Println("Notified: sessions") + ss.core.configMutex.RUnlock() + + for _, sinfo := range ss.sinfos { + sinfo.reconfigure <- newConfig + } + continue + } + } + }() ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle) @@ -271,6 +289,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { } sinfo := sessionInfo{} sinfo.core = ss.core + sinfo.reconfigure = make(chan bool, 1) sinfo.theirPermPub = *theirPermKey pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub @@ -539,6 +558,11 @@ func (sinfo *sessionInfo) doWorker() { } else { return } + case _ = <-sinfo.reconfigure: + sinfo.core.configMutex.RLock() + sinfo.core.log.Println("Notified: sessionInfo") + sinfo.core.configMutex.RUnlock() + continue } } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 3c1dae6..420392b 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -162,6 +162,7 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { core *Core + reconfigure chan bool key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root @@ -184,6 +185,7 @@ const SwitchQueueTotalMinSize = 4 * 1024 * 1024 func (t *switchTable) init(core *Core, key crypto.SigPubKey) { now := time.Now() t.core = core + t.reconfigure = make(chan bool, 1) t.key = key locator := switchLocator{root: key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) @@ -808,6 +810,11 @@ func (t *switchTable) doWorker() { } case f := <-t.admin: f() + case _ = <-t.reconfigure: + t.core.configMutex.RLock() + t.core.log.Println("Notified: switchTable") + t.core.configMutex.RUnlock() + continue } } } From fa7c4117b4cce9932f20b7d28399315854296dbc Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 29 Dec 2018 19:14:26 +0000 Subject: [PATCH 02/22] Use Core.config in init functions --- src/yggdrasil/admin.go | 6 ++- src/yggdrasil/core.go | 97 +++++++++++++++++++---------------------- src/yggdrasil/switch.go | 6 +-- src/yggdrasil/tcp.go | 9 ++-- 4 files changed, 56 insertions(+), 62 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 1c8c80e..723bf8f 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -52,7 +52,7 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info) } // init runs the initial admin setup. -func (a *admin) init(c *Core, listenaddr string) { +func (a *admin) init(c *Core) { a.core = c a.reconfigure = make(chan bool, 1) go func() { @@ -69,7 +69,9 @@ func (a *admin) init(c *Core, listenaddr string) { } } }() - a.listenaddr = listenaddr + a.core.configMutex.RLock() + a.listenaddr = a.core.config.AdminListen + a.core.configMutex.RUnlock() a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) { handlers := make(map[string]interface{}) for _, handler := range a.handlers { diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 9e4bb62..58d92b0 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -19,7 +19,7 @@ var buildName string var buildVersion string type module interface { - init(*config.NodeConfig) error + init(*Core, *config.NodeConfig) error start() error } @@ -32,12 +32,10 @@ type Core struct { config config.NodeConfig // Active config configOld config.NodeConfig // Previous config configMutex sync.RWMutex // Protects both config and configOld - // Core-specific config - boxPub crypto.BoxPubKey - boxPriv crypto.BoxPrivKey - sigPub crypto.SigPubKey - sigPriv crypto.SigPrivKey - // Modules + boxPub crypto.BoxPubKey + boxPriv crypto.BoxPrivKey + sigPub crypto.SigPubKey + sigPriv crypto.SigPrivKey switchTable switchTable peers peers sessions sessions @@ -48,15 +46,11 @@ type Core struct { multicast multicast nodeinfo nodeinfo tcp tcpInterface - // Other bits - log *log.Logger - ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this + log *log.Logger + ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this } -func (c *Core) init(bpub *crypto.BoxPubKey, - bpriv *crypto.BoxPrivKey, - spub *crypto.SigPubKey, - spriv *crypto.SigPrivKey) { +func (c *Core) init() error { // TODO separate init and start functions // Init sets up structs // Start launches goroutines that depend on structs being set up @@ -64,16 +58,45 @@ func (c *Core) init(bpub *crypto.BoxPubKey, if c.log == nil { c.log = log.New(ioutil.Discard, "", 0) } - c.boxPub, c.boxPriv = *bpub, *bpriv - c.sigPub, c.sigPriv = *spub, *spriv - c.admin.core = c + + boxPubHex, err := hex.DecodeString(c.config.EncryptionPublicKey) + if err != nil { + return err + } + boxPrivHex, err := hex.DecodeString(c.config.EncryptionPrivateKey) + if err != nil { + return err + } + sigPubHex, err := hex.DecodeString(c.config.SigningPublicKey) + if err != nil { + return err + } + sigPrivHex, err := hex.DecodeString(c.config.SigningPrivateKey) + if err != nil { + return err + } + + copy(c.boxPub[:], boxPubHex) + copy(c.boxPriv[:], boxPrivHex) + copy(c.sigPub[:], sigPubHex) + copy(c.sigPriv[:], sigPrivHex) + + c.admin.init(c) + c.nodeinfo.init(c) c.searches.init(c) c.dht.init(c) c.sessions.init(c) c.multicast.init(c) c.peers.init(c) c.router.init(c) - c.switchTable.init(c, c.sigPub) // TODO move before peers? before router? + c.switchTable.init(c) // TODO move before peers? before router? + + if err := c.tcp.init(c); err != nil { + c.log.Println("Failed to start TCP interface") + return err + } + + return nil } // UpdateConfig updates the configuration in Core and then signals the @@ -133,42 +156,10 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.configOld = c.config c.configMutex.Unlock() - var boxPub crypto.BoxPubKey - var boxPriv crypto.BoxPrivKey - var sigPub crypto.SigPubKey - var sigPriv crypto.SigPrivKey - boxPubHex, err := hex.DecodeString(nc.EncryptionPublicKey) - if err != nil { - return err - } - boxPrivHex, err := hex.DecodeString(nc.EncryptionPrivateKey) - if err != nil { - return err - } - sigPubHex, err := hex.DecodeString(nc.SigningPublicKey) - if err != nil { - return err - } - sigPrivHex, err := hex.DecodeString(nc.SigningPrivateKey) - if err != nil { - return err - } - copy(boxPub[:], boxPubHex) - copy(boxPriv[:], boxPrivHex) - copy(sigPub[:], sigPubHex) - copy(sigPriv[:], sigPrivHex) + c.init() - c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) - c.admin.init(c, nc.AdminListen) - - c.nodeinfo.init(c) c.nodeinfo.setNodeInfo(nc.NodeInfo, nc.NodeInfoPrivacy) - if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil { - c.log.Println("Failed to start TCP interface") - return err - } - if nc.SwitchOptions.MaxTotalQueueSize >= SwitchQueueTotalMinSize { c.switchTable.queueTotalMaxSize = nc.SwitchOptions.MaxTotalQueueSize } @@ -201,7 +192,7 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { } } for _, source := range nc.TunnelRouting.IPv6Sources { - if c.router.cryptokey.addSourceSubnet(source); err != nil { + if err := c.router.cryptokey.addSourceSubnet(source); err != nil { panic(err) } } @@ -211,7 +202,7 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { } } for _, source := range nc.TunnelRouting.IPv4Sources { - if c.router.cryptokey.addSourceSubnet(source); err != nil { + if err := c.router.cryptokey.addSourceSubnet(source); err != nil { panic(err) } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 420392b..f3c9512 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -182,12 +182,12 @@ type switchTable struct { const SwitchQueueTotalMinSize = 4 * 1024 * 1024 // Initializes the switchTable struct. -func (t *switchTable) init(core *Core, key crypto.SigPubKey) { +func (t *switchTable) init(core *Core) { now := time.Now() t.core = core t.reconfigure = make(chan bool, 1) - t.key = key - locator := switchLocator{root: key, tstamp: now.Unix()} + t.key = t.core.sigPub + locator := switchLocator{root: t.key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} t.updater.Store(&sync.Once{}) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 6d92344..c986dc6 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -40,6 +40,7 @@ type tcpInterface struct { core *Core serv net.Listener tcp_timeout time.Duration + tcp_addr string mutex sync.Mutex // Protecting the below calls map[string]struct{} conns map[tcpInfo](chan struct{}) @@ -80,15 +81,15 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { } // Initializes the struct. -func (iface *tcpInterface) init(core *Core, addr string, readTimeout int32) (err error) { +func (iface *tcpInterface) init(core *Core) (err error) { iface.core = core - - iface.tcp_timeout = time.Duration(readTimeout) * time.Millisecond + iface.tcp_addr = iface.core.config.Listen + iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout { iface.tcp_timeout = default_tcp_timeout } - iface.serv, err = net.Listen("tcp", addr) + iface.serv, err = net.Listen("tcp", iface.tcp_addr) if err == nil { iface.calls = make(map[string]struct{}) iface.conns = make(map[tcpInfo](chan struct{})) From 2925920c703a3a743346d241097513952c80d979 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 29 Dec 2018 19:53:31 +0000 Subject: [PATCH 03/22] Use mutex in switch/tcp init --- src/yggdrasil/switch.go | 2 ++ src/yggdrasil/tcp.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f3c9512..10c9563 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -186,7 +186,9 @@ func (t *switchTable) init(core *Core) { now := time.Now() t.core = core t.reconfigure = make(chan bool, 1) + t.core.configMutex.RLock() t.key = t.core.sigPub + t.core.configMutex.RUnlock() locator := switchLocator{root: t.key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index c986dc6..ad50d78 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -83,8 +83,10 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { // Initializes the struct. func (iface *tcpInterface) init(core *Core) (err error) { iface.core = core + iface.core.configMutex.RLock() iface.tcp_addr = iface.core.config.Listen iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond + iface.core.configMutex.RUnlock() if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout { iface.tcp_timeout = default_tcp_timeout } From 7fae1c993ab5d754625336b16e13e47a3d71a807 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 30 Dec 2018 12:04:42 +0000 Subject: [PATCH 04/22] Handle errors from reconfigure tasks --- src/yggdrasil/admin.go | 9 ++++----- src/yggdrasil/core.go | 26 ++++++++++++++++++-------- src/yggdrasil/dht.go | 11 ++++------- src/yggdrasil/multicast.go | 11 ++++------- src/yggdrasil/peer.go | 11 ++++------- src/yggdrasil/router.go | 11 ++++------- src/yggdrasil/search.go | 11 ++++------- src/yggdrasil/session.go | 35 ++++++++++++++++++----------------- src/yggdrasil/switch.go | 11 ++++------- 9 files changed, 64 insertions(+), 72 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 723bf8f..90fb112 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -23,7 +23,7 @@ import ( type admin struct { core *Core - reconfigure chan bool + reconfigure chan chan error listenaddr string listener net.Listener handlers []admin_handlerInfo @@ -54,18 +54,17 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info) // init runs the initial admin setup. func (a *admin) init(c *Core) { a.core = c - a.reconfigure = make(chan bool, 1) + a.reconfigure = make(chan chan error, 1) go func() { for { select { - case _ = <-a.reconfigure: + case e := <-a.reconfigure: a.core.configMutex.RLock() - a.core.log.Println("Notified: admin") if a.core.config.AdminListen != a.core.configOld.AdminListen { a.core.log.Println("AdminListen has changed!") } a.core.configMutex.RUnlock() - continue + e <- nil } } }() diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 58d92b0..435cd67 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -107,14 +107,24 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { c.config = *config c.configMutex.Unlock() - c.admin.reconfigure <- true - c.searches.reconfigure <- true - c.dht.reconfigure <- true - c.sessions.reconfigure <- true - c.multicast.reconfigure <- true - c.peers.reconfigure <- true - c.router.reconfigure <- true - c.switchTable.reconfigure <- true + components := []chan chan error{ + c.admin.reconfigure, + c.searches.reconfigure, + c.dht.reconfigure, + c.sessions.reconfigure, + c.multicast.reconfigure, + c.peers.reconfigure, + c.router.reconfigure, + c.switchTable.reconfigure, + } + + for _, component := range components { + response := make(chan error) + component <- response + if err := <-response; err != nil { + c.log.Println(err) + } + } } // GetBuildName gets the current build name. This is usually injected if built diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 3f2debd..bba6dfc 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -66,7 +66,7 @@ type dhtReqKey struct { // The main DHT struct. type dht struct { core *Core - reconfigure chan bool + reconfigure chan chan error nodeID crypto.NodeID peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests @@ -79,15 +79,12 @@ type dht struct { // Initializes the DHT. func (t *dht) init(c *Core) { t.core = c - t.reconfigure = make(chan bool, 1) + t.reconfigure = make(chan chan error, 1) go func() { for { select { - case _ = <-t.reconfigure: - t.core.configMutex.RLock() - t.core.log.Println("Notified: dht") - t.core.configMutex.RUnlock() - continue + case e := <-t.reconfigure: + e <- nil } } }() diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 3d73237..25c979c 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -11,22 +11,19 @@ import ( type multicast struct { core *Core - reconfigure chan bool + reconfigure chan chan error sock *ipv6.PacketConn groupAddr string } func (m *multicast) init(core *Core) { m.core = core - m.reconfigure = make(chan bool, 1) + m.reconfigure = make(chan chan error, 1) go func() { for { select { - case _ = <-m.reconfigure: - m.core.configMutex.RLock() - m.core.log.Println("Notified: multicast") - m.core.configMutex.RUnlock() - continue + case e := <-m.reconfigure: + e <- nil } } }() diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 502ea67..15174b7 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -19,7 +19,7 @@ import ( // In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { core *Core - reconfigure chan bool + reconfigure chan chan error mutex sync.Mutex // Synchronize writes to atomic ports atomic.Value //map[switchPort]*peer, use CoW semantics authMutex sync.RWMutex @@ -32,15 +32,12 @@ func (ps *peers) init(c *Core) { defer ps.mutex.Unlock() ps.putPorts(make(map[switchPort]*peer)) ps.core = c - ps.reconfigure = make(chan bool, 1) + ps.reconfigure = make(chan chan error, 1) go func() { for { select { - case _ = <-ps.reconfigure: - ps.core.configMutex.RLock() - ps.core.log.Println("Notified: peers") - ps.core.configMutex.RUnlock() - continue + case e := <-ps.reconfigure: + e <- nil } } }() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 096a978..68fb025 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -38,7 +38,7 @@ import ( // The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { core *Core - reconfigure chan bool + reconfigure chan chan error addr address.Address subnet address.Subnet in <-chan []byte // packets we received from the network, link to peer's "out" @@ -62,7 +62,7 @@ type router_recvPacket struct { // Initializes the router struct, which includes setting up channels to/from the tun/tap. func (r *router) init(core *Core) { r.core = core - r.reconfigure = make(chan bool, 1) + r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... @@ -126,11 +126,8 @@ func (r *router) mainLoop() { } case f := <-r.admin: f() - case _ = <-r.reconfigure: - r.core.configMutex.RLock() - r.core.log.Println("Notified: router") - r.core.configMutex.RUnlock() - continue + case e := <-r.reconfigure: + e <- nil } } } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index f522d7b..f0af61f 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -43,22 +43,19 @@ type searchInfo struct { // This stores a map of active searches. type searches struct { core *Core - reconfigure chan bool + reconfigure chan chan error searches map[crypto.NodeID]*searchInfo } // Intializes the searches struct. func (s *searches) init(core *Core) { s.core = core - s.reconfigure = make(chan bool, 1) + s.reconfigure = make(chan chan error, 1) go func() { for { select { - case _ = <-s.reconfigure: - s.core.configMutex.RLock() - s.core.log.Println("Notified: searches") - s.core.configMutex.RUnlock() - continue + case e := <-s.reconfigure: + e <- nil } } }() diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 78b36ec..3c8e013 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -18,7 +18,7 @@ import ( // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { core *Core - reconfigure chan bool + reconfigure chan chan error theirAddr address.Address theirSubnet address.Subnet theirPermPub crypto.BoxPubKey @@ -102,7 +102,7 @@ func (s *sessionInfo) timedout() bool { // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { core *Core - reconfigure chan bool + reconfigure chan chan error lastCleanup time.Time // Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey @@ -126,19 +126,23 @@ type sessions struct { // Initializes the session struct. func (ss *sessions) init(core *Core) { ss.core = core - ss.reconfigure = make(chan bool, 1) + ss.reconfigure = make(chan chan error, 1) go func() { for { select { - case newConfig := <-ss.reconfigure: - ss.core.configMutex.RLock() - ss.core.log.Println("Notified: sessions") - ss.core.configMutex.RUnlock() - - for _, sinfo := range ss.sinfos { - sinfo.reconfigure <- newConfig + case e := <-ss.reconfigure: + responses := make(map[crypto.Handle]chan error) + for index, session := range ss.sinfos { + responses[index] = make(chan error) + session.reconfigure <- responses[index] } - continue + for _, response := range responses { + if err := <-response; err != nil { + e <- err + continue + } + } + e <- nil } } }() @@ -289,7 +293,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { } sinfo := sessionInfo{} sinfo.core = ss.core - sinfo.reconfigure = make(chan bool, 1) + sinfo.reconfigure = make(chan chan error, 1) sinfo.theirPermPub = *theirPermKey pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub @@ -558,11 +562,8 @@ func (sinfo *sessionInfo) doWorker() { } else { return } - case _ = <-sinfo.reconfigure: - sinfo.core.configMutex.RLock() - sinfo.core.log.Println("Notified: sessionInfo") - sinfo.core.configMutex.RUnlock() - continue + case e := <-sinfo.reconfigure: + e <- nil } } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 10c9563..741de98 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -162,7 +162,7 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { core *Core - reconfigure chan bool + reconfigure chan chan error key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root @@ -185,7 +185,7 @@ const SwitchQueueTotalMinSize = 4 * 1024 * 1024 func (t *switchTable) init(core *Core) { now := time.Now() t.core = core - t.reconfigure = make(chan bool, 1) + t.reconfigure = make(chan chan error, 1) t.core.configMutex.RLock() t.key = t.core.sigPub t.core.configMutex.RUnlock() @@ -812,11 +812,8 @@ func (t *switchTable) doWorker() { } case f := <-t.admin: f() - case _ = <-t.reconfigure: - t.core.configMutex.RLock() - t.core.log.Println("Notified: switchTable") - t.core.configMutex.RUnlock() - continue + case e := <-t.reconfigure: + e <- nil } } } From f96747181d13480046874900898b7ede6e6c8069 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 30 Dec 2018 12:26:55 +0000 Subject: [PATCH 05/22] Allow updating AdminListen during runtime --- cmd/yggdrasil/main.go | 231 ++++++++++++++++++++++------------------- src/yggdrasil/admin.go | 4 +- 2 files changed, 125 insertions(+), 110 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index e98e623..5ee7add 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -76,6 +76,119 @@ func generateConfig(isAutoconf bool) *nodeConfig { return &cfg } +func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig { + // Use a configuration file. If -useconf, the configuration will be read + // from stdin. If -useconffile, the configuration will be read from the + // filesystem. + var config []byte + var err error + if *useconffile != "" { + // Read the file from the filesystem + config, err = ioutil.ReadFile(*useconffile) + } else { + // Read the file from stdin. + config, err = ioutil.ReadAll(os.Stdin) + } + if err != nil { + panic(err) + } + // If there's a byte order mark - which Windows 10 is now incredibly fond of + // throwing everywhere when it's converting things into UTF-16 for the hell + // of it - remove it and decode back down into UTF-8. This is necessary + // because hjson doesn't know what to do with UTF-16 and will panic + if bytes.Compare(config[0:2], []byte{0xFF, 0xFE}) == 0 || + bytes.Compare(config[0:2], []byte{0xFE, 0xFF}) == 0 { + utf := unicode.UTF16(unicode.BigEndian, unicode.UseBOM) + decoder := utf.NewDecoder() + config, err = decoder.Bytes(config) + if err != nil { + panic(err) + } + } + // Generate a new configuration - this gives us a set of sane defaults - + // then parse the configuration we loaded above on top of it. The effect + // of this is that any configuration item that is missing from the provided + // configuration will use a sane default. + cfg := generateConfig(false) + var dat map[string]interface{} + if err := hjson.Unmarshal(config, &dat); err != nil { + panic(err) + } + confJson, err := json.Marshal(dat) + if err != nil { + panic(err) + } + json.Unmarshal(confJson, &cfg) + // For now we will do a little bit to help the user adjust their + // configuration to match the new configuration format, as some of the key + // names have changed recently. + changes := map[string]string{ + "Multicast": "", + "LinkLocal": "MulticastInterfaces", + "BoxPub": "EncryptionPublicKey", + "BoxPriv": "EncryptionPrivateKey", + "SigPub": "SigningPublicKey", + "SigPriv": "SigningPrivateKey", + "AllowedBoxPubs": "AllowedEncryptionPublicKeys", + } + // Loop over the mappings aove and see if we have anything to fix. + for from, to := range changes { + if _, ok := dat[from]; ok { + if to == "" { + if !*normaliseconf { + log.Println("Warning: Deprecated config option", from, "- please remove") + } + } else { + if !*normaliseconf { + log.Println("Warning: Deprecated config option", from, "- please rename to", to) + } + // If the configuration file doesn't already contain a line with the + // new name then set it to the old value. This makes sure that we + // don't overwrite something that was put there intentionally. + if _, ok := dat[to]; !ok { + dat[to] = dat[from] + } + } + } + } + // Check to see if the peers are in a parsable format, if not then default + // them to the TCP scheme + if peers, ok := dat["Peers"].([]interface{}); ok { + for index, peer := range peers { + uri := peer.(string) + if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { + continue + } + if strings.HasPrefix(uri, "tcp:") { + uri = uri[4:] + } + (dat["Peers"].([]interface{}))[index] = "tcp://" + uri + } + } + // Now do the same with the interface peers + if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok { + for intf, peers := range interfacepeers { + for index, peer := range peers.([]interface{}) { + uri := peer.(string) + if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { + continue + } + if strings.HasPrefix(uri, "tcp:") { + uri = uri[4:] + } + ((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri + } + } + } + // Overlay our newly mapped configuration onto the autoconf node config that + // we generated above. + if err = mapstructure.Decode(dat, &cfg); err != nil { + panic(err) + } + + return cfg +} + // Generates a new configuration and returns it in HJSON format. This is used // with -genconf. func doGenconf(isjson bool) string { @@ -106,6 +219,7 @@ func main() { flag.Parse() var cfg *nodeConfig + var err error switch { case *version: fmt.Println("Build name:", yggdrasil.GetBuildName()) @@ -116,114 +230,8 @@ func main() { // port numbers, and will use an automatically selected TUN/TAP interface. cfg = generateConfig(true) case *useconffile != "" || *useconf: - // Use a configuration file. If -useconf, the configuration will be read - // from stdin. If -useconffile, the configuration will be read from the - // filesystem. - var config []byte - var err error - if *useconffile != "" { - // Read the file from the filesystem - config, err = ioutil.ReadFile(*useconffile) - } else { - // Read the file from stdin. - config, err = ioutil.ReadAll(os.Stdin) - } - if err != nil { - panic(err) - } - // If there's a byte order mark - which Windows 10 is now incredibly fond of - // throwing everywhere when it's converting things into UTF-16 for the hell - // of it - remove it and decode back down into UTF-8. This is necessary - // because hjson doesn't know what to do with UTF-16 and will panic - if bytes.Compare(config[0:2], []byte{0xFF, 0xFE}) == 0 || - bytes.Compare(config[0:2], []byte{0xFE, 0xFF}) == 0 { - utf := unicode.UTF16(unicode.BigEndian, unicode.UseBOM) - decoder := utf.NewDecoder() - config, err = decoder.Bytes(config) - if err != nil { - panic(err) - } - } - // Generate a new configuration - this gives us a set of sane defaults - - // then parse the configuration we loaded above on top of it. The effect - // of this is that any configuration item that is missing from the provided - // configuration will use a sane default. - cfg = generateConfig(false) - var dat map[string]interface{} - if err := hjson.Unmarshal(config, &dat); err != nil { - panic(err) - } - confJson, err := json.Marshal(dat) - if err != nil { - panic(err) - } - json.Unmarshal(confJson, &cfg) - // For now we will do a little bit to help the user adjust their - // configuration to match the new configuration format, as some of the key - // names have changed recently. - changes := map[string]string{ - "Multicast": "", - "LinkLocal": "MulticastInterfaces", - "BoxPub": "EncryptionPublicKey", - "BoxPriv": "EncryptionPrivateKey", - "SigPub": "SigningPublicKey", - "SigPriv": "SigningPrivateKey", - "AllowedBoxPubs": "AllowedEncryptionPublicKeys", - } - // Loop over the mappings aove and see if we have anything to fix. - for from, to := range changes { - if _, ok := dat[from]; ok { - if to == "" { - if !*normaliseconf { - log.Println("Warning: Deprecated config option", from, "- please remove") - } - } else { - if !*normaliseconf { - log.Println("Warning: Deprecated config option", from, "- please rename to", to) - } - // If the configuration file doesn't already contain a line with the - // new name then set it to the old value. This makes sure that we - // don't overwrite something that was put there intentionally. - if _, ok := dat[to]; !ok { - dat[to] = dat[from] - } - } - } - } - // Check to see if the peers are in a parsable format, if not then default - // them to the TCP scheme - if peers, ok := dat["Peers"].([]interface{}); ok { - for index, peer := range peers { - uri := peer.(string) - if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { - continue - } - if strings.HasPrefix(uri, "tcp:") { - uri = uri[4:] - } - (dat["Peers"].([]interface{}))[index] = "tcp://" + uri - } - } - // Now do the same with the interface peers - if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok { - for intf, peers := range interfacepeers { - for index, peer := range peers.([]interface{}) { - uri := peer.(string) - if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { - continue - } - if strings.HasPrefix(uri, "tcp:") { - uri = uri[4:] - } - ((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri - } - } - } - // Overlay our newly mapped configuration onto the autoconf node config that - // we generated above. - if err = mapstructure.Decode(dat, &cfg); err != nil { - panic(err) - } + // Read the configuration from either stdin or from the filesystem + cfg = readConfig(useconf, useconffile, normaliseconf) // If the -normaliseconf option was specified then remarshal the above // configuration and print it back to stdout. This lets the user update // their configuration file with newly mapped names (like above) or to @@ -327,7 +335,12 @@ func main() { for { select { case _ = <-r: - n.core.UpdateConfig(cfg) + if *useconffile != "" { + cfg = readConfig(useconf, useconffile, normaliseconf) + n.core.UpdateConfig(cfg) + } else { + logger.Println("Reloading config at runtime is only possible with -useconffile") + } case _ = <-c: goto exit } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 90fb112..5682339 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -61,7 +61,9 @@ func (a *admin) init(c *Core) { case e := <-a.reconfigure: a.core.configMutex.RLock() if a.core.config.AdminListen != a.core.configOld.AdminListen { - a.core.log.Println("AdminListen has changed!") + a.listenaddr = a.core.config.AdminListen + a.close() + a.start() } a.core.configMutex.RUnlock() e <- nil From cb4495902bf9399b881d1a4aa6008b15715111fc Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 30 Dec 2018 15:21:09 +0000 Subject: [PATCH 06/22] Allow updating Listen during runtime --- src/yggdrasil/core.go | 13 ++++++----- src/yggdrasil/multicast.go | 16 ++++++++++--- src/yggdrasil/tcp.go | 48 +++++++++++++++++++++++++++++++++----- 3 files changed, 62 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 435cd67..d4a2268 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -109,13 +109,14 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { components := []chan chan error{ c.admin.reconfigure, - c.searches.reconfigure, - c.dht.reconfigure, - c.sessions.reconfigure, + //c.searches.reconfigure, + //c.dht.reconfigure, + //c.sessions.reconfigure, + //c.peers.reconfigure, + //c.router.reconfigure, + //c.switchTable.reconfigure, + c.tcp.reconfigure, c.multicast.reconfigure, - c.peers.reconfigure, - c.router.reconfigure, - c.switchTable.reconfigure, } for _, component := range components { diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 25c979c..218f516 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync" "time" "golang.org/x/net/ipv6" @@ -14,6 +15,8 @@ type multicast struct { reconfigure chan chan error sock *ipv6.PacketConn groupAddr string + myAddr *net.TCPAddr + myAddrMutex sync.RWMutex } func (m *multicast) init(core *Core) { @@ -23,6 +26,9 @@ func (m *multicast) init(core *Core) { for { select { case e := <-m.reconfigure: + m.myAddrMutex.Lock() + m.myAddr = m.core.tcp.getAddr() + m.myAddrMutex.Unlock() e <- nil } } @@ -95,13 +101,14 @@ func (m *multicast) interfaces() []net.Interface { } func (m *multicast) announce() { + var anAddr net.TCPAddr + m.myAddrMutex.Lock() + m.myAddr = m.core.tcp.getAddr() + m.myAddrMutex.Unlock() groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) } - var anAddr net.TCPAddr - myAddr := m.core.tcp.getAddr() - anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) @@ -113,6 +120,9 @@ func (m *multicast) announce() { if err != nil { panic(err) } + m.myAddrMutex.RLock() + anAddr.Port = m.myAddr.Port + m.myAddrMutex.RUnlock() for _, addr := range addrs { addrIP, _, _ := net.ParseCIDR(addr.String()) if addrIP.To4() != nil { diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index ad50d78..224aca0 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -38,7 +38,9 @@ const tcp_ping_interval = (default_tcp_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core + reconfigure chan chan error serv net.Listener + serv_stop chan bool tcp_timeout time.Duration tcp_addr string mutex sync.Mutex // Protecting the below @@ -83,10 +85,37 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { // Initializes the struct. func (iface *tcpInterface) init(core *Core) (err error) { iface.core = core + iface.serv_stop = make(chan bool, 1) + iface.reconfigure = make(chan chan error, 1) + go func() { + for { + select { + case e := <-iface.reconfigure: + iface.core.configMutex.RLock() + updated := iface.core.config.Listen != iface.core.configOld.Listen + iface.core.configMutex.RUnlock() + if updated { + iface.serv_stop <- true + iface.serv.Close() + e <- iface.listen() + } else { + e <- nil + } + } + } + }() + + return iface.listen() +} + +func (iface *tcpInterface) listen() error { + var err error + iface.core.configMutex.RLock() iface.tcp_addr = iface.core.config.Listen iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond iface.core.configMutex.RUnlock() + if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout { iface.tcp_timeout = default_tcp_timeout } @@ -96,6 +125,7 @@ func (iface *tcpInterface) init(core *Core) (err error) { iface.calls = make(map[string]struct{}) iface.conns = make(map[tcpInfo](chan struct{})) go iface.listener() + return nil } return err @@ -107,10 +137,16 @@ func (iface *tcpInterface) listener() { iface.core.log.Println("Listening for TCP on:", iface.serv.Addr().String()) for { sock, err := iface.serv.Accept() - if err != nil { - panic(err) + select { + case <-iface.serv_stop: + iface.core.log.Println("Stopping listener") + return + default: + if err != nil { + panic(err) + } + go iface.handler(sock, true) } - go iface.handler(sock, true) } } @@ -363,12 +399,12 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddr := address.AddrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) - iface.core.log.Println("Connected:", themString, "source", us) + iface.core.log.Printf("Connected: %s, source: %s", themString, us) err = iface.reader(sock, in) // In this goroutine, because of defers if err == nil { - iface.core.log.Println("Disconnected:", themString, "source", us) + iface.core.log.Printf("Disconnected: %s, source: %s", themString, us) } else { - iface.core.log.Println("Disconnected:", themString, "source", us, "with error:", err) + iface.core.log.Printf("Disconnected: %s, source: %s, error: %s", themString, us, err) } return } From 80c9a1bc12d90dee70411ce1f1995328a878ae46 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 30 Dec 2018 16:48:34 +0000 Subject: [PATCH 07/22] Don't track localAddr in conns as it is irrelevant --- src/yggdrasil/tcp.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 224aca0..4fb3026 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -53,8 +53,8 @@ type tcpInterface struct { type tcpInfo struct { box crypto.BoxPubKey sig crypto.SigPubKey - localAddr string remoteAddr string + remotePort string } // Wrapper function to set additional options for specific connection types. @@ -313,8 +313,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } } // Check if we already have a connection to this node, close and block if yes - info.localAddr, _, _ = net.SplitHostPort(sock.LocalAddr().String()) - info.remoteAddr, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) + info.remoteAddr, info.remotePort, _ = net.SplitHostPort(sock.RemoteAddr().String()) iface.mutex.Lock() if blockChan, isIn := iface.conns[info]; isIn { iface.mutex.Unlock() From cd86c338505003dc7779423e427c0e936885f727 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 30 Dec 2018 21:11:16 +0000 Subject: [PATCH 08/22] Try to tidy up a bit, move checks for if we are already calling/connected Something I noticed when working on reconfigure support for the "Listen" option is that we have some rather huge weaknesses in our multicasting design. Right now if we change our Listen address, it's not really possible for remote nodes to know whether they are still connected to us, so they start connecting in response to our changed beacons. They can't know that they already know about us until *after* the handshake but this registers in the local client log as repeated Connect/Disconnects even though the existing peerings never actually drop. --- src/yggdrasil/tcp.go | 74 +++++++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 4fb3026..ec8bca4 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -53,8 +53,8 @@ type tcpInterface struct { type tcpInfo struct { box crypto.BoxPubKey sig crypto.SigPubKey + localAddr string remoteAddr string - remotePort string } // Wrapper function to set additional options for specific connection types. @@ -150,6 +150,22 @@ func (iface *tcpInterface) listener() { } } +// Checks if we already have a connection to this node +func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.conns[info] + return isIn +} + +// Checks if we already are calling this address +func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { + iface.mutex.Lock() + defer iface.mutex.Unlock() + _, isIn := iface.calls[saddr] + return isIn +} + // Checks if a connection already exists. // If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. // If the dial is successful, it launches the handler. @@ -161,25 +177,18 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - quit := false - iface.mutex.Lock() - if _, isIn := iface.calls[callname]; isIn { - quit = true - } else { - iface.calls[callname] = struct{}{} - defer func() { - // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_tcp_timeout) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - iface.mutex.Lock() - delete(iface.calls, callname) - iface.mutex.Unlock() - }() - } - iface.mutex.Unlock() - if quit { + if iface.isAlreadyCalling(saddr) { return } + iface.calls[callname] = struct{}{} + defer func() { + // Block new calls for a little while, to mitigate livelock scenarios + time.Sleep(default_tcp_timeout) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + iface.mutex.Lock() + delete(iface.calls, callname) + iface.mutex.Unlock() + }() var conn net.Conn var err error if socksaddr != nil { @@ -284,9 +293,19 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? return } + remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String()) + localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String()) + if e1 != nil || e2 != nil { + return + } info := tcpInfo{ // used as a map key, so don't include ephemeral link key - box: meta.box, - sig: meta.sig, + box: meta.box, + sig: meta.sig, + localAddr: localAddr, + remoteAddr: remoteAddr, + } + if iface.isAlreadyConnected(info) { + return } // Quit the parent call if this is a connection to ourself equiv := func(k1, k2 []byte) bool { @@ -297,14 +316,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } return true } - if equiv(info.box[:], iface.core.boxPub[:]) { + if equiv(meta.box[:], iface.core.boxPub[:]) { return } - if equiv(info.sig[:], iface.core.sigPub[:]) { + if equiv(meta.sig[:], iface.core.sigPub[:]) { return } // Check if we're authorized to connect to this key / IP - if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&info.box) { + if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) { // Allow unauthorized peers if they're link-local raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) raddr := net.ParseIP(raddrStr) @@ -313,14 +332,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } } // Check if we already have a connection to this node, close and block if yes - info.remoteAddr, info.remotePort, _ = net.SplitHostPort(sock.RemoteAddr().String()) iface.mutex.Lock() - if blockChan, isIn := iface.conns[info]; isIn { + /*if blockChan, isIn := iface.conns[info]; isIn { iface.mutex.Unlock() sock.Close() <-blockChan return - } + }*/ blockChan := make(chan struct{}) iface.conns[info] = blockChan iface.mutex.Unlock() @@ -332,7 +350,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - p := iface.core.peers.newPeer(&info.box, &info.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) + p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p.linkOut = make(chan []byte, 1) in := func(bs []byte) { p.handlePacket(bs) @@ -394,7 +412,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - themNodeID := crypto.GetNodeID(&info.box) + themNodeID := crypto.GetNodeID(&meta.box) themAddr := address.AddrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) From 1e29465af12fdd430815cc5d9b6d83469340f834 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 31 Dec 2018 12:08:15 +0000 Subject: [PATCH 09/22] Fix debug builds (hopefully) --- src/yggdrasil/debug.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 4a32eb6..6bd5430 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -16,6 +16,7 @@ import "fmt" import "net" import "log" import "regexp" +import "encoding/hex" import _ "net/http/pprof" import "net/http" @@ -23,6 +24,7 @@ import "runtime" import "os" import "github.com/yggdrasil-network/yggdrasil-go/src/address" +import "github.com/yggdrasil-network/yggdrasil-go/src/config" import "github.com/yggdrasil-network/yggdrasil-go/src/crypto" import "github.com/yggdrasil-network/yggdrasil-go/src/defaults" @@ -52,7 +54,17 @@ func StartProfiler(log *log.Logger) error { func (c *Core) Init() { bpub, bpriv := crypto.NewBoxKeys() spub, spriv := crypto.NewSigKeys() - c.init(bpub, bpriv, spub, spriv) + hbpub := hex.EncodeToString(bpub[:]) + hbpriv := hex.EncodeToString(bpriv[:]) + hspub := hex.EncodeToString(spub[:]) + hspriv := hex.EncodeToString(spriv[:]) + c.config = config.NodeConfig{ + EncryptionPublicKey: hbpub, + EncryptionPrivateKey: hbpriv, + SigningPublicKey: hspub, + SigningPrivateKey: hspriv, + } + c.init( /*bpub, bpriv, spub, spriv*/ ) c.switchTable.start() c.router.start() } @@ -350,7 +362,7 @@ func (c *Core) DEBUG_init(bpub []byte, bpriv []byte, spub []byte, spriv []byte) { - var boxPub crypto.BoxPubKey + /*var boxPub crypto.BoxPubKey var boxPriv crypto.BoxPrivKey var sigPub crypto.SigPubKey var sigPriv crypto.SigPrivKey @@ -358,7 +370,18 @@ func (c *Core) DEBUG_init(bpub []byte, copy(boxPriv[:], bpriv) copy(sigPub[:], spub) copy(sigPriv[:], spriv) - c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) + c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)*/ + hbpub := hex.EncodeToString(bpub[:]) + hbpriv := hex.EncodeToString(bpriv[:]) + hspub := hex.EncodeToString(spub[:]) + hspriv := hex.EncodeToString(spriv[:]) + c.config = config.NodeConfig{ + EncryptionPublicKey: hbpub, + EncryptionPrivateKey: hbpriv, + SigningPublicKey: hspub, + SigningPrivateKey: hspriv, + } + c.init( /*bpub, bpriv, spub, spriv*/ ) if err := c.router.start(); err != nil { panic(err) @@ -427,7 +450,8 @@ func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) { //* func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) { - if err := c.tcp.init(c, addrport, 0); err != nil { + c.config.Listen = addrport + if err := c.tcp.init(c /*, addrport, 0*/); err != nil { c.log.Println("Failed to start TCP interface:", err) panic(err) } @@ -474,7 +498,8 @@ func (c *Core) DEBUG_addKCPConn(saddr string) { func (c *Core) DEBUG_setupAndStartAdminInterface(addrport string) { a := admin{} - a.init(c, addrport) + c.config.AdminListen = addrport + a.init(c /*, addrport*/) c.admin = a } From aed3c7e7845fbde73f720d6c82f55453038fe460 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 14:25:52 +0000 Subject: [PATCH 10/22] Give nodeconfig to tun --- src/yggdrasil/adapter.go | 8 +++++--- src/yggdrasil/core.go | 17 ++++++++--------- src/yggdrasil/tun.go | 27 ++++++++++++++++++++++++++- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/yggdrasil/adapter.go b/src/yggdrasil/adapter.go index 7fb6a19..3ce80d2 100644 --- a/src/yggdrasil/adapter.go +++ b/src/yggdrasil/adapter.go @@ -3,9 +3,10 @@ package yggdrasil // Defines the minimum required struct members for an adapter type (this is // now the base type for tunAdapter in tun.go) type Adapter struct { - core *Core - send chan<- []byte - recv <-chan []byte + core *Core + send chan<- []byte + recv <-chan []byte + reconfigure chan chan error } // Initialises the adapter. @@ -13,4 +14,5 @@ func (adapter *Adapter) init(core *Core, send chan<- []byte, recv <-chan []byte) adapter.core = core adapter.send = send adapter.recv = recv + adapter.reconfigure = make(chan chan error, 1) } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 7e10dbc..bee09ac 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -2,7 +2,6 @@ package yggdrasil import ( "encoding/hex" - "fmt" "io/ioutil" "log" "net" @@ -110,12 +109,13 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { components := []chan chan error{ c.admin.reconfigure, - //c.searches.reconfigure, - //c.dht.reconfigure, - //c.sessions.reconfigure, - //c.peers.reconfigure, - //c.router.reconfigure, - //c.switchTable.reconfigure, + c.searches.reconfigure, + c.dht.reconfigure, + c.sessions.reconfigure, + c.peers.reconfigure, + c.router.reconfigure, + c.router.tun.reconfigure, + c.switchTable.reconfigure, c.tcp.reconfigure, c.multicast.reconfigure, } @@ -240,8 +240,7 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - ip := net.IP(c.router.addr[:]).String() - if err := c.router.tun.start(nc.IfName, nc.IfTAPMode, fmt.Sprintf("%s/%d", ip, 8*len(address.GetPrefix())-1), nc.IfMTU); err != nil { + if err := c.router.tun.start(); err != nil { c.log.Println("Failed to start TUN/TAP") return err } diff --git a/src/yggdrasil/tun.go b/src/yggdrasil/tun.go index 8c0f91d..0bda312 100644 --- a/src/yggdrasil/tun.go +++ b/src/yggdrasil/tun.go @@ -5,6 +5,8 @@ package yggdrasil import ( "bytes" "errors" + "fmt" + "net" "sync" "time" @@ -42,11 +44,34 @@ func getSupportedMTU(mtu int) int { func (tun *tunAdapter) init(core *Core, send chan<- []byte, recv <-chan []byte) { tun.Adapter.init(core, send, recv) tun.icmpv6.init(tun) + go func() { + for { + select { + case e := <-tun.reconfigure: + tun.core.configMutex.RLock() + updated := tun.core.config.IfName != tun.core.configOld.IfName || + tun.core.config.IfTAPMode != tun.core.configOld.IfTAPMode || + tun.core.config.IfMTU != tun.core.configOld.IfMTU + tun.core.configMutex.RUnlock() + if updated { + e <- nil + } else { + e <- nil + } + } + } + }() } // Starts the setup process for the TUN/TAP adapter, and if successful, starts // the read/write goroutines to handle packets on that interface. -func (tun *tunAdapter) start(ifname string, iftapmode bool, addr string, mtu int) error { +func (tun *tunAdapter) start() error { + tun.core.configMutex.RLock() + ifname := tun.core.config.IfName + iftapmode := tun.core.config.IfTAPMode + addr := fmt.Sprintf("%s/%d", net.IP(tun.core.router.addr[:]).String(), 8*len(address.GetPrefix())-1) + mtu := tun.core.config.IfMTU + tun.core.configMutex.RUnlock() if ifname != "none" { if err := tun.setup(ifname, iftapmode, addr, mtu); err != nil { return err From 87d393bd9f163039f01446ba968cc3e6d77b5e30 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 17:21:15 +0000 Subject: [PATCH 11/22] Move add peer loop into Core, refresh it from active config --- cmd/yggdrasil/main.go | 22 ---------------------- src/yggdrasil/core.go | 38 +++++++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 262fb68..cc54d3d 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -12,7 +12,6 @@ import ( "regexp" "strings" "syscall" - "time" "golang.org/x/text/encoding/unicode" @@ -243,27 +242,6 @@ func main() { for _, pBoxStr := range cfg.AllowedEncryptionPublicKeys { n.core.AddAllowedEncryptionPublicKey(pBoxStr) } - // If any static peers were provided in the configuration above then we should - // configure them. The loop ensures that disconnected peers will eventually - // be reconnected with. - go func() { - if len(cfg.Peers) == 0 && len(cfg.InterfacePeers) == 0 { - return - } - for { - for _, peer := range cfg.Peers { - n.core.AddPeer(peer, "") - time.Sleep(time.Second) - } - for intf, intfpeers := range cfg.InterfacePeers { - for _, peer := range intfpeers { - n.core.AddPeer(peer, intf) - time.Sleep(time.Second) - } - } - time.Sleep(time.Minute) - } - }() // The Stop function ensures that the TUN/TAP adapter is correctly shut down // before the program exits. defer func() { diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index bee09ac..a53449b 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -7,6 +7,7 @@ import ( "net" "regexp" "sync" + "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -91,14 +92,39 @@ func (c *Core) init() error { c.router.init(c) c.switchTable.init(c) // TODO move before peers? before router? - if err := c.tcp.init(c); err != nil { - c.log.Println("Failed to start TCP interface") - return err - } - return nil } +// If any static peers were provided in the configuration above then we should +// configure them. The loop ensures that disconnected peers will eventually +// be reconnected with. +func (c *Core) addPeerLoop() { + for { + // Get the peers from the config - these could change! + c.configMutex.RLock() + peers := c.config.Peers + interfacepeers := c.config.InterfacePeers + c.configMutex.RUnlock() + + // Add peers from the Peers section + for _, peer := range peers { + c.AddPeer(peer, "") + time.Sleep(time.Second) + } + + // Add peers from the InterfacePeers section + for intf, intfpeers := range interfacepeers { + for _, peer := range intfpeers { + c.AddPeer(peer, intf) + time.Sleep(time.Second) + } + } + + // Sit for a while + time.Sleep(time.Minute) + } +} + // UpdateConfig updates the configuration in Core and then signals the // various module goroutines to reconfigure themselves if needed func (c *Core) UpdateConfig(config *config.NodeConfig) { @@ -245,6 +271,8 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } + go c.addPeerLoop() + c.log.Println("Startup complete") return nil } From 28072c9fe2f7587ec02ca212016c6e42809fd6d2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 17:41:08 +0000 Subject: [PATCH 12/22] Make CKR thread-safe --- src/yggdrasil/ckr.go | 66 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 58 insertions(+), 8 deletions(-) diff --git a/src/yggdrasil/ckr.go b/src/yggdrasil/ckr.go index a3df891..84d60e3 100644 --- a/src/yggdrasil/ckr.go +++ b/src/yggdrasil/ckr.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "sort" + "sync" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -16,14 +17,19 @@ import ( // allow traffic for non-Yggdrasil ranges to be routed over Yggdrasil. type cryptokey struct { - core *Core - enabled bool - ipv4routes []cryptokey_route - ipv6routes []cryptokey_route - ipv4cache map[address.Address]cryptokey_route - ipv6cache map[address.Address]cryptokey_route - ipv4sources []net.IPNet - ipv6sources []net.IPNet + core *Core + enabled bool + reconfigure chan chan error + ipv4routes []cryptokey_route + ipv6routes []cryptokey_route + ipv4cache map[address.Address]cryptokey_route + ipv6cache map[address.Address]cryptokey_route + ipv4sources []net.IPNet + ipv6sources []net.IPNet + mutexenabled sync.RWMutex // protects enabled + mutexroutes sync.RWMutex // protects ipv4routes, ipv6routes + mutexcache sync.RWMutex // protects ipv4cache, ipv6cache + mutexsources sync.RWMutex // protects ipv4sources, ipv6sources } type cryptokey_route struct { @@ -34,21 +40,43 @@ type cryptokey_route struct { // Initialise crypto-key routing. This must be done before any other CKR calls. func (c *cryptokey) init(core *Core) { c.core = core + c.reconfigure = make(chan chan error, 1) + go func() { + for { + select { + case e := <-c.reconfigure: + e <- nil + } + } + }() + + c.mutexroutes.Lock() c.ipv4routes = make([]cryptokey_route, 0) c.ipv6routes = make([]cryptokey_route, 0) + c.mutexroutes.Unlock() + + c.mutexcache.Lock() c.ipv4cache = make(map[address.Address]cryptokey_route, 0) c.ipv6cache = make(map[address.Address]cryptokey_route, 0) + c.mutexcache.Unlock() + + c.mutexsources.Lock() c.ipv4sources = make([]net.IPNet, 0) c.ipv6sources = make([]net.IPNet, 0) + c.mutexsources.Unlock() } // Enable or disable crypto-key routing. func (c *cryptokey) setEnabled(enabled bool) { + c.mutexenabled.Lock() + defer c.mutexenabled.Unlock() c.enabled = enabled } // Check if crypto-key routing is enabled. func (c *cryptokey) isEnabled() bool { + c.mutexenabled.RLock() + defer c.mutexenabled.RUnlock() return c.enabled } @@ -72,6 +100,9 @@ func (c *cryptokey) isValidSource(addr address.Address, addrlen int) bool { // Does it match a configured CKR source? if c.isEnabled() { + c.mutexsources.RLock() + defer c.mutexsources.RUnlock() + // Build our references to the routing sources var routingsources *[]net.IPNet @@ -98,6 +129,9 @@ func (c *cryptokey) isValidSource(addr address.Address, addrlen int) bool { // Adds a source subnet, which allows traffic with these source addresses to // be tunnelled using crypto-key routing. func (c *cryptokey) addSourceSubnet(cidr string) error { + c.mutexsources.Lock() + defer c.mutexsources.Unlock() + // Is the CIDR we've been given valid? _, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -135,6 +169,9 @@ func (c *cryptokey) addSourceSubnet(cidr string) error { // Adds a destination route for the given CIDR to be tunnelled to the node // with the given BoxPubKey. func (c *cryptokey) addRoute(cidr string, dest string) error { + c.mutexroutes.Lock() + defer c.mutexroutes.Unlock() + // Is the CIDR we've been given valid? ipaddr, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -209,6 +246,11 @@ func (c *cryptokey) addRoute(cidr string, dest string) error { // length specified in bytes) from the crypto-key routing table. An error is // returned if the address is not suitable or no route was found. func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (crypto.BoxPubKey, error) { + c.mutexroutes.RLock() + c.mutexcache.RLock() + defer c.mutexroutes.RUnlock() + defer c.mutexcache.RUnlock() + // Check if the address is a valid Yggdrasil address - if so it // is exempt from all CKR checking if addr.IsValid() { @@ -269,6 +311,9 @@ func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (c // Removes a source subnet, which allows traffic with these source addresses to // be tunnelled using crypto-key routing. func (c *cryptokey) removeSourceSubnet(cidr string) error { + c.mutexsources.Lock() + defer c.mutexsources.Unlock() + // Is the CIDR we've been given valid? _, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -304,6 +349,11 @@ func (c *cryptokey) removeSourceSubnet(cidr string) error { // Removes a destination route for the given CIDR to be tunnelled to the node // with the given BoxPubKey. func (c *cryptokey) removeRoute(cidr string, dest string) error { + c.mutexroutes.Lock() + c.mutexcache.Lock() + defer c.mutexroutes.Unlock() + defer c.mutexcache.Unlock() + // Is the CIDR we've been given valid? _, ipnet, err := net.ParseCIDR(cidr) if err != nil { From bd04124e43ff42d80740bb93b413fc04245198c8 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 18:06:41 +0000 Subject: [PATCH 13/22] Reconfigure support for crypto-key routing --- src/yggdrasil/ckr.go | 60 ++++++++++++++++++++++++++++++++++++----- src/yggdrasil/core.go | 25 ----------------- src/yggdrasil/router.go | 8 ++++++ 3 files changed, 62 insertions(+), 31 deletions(-) diff --git a/src/yggdrasil/ckr.go b/src/yggdrasil/ckr.go index 84d60e3..bf569fb 100644 --- a/src/yggdrasil/ckr.go +++ b/src/yggdrasil/ckr.go @@ -45,25 +45,73 @@ func (c *cryptokey) init(core *Core) { for { select { case e := <-c.reconfigure: - e <- nil + e <- c.configure() } } }() + if err := c.configure(); err != nil { + c.core.log.Println("CKR configuration failed:", err) + } +} + +// Configure the CKR routes +func (c *cryptokey) configure() error { + c.core.configMutex.RLock() + defer c.core.configMutex.RUnlock() + + // Set enabled/disabled state + c.setEnabled(c.core.config.TunnelRouting.Enable) + + // Clear out existing routes c.mutexroutes.Lock() - c.ipv4routes = make([]cryptokey_route, 0) c.ipv6routes = make([]cryptokey_route, 0) + c.ipv4routes = make([]cryptokey_route, 0) c.mutexroutes.Unlock() + // Add IPv6 routes + for ipv6, pubkey := range c.core.config.TunnelRouting.IPv6Destinations { + if err := c.addRoute(ipv6, pubkey); err != nil { + return err + } + } + + // Add IPv4 routes + for ipv4, pubkey := range c.core.config.TunnelRouting.IPv4Destinations { + if err := c.addRoute(ipv4, pubkey); err != nil { + return err + } + } + + // Clear out existing sources + c.mutexsources.Lock() + c.ipv6sources = make([]net.IPNet, 0) + c.ipv4sources = make([]net.IPNet, 0) + c.mutexsources.Unlock() + + // Add IPv6 sources + c.ipv6sources = make([]net.IPNet, 0) + for _, source := range c.core.config.TunnelRouting.IPv6Sources { + if err := c.addSourceSubnet(source); err != nil { + return err + } + } + + // Add IPv4 sources + c.ipv4sources = make([]net.IPNet, 0) + for _, source := range c.core.config.TunnelRouting.IPv4Sources { + if err := c.addSourceSubnet(source); err != nil { + return err + } + } + + // Wipe the caches c.mutexcache.Lock() c.ipv4cache = make(map[address.Address]cryptokey_route, 0) c.ipv6cache = make(map[address.Address]cryptokey_route, 0) c.mutexcache.Unlock() - c.mutexsources.Lock() - c.ipv4sources = make([]net.IPNet, 0) - c.ipv6sources = make([]net.IPNet, 0) - c.mutexsources.Unlock() + return nil } // Enable or disable crypto-key routing. diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index a53449b..4b00fc3 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -231,31 +231,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - c.router.cryptokey.setEnabled(nc.TunnelRouting.Enable) - if c.router.cryptokey.isEnabled() { - c.log.Println("Crypto-key routing enabled") - for ipv6, pubkey := range nc.TunnelRouting.IPv6Destinations { - if err := c.router.cryptokey.addRoute(ipv6, pubkey); err != nil { - panic(err) - } - } - for _, source := range nc.TunnelRouting.IPv6Sources { - if err := c.router.cryptokey.addSourceSubnet(source); err != nil { - panic(err) - } - } - for ipv4, pubkey := range nc.TunnelRouting.IPv4Destinations { - if err := c.router.cryptokey.addRoute(ipv4, pubkey); err != nil { - panic(err) - } - } - for _, source := range nc.TunnelRouting.IPv4Sources { - if err := c.router.cryptokey.addSourceSubnet(source); err != nil { - panic(err) - } - } - } - if err := c.admin.start(); err != nil { c.log.Println("Failed to start admin socket") return err diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 68fb025..74fff3f 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -127,6 +127,14 @@ func (r *router) mainLoop() { case f := <-r.admin: f() case e := <-r.reconfigure: + // Send reconfigure notification to cryptokey + response := make(chan error) + r.cryptokey.reconfigure <- response + if err := <-response; err != nil { + e <- err + } + + // Anything else to do? e <- nil } } From 51026d762ef9958353327f0180770b3e60e1555c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 18:24:35 +0000 Subject: [PATCH 14/22] Make session firewall thread-safe for config updates --- src/yggdrasil/session.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 3c8e013..3cd1cf7 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -7,6 +7,7 @@ package yggdrasil import ( "bytes" "encoding/hex" + "sync" "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" @@ -115,6 +116,7 @@ type sessions struct { addrToPerm map[address.Address]*crypto.BoxPubKey subnetToPerm map[address.Subnet]*crypto.BoxPubKey // Options from the session firewall + sessionFirewallMutex sync.RWMutex sessionFirewallEnabled bool sessionFirewallAllowsDirect bool sessionFirewallAllowsRemote bool @@ -157,12 +159,16 @@ func (ss *sessions) init(core *Core) { // Enable or disable the session firewall func (ss *sessions) setSessionFirewallState(enabled bool) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallEnabled = enabled } // Set the session firewall defaults (first parameter is whether to allow // sessions from direct peers, second is whether to allow from remote nodes). func (ss *sessions) setSessionFirewallDefaults(allowsDirect bool, allowsRemote bool, alwaysAllowsOutbound bool) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallAllowsDirect = allowsDirect ss.sessionFirewallAllowsRemote = allowsRemote ss.sessionFirewallAlwaysAllowsOutbound = alwaysAllowsOutbound @@ -170,17 +176,24 @@ func (ss *sessions) setSessionFirewallDefaults(allowsDirect bool, allowsRemote b // Set the session firewall whitelist - nodes always allowed to open sessions. func (ss *sessions) setSessionFirewallWhitelist(whitelist []string) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallWhitelist = whitelist } // Set the session firewall blacklist - nodes never allowed to open sessions. func (ss *sessions) setSessionFirewallBlacklist(blacklist []string) { + ss.sessionFirewallMutex.Lock() + defer ss.sessionFirewallMutex.Unlock() ss.sessionFirewallBlacklist = blacklist } // Determines whether the session with a given publickey is allowed based on // session firewall rules. func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool { + ss.sessionFirewallMutex.RLock() + defer ss.sessionFirewallMutex.RUnlock() + // Allow by default if the session firewall is disabled if !ss.sessionFirewallEnabled { return true @@ -286,10 +299,8 @@ func (ss *sessions) getByTheirSubnet(snet *address.Subnet) (*sessionInfo, bool) // Creates a new session and lazily cleans up old/timedout existing sessions. // This includse initializing session info to sane defaults (e.g. lowest supported MTU). func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { - if ss.sessionFirewallEnabled { - if !ss.isSessionAllowed(theirPermKey, true) { - return nil - } + if !ss.isSessionAllowed(theirPermKey, true) { + return nil } sinfo := sessionInfo{} sinfo.core = ss.core @@ -465,11 +476,14 @@ func (ss *sessions) handlePing(ping *sessionPing) { // Get the corresponding session (or create a new session) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) // Check the session firewall + ss.sessionFirewallMutex.RLock() if !isIn && ss.sessionFirewallEnabled { if !ss.isSessionAllowed(&ping.SendPermPub, false) { + ss.sessionFirewallMutex.RUnlock() return } } + ss.sessionFirewallMutex.RUnlock() if !isIn || sinfo.timedout() { if isIn { sinfo.close() From 9e186bdd6730d6c55c61858da429e9bf5bca410e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 18:34:15 +0000 Subject: [PATCH 15/22] Remove mutexes from CKR and use router goroutine/doAdmin for update config --- src/yggdrasil/ckr.go | 64 +++++++++++------------------------------ src/yggdrasil/core.go | 1 + src/yggdrasil/router.go | 8 ------ 3 files changed, 17 insertions(+), 56 deletions(-) diff --git a/src/yggdrasil/ckr.go b/src/yggdrasil/ckr.go index bf569fb..14464f6 100644 --- a/src/yggdrasil/ckr.go +++ b/src/yggdrasil/ckr.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "sort" - "sync" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -17,19 +16,15 @@ import ( // allow traffic for non-Yggdrasil ranges to be routed over Yggdrasil. type cryptokey struct { - core *Core - enabled bool - reconfigure chan chan error - ipv4routes []cryptokey_route - ipv6routes []cryptokey_route - ipv4cache map[address.Address]cryptokey_route - ipv6cache map[address.Address]cryptokey_route - ipv4sources []net.IPNet - ipv6sources []net.IPNet - mutexenabled sync.RWMutex // protects enabled - mutexroutes sync.RWMutex // protects ipv4routes, ipv6routes - mutexcache sync.RWMutex // protects ipv4cache, ipv6cache - mutexsources sync.RWMutex // protects ipv4sources, ipv6sources + core *Core + enabled bool + reconfigure chan chan error + ipv4routes []cryptokey_route + ipv6routes []cryptokey_route + ipv4cache map[address.Address]cryptokey_route + ipv6cache map[address.Address]cryptokey_route + ipv4sources []net.IPNet + ipv6sources []net.IPNet } type cryptokey_route struct { @@ -45,7 +40,11 @@ func (c *cryptokey) init(core *Core) { for { select { case e := <-c.reconfigure: - e <- c.configure() + var err error + c.core.router.doAdmin(func() { + err = c.core.router.cryptokey.configure() + }) + e <- err } } }() @@ -55,7 +54,8 @@ func (c *cryptokey) init(core *Core) { } } -// Configure the CKR routes +// Configure the CKR routes - this must only ever be called from the router +// goroutine, e.g. through router.doAdmin func (c *cryptokey) configure() error { c.core.configMutex.RLock() defer c.core.configMutex.RUnlock() @@ -64,10 +64,8 @@ func (c *cryptokey) configure() error { c.setEnabled(c.core.config.TunnelRouting.Enable) // Clear out existing routes - c.mutexroutes.Lock() c.ipv6routes = make([]cryptokey_route, 0) c.ipv4routes = make([]cryptokey_route, 0) - c.mutexroutes.Unlock() // Add IPv6 routes for ipv6, pubkey := range c.core.config.TunnelRouting.IPv6Destinations { @@ -84,10 +82,8 @@ func (c *cryptokey) configure() error { } // Clear out existing sources - c.mutexsources.Lock() c.ipv6sources = make([]net.IPNet, 0) c.ipv4sources = make([]net.IPNet, 0) - c.mutexsources.Unlock() // Add IPv6 sources c.ipv6sources = make([]net.IPNet, 0) @@ -106,25 +102,19 @@ func (c *cryptokey) configure() error { } // Wipe the caches - c.mutexcache.Lock() c.ipv4cache = make(map[address.Address]cryptokey_route, 0) c.ipv6cache = make(map[address.Address]cryptokey_route, 0) - c.mutexcache.Unlock() return nil } // Enable or disable crypto-key routing. func (c *cryptokey) setEnabled(enabled bool) { - c.mutexenabled.Lock() - defer c.mutexenabled.Unlock() c.enabled = enabled } // Check if crypto-key routing is enabled. func (c *cryptokey) isEnabled() bool { - c.mutexenabled.RLock() - defer c.mutexenabled.RUnlock() return c.enabled } @@ -148,9 +138,6 @@ func (c *cryptokey) isValidSource(addr address.Address, addrlen int) bool { // Does it match a configured CKR source? if c.isEnabled() { - c.mutexsources.RLock() - defer c.mutexsources.RUnlock() - // Build our references to the routing sources var routingsources *[]net.IPNet @@ -177,9 +164,6 @@ func (c *cryptokey) isValidSource(addr address.Address, addrlen int) bool { // Adds a source subnet, which allows traffic with these source addresses to // be tunnelled using crypto-key routing. func (c *cryptokey) addSourceSubnet(cidr string) error { - c.mutexsources.Lock() - defer c.mutexsources.Unlock() - // Is the CIDR we've been given valid? _, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -217,9 +201,6 @@ func (c *cryptokey) addSourceSubnet(cidr string) error { // Adds a destination route for the given CIDR to be tunnelled to the node // with the given BoxPubKey. func (c *cryptokey) addRoute(cidr string, dest string) error { - c.mutexroutes.Lock() - defer c.mutexroutes.Unlock() - // Is the CIDR we've been given valid? ipaddr, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -294,11 +275,6 @@ func (c *cryptokey) addRoute(cidr string, dest string) error { // length specified in bytes) from the crypto-key routing table. An error is // returned if the address is not suitable or no route was found. func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (crypto.BoxPubKey, error) { - c.mutexroutes.RLock() - c.mutexcache.RLock() - defer c.mutexroutes.RUnlock() - defer c.mutexcache.RUnlock() - // Check if the address is a valid Yggdrasil address - if so it // is exempt from all CKR checking if addr.IsValid() { @@ -359,9 +335,6 @@ func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (c // Removes a source subnet, which allows traffic with these source addresses to // be tunnelled using crypto-key routing. func (c *cryptokey) removeSourceSubnet(cidr string) error { - c.mutexsources.Lock() - defer c.mutexsources.Unlock() - // Is the CIDR we've been given valid? _, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -397,11 +370,6 @@ func (c *cryptokey) removeSourceSubnet(cidr string) error { // Removes a destination route for the given CIDR to be tunnelled to the node // with the given BoxPubKey. func (c *cryptokey) removeRoute(cidr string, dest string) error { - c.mutexroutes.Lock() - c.mutexcache.Lock() - defer c.mutexroutes.Unlock() - defer c.mutexcache.Unlock() - // Is the CIDR we've been given valid? _, ipnet, err := net.ParseCIDR(cidr) if err != nil { diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 4b00fc3..c6d6f4a 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -141,6 +141,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { c.peers.reconfigure, c.router.reconfigure, c.router.tun.reconfigure, + c.router.cryptokey.reconfigure, c.switchTable.reconfigure, c.tcp.reconfigure, c.multicast.reconfigure, diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 74fff3f..68fb025 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -127,14 +127,6 @@ func (r *router) mainLoop() { case f := <-r.admin: f() case e := <-r.reconfigure: - // Send reconfigure notification to cryptokey - response := make(chan error) - r.cryptokey.reconfigure <- response - if err := <-response; err != nil { - e <- err - } - - // Anything else to do? e <- nil } } From 5cde3b5efc5ef4b72ae7e6fcba941b68f171a522 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 18:51:49 +0000 Subject: [PATCH 16/22] Update nodeinfo in router reconfigure --- src/yggdrasil/admin.go | 5 ++++- src/yggdrasil/router.go | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 4c864b3..a44044a 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -350,7 +350,10 @@ func (a *admin) init(c *Core) { } var box_pub_key, coords string if in["box_pub_key"] == nil && in["coords"] == nil { - nodeinfo := []byte(a.core.nodeinfo.getNodeInfo()) + var nodeinfo []byte + a.core.router.doAdmin(func() { + nodeinfo = []byte(a.core.nodeinfo.getNodeInfo()) + }) var jsoninfo interface{} if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil { return admin_info{}, err diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 68fb025..b4e16a3 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -127,7 +127,9 @@ func (r *router) mainLoop() { case f := <-r.admin: f() case e := <-r.reconfigure: - e <- nil + r.core.configMutex.RLock() + e <- r.core.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy) + r.core.configMutex.RUnlock() } } } From 9e486ed4fe7ea68e4aa5618611a2eaf659c004ce Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 19:05:16 +0000 Subject: [PATCH 17/22] Move nodeinfo into router --- src/yggdrasil/admin.go | 8 ++++---- src/yggdrasil/core.go | 8 ++------ src/yggdrasil/nodeinfo.go | 2 +- src/yggdrasil/router.go | 8 ++++++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index a44044a..db85629 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -352,7 +352,7 @@ func (a *admin) init(c *Core) { if in["box_pub_key"] == nil && in["coords"] == nil { var nodeinfo []byte a.core.router.doAdmin(func() { - nodeinfo = []byte(a.core.nodeinfo.getNodeInfo()) + nodeinfo = []byte(a.core.router.nodeinfo.getNodeInfo()) }) var jsoninfo interface{} if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil { @@ -864,7 +864,7 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) ( copy(key[:], keyBytes) } if !nocache { - if response, err := a.core.nodeinfo.getCachedNodeInfo(key); err == nil { + if response, err := a.core.router.nodeinfo.getCachedNodeInfo(key); err == nil { return response, nil } } @@ -882,14 +882,14 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) ( } response := make(chan *nodeinfoPayload, 1) sendNodeInfoRequest := func() { - a.core.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { + a.core.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { defer func() { recover() }() select { case response <- nodeinfo: default: } }) - a.core.nodeinfo.sendNodeInfo(key, coords, false) + a.core.router.nodeinfo.sendNodeInfo(key, coords, false) } a.core.router.doAdmin(sendNodeInfoRequest) go func() { diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index c6d6f4a..dc1b8f0 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -44,7 +44,6 @@ type Core struct { admin admin searches searches multicast multicast - nodeinfo nodeinfo tcp tcpInterface awdl awdl log *log.Logger @@ -83,7 +82,6 @@ func (c *Core) init() error { copy(c.sigPriv[:], sigPrivHex) c.admin.init(c) - c.nodeinfo.init(c) c.searches.init(c) c.dht.init(c) c.sessions.init(c) @@ -197,8 +195,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.init() - c.nodeinfo.setNodeInfo(nc.NodeInfo, nc.NodeInfoPrivacy) - if err := c.tcp.init(c); err != nil { c.log.Println("Failed to start TCP interface") return err @@ -297,12 +293,12 @@ func (c *Core) GetSubnet() *net.IPNet { // Gets the nodeinfo. func (c *Core) GetNodeInfo() nodeinfoPayload { - return c.nodeinfo.getNodeInfo() + return c.router.nodeinfo.getNodeInfo() } // Sets the nodeinfo. func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) { - c.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) + c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) } // Sets the output logger of the Yggdrasil node after startup. This may be diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index b907632..963a2fc 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -170,7 +170,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse nodeinfo := nodeinfoReqRes{ SendCoords: table.self.getCoords(), IsResponse: isResponse, - NodeInfo: m.core.nodeinfo.getNodeInfo(), + NodeInfo: m.getNodeInfo(), } bs := nodeinfo.encode() shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b4e16a3..11509d4 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -51,6 +51,7 @@ type router struct { reset chan struct{} // signal that coords changed (re-init sessions/dht) admin chan func() // pass a lambda for the admin socket to query stuff cryptokey cryptokey + nodeinfo nodeinfo } // Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the tun. @@ -85,6 +86,9 @@ func (r *router) init(core *Core) { r.send = send r.reset = make(chan struct{}, 1) r.admin = make(chan func(), 32) + r.core.configMutex.RLock() + r.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy) + r.core.configMutex.RUnlock() r.cryptokey.init(r.core) r.tun.init(r.core, send, recv) } @@ -128,7 +132,7 @@ func (r *router) mainLoop() { f() case e := <-r.reconfigure: r.core.configMutex.RLock() - e <- r.core.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy) + e <- r.nodeinfo.setNodeInfo(r.core.config.NodeInfo, r.core.config.NodeInfoPrivacy) r.core.configMutex.RUnlock() } } @@ -469,7 +473,7 @@ func (r *router) handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { return } req.SendPermPub = *fromKey - r.core.nodeinfo.handleNodeInfo(&req) + r.nodeinfo.handleNodeInfo(&req) } // Passed a function to call. From f6b663c2578ae5855e5c915233b55cb4342baa7f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 19:27:13 +0000 Subject: [PATCH 18/22] Make multicasting use config instead of ifceExpr in Core --- cmd/yggdrasil/main.go | 10 ---------- src/yggdrasil/core.go | 9 --------- src/yggdrasil/multicast.go | 21 ++++++++++++++------- 3 files changed, 14 insertions(+), 26 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index cc54d3d..c3add0c 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -9,7 +9,6 @@ import ( "log" "os" "os/signal" - "regexp" "strings" "syscall" @@ -221,15 +220,6 @@ func main() { // Setup the Yggdrasil node itself. The node{} type includes a Core, so we // don't need to create this manually. n := node{} - // Check to see if any multicast interface expressions were provided in the - // config. If they were then set them now. - for _, ll := range cfg.MulticastInterfaces { - ifceExpr, err := regexp.Compile(ll) - if err != nil { - panic(err) - } - n.core.AddMulticastInterfaceExpr(ifceExpr) - } // Now that we have a working configuration, we can now actually start // Yggdrasil. This will start the router, switch, DHT node, TCP and UDP // sockets, TUN/TAP adapter and multicast discovery port. diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index dc1b8f0..3382562 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "log" "net" - "regexp" "sync" "time" @@ -47,7 +46,6 @@ type Core struct { tcp tcpInterface awdl awdl log *log.Logger - ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this } func (c *Core) init() error { @@ -313,13 +311,6 @@ func (c *Core) AddPeer(addr string, sintf string) error { return c.admin.addPeer(addr, sintf) } -// Adds an expression to select multicast interfaces for peer discovery. This -// should be done before calling Start. This function can be called multiple -// times to add multiple search expressions. -func (c *Core) AddMulticastInterfaceExpr(expr *regexp.Regexp) { - c.ifceExpr = append(c.ifceExpr, expr) -} - // Adds an allowed public key. This allow peerings to be restricted only to // keys that you have selected. func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error { diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index cf84fed..4087895 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "regexp" "sync" "time" @@ -35,15 +36,13 @@ func (m *multicast) init(core *Core) { }() m.groupAddr = "[ff02::114]:9001" // Check if we've been given any expressions - if len(m.core.ifceExpr) == 0 { - return + if count := len(m.interfaces()); count != 0 { + m.core.log.Println("Found", count, "multicast interface(s)") } - // Ask the system for network interfaces - m.core.log.Println("Found", len(m.interfaces()), "multicast interface(s)") } func (m *multicast) start() error { - if len(m.core.ifceExpr) == 0 { + if len(m.interfaces()) == 0 { m.core.log.Println("Multicast discovery is disabled") } else { m.core.log.Println("Multicast discovery is enabled") @@ -71,6 +70,10 @@ func (m *multicast) start() error { } func (m *multicast) interfaces() []net.Interface { + // Get interface expressions from config + m.core.configMutex.RLock() + exprs := m.core.config.MulticastInterfaces + m.core.configMutex.RUnlock() // Ask the system for network interfaces var interfaces []net.Interface allifaces, err := net.Interfaces() @@ -91,8 +94,12 @@ func (m *multicast) interfaces() []net.Interface { // Ignore point-to-point interfaces continue } - for _, expr := range m.core.ifceExpr { - if expr.MatchString(iface.Name) { + for _, expr := range exprs { + e, err := regexp.Compile(expr) + if err != nil { + panic(err) + } + if e.MatchString(iface.Name) { interfaces = append(interfaces, iface) } } From d9ddf30faf2998c1ad814253a1dadf927527eeee Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 14 Jan 2019 19:29:22 +0000 Subject: [PATCH 19/22] Fix debug builds --- src/yggdrasil/debug.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 6bd5430..7c6c757 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -517,7 +517,7 @@ func (c *Core) DEBUG_setLogger(log *log.Logger) { } func (c *Core) DEBUG_setIfceExpr(expr *regexp.Regexp) { - c.ifceExpr = append(c.ifceExpr, expr) + c.log.Println("DEBUG_setIfceExpr no longer implemented") } func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { From 39567bed83308cc1431943eb91ea321fb5e33d14 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 15 Jan 2019 08:44:33 +0000 Subject: [PATCH 20/22] Address some comments --- src/yggdrasil/ckr.go | 14 ++++++-------- src/yggdrasil/tun.go | 1 + 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/yggdrasil/ckr.go b/src/yggdrasil/ckr.go index 14464f6..a5ed455 100644 --- a/src/yggdrasil/ckr.go +++ b/src/yggdrasil/ckr.go @@ -38,14 +38,12 @@ func (c *cryptokey) init(core *Core) { c.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-c.reconfigure: - var err error - c.core.router.doAdmin(func() { - err = c.core.router.cryptokey.configure() - }) - e <- err - } + e := <-c.reconfigure + var err error + c.core.router.doAdmin(func() { + err = c.core.router.cryptokey.configure() + }) + e <- err } }() diff --git a/src/yggdrasil/tun.go b/src/yggdrasil/tun.go index 0bda312..46fabbc 100644 --- a/src/yggdrasil/tun.go +++ b/src/yggdrasil/tun.go @@ -54,6 +54,7 @@ func (tun *tunAdapter) init(core *Core, send chan<- []byte, recv <-chan []byte) tun.core.config.IfMTU != tun.core.configOld.IfMTU tun.core.configMutex.RUnlock() if updated { + tun.core.log.Println("Reconfiguring TUN/TAP is not supported yet") e <- nil } else { e <- nil From 2cd373fc1e41a115299dc8f56bd2d4e6b0b40ab6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 15 Jan 2019 08:51:19 +0000 Subject: [PATCH 21/22] Remove unnecessary selects --- src/yggdrasil/admin.go | 18 ++++++++---------- src/yggdrasil/dht.go | 6 ++---- src/yggdrasil/multicast.go | 12 +++++------- src/yggdrasil/peer.go | 6 ++---- src/yggdrasil/search.go | 6 ++---- src/yggdrasil/session.go | 26 ++++++++++++-------------- src/yggdrasil/tcp.go | 22 ++++++++++------------ src/yggdrasil/tun.go | 24 +++++++++++------------- 8 files changed, 52 insertions(+), 68 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index db85629..5524fe2 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -57,17 +57,15 @@ func (a *admin) init(c *Core) { a.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-a.reconfigure: - a.core.configMutex.RLock() - if a.core.config.AdminListen != a.core.configOld.AdminListen { - a.listenaddr = a.core.config.AdminListen - a.close() - a.start() - } - a.core.configMutex.RUnlock() - e <- nil + e := <-a.reconfigure + a.core.configMutex.RLock() + if a.core.config.AdminListen != a.core.configOld.AdminListen { + a.listenaddr = a.core.config.AdminListen + a.close() + a.start() } + a.core.configMutex.RUnlock() + e <- nil } }() a.core.configMutex.RLock() diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index bba6dfc..5427aca 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -82,10 +82,8 @@ func (t *dht) init(c *Core) { t.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-t.reconfigure: - e <- nil - } + e := <-t.reconfigure + e <- nil } }() t.nodeID = *t.core.GetNodeID() diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 4087895..08f0954 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -25,13 +25,11 @@ func (m *multicast) init(core *Core) { m.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-m.reconfigure: - m.myAddrMutex.Lock() - m.myAddr = m.core.tcp.getAddr() - m.myAddrMutex.Unlock() - e <- nil - } + e := <-m.reconfigure + m.myAddrMutex.Lock() + m.myAddr = m.core.tcp.getAddr() + m.myAddrMutex.Unlock() + e <- nil } }() m.groupAddr = "[ff02::114]:9001" diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 98cbe02..c83504f 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -35,10 +35,8 @@ func (ps *peers) init(c *Core) { ps.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-ps.reconfigure: - e <- nil - } + e := <-ps.reconfigure + e <- nil } }() ps.allowedEncryptionPublicKeys = make(map[crypto.BoxPubKey]struct{}) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 8106fb7..c391dda 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -53,10 +53,8 @@ func (s *searches) init(core *Core) { s.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-s.reconfigure: - e <- nil - } + e := <-s.reconfigure + e <- nil } }() s.searches = make(map[crypto.NodeID]*searchInfo) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 3cd1cf7..e29cd4f 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -131,21 +131,19 @@ func (ss *sessions) init(core *Core) { ss.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-ss.reconfigure: - responses := make(map[crypto.Handle]chan error) - for index, session := range ss.sinfos { - responses[index] = make(chan error) - session.reconfigure <- responses[index] - } - for _, response := range responses { - if err := <-response; err != nil { - e <- err - continue - } - } - e <- nil + e := <-ss.reconfigure + responses := make(map[crypto.Handle]chan error) + for index, session := range ss.sinfos { + responses[index] = make(chan error) + session.reconfigure <- responses[index] } + for _, response := range responses { + if err := <-response; err != nil { + e <- err + continue + } + } + e <- nil } }() ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index b47b553..9cab9ea 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -90,18 +90,16 @@ func (iface *tcpInterface) init(core *Core) (err error) { iface.reconfigure = make(chan chan error, 1) go func() { for { - select { - case e := <-iface.reconfigure: - iface.core.configMutex.RLock() - updated := iface.core.config.Listen != iface.core.configOld.Listen - iface.core.configMutex.RUnlock() - if updated { - iface.serv_stop <- true - iface.serv.Close() - e <- iface.listen() - } else { - e <- nil - } + e := <-iface.reconfigure + iface.core.configMutex.RLock() + updated := iface.core.config.Listen != iface.core.configOld.Listen + iface.core.configMutex.RUnlock() + if updated { + iface.serv_stop <- true + iface.serv.Close() + e <- iface.listen() + } else { + e <- nil } } }() diff --git a/src/yggdrasil/tun.go b/src/yggdrasil/tun.go index 46fabbc..c0a7139 100644 --- a/src/yggdrasil/tun.go +++ b/src/yggdrasil/tun.go @@ -46,19 +46,17 @@ func (tun *tunAdapter) init(core *Core, send chan<- []byte, recv <-chan []byte) tun.icmpv6.init(tun) go func() { for { - select { - case e := <-tun.reconfigure: - tun.core.configMutex.RLock() - updated := tun.core.config.IfName != tun.core.configOld.IfName || - tun.core.config.IfTAPMode != tun.core.configOld.IfTAPMode || - tun.core.config.IfMTU != tun.core.configOld.IfMTU - tun.core.configMutex.RUnlock() - if updated { - tun.core.log.Println("Reconfiguring TUN/TAP is not supported yet") - e <- nil - } else { - e <- nil - } + e := <-tun.reconfigure + tun.core.configMutex.RLock() + updated := tun.core.config.IfName != tun.core.configOld.IfName || + tun.core.config.IfTAPMode != tun.core.configOld.IfTAPMode || + tun.core.config.IfMTU != tun.core.configOld.IfMTU + tun.core.configMutex.RUnlock() + if updated { + tun.core.log.Println("Reconfiguring TUN/TAP is not supported yet") + e <- nil + } else { + e <- nil } } }() From 53be1b02f3a8685bf56c1d7371fbf0822af954b7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 15 Jan 2019 08:53:57 +0000 Subject: [PATCH 22/22] Check if accepting socket produced an error --- src/yggdrasil/tcp.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 9cab9ea..c90c3ff 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -140,6 +140,10 @@ func (iface *tcpInterface) listener() { iface.core.log.Println("Listening for TCP on:", iface.serv.Addr().String()) for { sock, err := iface.serv.Accept() + if err != nil { + iface.core.log.Println("Failed to accept connection:", err) + return + } select { case <-iface.serv_stop: iface.core.log.Println("Stopping listener")