5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-22 23:41:35 +00:00

do session crypto work using the worker pool

This commit is contained in:
Arceliar 2019-08-03 23:27:52 -05:00
parent befd1b43a0
commit 00e9c3dbd9

View File

@ -442,6 +442,7 @@ func (sinfo *sessionInfo) recvWorker() {
// Since there's no reason for anywhere else in the session code to need to *read* it... // Since there's no reason for anywhere else in the session code to need to *read* it...
// Only needs to be updated from the outside if a ping resets it... // Only needs to be updated from the outside if a ping resets it...
// That would get rid of the need to take a mutex for the sessionFunc // That would get rid of the need to take a mutex for the sessionFunc
var callbacks []chan func()
doRecv := func(p *wire_trafficPacket) { doRecv := func(p *wire_trafficPacket) {
var bs []byte var bs []byte
var err error var err error
@ -459,31 +460,51 @@ func (sinfo *sessionInfo) recvWorker() {
return return
} }
var isOK bool var isOK bool
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) ch := make(chan func(), 1)
if !isOK { poolFunc := func() {
util.PutBytes(bs) bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
return callback := func() {
} if !isOK {
sessionFunc = func() { util.PutBytes(bs)
if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) { return
// The session updated in the mean time, so return an error }
err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0} sessionFunc = func() {
return if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) {
// The session updated in the mean time, so return an error
err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0}
return
}
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
}
sinfo.doFunc(sessionFunc)
if err != nil {
// Not sure what else to do with this packet, I guess just drop it
util.PutBytes(bs)
} else {
// Pass the packet to the buffer for Conn.Read
sinfo.recv <- bs
}
} }
sinfo.updateNonce(&p.Nonce) ch <- callback
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
}
sinfo.doFunc(sessionFunc)
if err != nil {
// Not sure what else to do with this packet, I guess just drop it
util.PutBytes(bs)
} else {
// Pass the packet to the buffer for Conn.Read
sinfo.recv <- bs
} }
// Send to the worker and wait for it to finish
util.WorkerGo(poolFunc)
callbacks = append(callbacks, ch)
} }
for { for {
for len(callbacks) > 0 {
select {
case f := <-callbacks[0]:
callbacks = callbacks[1:]
f()
case <-sinfo.cancel.Finished():
return
case p := <-sinfo.fromRouter:
doRecv(p)
}
}
select { select {
case <-sinfo.cancel.Finished(): case <-sinfo.cancel.Finished():
return return
@ -496,6 +517,7 @@ func (sinfo *sessionInfo) recvWorker() {
func (sinfo *sessionInfo) sendWorker() { func (sinfo *sessionInfo) sendWorker() {
// TODO move info that this worker needs here, send updates via a channel // TODO move info that this worker needs here, send updates via a channel
// Otherwise we need to take a mutex to avoid races with update() // Otherwise we need to take a mutex to avoid races with update()
var callbacks []chan func()
doSend := func(bs []byte) { doSend := func(bs []byte) {
var p wire_trafficPacket var p wire_trafficPacket
var k crypto.BoxSharedKey var k crypto.BoxSharedKey
@ -511,16 +533,34 @@ func (sinfo *sessionInfo) sendWorker() {
} }
// Get the mutex-protected info needed to encrypt the packet // Get the mutex-protected info needed to encrypt the packet
sinfo.doFunc(sessionFunc) sinfo.doFunc(sessionFunc)
// Encrypt the packet ch := make(chan func(), 1)
p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce) poolFunc := func() {
packet := p.encode() // Encrypt the packet
// Cleanup p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce)
util.PutBytes(bs) packet := p.encode()
util.PutBytes(p.Payload) // Cleanup
// Send the packet util.PutBytes(bs)
sinfo.core.router.out(packet) util.PutBytes(p.Payload)
// The callback will send the packet
callback := func() { sinfo.core.router.out(packet) }
ch <- callback
}
// Send to the worker and wait for it to finish
util.WorkerGo(poolFunc)
callbacks = append(callbacks, ch)
} }
for { for {
for len(callbacks) > 0 {
select {
case f := <-callbacks[0]:
callbacks = callbacks[1:]
f()
case <-sinfo.cancel.Finished():
return
case bs := <-sinfo.send:
doSend(bs)
}
}
select { select {
case <-sinfo.cancel.Finished(): case <-sinfo.cancel.Finished():
return return