diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index d39f129..ea0b1a1 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -442,6 +442,7 @@ func (sinfo *sessionInfo) recvWorker() { // Since there's no reason for anywhere else in the session code to need to *read* it... // Only needs to be updated from the outside if a ping resets it... // That would get rid of the need to take a mutex for the sessionFunc + var callbacks []chan func() doRecv := func(p *wire_trafficPacket) { var bs []byte var err error @@ -459,31 +460,51 @@ func (sinfo *sessionInfo) recvWorker() { return } var isOK bool - bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) - if !isOK { - util.PutBytes(bs) - return - } - sessionFunc = func() { - if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) { - // The session updated in the mean time, so return an error - err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0} - return + ch := make(chan func(), 1) + poolFunc := func() { + bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) + callback := func() { + if !isOK { + util.PutBytes(bs) + return + } + sessionFunc = func() { + if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) { + // The session updated in the mean time, so return an error + err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0} + return + } + sinfo.updateNonce(&p.Nonce) + sinfo.time = time.Now() + sinfo.bytesRecvd += uint64(len(bs)) + } + sinfo.doFunc(sessionFunc) + if err != nil { + // Not sure what else to do with this packet, I guess just drop it + util.PutBytes(bs) + } else { + // Pass the packet to the buffer for Conn.Read + sinfo.recv <- bs + } } - sinfo.updateNonce(&p.Nonce) - sinfo.time = time.Now() - sinfo.bytesRecvd += uint64(len(bs)) - } - sinfo.doFunc(sessionFunc) - if err != nil { - // Not sure what else to do with this packet, I guess just drop it - util.PutBytes(bs) - } else { - // Pass the packet to the buffer for Conn.Read - sinfo.recv <- bs + ch <- callback } + // Send to the worker and wait for it to finish + util.WorkerGo(poolFunc) + callbacks = append(callbacks, ch) } for { + for len(callbacks) > 0 { + select { + case f := <-callbacks[0]: + callbacks = callbacks[1:] + f() + case <-sinfo.cancel.Finished(): + return + case p := <-sinfo.fromRouter: + doRecv(p) + } + } select { case <-sinfo.cancel.Finished(): return @@ -496,6 +517,7 @@ func (sinfo *sessionInfo) recvWorker() { func (sinfo *sessionInfo) sendWorker() { // TODO move info that this worker needs here, send updates via a channel // Otherwise we need to take a mutex to avoid races with update() + var callbacks []chan func() doSend := func(bs []byte) { var p wire_trafficPacket var k crypto.BoxSharedKey @@ -511,16 +533,34 @@ func (sinfo *sessionInfo) sendWorker() { } // Get the mutex-protected info needed to encrypt the packet sinfo.doFunc(sessionFunc) - // Encrypt the packet - p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce) - packet := p.encode() - // Cleanup - util.PutBytes(bs) - util.PutBytes(p.Payload) - // Send the packet - sinfo.core.router.out(packet) + ch := make(chan func(), 1) + poolFunc := func() { + // Encrypt the packet + p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce) + packet := p.encode() + // Cleanup + util.PutBytes(bs) + util.PutBytes(p.Payload) + // The callback will send the packet + callback := func() { sinfo.core.router.out(packet) } + ch <- callback + } + // Send to the worker and wait for it to finish + util.WorkerGo(poolFunc) + callbacks = append(callbacks, ch) } for { + for len(callbacks) > 0 { + select { + case f := <-callbacks[0]: + callbacks = callbacks[1:] + f() + case <-sinfo.cancel.Finished(): + return + case bs := <-sinfo.send: + doSend(bs) + } + } select { case <-sinfo.cancel.Finished(): return