From b337228aa45436736640492b54cd64059f21a199 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 24 Aug 2019 14:24:42 -0500 Subject: [PATCH] minor fixes to peer stuff --- src/yggdrasil/api.go | 43 +++++++++++++++++++++++------------------- src/yggdrasil/debug.go | 2 +- src/yggdrasil/peer.go | 20 +++++++++++++------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index f50c8ce..cbf232a 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "sort" - "sync/atomic" "time" "github.com/gologme/log" @@ -106,15 +105,18 @@ func (c *Core) GetPeers() []Peer { sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] }) for _, port := range ps { p := ports[port] - info := Peer{ - Endpoint: p.intf.name, - BytesSent: atomic.LoadUint64(&p.bytesSent), - BytesRecvd: atomic.LoadUint64(&p.bytesRecvd), - Protocol: p.intf.info.linkType, - Port: uint64(port), - Uptime: time.Since(p.firstSeen), - } - copy(info.PublicKey[:], p.box[:]) + var info Peer + <-p.SyncExec(func() { + info = Peer{ + Endpoint: p.intf.name, + BytesSent: p.bytesSent, + BytesRecvd: p.bytesRecvd, + Protocol: p.intf.info.linkType, + Port: uint64(port), + Uptime: time.Since(p.firstSeen), + } + copy(info.PublicKey[:], p.box[:]) + }) peers = append(peers, info) } return peers @@ -135,15 +137,18 @@ func (c *Core) GetSwitchPeers() []SwitchPeer { continue } coords := elem.locator.getCoords() - info := SwitchPeer{ - Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...), - BytesSent: atomic.LoadUint64(&peer.bytesSent), - BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd), - Port: uint64(elem.port), - Protocol: peer.intf.info.linkType, - Endpoint: peer.intf.info.remote, - } - copy(info.PublicKey[:], peer.box[:]) + var info SwitchPeer + <-peer.SyncExec(func() { + info = SwitchPeer{ + Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...), + BytesSent: peer.bytesSent, + BytesRecvd: peer.bytesRecvd, + Port: uint64(elem.port), + Protocol: peer.intf.info.linkType, + Endpoint: peer.intf.info.remote, + } + copy(info.PublicKey[:], peer.box[:]) + }) switchpeers = append(switchpeers, info) } return switchpeers diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index d90bf1f..30eb874 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -576,7 +576,7 @@ func DEBUG_simLinkPeers(p, q *peer) { default: } if len(packets) > 0 { - dest.handlePacket(packets[0]) + <-dest.SyncExec(func() { dest._handlePacket(packets[0]) }) packets = packets[1:] continue } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f3af140..4bebc17 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -96,9 +96,6 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { // Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { - bytesSent uint64 // To track bandwidth usage for getPeers - bytesRecvd uint64 // To track bandwidth usage for getPeers - // BUG: sync/atomic, 32 bit platforms need the above to be the first element phony.Actor core *Core intf *linkInterface @@ -114,6 +111,9 @@ type peer struct { out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes done (chan struct{}) // closed to exit the linkLoop close func() // Called when a peer is removed, to close the underlying connection, or via admin api + // The below aren't actually useful internally, they're just gathered for getPeers statistics + bytesSent uint64 + bytesRecvd uint64 } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. @@ -217,7 +217,7 @@ func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) { // Passes the packet to a handler for that packet type. func (p *peer) _handlePacket(packet []byte) { // FIXME this is off by stream padding and msg length overhead, should be done in tcp.go - atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) + p.bytesRecvd += uint64(len(packet)) pType, pTypeLen := wire_decode_uint64(packet) if pTypeLen == 0 { return @@ -259,10 +259,12 @@ func (p *peer) _sendPackets(packets [][]byte) { for _, packet := range packets { size += len(packet) } - atomic.AddUint64(&p.bytesSent, uint64(size)) + p.bytesSent += uint64(size) p.out(packets) } +var peerLinkOutHelper phony.Actor + // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. func (p *peer) _sendLinkPacket(packet []byte) { @@ -279,8 +281,12 @@ func (p *peer) _sendLinkPacket(packet []byte) { } packet = linkPacket.encode() // TODO replace this with a message send if/when the link becomes an actor - // FIXME not 100% sure the channel send version is deadlock-free... - p.linkOut <- packet + peerLinkOutHelper.EnqueueFrom(nil, func() { + select { + case p.linkOut <- packet: + case <-p.done: + } + }) } // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.