mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-26 11:51:37 +00:00
clean up unused session code
This commit is contained in:
parent
533da351f9
commit
e3603c0462
@ -161,12 +161,6 @@ func (r *router) _handleTraffic(packet []byte) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
sinfo.recv(r, &p)
|
sinfo.recv(r, &p)
|
||||||
return
|
|
||||||
select {
|
|
||||||
case sinfo.fromRouter <- p:
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
util.PutBytes(p.Payload)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handles protocol traffic by decrypting it, checking its type, and passing it to the appropriate handler for that traffic type.
|
// Handles protocol traffic by decrypting it, checking its type, and passing it to the appropriate handler for that traffic type.
|
||||||
|
@ -7,7 +7,6 @@ package yggdrasil
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"errors"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -71,9 +70,7 @@ type sessionInfo struct {
|
|||||||
bytesRecvd uint64 // Bytes of real traffic received in this session
|
bytesRecvd uint64 // Bytes of real traffic received in this session
|
||||||
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
|
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
|
||||||
cancel util.Cancellation // Used to terminate workers
|
cancel util.Cancellation // Used to terminate workers
|
||||||
fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session
|
|
||||||
toConn chan []byte // Decrypted packets go here, picked up by the associated Conn
|
toConn chan []byte // Decrypted packets go here, picked up by the associated Conn
|
||||||
fromConn chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent
|
|
||||||
callbacks []chan func() // Finished work from crypto workers
|
callbacks []chan func() // Finished work from crypto workers
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,9 +250,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
|
|||||||
sinfo.myHandle = *crypto.NewHandle()
|
sinfo.myHandle = *crypto.NewHandle()
|
||||||
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.fromRouter = make(chan wire_trafficPacket, 1)
|
|
||||||
sinfo.toConn = make(chan []byte, 32)
|
sinfo.toConn = make(chan []byte, 32)
|
||||||
sinfo.fromConn = make(chan FlowKeyMessage, 32)
|
|
||||||
ss.sinfos[sinfo.myHandle] = &sinfo
|
ss.sinfos[sinfo.myHandle] = &sinfo
|
||||||
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
||||||
go func() {
|
go func() {
|
||||||
@ -479,207 +474,11 @@ func (ss *sessions) reset() {
|
|||||||
//////////////////////////// Worker Functions Below ////////////////////////////
|
//////////////////////////// Worker Functions Below ////////////////////////////
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
func (sinfo *sessionInfo) startWorkers() {
|
|
||||||
go sinfo.recvWorker()
|
|
||||||
go sinfo.sendWorker()
|
|
||||||
}
|
|
||||||
|
|
||||||
type FlowKeyMessage struct {
|
type FlowKeyMessage struct {
|
||||||
FlowKey uint64
|
FlowKey uint64
|
||||||
Message []byte
|
Message []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sinfo *sessionInfo) recvWorker() {
|
|
||||||
// TODO move theirNonce etc into a struct that gets stored here, passed in over a channel
|
|
||||||
// Since there's no reason for anywhere else in the session code to need to *read* it...
|
|
||||||
// Only needs to be updated from the outside if a ping resets it...
|
|
||||||
// That would get rid of the need to take a mutex for the sessionFunc
|
|
||||||
var callbacks []chan func()
|
|
||||||
doRecv := func(p wire_trafficPacket) {
|
|
||||||
var bs []byte
|
|
||||||
var err error
|
|
||||||
var k crypto.BoxSharedKey
|
|
||||||
sessionFunc := func() {
|
|
||||||
if !sinfo._nonceIsOK(&p.Nonce) {
|
|
||||||
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
k = sinfo.sharedSesKey
|
|
||||||
}
|
|
||||||
sinfo.doFunc(sessionFunc)
|
|
||||||
if err != nil {
|
|
||||||
util.PutBytes(p.Payload)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var isOK bool
|
|
||||||
ch := make(chan func(), 1)
|
|
||||||
poolFunc := func() {
|
|
||||||
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
|
|
||||||
callback := func() {
|
|
||||||
util.PutBytes(p.Payload)
|
|
||||||
if !isOK {
|
|
||||||
util.PutBytes(bs)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sessionFunc = func() {
|
|
||||||
if k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
|
|
||||||
// The session updated in the mean time, so return an error
|
|
||||||
err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sinfo._updateNonce(&p.Nonce)
|
|
||||||
sinfo.time = time.Now()
|
|
||||||
sinfo.bytesRecvd += uint64(len(bs))
|
|
||||||
}
|
|
||||||
sinfo.doFunc(sessionFunc)
|
|
||||||
if err != nil {
|
|
||||||
// Not sure what else to do with this packet, I guess just drop it
|
|
||||||
util.PutBytes(bs)
|
|
||||||
} else {
|
|
||||||
// Pass the packet to the buffer for Conn.Read
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
util.PutBytes(bs)
|
|
||||||
case sinfo.toConn <- bs:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ch <- callback
|
|
||||||
}
|
|
||||||
// Send to the worker and wait for it to finish
|
|
||||||
util.WorkerGo(poolFunc)
|
|
||||||
callbacks = append(callbacks, ch)
|
|
||||||
}
|
|
||||||
fromHelper := make(chan wire_trafficPacket, 1)
|
|
||||||
go func() {
|
|
||||||
var buf []wire_trafficPacket
|
|
||||||
for {
|
|
||||||
for len(buf) > 0 {
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case p := <-sinfo.fromRouter:
|
|
||||||
buf = append(buf, p)
|
|
||||||
for len(buf) > 64 { // Based on nonce window size
|
|
||||||
util.PutBytes(buf[0].Payload)
|
|
||||||
buf = buf[1:]
|
|
||||||
}
|
|
||||||
case fromHelper <- buf[0]:
|
|
||||||
buf = buf[1:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case p := <-sinfo.fromRouter:
|
|
||||||
buf = append(buf, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case <-sinfo.init:
|
|
||||||
// Wait until the session has finished initializing before processing any packets
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
for len(callbacks) > 0 {
|
|
||||||
select {
|
|
||||||
case f := <-callbacks[0]:
|
|
||||||
callbacks = callbacks[1:]
|
|
||||||
f()
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case p := <-fromHelper:
|
|
||||||
doRecv(p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case p := <-fromHelper:
|
|
||||||
doRecv(p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sinfo *sessionInfo) sendWorker() {
|
|
||||||
// TODO move info that this worker needs here, send updates via a channel
|
|
||||||
// Otherwise we need to take a mutex to avoid races with update()
|
|
||||||
var callbacks []chan func()
|
|
||||||
doSend := func(msg FlowKeyMessage) {
|
|
||||||
var p wire_trafficPacket
|
|
||||||
var k crypto.BoxSharedKey
|
|
||||||
sessionFunc := func() {
|
|
||||||
sinfo.bytesSent += uint64(len(msg.Message))
|
|
||||||
p = wire_trafficPacket{
|
|
||||||
Coords: append([]byte(nil), sinfo.coords...),
|
|
||||||
Handle: sinfo.theirHandle,
|
|
||||||
Nonce: sinfo.myNonce,
|
|
||||||
}
|
|
||||||
if msg.FlowKey != 0 {
|
|
||||||
// Helps ensure that traffic from this flow ends up in a separate queue from other flows
|
|
||||||
// The zero padding relies on the fact that the self-peer is always on port 0
|
|
||||||
p.Coords = append(p.Coords, 0)
|
|
||||||
p.Coords = wire_put_uint64(msg.FlowKey, p.Coords)
|
|
||||||
}
|
|
||||||
sinfo.myNonce.Increment()
|
|
||||||
k = sinfo.sharedSesKey
|
|
||||||
}
|
|
||||||
// Get the mutex-protected info needed to encrypt the packet
|
|
||||||
sinfo.doFunc(sessionFunc)
|
|
||||||
ch := make(chan func(), 1)
|
|
||||||
poolFunc := func() {
|
|
||||||
// Encrypt the packet
|
|
||||||
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
|
|
||||||
// The callback will send the packet
|
|
||||||
callback := func() {
|
|
||||||
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
|
|
||||||
packet := p.encode()
|
|
||||||
// Cleanup
|
|
||||||
util.PutBytes(msg.Message)
|
|
||||||
util.PutBytes(p.Payload)
|
|
||||||
// Send the packet
|
|
||||||
// TODO replace this with a send to the peer struct if that becomes an actor
|
|
||||||
sinfo.sessions.router.EnqueueFrom(sinfo, func() {
|
|
||||||
sinfo.sessions.router.out(packet)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
ch <- callback
|
|
||||||
}
|
|
||||||
// Send to the worker and wait for it to finish
|
|
||||||
util.WorkerGo(poolFunc)
|
|
||||||
callbacks = append(callbacks, ch)
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case <-sinfo.init:
|
|
||||||
// Wait until the session has finished initializing before processing any packets
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
for len(callbacks) > 0 {
|
|
||||||
select {
|
|
||||||
case f := <-callbacks[0]:
|
|
||||||
callbacks = callbacks[1:]
|
|
||||||
f()
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case msg := <-sinfo.fromConn:
|
|
||||||
doSend(msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
return
|
|
||||||
case bs := <-sinfo.fromConn:
|
|
||||||
doSend(bs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) {
|
func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) {
|
||||||
sinfo.EnqueueFrom(from, func() {
|
sinfo.EnqueueFrom(from, func() {
|
||||||
sinfo._recvPacket(packet)
|
sinfo._recvPacket(packet)
|
||||||
|
Loading…
Reference in New Issue
Block a user