From 052de98f126f50341ca60e41064fe485f77858d6 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 16 May 2020 17:07:47 -0500 Subject: [PATCH 01/18] work-in-progress on buffering overhaul --- src/yggdrasil/api.go | 8 ++-- src/yggdrasil/link.go | 74 +++++++++++++++++++++------- src/yggdrasil/packetqueue.go | 82 +++++++++++++++---------------- src/yggdrasil/peer.go | 56 ++++++++++++++-------- src/yggdrasil/router.go | 93 +++++++++++++++++++++++++----------- 5 files changed, 202 insertions(+), 111 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 31ece6b..66ee9b8 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -123,10 +123,10 @@ func (c *Core) GetPeers() []Peer { var info Peer phony.Block(p, func() { info = Peer{ - Endpoint: p.intf.name, + Endpoint: p.intf.name(), BytesSent: p.bytesSent, BytesRecvd: p.bytesRecvd, - Protocol: p.intf.info.linkType, + Protocol: p.intf.interfaceType(), Port: uint64(port), Uptime: time.Since(p.firstSeen), } @@ -163,8 +163,8 @@ func (c *Core) GetSwitchPeers() []SwitchPeer { BytesSent: peer.bytesSent, BytesRecvd: peer.bytesRecvd, Port: uint64(elem.port), - Protocol: peer.intf.info.linkType, - Endpoint: peer.intf.info.remote, + Protocol: peer.intf.interfaceType(), + Endpoint: peer.intf.remote(), } copy(info.PublicKey[:], peer.box[:]) }) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 539d048..3b3cfdb 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -47,7 +47,7 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string + lname string link *link peer *peer msgIO linkInterfaceMsgIO @@ -125,7 +125,7 @@ func (l *link) listen(uri string) error { func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { // Technically anything unique would work for names, but let's pick something human readable, just for debugging intf := linkInterface{ - name: name, + lname: name, link: l, msgIO: msgIO, info: linkInfo{ @@ -178,7 +178,7 @@ func (intf *linkInterface) handler() error { } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.link.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP @@ -217,23 +217,9 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - out := func(msgs [][]byte) { - // nil to prevent it from blocking if the link is somehow frozen - // this is safe because another packet won't be sent until the link notifies - // the peer that it's ready for one - intf.writer.sendFrom(nil, msgs, false) - } - linkOut := func(bs []byte) { - // nil to prevent it from blocking if the link is somehow frozen - // FIXME this is hypothetically not safe, the peer shouldn't be sending - // additional packets until this one finishes, otherwise this could leak - // memory if writing happens slower than link packets are generated... - // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) - } phony.Block(&intf.link.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut) + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -275,6 +261,58 @@ func (intf *linkInterface) handler() error { //////////////////////////////////////////////////////////////////////////////// +// linkInterface needs to match the peerInterface type needed by the peers + +func (intf *linkInterface) out(bss [][]byte) { + intf.Act(nil, func() { + // nil to prevent it from blocking if the link is somehow frozen + // this is safe because another packet won't be sent until the link notifies + // the peer that it's ready for one + intf.writer.sendFrom(nil, bss, false) + }) +} + +func (intf *linkInterface) linkOut(bs []byte) { + intf.Act(nil, func() { + // nil to prevent it from blocking if the link is somehow frozen + // FIXME this is hypothetically not safe, the peer shouldn't be sending + // additional packets until this one finishes, otherwise this could leak + // memory if writing happens slower than link packets are generated... + // that seems unlikely, so it's a lesser evil than deadlocking for now + intf.writer.sendFrom(nil, [][]byte{bs}, true) + }) +} + +func (intf *linkInterface) notifyQueued(seq uint64) { + // This is the part where we want non-nil 'from' fields + intf.Act(intf.peer, func() { + if !intf.isIdle { + intf.peer.dropFromQueue(intf, seq) + } + }) +} + +func (intf *linkInterface) close() { + intf.Act(nil, func() { intf.msgIO.close() }) +} + +func (intf *linkInterface) name() string { + return intf.lname +} + +func (intf *linkInterface) local() string { + return intf.info.local +} + +func (intf *linkInterface) remote() string { + return intf.info.remote +} + +func (intf *linkInterface) interfaceType() string { + return intf.info.linkType +} + +//////////////////////////////////////////////////////////////////////////////// const ( sendTime = 1 * time.Second // How long to wait before deciding a send is blocked keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 2000ffa..7abdaea 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -4,9 +4,6 @@ import ( "time" ) -// TODO take max size from config -const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB - type pqStreamID string type pqPacketInfo struct { @@ -25,46 +22,50 @@ type packetQueue struct { size uint64 } -func (q *packetQueue) cleanup() { - for q.size > MAX_PACKET_QUEUE_SIZE { - // TODO? drop from a random stream - // odds proportional to size? bandwidth? - // always using the worst is exploitable -> flood 1 packet per random stream - // find the stream that's using the most bandwidth - now := time.Now() - var worst pqStreamID - for id := range q.streams { +// drop will remove a packet from the queue, returning it to the pool +// returns true if a packet was removed, false otherwise +func (q *packetQueue) drop() bool { + if q.size == 0 { + return false + } + // TODO? drop from a random stream + // odds proportional to size? bandwidth? + // always using the worst is exploitable -> flood 1 packet per random stream + // find the stream that's using the most bandwidth + now := time.Now() + var worst pqStreamID + for id := range q.streams { + worst = id + break // get a random ID to start + } + worstStream := q.streams[worst] + worstSize := float64(worstStream.size) + worstAge := now.Sub(worstStream.infos[0].time).Seconds() + for id, stream := range q.streams { + thisSize := float64(stream.size) + thisAge := now.Sub(stream.infos[0].time).Seconds() + // cross multiply to avoid division by zero issues + if worstSize*thisAge < thisSize*worstAge { + // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth worst = id - break // get a random ID to start - } - worstStream := q.streams[worst] - worstSize := float64(worstStream.size) - worstAge := now.Sub(worstStream.infos[0].time).Seconds() - for id, stream := range q.streams { - thisSize := float64(stream.size) - thisAge := now.Sub(stream.infos[0].time).Seconds() - // cross multiply to avoid division by zero issues - if worstSize*thisAge < thisSize*worstAge { - // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth - worst = id - worstStream = stream - worstSize = thisSize - worstAge = thisAge - } - } - // Drop the oldest packet from the worst stream - packet := worstStream.infos[0].packet - worstStream.infos = worstStream.infos[1:] - worstStream.size -= uint64(len(packet)) - q.size -= uint64(len(packet)) - pool_putBytes(packet) - // save the modified stream to queues - if len(worstStream.infos) > 0 { - q.streams[worst] = worstStream - } else { - delete(q.streams, worst) + worstStream = stream + worstSize = thisSize + worstAge = thisAge } } + // Drop the oldest packet from the worst stream + packet := worstStream.infos[0].packet + worstStream.infos = worstStream.infos[1:] + worstStream.size -= uint64(len(packet)) + q.size -= uint64(len(packet)) + pool_putBytes(packet) + // save the modified stream to queues + if len(worstStream.infos) > 0 { + q.streams[worst] = worstStream + } else { + delete(q.streams, worst) + } + return true } func (q *packetQueue) push(packet []byte) { @@ -80,7 +81,6 @@ func (q *packetQueue) push(packet []byte) { // save update to queues q.streams[id] = stream q.size += uint64(len(packet)) - q.cleanup() } func (q *packetQueue) pop() ([]byte, bool) { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 31bba66..31ea5f4 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -77,29 +77,38 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string { return ps.core.config.Current.AllowedEncryptionPublicKeys } +type peerInterface interface { + out([][]byte) + linkOut([]byte) + notifyQueued(uint64) + close() + // These next ones are only used by the API + name() string + local() string + remote() string + interfaceType() string +} + // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { phony.Inbox core *Core - intf *linkInterface + intf peerInterface port switchPort box crypto.BoxPubKey sig crypto.SigPubKey shared crypto.BoxSharedKey linkShared crypto.BoxSharedKey endpoint string - firstSeen time.Time // To track uptime for getPeers - linkOut func([]byte) // used for protocol traffic (bypasses the switch) - dinfo *dhtInfo // used to keep the DHT working - out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes - done (chan struct{}) // closed to exit the linkLoop - close func() // Called when a peer is removed, to close the underlying connection, or via admin api + firstSeen time.Time // To track uptime for getPeers + dinfo *dhtInfo // used to keep the DHT working // The below aren't actually useful internally, they're just gathered for getPeers statistics bytesSent uint64 bytesRecvd uint64 ports map[switchPort]*peer table *lookupTable queue packetQueue + seq uint64 // this and idle are used to detect when to drop packets from queue idle bool } @@ -123,19 +132,15 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf peerInterface) *peer { now := time.Now() p := peer{box: *box, + core: ps.core, + intf: intf, sig: *sig, shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), linkShared: *linkShared, firstSeen: now, - done: make(chan struct{}), - close: closer, - core: ps.core, - intf: intf, - out: out, - linkOut: linkOut, } oldPorts := ps.ports newPorts := make(map[switchPort]*peer) @@ -172,10 +177,7 @@ func (ps *peers) _removePeer(p *peer) { newPorts[k] = v } delete(newPorts, p.port) - if p.close != nil { - p.close() - } - close(p.done) + p.intf.close() ps.ports = newPorts ps._updatePeers() } @@ -295,12 +297,26 @@ func (p *peer) _handleIdle() { } if len(packets) > 0 { p.bytesSent += uint64(size) - p.out(packets) + p.intf.out(packets) } else { p.idle = true } } +func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { + p.Act(from, func() { + switch { + case seq != p.seq: + case p.queue.drop(): + p.intf.notifyQueued(p.seq) + } + if seq != p.seq { + return + } + + }) +} + // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. func (p *peer) _sendLinkPacket(packet []byte) { @@ -316,7 +332,7 @@ func (p *peer) _sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - p.linkOut(packet) + p.intf.linkOut(packet) } // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 1bb14c4..303ada6 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -45,6 +45,8 @@ type router struct { nodeinfo nodeinfo searches searches sessions sessions + intf routerInterface + peer *peer table *lookupTable // has a copy of our locator } @@ -53,28 +55,17 @@ func (r *router) init(core *Core) { r.core = core r.addr = *address.AddrForNodeID(&r.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.dht.nodeID) - self := linkInterface{ - name: "(self)", - info: linkInfo{ - local: "(self)", - remote: "(self)", - linkType: "self", - }, - } - var p *peer - peerOut := func(packets [][]byte) { - r.handlePackets(p, packets) - r.Act(p, func() { - // after the router handle the packets, notify the peer that it's ready for more - p.Act(r, p._handleIdle) - }) - } + r.intf.router = r phony.Block(&r.core.peers, func() { // FIXME don't block here! - p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil) + r.peer = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &r.intf) }) - p.Act(r, p._handleIdle) - r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } + r.peer.Act(r, r.peer._handleIdle) + r.out = func(bs []byte) { + r.intf.Act(r, func() { + r.peer.handlePacketFrom(&r.intf, bs) + }) + } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy) @@ -123,15 +114,6 @@ func (r *router) start() error { return nil } -// In practice, the switch will call this with 1 packet -func (r *router) handlePackets(from phony.Actor, packets [][]byte) { - r.Act(from, func() { - for _, packet := range packets { - r._handlePacket(packet) - } - }) -} - // Insert a peer info into the dht, TODO? make the dht a separate actor func (r *router) insertPeer(from phony.Actor, info *dhtInfo) { r.Act(from, func() { @@ -275,3 +257,58 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { req.SendPermPub = *fromKey r.nodeinfo.handleNodeInfo(r, &req) } + +//////////////////////////////////////////////////////////////////////////////// + +// routerInterface is a helper that implements peerInterface +type routerInterface struct { + phony.Inbox + router *router + busy bool +} + +func (intf *routerInterface) out(bss [][]byte) { + intf.Act(intf.router.peer, func() { + intf.router.Act(intf, func() { + for _, bs := range bss { + intf.router._handlePacket(bs) + } + // we may block due to the above + // so we send a message to ourself, that we'd handle after unblocking + // that message tells us to tell the interface that we're finally idle again + intf.router.Act(nil, func() { + intf.Act(intf.router, intf._handleIdle) + }) + intf.Act(intf.router, intf._handleBusy) + }) + }) +} + +func (intf *routerInterface) _handleBusy() { + intf.busy = true +} + +func (intf *routerInterface) _handleIdle() { + intf.busy = false + intf.router.peer.Act(intf, intf.router.peer._handleIdle) +} + +func (intf *routerInterface) linkOut(_ []byte) {} + +func (intf *routerInterface) notifyQueued(seq uint64) { + intf.Act(intf.router.peer, func() { + if intf.busy { + intf.router.peer.dropFromQueue(intf, seq) + } + }) +} + +func (intf *routerInterface) close() {} + +func (intf *routerInterface) name() string { return "(self)" } + +func (intf *routerInterface) local() string { return "(self)" } + +func (intf *routerInterface) remote() string { return "(self)" } + +func (intf *routerInterface) interfaceType() string { return "self" } From b132560f651ad5f04b977517803a1d6016b1e9a5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 16 May 2020 17:24:26 -0500 Subject: [PATCH 02/18] it helps to actually run the notifyQueued stuff... --- src/yggdrasil/peer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 31ea5f4..ada2921 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -281,6 +281,8 @@ func (p *peer) _sendPackets(packets [][]byte) { if p.idle { p.idle = false p._handleIdle() + } else { + p.intf.notifyQueued(p.seq) } } @@ -296,6 +298,7 @@ func (p *peer) _handleIdle() { } } if len(packets) > 0 { + p.seq++ p.bytesSent += uint64(size) p.intf.out(packets) } else { From b17a035a05214c8066eaea613f86f176cfd7d33c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 16 May 2020 17:40:11 -0500 Subject: [PATCH 03/18] workarounds to dropping being too aggressive --- src/yggdrasil/packetqueue.go | 5 +++++ src/yggdrasil/peer.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 7abdaea..e37d5bb 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -55,6 +55,11 @@ func (q *packetQueue) drop() bool { } // Drop the oldest packet from the worst stream packet := worstStream.infos[0].packet + if q.size-uint64(len(packet)) < streamMsgSize { + // TODO something better + // We don't want to drop *all* packets, so lets save 1 batch worth... + return false + } worstStream.infos = worstStream.infos[1:] worstStream.size -= uint64(len(packet)) q.size -= uint64(len(packet)) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ada2921..0c195c6 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -289,7 +289,7 @@ func (p *peer) _sendPackets(packets [][]byte) { func (p *peer) _handleIdle() { var packets [][]byte var size uint64 - for size < 65535 { + for size < streamMsgSize { if packet, success := p.queue.pop(); success { packets = append(packets, packet) size += uint64(len(packet)) From 62b9fab5f822f018940062ca8914852f41a8ce0b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 16 May 2020 18:56:04 -0500 Subject: [PATCH 04/18] more work-in-progress, debugging why things are dropping so often --- src/yggdrasil/link.go | 31 +++++++++++++------------------ src/yggdrasil/packetqueue.go | 2 +- src/yggdrasil/peer.go | 5 +---- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 3b3cfdb..8098950 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -63,8 +63,7 @@ type linkInterface struct { stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it isIdle bool // True if the peer actor knows the link is idle - stalled bool // True if we haven't been receiving any response traffic - unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) + blocked bool // True if we've blocked the peer in the switch } func (l *link) init(c *Core) error { @@ -235,6 +234,7 @@ func (intf *linkInterface) handler() error { strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start things go intf.peer.start() + intf.Act(nil, intf._notifyIdle) intf.reader.Act(nil, intf.reader._read) // Wait for the reader to finish // TODO find a way to do this without keeping live goroutines around @@ -344,8 +344,9 @@ func (intf *linkInterface) _cancelStallTimer() { // through other links, if alternatives exist func (intf *linkInterface) notifyBlockedSend() { intf.Act(nil, func() { - if intf.sendTimer != nil { + if intf.sendTimer != nil && !intf.blocked { //As far as we know, we're still trying to send, and the timer fired. + intf.blocked = true intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) @@ -365,25 +366,21 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { }) } -// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state +// Notify the peer that we're ready for more traffic func (intf *linkInterface) _notifyIdle() { if !intf.isIdle { - if intf.stalled { - intf.unstalled = false - } else { - intf.isIdle = true - intf.peer.Act(intf, intf.peer._handleIdle) - } + intf.isIdle = true + intf.peer.Act(intf, intf.peer._handleIdle) } } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds func (intf *linkInterface) notifyStalled() { intf.Act(nil, func() { // Sent from a time.AfterFunc - if intf.stallTimer != nil { + if intf.stallTimer != nil && !intf.blocked { intf.stallTimer.Stop() intf.stallTimer = nil - intf.stalled = true + intf.blocked = true intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) @@ -406,15 +403,13 @@ func (intf *linkInterface) notifyRead(size int) { intf.stallTimer.Stop() intf.stallTimer = nil } - intf.stalled = false - if !intf.unstalled { - intf._notifyIdle() - intf.unstalled = true - } if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } - intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) + if intf.blocked { + intf.blocked = false + intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) + } }) } diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index e37d5bb..caabe67 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -55,7 +55,7 @@ func (q *packetQueue) drop() bool { } // Drop the oldest packet from the worst stream packet := worstStream.infos[0].packet - if q.size-uint64(len(packet)) < streamMsgSize { + if false && q.size-uint64(len(packet)) < streamMsgSize { // TODO something better // We don't want to drop *all* packets, so lets save 1 batch worth... return false diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 0c195c6..f88eb8b 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -311,12 +311,9 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { switch { case seq != p.seq: case p.queue.drop(): + p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size) p.intf.notifyQueued(p.seq) } - if seq != p.seq { - return - } - }) } From 527d44391666305d818d0b83fd9c38a85d74406c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 07:21:09 -0500 Subject: [PATCH 05/18] move where the queue size check before dropping would occur --- src/yggdrasil/packetqueue.go | 5 ----- src/yggdrasil/peer.go | 1 + 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index caabe67..7abdaea 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -55,11 +55,6 @@ func (q *packetQueue) drop() bool { } // Drop the oldest packet from the worst stream packet := worstStream.infos[0].packet - if false && q.size-uint64(len(packet)) < streamMsgSize { - // TODO something better - // We don't want to drop *all* packets, so lets save 1 batch worth... - return false - } worstStream.infos = worstStream.infos[1:] worstStream.size -= uint64(len(packet)) q.size -= uint64(len(packet)) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f88eb8b..d3f7047 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -310,6 +310,7 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { switch { case seq != p.seq: + //case p.queue.size < streamMsgSize: case p.queue.drop(): p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size) p.intf.notifyQueued(p.seq) From 15ac2595aa96a0c1a29fe6a1927a4366dd03f3f4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 08:22:02 -0500 Subject: [PATCH 06/18] use a dedicated per-stream writer goroutine, send messages to it over a 1-buffered channel, this eliminates most of the false positive blocking that causes drops --- src/yggdrasil/link.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 8098950..d439c30 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -256,6 +256,11 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } + intf.writer.Act(nil, func() { + if intf.writer.worker != nil { + close(intf.writer.worker) + } + }) return err } @@ -428,7 +433,8 @@ func (intf *linkInterface) notifyDoKeepAlive() { type linkWriter struct { phony.Inbox - intf *linkInterface + intf *linkInterface + worker chan [][]byte } func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { @@ -437,8 +443,19 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool for _, bs := range bss { size += len(bs) } + if w.worker == nil { + w.worker = make(chan [][]byte, 1) + go func() { + for bss := range w.worker { + w.intf.msgIO.writeMsgs(bss) + } + }() + } w.intf.notifySending(size, isLinkTraffic) - w.intf.msgIO.writeMsgs(bss) + func() { + defer func() { recover() }() + w.worker <- bss + }() w.intf.notifySent(size, isLinkTraffic) }) } From 0dcc555eabd40cb8cce10ef5fdcf2589aec18ca5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 08:34:22 -0500 Subject: [PATCH 07/18] cleaner startup/shutdown of the link writer's worker --- src/yggdrasil/link.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index d439c30..5676ebe 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -136,6 +136,7 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st force: force, } intf.writer.intf = &intf + intf.writer.worker = make(chan [][]byte, 1) intf.reader.intf = &intf intf.reader.err = make(chan error) return &intf, nil @@ -151,6 +152,15 @@ func (l *link) stop() error { func (intf *linkInterface) handler() error { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later + go func() { + for bss := range intf.writer.worker { + intf.msgIO.writeMsgs(bss) + } + }() + defer intf.writer.Act(nil, func() { + intf.writer.closed = true + close(intf.writer.worker) + }) myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() meta.box = intf.link.core.boxPub @@ -256,11 +266,6 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } - intf.writer.Act(nil, func() { - if intf.writer.worker != nil { - close(intf.writer.worker) - } - }) return err } @@ -435,27 +440,20 @@ type linkWriter struct { phony.Inbox intf *linkInterface worker chan [][]byte + closed bool } func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { w.Act(from, func() { + if w.closed { + return + } var size int for _, bs := range bss { size += len(bs) } - if w.worker == nil { - w.worker = make(chan [][]byte, 1) - go func() { - for bss := range w.worker { - w.intf.msgIO.writeMsgs(bss) - } - }() - } w.intf.notifySending(size, isLinkTraffic) - func() { - defer func() { recover() }() - w.worker <- bss - }() + w.worker <- bss w.intf.notifySent(size, isLinkTraffic) }) } From 6e92af1cd26da323db8d83bf52ca1d850ab6df8a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 08:49:40 -0500 Subject: [PATCH 08/18] re-enable a minimum queue size of ~1 big packet --- src/yggdrasil/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d3f7047..361a0ea 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -310,7 +310,7 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { switch { case seq != p.seq: - //case p.queue.size < streamMsgSize: + case p.queue.size < streamMsgSize: case p.queue.drop(): p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size) p.intf.notifyQueued(p.seq) From 7720e169f26d4609de81ee5dcd83fe96df4cfe64 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 12:09:40 -0500 Subject: [PATCH 09/18] when we detect we're blocked, only drop packets often enough to make sure the existing queue's size is non-increasing, and always drop the worst packet from a random flow with odds based on the total size of packets queued for that flow --- src/yggdrasil/packetqueue.go | 32 ++++++++++---------------------- src/yggdrasil/peer.go | 20 ++++++++++++-------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 7abdaea..464bc6c 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,6 +1,7 @@ package yggdrasil import ( + "math/rand" "time" ) @@ -28,32 +29,19 @@ func (q *packetQueue) drop() bool { if q.size == 0 { return false } - // TODO? drop from a random stream - // odds proportional to size? bandwidth? - // always using the worst is exploitable -> flood 1 packet per random stream - // find the stream that's using the most bandwidth - now := time.Now() + // select a random stream, odds based on stream size + offset := rand.Uint64() % q.size var worst pqStreamID - for id := range q.streams { - worst = id - break // get a random ID to start - } - worstStream := q.streams[worst] - worstSize := float64(worstStream.size) - worstAge := now.Sub(worstStream.infos[0].time).Seconds() + var size uint64 for id, stream := range q.streams { - thisSize := float64(stream.size) - thisAge := now.Sub(stream.infos[0].time).Seconds() - // cross multiply to avoid division by zero issues - if worstSize*thisAge < thisSize*worstAge { - // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth - worst = id - worstStream = stream - worstSize = thisSize - worstAge = thisAge + worst = id + size += stream.size + if size >= offset { + break } } - // Drop the oldest packet from the worst stream + // drop the oldest packet from the stream + worstStream := q.streams[worst] packet := worstStream.infos[0].packet worstStream.infos = worstStream.infos[1:] worstStream.size -= uint64(len(packet)) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 361a0ea..3110307 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -110,6 +110,7 @@ type peer struct { queue packetQueue seq uint64 // this and idle are used to detect when to drop packets from queue idle bool + drop bool // set to true if we're dropping packets from the queue } func (ps *peers) updateTables(from phony.Actor, table *lookupTable) { @@ -275,13 +276,19 @@ func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { } func (p *peer) _sendPackets(packets [][]byte) { + size := p.queue.size for _, packet := range packets { p.queue.push(packet) } - if p.idle { + switch { + case p.idle: p.idle = false p._handleIdle() - } else { + case p.drop: + for p.queue.size > size { + p.queue.drop() + } + default: p.intf.notifyQueued(p.seq) } } @@ -303,17 +310,14 @@ func (p *peer) _handleIdle() { p.intf.out(packets) } else { p.idle = true + p.drop = false } } func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { - switch { - case seq != p.seq: - case p.queue.size < streamMsgSize: - case p.queue.drop(): - p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size) - p.intf.notifyQueued(p.seq) + if seq == p.seq { + p.drop = true } }) } From d96ae156a11f03d491d987954662bf83b1cb2482 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 12:27:43 -0500 Subject: [PATCH 10/18] slight change to peer function names/args --- src/yggdrasil/peer.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 3110307..d8d14cf 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -264,22 +264,20 @@ func (p *peer) _handleTraffic(packet []byte) { coords := peer_getPacketCoords(packet) next := p.table.lookup(coords) if nPeer, isIn := p.ports[next]; isIn { - nPeer.sendPacketsFrom(p, [][]byte{packet}) + nPeer.sendPacketFrom(p, packet) } //p.core.switchTable.packetInFrom(p, packet) } -func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { +func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) { p.Act(from, func() { - p._sendPackets(packets) + p._sendPacket(packet) }) } -func (p *peer) _sendPackets(packets [][]byte) { +func (p *peer) _sendPacket(packet []byte) { size := p.queue.size - for _, packet := range packets { - p.queue.push(packet) - } + p.queue.push(packet) switch { case p.idle: p.idle = false From ff3c8cb687561b2485f7d6ad8e5723a8127aaa7a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 12:58:57 -0500 Subject: [PATCH 11/18] less aggresive queue size reduction --- src/yggdrasil/peer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d8d14cf..02e92f9 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -108,6 +108,7 @@ type peer struct { ports map[switchPort]*peer table *lookupTable queue packetQueue + max uint64 seq uint64 // this and idle are used to detect when to drop packets from queue idle bool drop bool // set to true if we're dropping packets from the queue @@ -276,14 +277,13 @@ func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) { } func (p *peer) _sendPacket(packet []byte) { - size := p.queue.size p.queue.push(packet) switch { case p.idle: p.idle = false p._handleIdle() case p.drop: - for p.queue.size > size { + for p.queue.size > p.max { p.queue.drop() } default: @@ -306,6 +306,9 @@ func (p *peer) _handleIdle() { p.seq++ p.bytesSent += uint64(size) p.intf.out(packets) + if p.drop { + p.max = p.queue.size + } } else { p.idle = true p.drop = false From d43b93f60a771566280d34bc058f42b48f168ae6 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 13:23:15 -0500 Subject: [PATCH 12/18] safer check for the queues if we're blocked on a send, should work even if we're blocked on a link packet send --- src/yggdrasil/link.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 5676ebe..a0ce5d8 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -63,6 +63,7 @@ type linkInterface struct { stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it isIdle bool // True if the peer actor knows the link is idle + isSending bool // True between a notifySending and a notifySent blocked bool // True if we've blocked the peer in the switch } @@ -296,7 +297,7 @@ func (intf *linkInterface) linkOut(bs []byte) { func (intf *linkInterface) notifyQueued(seq uint64) { // This is the part where we want non-nil 'from' fields intf.Act(intf.peer, func() { - if !intf.isIdle { + if intf.isSending { intf.peer.dropFromQueue(intf, seq) } }) @@ -336,6 +337,7 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { if !isLinkTraffic { intf.isIdle = false } + intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() }) @@ -370,6 +372,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { if !isLinkTraffic { intf._notifyIdle() } + intf.isSending = false if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) } From cf2edc99d1d7d16316c30522c4fac0be3a45da34 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 17 May 2020 13:32:58 -0500 Subject: [PATCH 13/18] correctly set peer.max --- src/yggdrasil/peer.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 02e92f9..339ea5a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -306,9 +306,7 @@ func (p *peer) _handleIdle() { p.seq++ p.bytesSent += uint64(size) p.intf.out(packets) - if p.drop { - p.max = p.queue.size - } + p.max = p.queue.size } else { p.idle = true p.drop = false @@ -319,6 +317,7 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { if seq == p.seq { p.drop = true + p.max = p.queue.size } }) } From 59c5644a52cbc5316fd5c2dc596706aa1c35d635 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 May 2020 10:08:23 -0500 Subject: [PATCH 14/18] some peer/link cleanup --- src/yggdrasil/link.go | 42 ++++++++++++++++++++---------------------- src/yggdrasil/peer.go | 2 +- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a0ce5d8..dc61892 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -62,7 +62,6 @@ type linkInterface struct { keepAliveTimer *time.Timer // Fires to send keep-alive traffic stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - isIdle bool // True if the peer actor knows the link is idle isSending bool // True between a notifySending and a notifySent blocked bool // True if we've blocked the peer in the switch } @@ -279,7 +278,7 @@ func (intf *linkInterface) out(bss [][]byte) { // nil to prevent it from blocking if the link is somehow frozen // this is safe because another packet won't be sent until the link notifies // the peer that it's ready for one - intf.writer.sendFrom(nil, bss, false) + intf.writer.sendFrom(nil, bss) }) } @@ -290,7 +289,7 @@ func (intf *linkInterface) linkOut(bs []byte) { // additional packets until this one finishes, otherwise this could leak // memory if writing happens slower than link packets are generated... // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) + intf.writer.sendFrom(nil, [][]byte{bs}) }) } @@ -332,11 +331,8 @@ const ( ) // notify the intf that we're currently sending -func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { +func (intf *linkInterface) notifySending(size int) { intf.Act(&intf.writer, func() { - if !isLinkTraffic { - intf.isIdle = false - } intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() @@ -365,13 +361,18 @@ func (intf *linkInterface) notifyBlockedSend() { } // notify the intf that we've finished sending, returning the peer to the switch -func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { +func (intf *linkInterface) notifySent(size int) { intf.Act(&intf.writer, func() { - intf.sendTimer.Stop() - intf.sendTimer = nil - if !isLinkTraffic { - intf._notifyIdle() + if intf.sendTimer != nil { + intf.sendTimer.Stop() + intf.sendTimer = nil } + if intf.keepAliveTimer != nil { + // TODO? unset this when we start sending, not when we finish... + intf.keepAliveTimer.Stop() + intf.keepAliveTimer = nil + } + intf._notifyIdle() intf.isSending = false if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) @@ -381,10 +382,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { // Notify the peer that we're ready for more traffic func (intf *linkInterface) _notifyIdle() { - if !intf.isIdle { - intf.isIdle = true - intf.peer.Act(intf, intf.peer._handleIdle) - } + intf.peer.Act(intf, intf.peer._handleIdle) } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds @@ -416,8 +414,8 @@ func (intf *linkInterface) notifyRead(size int) { intf.stallTimer.Stop() intf.stallTimer = nil } - if size > 0 && intf.stallTimer == nil { - intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) + if size > 0 && intf.keepAliveTimer == nil { + intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } if intf.blocked { intf.blocked = false @@ -432,7 +430,7 @@ func (intf *linkInterface) notifyDoKeepAlive() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil - intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic + intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic } }) } @@ -446,7 +444,7 @@ type linkWriter struct { closed bool } -func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { +func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) { w.Act(from, func() { if w.closed { return @@ -455,9 +453,9 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool for _, bs := range bss { size += len(bs) } - w.intf.notifySending(size, isLinkTraffic) + w.intf.notifySending(size) w.worker <- bss - w.intf.notifySent(size, isLinkTraffic) + w.intf.notifySent(size) }) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 339ea5a..3cfc0b4 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -317,7 +317,7 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { if seq == p.seq { p.drop = true - p.max = p.queue.size + p.max = p.queue.size + streamMsgSize } }) } From ef1e506a0c05dc163b5db89b5bf4e30be4cbf761 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 May 2020 10:23:55 -0500 Subject: [PATCH 15/18] work-in-progress on more cleanup --- src/yggdrasil/api.go | 6 ++-- src/yggdrasil/core.go | 6 ++-- src/yggdrasil/link.go | 70 +++++++++++++++++++------------------- src/yggdrasil/simlink.go | 2 +- src/yggdrasil/stream.go | 2 +- src/yggdrasil/switch.go | 2 +- src/yggdrasil/tcp.go | 50 +++++++++++++-------------- src/yggdrasil/tcp_linux.go | 6 ++-- src/yggdrasil/tls.go | 4 +-- 9 files changed, 74 insertions(+), 74 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 66ee9b8..b5b8d36 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -257,14 +257,14 @@ func (c *Core) ConnDialer() (*Dialer, error) { // "Listen" configuration item, e.g. // tcp://a.b.c.d:e func (c *Core) ListenTCP(uri string) (*TcpListener, error) { - return c.link.tcp.listen(uri, nil) + return c.links.tcp.listen(uri, nil) } // ListenTLS starts a new TLS listener. The input URI should match that of the // "Listen" configuration item, e.g. // tls://a.b.c.d:e func (c *Core) ListenTLS(uri string) (*TcpListener, error) { - return c.link.tcp.listen(uri, c.link.tcp.tls.forListener) + return c.links.tcp.listen(uri, c.links.tcp.tls.forListener) } // NodeID gets the node ID. This is derived from your router encryption keys. @@ -463,7 +463,7 @@ func (c *Core) RemovePeer(addr string, sintf string) error { // This does not add the peer to the peer list, so if the connection drops, the // peer will not be called again automatically. func (c *Core) CallPeer(addr string, sintf string) error { - return c.link.call(addr, sintf) + return c.links.call(addr, sintf) } // DisconnectPeer disconnects a peer once. This should be specified as a port diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index f766494..4ac678d 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -29,7 +29,7 @@ type Core struct { switchTable switchTable peers peers router router - link link + links links log *log.Logger addPeerTimer *time.Timer } @@ -165,7 +165,7 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState return nil, err } - if err := c.link.init(c); err != nil { + if err := c.links.init(c); err != nil { c.log.Errorln("Failed to start link interfaces") return nil, err } @@ -197,7 +197,7 @@ func (c *Core) _stop() { if c.addPeerTimer != nil { c.addPeerTimer.Stop() } - c.link.stop() + c.links.stop() /* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown for _, peer := range c.GetPeers() { c.DisconnectPeer(peer.Port) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index dc61892..9776ee5 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -20,7 +20,7 @@ import ( "github.com/Arceliar/phony" ) -type link struct { +type links struct { core *Core mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface @@ -37,7 +37,7 @@ type linkInfo struct { remote string // Remote name or address } -type linkInterfaceMsgIO interface { +type linkMsgIO interface { readMsg() ([]byte, error) writeMsgs([][]byte) (int, error) close() error @@ -48,9 +48,9 @@ type linkInterfaceMsgIO interface { type linkInterface struct { lname string - link *link + links *links peer *peer - msgIO linkInterfaceMsgIO + msgIO linkMsgIO info linkInfo incoming bool force bool @@ -66,7 +66,7 @@ type linkInterface struct { blocked bool // True if we've blocked the peer in the switch } -func (l *link) init(c *Core) error { +func (l *links) init(c *Core) error { l.core = c l.mutex.Lock() l.interfaces = make(map[linkInfo]*linkInterface) @@ -81,11 +81,11 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) reconfigure() { +func (l *links) reconfigure() { l.tcp.reconfigure() } -func (l *link) call(uri string, sintf string) error { +func (l *links) call(uri string, sintf string) error { u, err := url.Parse(uri) if err != nil { return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err) @@ -104,7 +104,7 @@ func (l *link) call(uri string, sintf string) error { return nil } -func (l *link) listen(uri string) error { +func (l *links) listen(uri string) error { u, err := url.Parse(uri) if err != nil { return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err) @@ -121,11 +121,11 @@ func (l *link) listen(uri string) error { } } -func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { +func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { // Technically anything unique would work for names, but let's pick something human readable, just for debugging intf := linkInterface{ lname: name, - link: l, + links: l, msgIO: msgIO, info: linkInfo{ linkType: linkType, @@ -142,7 +142,7 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st return &intf, nil } -func (l *link) stop() error { +func (l *links) stop() error { close(l.stopped) if err := l.tcp.stop(); err != nil { return err @@ -163,8 +163,8 @@ func (intf *linkInterface) handler() error { }) myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() - meta.box = intf.link.core.boxPub - meta.sig = intf.link.core.sigPub + meta.box = intf.links.core.boxPub + meta.sig = intf.links.core.sigPub meta.link = *myLinkPub metaBytes := meta.encode() // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) @@ -187,12 +187,12 @@ func (intf *linkInterface) handler() error { } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - intf.link.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.links.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP - if intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - intf.link.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", + if intf.incoming && !intf.force && !intf.links.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() return nil @@ -200,12 +200,12 @@ func (intf *linkInterface) handler() error { // Check if we already have a link to this node intf.info.box = meta.box intf.info.sig = meta.sig - intf.link.mutex.Lock() - if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { - intf.link.mutex.Unlock() + intf.links.mutex.Lock() + if oldIntf, isIn := intf.links.interfaces[intf.info]; isIn { + intf.links.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. - intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name) + intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.msgIO.close() if !intf.incoming { // Block outgoing connection attempts until the existing connection closes @@ -214,21 +214,21 @@ func (intf *linkInterface) handler() error { return nil } else { intf.closed = make(chan struct{}) - intf.link.interfaces[intf.info] = intf + intf.links.interfaces[intf.info] = intf defer func() { - intf.link.mutex.Lock() - delete(intf.link.interfaces, intf.info) - intf.link.mutex.Unlock() + intf.links.mutex.Lock() + delete(intf.links.interfaces, intf.info) + intf.links.mutex.Unlock() close(intf.closed) }() - intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) + intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name) } - intf.link.mutex.Unlock() + intf.links.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - phony.Block(&intf.link.core.peers, func() { + phony.Block(&intf.links.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) + intf.peer = intf.links.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -240,7 +240,7 @@ func (intf *linkInterface) handler() error { themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) - intf.link.core.log.Infof("Connected %s: %s, source %s", + intf.links.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start things go intf.peer.start() @@ -252,7 +252,7 @@ func (intf *linkInterface) handler() error { defer close(done) go func() { select { - case <-intf.link.stopped: + case <-intf.links.stopped: intf.msgIO.close() case <-done: } @@ -260,10 +260,10 @@ func (intf *linkInterface) handler() error { err = <-intf.reader.err // TODO don't report an error if it's just a 'use of closed network connection' if err != nil { - intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", + intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) } else { - intf.link.core.log.Infof("Disconnected %s: %s, source %s", + intf.links.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } return err @@ -355,7 +355,7 @@ func (intf *linkInterface) notifyBlockedSend() { if intf.sendTimer != nil && !intf.blocked { //As far as we know, we're still trying to send, and the timer fired. intf.blocked = true - intf.link.core.switchTable.blockPeer(intf, intf.peer.port) + intf.links.core.switchTable.blockPeer(intf, intf.peer.port) } }) } @@ -392,7 +392,7 @@ func (intf *linkInterface) notifyStalled() { intf.stallTimer.Stop() intf.stallTimer = nil intf.blocked = true - intf.link.core.switchTable.blockPeer(intf, intf.peer.port) + intf.links.core.switchTable.blockPeer(intf, intf.peer.port) } }) } @@ -419,7 +419,7 @@ func (intf *linkInterface) notifyRead(size int) { } if intf.blocked { intf.blocked = false - intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) + intf.links.core.switchTable.unblockPeer(intf, intf.peer.port) } }) } diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index f830c21..6c04a8c 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -58,7 +58,7 @@ func (c *Core) NewSimlink() *Simlink { s := &Simlink{rch: make(chan []byte, 1)} n := "Simlink" var err error - s.link, err = c.link.create(s, n, n, n, n, false, true) + s.link, err = c.links.create(s, n, n, n, n, false, true) if err != nil { panic(err) } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index be1398f..afa97c7 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -9,7 +9,7 @@ import ( ) // Test that this matches the interface we expect -var _ = linkInterfaceMsgIO(&stream{}) +var _ = linkMsgIO(&stream{}) type stream struct { rwc io.ReadWriteCloser diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6ab9a02..a5c099b 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -188,7 +188,7 @@ func (t *switchTable) init(core *Core) { func (t *switchTable) reconfigure() { // This is where reconfiguration would go, if we had anything useful to do. - t.core.link.reconfigure() + t.core.links.reconfigure() t.core.peers.reconfigure() } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 9cca419..17b34e9 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -33,7 +33,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { - link *link + links *links waitgroup sync.WaitGroup mutex sync.Mutex // Protecting the below listeners map[string]*TcpListener @@ -86,8 +86,8 @@ func (t *tcp) getAddr() *net.TCPAddr { } // Initializes the struct. -func (t *tcp) init(l *link) error { - t.link = l +func (t *tcp) init(l *links) error { + t.links = l t.tls.init(t) t.mutex.Lock() t.calls = make(map[string]struct{}) @@ -95,9 +95,9 @@ func (t *tcp) init(l *link) error { t.listeners = make(map[string]*TcpListener) t.mutex.Unlock() - t.link.core.config.Mutex.RLock() - defer t.link.core.config.Mutex.RUnlock() - for _, listenaddr := range t.link.core.config.Current.Listen { + t.links.core.config.Mutex.RLock() + defer t.links.core.config.Mutex.RUnlock() + for _, listenaddr := range t.links.core.config.Current.Listen { switch listenaddr[:6] { case "tcp://": if _, err := t.listen(listenaddr[6:], nil); err != nil { @@ -108,7 +108,7 @@ func (t *tcp) init(l *link) error { return err } default: - t.link.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring") + t.links.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring") } } @@ -126,35 +126,35 @@ func (t *tcp) stop() error { } func (t *tcp) reconfigure() { - t.link.core.config.Mutex.RLock() - added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) - deleted := util.Difference(t.link.core.config.Previous.Listen, t.link.core.config.Current.Listen) - t.link.core.config.Mutex.RUnlock() + t.links.core.config.Mutex.RLock() + added := util.Difference(t.links.core.config.Current.Listen, t.links.core.config.Previous.Listen) + deleted := util.Difference(t.links.core.config.Previous.Listen, t.links.core.config.Current.Listen) + t.links.core.config.Mutex.RUnlock() if len(added) > 0 || len(deleted) > 0 { for _, a := range added { switch a[:6] { case "tcp://": if _, err := t.listen(a[6:], nil); err != nil { - t.link.core.log.Errorln("Error adding TCP", a[6:], "listener:", err) + t.links.core.log.Errorln("Error adding TCP", a[6:], "listener:", err) } case "tls://": if _, err := t.listen(a[6:], t.tls.forListener); err != nil { - t.link.core.log.Errorln("Error adding TLS", a[6:], "listener:", err) + t.links.core.log.Errorln("Error adding TLS", a[6:], "listener:", err) } default: - t.link.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring") + t.links.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring") } } for _, d := range deleted { if d[:6] != "tcp://" && d[:6] != "tls://" { - t.link.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring") + t.links.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring") continue } t.mutex.Lock() if listener, ok := t.listeners[d[6:]]; ok { t.mutex.Unlock() listener.Stop() - t.link.core.log.Infoln("Stopped TCP listener:", d[6:]) + t.links.core.log.Infoln("Stopped TCP listener:", d[6:]) } else { t.mutex.Unlock() } @@ -202,13 +202,13 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { } // And here we go! defer func() { - t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) + t.links.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) l.Listener.Close() t.mutex.Lock() delete(t.listeners, listenaddr) t.mutex.Unlock() }() - t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) + t.links.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) go func() { <-l.stop l.Listener.Close() @@ -217,7 +217,7 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { for { sock, err := l.Listener.Accept() if err != nil { - t.link.core.log.Errorln("Failed to accept connection:", err) + t.links.core.log.Errorln("Failed to accept connection:", err) return } t.waitgroup.Add(1) @@ -344,7 +344,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp } conn, err = dialer.Dial("tcp", dst.String()) if err != nil { - t.link.core.log.Debugf("Failed to dial %s: %s", callproto, err) + t.links.core.log.Debugf("Failed to dial %s: %s", callproto, err) return } t.waitgroup.Add(1) @@ -361,7 +361,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade if upgrade != nil { var err error if sock, err = upgrade.upgrade(sock); err != nil { - t.link.core.log.Errorln("TCP handler upgrade failed:", err) + t.links.core.log.Errorln("TCP handler upgrade failed:", err) return } else { upgraded = true @@ -387,12 +387,12 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) } force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() - link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force) + link, err := t.links.create(&stream, name, proto, local, remote, incoming, force) if err != nil { - t.link.core.log.Println(err) + t.links.core.log.Println(err) panic(err) } - t.link.core.log.Debugln("DEBUG: starting handler for", name) + t.links.core.log.Debugln("DEBUG: starting handler for", name) err = link.handler() - t.link.core.log.Debugln("DEBUG: stopped handler for", name, err) + t.links.core.log.Debugln("DEBUG: stopped handler for", name, err) } diff --git a/src/yggdrasil/tcp_linux.go b/src/yggdrasil/tcp_linux.go index 9ec3c10..e18f92b 100644 --- a/src/yggdrasil/tcp_linux.go +++ b/src/yggdrasil/tcp_linux.go @@ -20,10 +20,10 @@ func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { // Log any errors if bbr != nil { - t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr) + t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr) } if control != nil { - t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control) + t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control) } // Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal @@ -38,7 +38,7 @@ func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) err } c.Control(btd) if err != nil { - t.link.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf) + t.links.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf) } return t.tcpContext(network, address, c) } diff --git a/src/yggdrasil/tls.go b/src/yggdrasil/tls.go index 7212c4d..e2861ac 100644 --- a/src/yggdrasil/tls.go +++ b/src/yggdrasil/tls.go @@ -34,7 +34,7 @@ func (t *tcptls) init(tcp *tcp) { } edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize) - copy(edpriv[:], tcp.link.core.sigPriv[:]) + copy(edpriv[:], tcp.links.core.sigPriv[:]) certBuf := &bytes.Buffer{} @@ -42,7 +42,7 @@ func (t *tcptls) init(tcp *tcp) { pubtemp := x509.Certificate{ SerialNumber: big.NewInt(1), Subject: pkix.Name{ - CommonName: hex.EncodeToString(tcp.link.core.sigPub[:]), + CommonName: hex.EncodeToString(tcp.links.core.sigPub[:]), }, NotBefore: time.Now(), NotAfter: time.Now().Add(time.Hour * 24 * 365), From 59896f17fd66566b49bc56cc8e963dade5e06c29 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 May 2020 10:28:57 -0500 Subject: [PATCH 16/18] more cleanup --- src/yggdrasil/link.go | 82 +++++++++++++++++++++++----------------- src/yggdrasil/peer.go | 16 +------- src/yggdrasil/simlink.go | 2 +- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 9776ee5..067c2ec 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -21,11 +21,11 @@ import ( ) type links struct { - core *Core - mutex sync.RWMutex // protects interfaces below - interfaces map[linkInfo]*linkInterface - tcp tcp // TCP interface support - stopped chan struct{} + core *Core + mutex sync.RWMutex // protects links below + links map[linkInfo]*link + tcp tcp // TCP interface support + stopped chan struct{} // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -46,7 +46,7 @@ type linkMsgIO interface { _recvMetaBytes() ([]byte, error) } -type linkInterface struct { +type link struct { lname string links *links peer *peer @@ -55,8 +55,8 @@ type linkInterface struct { incoming bool force bool closed chan struct{} - reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch - writer linkWriter // Writes packets, notifies this linkInterface + reader linkReader // Reads packets, notifies this link, passes packets to switch + writer linkWriter // Writes packets, notifies this link phony.Inbox // Protects the below sendTimer *time.Timer // Fires to signal that sending is blocked keepAliveTimer *time.Timer // Fires to send keep-alive traffic @@ -69,7 +69,7 @@ type linkInterface struct { func (l *links) init(c *Core) error { l.core = c l.mutex.Lock() - l.interfaces = make(map[linkInfo]*linkInterface) + l.links = make(map[linkInfo]*link) l.mutex.Unlock() l.stopped = make(chan struct{}) @@ -121,9 +121,9 @@ func (l *links) listen(uri string) error { } } -func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { +func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*link, error) { // Technically anything unique would work for names, but let's pick something human readable, just for debugging - intf := linkInterface{ + intf := link{ lname: name, links: l, msgIO: msgIO, @@ -150,7 +150,7 @@ func (l *links) stop() error { return nil } -func (intf *linkInterface) handler() error { +func (intf *link) handler() error { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later go func() { for bss := range intf.writer.worker { @@ -201,7 +201,7 @@ func (intf *linkInterface) handler() error { intf.info.box = meta.box intf.info.sig = meta.sig intf.links.mutex.Lock() - if oldIntf, isIn := intf.links.interfaces[intf.info]; isIn { + if oldIntf, isIn := intf.links.links[intf.info]; isIn { intf.links.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. @@ -214,10 +214,10 @@ func (intf *linkInterface) handler() error { return nil } else { intf.closed = make(chan struct{}) - intf.links.interfaces[intf.info] = intf + intf.links.links[intf.info] = intf defer func() { intf.links.mutex.Lock() - delete(intf.links.interfaces, intf.info) + delete(intf.links.links, intf.info) intf.links.mutex.Unlock() close(intf.closed) }() @@ -271,9 +271,21 @@ func (intf *linkInterface) handler() error { //////////////////////////////////////////////////////////////////////////////// -// linkInterface needs to match the peerInterface type needed by the peers +// link needs to match the linkInterface type needed by the peers -func (intf *linkInterface) out(bss [][]byte) { +type linkInterface interface { + out([][]byte) + linkOut([]byte) + notifyQueued(uint64) + close() + // These next ones are only used by the API + name() string + local() string + remote() string + interfaceType() string +} + +func (intf *link) out(bss [][]byte) { intf.Act(nil, func() { // nil to prevent it from blocking if the link is somehow frozen // this is safe because another packet won't be sent until the link notifies @@ -282,7 +294,7 @@ func (intf *linkInterface) out(bss [][]byte) { }) } -func (intf *linkInterface) linkOut(bs []byte) { +func (intf *link) linkOut(bs []byte) { intf.Act(nil, func() { // nil to prevent it from blocking if the link is somehow frozen // FIXME this is hypothetically not safe, the peer shouldn't be sending @@ -293,7 +305,7 @@ func (intf *linkInterface) linkOut(bs []byte) { }) } -func (intf *linkInterface) notifyQueued(seq uint64) { +func (intf *link) notifyQueued(seq uint64) { // This is the part where we want non-nil 'from' fields intf.Act(intf.peer, func() { if intf.isSending { @@ -302,23 +314,23 @@ func (intf *linkInterface) notifyQueued(seq uint64) { }) } -func (intf *linkInterface) close() { +func (intf *link) close() { intf.Act(nil, func() { intf.msgIO.close() }) } -func (intf *linkInterface) name() string { +func (intf *link) name() string { return intf.lname } -func (intf *linkInterface) local() string { +func (intf *link) local() string { return intf.info.local } -func (intf *linkInterface) remote() string { +func (intf *link) remote() string { return intf.info.remote } -func (intf *linkInterface) interfaceType() string { +func (intf *link) interfaceType() string { return intf.info.linkType } @@ -331,7 +343,7 @@ const ( ) // notify the intf that we're currently sending -func (intf *linkInterface) notifySending(size int) { +func (intf *link) notifySending(size int) { intf.Act(&intf.writer, func() { intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) @@ -340,7 +352,7 @@ func (intf *linkInterface) notifySending(size int) { } // we just sent something, so cancel any pending timer to send keep-alive traffic -func (intf *linkInterface) _cancelStallTimer() { +func (intf *link) _cancelStallTimer() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil @@ -350,7 +362,7 @@ func (intf *linkInterface) _cancelStallTimer() { // This gets called from a time.AfterFunc, and notifies the switch that we appear // to have gotten blocked on a write, so the switch should start routing traffic // through other links, if alternatives exist -func (intf *linkInterface) notifyBlockedSend() { +func (intf *link) notifyBlockedSend() { intf.Act(nil, func() { if intf.sendTimer != nil && !intf.blocked { //As far as we know, we're still trying to send, and the timer fired. @@ -361,7 +373,7 @@ func (intf *linkInterface) notifyBlockedSend() { } // notify the intf that we've finished sending, returning the peer to the switch -func (intf *linkInterface) notifySent(size int) { +func (intf *link) notifySent(size int) { intf.Act(&intf.writer, func() { if intf.sendTimer != nil { intf.sendTimer.Stop() @@ -381,12 +393,12 @@ func (intf *linkInterface) notifySent(size int) { } // Notify the peer that we're ready for more traffic -func (intf *linkInterface) _notifyIdle() { +func (intf *link) _notifyIdle() { intf.peer.Act(intf, intf.peer._handleIdle) } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds -func (intf *linkInterface) notifyStalled() { +func (intf *link) notifyStalled() { intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil && !intf.blocked { intf.stallTimer.Stop() @@ -398,7 +410,7 @@ func (intf *linkInterface) notifyStalled() { } // reset the close timer -func (intf *linkInterface) notifyReading() { +func (intf *link) notifyReading() { intf.Act(&intf.reader, func() { if intf.closeTimer != nil { intf.closeTimer.Stop() @@ -408,7 +420,7 @@ func (intf *linkInterface) notifyReading() { } // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic -func (intf *linkInterface) notifyRead(size int) { +func (intf *link) notifyRead(size int) { intf.Act(&intf.reader, func() { if intf.stallTimer != nil { intf.stallTimer.Stop() @@ -425,7 +437,7 @@ func (intf *linkInterface) notifyRead(size int) { } // We need to send keep-alive traffic now -func (intf *linkInterface) notifyDoKeepAlive() { +func (intf *link) notifyDoKeepAlive() { intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil { intf.stallTimer.Stop() @@ -439,7 +451,7 @@ func (intf *linkInterface) notifyDoKeepAlive() { type linkWriter struct { phony.Inbox - intf *linkInterface + intf *link worker chan [][]byte closed bool } @@ -463,7 +475,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) { type linkReader struct { phony.Inbox - intf *linkInterface + intf *link err chan error } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 3cfc0b4..4463bc6 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -77,23 +77,11 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string { return ps.core.config.Current.AllowedEncryptionPublicKeys } -type peerInterface interface { - out([][]byte) - linkOut([]byte) - notifyQueued(uint64) - close() - // These next ones are only used by the API - name() string - local() string - remote() string - interfaceType() string -} - // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { phony.Inbox core *Core - intf peerInterface + intf linkInterface port switchPort box crypto.BoxPubKey sig crypto.SigPubKey @@ -134,7 +122,7 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf peerInterface) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf linkInterface) *peer { now := time.Now() p := peer{box: *box, core: ps.core, diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index 6c04a8c..6675981 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -9,7 +9,7 @@ type Simlink struct { phony.Inbox rch chan []byte dest *Simlink - link *linkInterface + link *link started bool } From f2b9e95895167d54d53837c5b5ab731ec327982f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 May 2020 12:21:01 -0500 Subject: [PATCH 17/18] simplify routerInterface --- src/yggdrasil/router.go | 43 +++++++++-------------------------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 303ada6..cfb75a0 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -62,9 +62,7 @@ func (r *router) init(core *Core) { }) r.peer.Act(r, r.peer._handleIdle) r.out = func(bs []byte) { - r.intf.Act(r, func() { - r.peer.handlePacketFrom(&r.intf, bs) - }) + r.peer.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() @@ -262,46 +260,23 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { // routerInterface is a helper that implements peerInterface type routerInterface struct { - phony.Inbox router *router - busy bool } func (intf *routerInterface) out(bss [][]byte) { - intf.Act(intf.router.peer, func() { - intf.router.Act(intf, func() { - for _, bs := range bss { - intf.router._handlePacket(bs) - } - // we may block due to the above - // so we send a message to ourself, that we'd handle after unblocking - // that message tells us to tell the interface that we're finally idle again - intf.router.Act(nil, func() { - intf.Act(intf.router, intf._handleIdle) - }) - intf.Act(intf.router, intf._handleBusy) - }) + // Note that this is run in the peer's goroutine + intf.router.Act(intf.router.peer, func() { + for _, bs := range bss { + intf.router._handlePacket(bs) + } }) -} - -func (intf *routerInterface) _handleBusy() { - intf.busy = true -} - -func (intf *routerInterface) _handleIdle() { - intf.busy = false - intf.router.peer.Act(intf, intf.router.peer._handleIdle) + //intf.router.peer.Act(nil, intf.router.peer._handleIdle) + intf.router.peer._handleIdle() } func (intf *routerInterface) linkOut(_ []byte) {} -func (intf *routerInterface) notifyQueued(seq uint64) { - intf.Act(intf.router.peer, func() { - if intf.busy { - intf.router.peer.dropFromQueue(intf, seq) - } - }) -} +func (intf *routerInterface) notifyQueued(seq uint64) {} func (intf *routerInterface) close() {} From 77ded84ea580ae219802b96d5fb6bed389bfe998 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 May 2020 12:21:23 -0500 Subject: [PATCH 18/18] simplify routerInterface --- src/yggdrasil/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index cfb75a0..2ab3855 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -258,7 +258,7 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { //////////////////////////////////////////////////////////////////////////////// -// routerInterface is a helper that implements peerInterface +// routerInterface is a helper that implements linkInterface type routerInterface struct { router *router }