diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f9aca08..1834cfd 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -11,17 +11,6 @@ package yggdrasil // It needs to ignore messages with a lower seq // Probably best to start setting seq to a timestamp in that case... -// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon -// That could happen with a peer over a high-latency link, with many msgHops -// Possible workarounds: -// 1. Pre-emptively send all hops when one is requested, or after any change -// Maybe requires changing how the throttle works and msgHops are saved -// In case some arrive out of order or are dropped -// This is relatively easy to implement, but could be wasteful -// 2. Save your old locator, sigs, etc, so you can respond to older ancs -// And finish requesting an old anc before updating to a new one -// But that may lead to other issues if not done carefully... - import "time" import "sync" import "sync/atomic" @@ -83,36 +72,23 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { } type peer struct { - // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends - // use get/update methods only! (atomic accessors as float64) - queueSize int64 + queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element - firstSeen time.Time // To track uptime for getPeers + core *Core + port switchPort box boxPubKey sig sigPubKey shared boxSharedKey - //in <-chan []byte - //out chan<- []byte - //in func([]byte) - out func([]byte) - core *Core - port switchPort - // This is used to limit how often we perform expensive operations - throttle uint8 // TODO apply this sanely - // Called when a peer is removed, to close the underlying connection, or via admin api - close func() - // To allow the peer to call close if idle for too long - lastAnc time.Time // TODO? rename and use this - // used for protocol traffic (to bypass queues) - linkOut (chan []byte) - doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo *dhtInfo // used to keep the DHT working + firstSeen time.Time // To track uptime for getPeers + linkOut (chan []byte) // used for protocol traffic (to bypass queues) + doSend (chan struct{}) // tell the linkLoop to send a switchMsg + dinfo *dhtInfo // used to keep the DHT working + out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + close func() // Called when a peer is removed, to close the underlying connection, or via admin api } -const peer_Throttle = 1 - func (p *peer) getQueueSize() int64 { return atomic.LoadInt64(&p.queueSize) } @@ -126,7 +102,6 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { p := peer{box: *box, sig: *sig, shared: *getSharedKey(&ps.core.boxPriv, box), - lastAnc: now, firstSeen: now, doSend: make(chan struct{}, 1), core: ps.core} @@ -315,9 +290,8 @@ func (p *peer) handleSwitchMsg(packet []byte) { msg.decode(packet) //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) if len(msg.Hops) < 1 { - p.throttle++ panic("FIXME testing") - return + p.core.peers.removePeer(p.port) } var loc switchLocator prevKey := msg.Root @@ -328,9 +302,8 @@ func (p *peer) handleSwitchMsg(packet []byte) { loc.coords = append(loc.coords, hop.Port) bs := getBytesForSig(&hop.Next, &sigMsg) if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { - p.throttle++ panic("FIXME testing") - return + p.core.peers.removePeer(p.port) } prevKey = hop.Next } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index aa9e3d3..67d9d8f 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -9,7 +9,7 @@ package yggdrasil // TODO document/comment everything in a lot more detail // TODO? use a pre-computed lookup table (python version had this) -// A little annoying to do with constant changes from bandwidth estimates +// A little annoying to do with constant changes from backpressure import "time" import "sort"