diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 8869bd2..851a376 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -12,6 +12,8 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" + + "github.com/Arceliar/phony" ) // The peers struct represents peers with an active connection. @@ -97,6 +99,7 @@ type peer struct { bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element + phony.Actor core *Core intf *linkInterface port switchPort @@ -206,7 +209,7 @@ func (p *peer) linkLoop() { if !ok { return } - p.sendSwitchMsg() + <-p.SyncExec(p._sendSwitchMsg) case dinfo = <-p.dinfo: case _ = <-tick.C: if dinfo != nil { @@ -227,20 +230,19 @@ func (p *peer) handlePacket(packet []byte) { } switch pType { case wire_Traffic: - p.handleTraffic(packet, pTypeLen) + p._handleTraffic(packet, pTypeLen) case wire_ProtocolTraffic: - p.handleTraffic(packet, pTypeLen) + p._handleTraffic(packet, pTypeLen) case wire_LinkProtocolTraffic: - p.handleLinkTraffic(packet) + p._handleLinkTraffic(packet) default: util.PutBytes(packet) } - return } // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. -func (p *peer) handleTraffic(packet []byte, pTypeLen int) { +func (p *peer) _handleTraffic(packet []byte, pTypeLen int) { table := p.core.switchTable.getTable() if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { // Drop traffic if the peer isn't in the switch @@ -249,8 +251,14 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { p.core.switchTable.packetIn <- packet } +func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) { + p.EnqueueFrom(from, func() { + p._sendPackets(packets) + }) +} + // This just calls p.out(packet) for now. -func (p *peer) sendPackets(packets [][]byte) { +func (p *peer) _sendPackets(packets [][]byte) { // Is there ever a case where something more complicated is needed? // What if p.out blocks? var size int @@ -263,7 +271,7 @@ func (p *peer) sendPackets(packets [][]byte) { // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. -func (p *peer) sendLinkPacket(packet []byte) { +func (p *peer) _sendLinkPacket(packet []byte) { innerPayload, innerNonce := crypto.BoxSeal(&p.linkShared, packet, nil) innerLinkPacket := wire_linkProtoTrafficPacket{ Nonce: *innerNonce, @@ -281,7 +289,7 @@ func (p *peer) sendLinkPacket(packet []byte) { // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. // Identifies the link traffic type and calls the appropriate handler. -func (p *peer) handleLinkTraffic(bs []byte) { +func (p *peer) _handleLinkTraffic(bs []byte) { packet := wire_linkProtoTrafficPacket{} if !packet.decode(bs) { return @@ -304,14 +312,14 @@ func (p *peer) handleLinkTraffic(bs []byte) { } switch pType { case wire_SwitchMsg: - p.handleSwitchMsg(payload) + p._handleSwitchMsg(payload) default: util.PutBytes(bs) } } // Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. -func (p *peer) sendSwitchMsg() { +func (p *peer) _sendSwitchMsg() { msg := p.core.switchTable.getMsg() if msg == nil { return @@ -323,12 +331,12 @@ func (p *peer) sendSwitchMsg() { Sig: *crypto.Sign(&p.core.sigPriv, bs), }) packet := msg.encode() - p.sendLinkPacket(packet) + p._sendLinkPacket(packet) } // Handles a switchMsg from the peer, checking signatures and passing good messages to the switch. // Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins). -func (p *peer) handleSwitchMsg(packet []byte) { +func (p *peer) _handleSwitchMsg(packet []byte) { var msg switchMsg if !msg.decode(packet) { return diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 1fa75a6..d87b788 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -703,7 +703,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo if best != nil { // Send to the best idle next hop delete(idle, best.port) - best.sendPackets([][]byte{packet}) + best.sendPacketsFrom(nil, [][]byte{packet}) return true } // Didn't find anyone idle to send it to @@ -817,7 +817,8 @@ func (t *switchTable) handleIdle(port switchPort) bool { } } if len(packets) > 0 { - to.sendPackets(packets) + // TODO rewrite if/when the switch becomes an actor + to.sendPacketsFrom(nil, packets) return true } return false @@ -830,7 +831,8 @@ func (t *switchTable) doWorker() { // Keep sending packets to the router self := t.core.peers.getPorts()[0] for bs := range sendingToRouter { - self.sendPackets([][]byte{bs}) + // TODO remove this ugly mess of goroutines if/when the switch becomes an actor + <-self.SyncExec(func() { self._sendPackets([][]byte{bs}) }) } }() go func() {