diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index f4cd75f..a62f9ff 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -6,13 +6,15 @@ import "os" import "strings" import "strconv" import "time" -import "log" import "runtime" import "runtime/pprof" import "flag" +import "github.com/gologme/log" + import . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" +import . "github.com/yggdrasil-network/yggdrasil-go/src/crypto" //////////////////////////////////////////////////////////////////////////////// diff --git a/src/util/util.go b/src/util/util.go index 45be3b1..df15ff2 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -56,3 +56,23 @@ func TimerStop(t *time.Timer) bool { } return true } + +// Run a blocking function with a timeout. +// Returns true if the function returns. +// Returns false if the timer fires. +// The blocked function remains blocked--the caller is responsible for somehow killing it. +func FuncTimeout(f func(), timeout time.Duration) bool { + success := make(chan struct{}) + go func() { + defer close(success) + f() + }() + timer := time.NewTimer(timeout) + defer TimerStop(timer) + select { + case <-success: + return true + case <-timer.C: + return false + } +} diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 2e23dd1..0443328 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -125,11 +125,15 @@ func (c *Core) addPeerLoop() { // UpdateConfig updates the configuration in Core and then signals the // various module goroutines to reconfigure themselves if needed func (c *Core) UpdateConfig(config *config.NodeConfig) { + c.log.Infoln("Reloading configuration...") + c.configMutex.Lock() c.configOld = c.config c.config = *config c.configMutex.Unlock() + errors := 0 + components := []chan chan error{ c.admin.reconfigure, c.searches.reconfigure, @@ -148,9 +152,16 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { response := make(chan error) component <- response if err := <-response; err != nil { - c.log.Println(err) + c.log.Errorln(err) + errors++ } } + + if errors > 0 { + c.log.Warnln(errors, "modules reported errors during configuration reload") + } else { + c.log.Infoln("Configuration reloaded successfully") + } } // GetBuildName gets the current build name. This is usually injected if built diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ad4b1fa..6fc7687 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "errors" "fmt" + "io" "net" "strings" "sync" @@ -91,11 +92,16 @@ func (intf *linkInterface) handler() error { meta.link = *myLinkPub metaBytes := meta.encode() // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) - err := intf.msgIO._sendMetaBytes(metaBytes) + var err error + if !util.FuncTimeout(func() { err = intf.msgIO._sendMetaBytes(metaBytes) }, 30*time.Second) { + return errors.New("timeout on metadata send") + } if err != nil { return err } - metaBytes, err = intf.msgIO._recvMetaBytes() + if !util.FuncTimeout(func() { metaBytes, err = intf.msgIO._recvMetaBytes() }, 30*time.Second) { + return errors.New("timeout on metadata recv") + } if err != nil { return err } @@ -109,8 +115,8 @@ func (intf *linkInterface) handler() error { return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP - if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", + if !intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + intf.link.core.log.Warnf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() return nil @@ -162,8 +168,6 @@ func (intf *linkInterface) handler() error { themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - defer intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop go intf.peer.linkLoop() // Start the writer @@ -216,8 +220,8 @@ func (intf *linkInterface) handler() error { case signalReady <- struct{}{}: default: } - intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) + //intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", + // strings.ToUpper(intf.info.linkType), themString, intf.info.local) } } }() @@ -225,26 +229,32 @@ func (intf *linkInterface) handler() error { // Used to enable/disable activity in the switch signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive defer close(signalAlive) + ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved go func() { var isAlive bool var isReady bool var sendTimerRunning bool var recvTimerRunning bool - recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?... sendTime := time.Second sendTimer := time.NewTimer(sendTime) defer util.TimerStop(sendTimer) recvTimer := time.NewTimer(recvTime) defer util.TimerStop(recvTimer) + closeTimer := time.NewTimer(closeTime) + defer util.TimerStop(closeTimer) for { - intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", - strings.ToUpper(intf.info.linkType), themString, intf.info.local, - isAlive, isReady, sendTimerRunning, recvTimerRunning) + //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", + // strings.ToUpper(intf.info.linkType), themString, intf.info.local, + // isAlive, isReady, sendTimerRunning, recvTimerRunning) select { case gotMsg, ok := <-signalAlive: if !ok { return } + util.TimerStop(closeTimer) + closeTimer.Reset(closeTime) util.TimerStop(recvTimer) recvTimerRunning = false isAlive = true @@ -299,6 +309,14 @@ func (intf *linkInterface) handler() error { case <-recvTimer.C: // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding isAlive = false + case <-closeTimer.C: + // We haven't received anything in a really long time, so things have died at the switch level and then some... + // Just close the connection at this point... + select { + case ret <- errors.New("timeout"): + default: + } + intf.msgIO.close() } } }() @@ -309,7 +327,13 @@ func (intf *linkInterface) handler() error { intf.peer.handlePacket(msg) } if err != nil { - return err + if err != io.EOF { + select { + case ret <- err: + default: + } + } + break } select { case signalAlive <- len(msg) > 0: @@ -317,5 +341,15 @@ func (intf *linkInterface) handler() error { } } //////////////////////////////////////////////////////////////////////////////// - return nil + // Remember to set `err` to something useful before returning + select { + case err = <-ret: + intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) + default: + err = nil + intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + } + return err } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 99e6982..6acd473 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -66,19 +66,36 @@ func (r *router) init(core *Core) { r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) - in := make(chan []byte, 32) // TODO something better than this... + in := make(chan []byte, 1) // TODO something better than this... p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) - p.out = func(packet []byte) { - // This is to make very sure it never blocks - select { - case in <- packet: - return - default: - util.PutBytes(packet) - } - } + p.out = func(packet []byte) { in <- packet } r.in = in - r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block + out := make(chan []byte, 32) + go func() { + for packet := range out { + p.handlePacket(packet) + } + }() + out2 := make(chan []byte, 32) + go func() { + // This worker makes sure r.out never blocks + // It will buffer traffic long enough for the switch worker to take it + // If (somehow) you can send faster than the switch can receive, then this would use unbounded memory + // But crypto slows sends enough that the switch should always be able to take the packets... + var buf [][]byte + for { + buf = append(buf, <-out2) + for len(buf) > 0 { + select { + case bs := <-out2: + buf = append(buf, bs) + case out <- buf[0]: + buf = buf[1:] + } + } + } + }() + r.out = func(packet []byte) { out2 <- packet } r.toRecv = make(chan router_recvPacket, 32) recv := make(chan []byte, 32) send := make(chan []byte, 32) @@ -305,7 +322,6 @@ func (r *router) sendPacket(bs []byte) { // Don't continue - drop the packet return } - sinfo.send <- bs } } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index cdabaf2..012af57 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -525,17 +525,36 @@ func (ss *sessions) resetInits() { // It handles calling the relatively expensive crypto operations. // It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK. func (sinfo *sessionInfo) doWorker() { + send := make(chan []byte, 32) + defer close(send) + go func() { + for bs := range send { + sinfo.doSend(bs) + } + }() + recv := make(chan *wire_trafficPacket, 32) + defer close(recv) + go func() { + for p := range recv { + sinfo.doRecv(p) + } + }() for { select { case p, ok := <-sinfo.recv: if ok { - sinfo.doRecv(p) + select { + case recv <- p: + default: + // We need something to not block, and it's best to drop it before we decrypt + util.PutBytes(p.Payload) + } } else { return } case bs, ok := <-sinfo.send: if ok { - sinfo.doSend(bs) + send <- bs } else { return } @@ -625,8 +644,5 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { sinfo.updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) - select { - case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}: - default: // avoid deadlocks, maybe do this somewhere else?... - } + sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo} } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index a2877eb..bf6b919 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -176,6 +176,7 @@ type switchTable struct { admin chan func() // Pass a lambda for the admin socket to query stuff queues switch_buffers // Queues - not atomic so ONLY use through admin chan queueTotalMaxSize uint64 // Maximum combined size of queues + toRouter chan []byte // Packets to be sent to the router } // Minimum allowed total size of switch queues. @@ -199,6 +200,7 @@ func (t *switchTable) init(core *Core) { t.idleIn = make(chan switchPort, 1024) t.admin = make(chan func()) t.queueTotalMaxSize = SwitchQueueTotalMinSize + t.toRouter = make(chan []byte, 1) } // Safely gets a copy of this node's locator. @@ -215,7 +217,6 @@ func (t *switchTable) doMaintenance() { defer t.mutex.Unlock() // Release lock when we're done t.cleanRoot() t.cleanDropped() - t.cleanPeers() } // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. @@ -272,28 +273,6 @@ func (t *switchTable) forgetPeer(port switchPort) { } } -// Clean all unresponsive peers from the table, needed in case a peer stops updating. -// Needed in case a non-parent peer keeps the connection open but stops sending updates. -// Also reclaims space from deleted peers by copying the map. -func (t *switchTable) cleanPeers() { - now := time.Now() - for port, peer := range t.data.peers { - if now.Sub(peer.time) > switch_timeout+switch_throttle { - // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. - delete(t.data.peers, port) - go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe - } - } - if _, isIn := t.data.peers[t.parent]; !isIn { - // The root timestamp would probably time out before this happens, but better safe than sorry. - // We removed the current parent, so find a new one. - t.parent = 0 - for _, peer := range t.data.peers { - t.unlockedHandleMsg(&peer.msg, peer.port, true) - } - } -} - // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // This function is called periodically to do that cleanup. @@ -570,23 +549,23 @@ func (t *switchTable) start() error { return nil } -// Check if a packet should go to the self node -// This means there's no node closer to the destination than us -// This is mainly used to identify packets addressed to us, or that hit a blackhole -func (t *switchTable) selfIsClosest(dest []byte) bool { +// Return a map of ports onto distance, keeping only ports closer to the destination than this node +// If the map is empty (or nil), then no peer is closer +func (t *switchTable) getCloser(dest []byte) map[switchPort]int { table := t.getTable() myDist := table.self.dist(dest) if myDist == 0 { // Skip the iteration step if it's impossible to be closer - return true + return nil } + closer := make(map[switchPort]int, len(table.elems)) for _, info := range table.elems { dist := info.locator.dist(dest) if dist < myDist { - return false + closer[info.port] = dist } } - return true + return closer } // Returns true if the peer is closer to the destination than ourself @@ -639,26 +618,48 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) - ports := t.core.peers.getPorts() - if t.selfIsClosest(coords) { + closer := t.getCloser(coords) + if len(closer) == 0 { // TODO? call the router directly, and remove the whole concept of a self peer? - ports[0].sendPacket(packet) + t.toRouter <- packet return true } table := t.getTable() - myDist := table.self.dist(coords) var best *peer - bestDist := myDist - for port := range idle { - if to := ports[port]; to != nil { - if info, isIn := table.elems[to.port]; isIn { - dist := info.locator.dist(coords) - if !(dist < bestDist) { - continue - } - best = to - bestDist = dist - } + var bestDist int + var bestCoordLen int + ports := t.core.peers.getPorts() + for port, dist := range closer { + to := ports[port] + _, isIdle := idle[port] + coordLen := len(table.elems[port].locator.coords) + var update bool + switch { + case to == nil: + //nothing + case !isIdle: + //nothing + case best == nil: + update = true + case dist < bestDist: + update = true + case dist > bestDist: + //nothing + case coordLen < bestCoordLen: + update = true + /* + case coordLen > bestCoordLen: + //nothing + case port < best.port: + update = true + */ + default: + //nothing + } + if update { + best = to + bestDist = dist + bestCoordLen = coordLen } } if best != nil { @@ -697,7 +698,7 @@ func (b *switch_buffers) cleanup(t *switchTable) { // Remove queues for which we have no next hop packet := buf.packets[0] coords := switch_getPacketCoords(packet.bytes) - if t.selfIsClosest(coords) { + if len(t.getCloser(coords)) == 0 { for _, packet := range buf.packets { util.PutBytes(packet.bytes) } @@ -776,11 +777,38 @@ func (t *switchTable) handleIdle(port switchPort) bool { // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { + sendingToRouter := make(chan []byte, 1) + go func() { + // Keep sending packets to the router + self := t.core.peers.getPorts()[0] + for bs := range sendingToRouter { + self.sendPacket(bs) + } + }() + go func() { + // Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer + var buf [][]byte + for { + buf = append(buf, <-t.toRouter) + for len(buf) > 0 { + select { + case bs := <-t.toRouter: + buf = append(buf, bs) + for len(buf) > 32 { + util.PutBytes(buf[0]) + buf = buf[1:] + } + case sendingToRouter <- buf[0]: + buf = buf[1:] + } + } + } + }() t.queues.switchTable = t t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { - t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) + //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) select { case bytes := <-t.packetIn: // Try to send it somewhere (or drop it if it's corrupt or at a dead end)