diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 4ce374b..824afd3 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -387,7 +387,8 @@ func (intf *linkInterface) handler() error { for { msg, err := intf.msgIO.readMsg() if len(msg) > 0 { - intf.peer.handlePacket(msg) + // TODO rewrite this if the link becomes an actor + <-intf.peer.SyncExec(func() { intf.peer._handlePacket(msg) }) } if err != nil { if err != io.EOF { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 851a376..22db88d 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -219,9 +219,15 @@ func (p *peer) linkLoop() { } } +func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) { + p.EnqueueFrom(from, func() { + p._handlePacket(packet) + }) +} + // Called to handle incoming packets. // Passes the packet to a handler for that packet type. -func (p *peer) handlePacket(packet []byte) { +func (p *peer) _handlePacket(packet []byte) { // FIXME this is off by stream padding and msg length overhead, should be done in tcp.go atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) pType, pTypeLen := wire_decode_uint64(packet) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 464a477..adf1b1d 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -64,11 +64,8 @@ func (r *router) init(core *Core) { }, } p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - p.out = func(packets [][]byte) { - // TODO make peers and/or the switch into actors, have them pass themselves as the from field - r.handlePackets(nil, packets) - } - r.out = p.handlePacket // TODO if the peer becomes its own actor, then send a message here + p.out = func(packets [][]byte) { r.handlePackets(p, packets) } + r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)