mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-22 23:41:35 +00:00
have the conn actor receive messages from the session actor and either pass them to a callback or buffer them in a channel for Read to use if no callback was set
This commit is contained in:
parent
9948e3d659
commit
1e346aaad0
@ -62,15 +62,18 @@ type Conn struct {
|
|||||||
nodeMask *crypto.NodeID
|
nodeMask *crypto.NodeID
|
||||||
session *sessionInfo
|
session *sessionInfo
|
||||||
mtu uint16
|
mtu uint16
|
||||||
|
readCallback func([]byte)
|
||||||
|
readBuffer chan []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO func NewConn() that initializes additional fields as needed
|
// TODO func NewConn() that initializes additional fields as needed
|
||||||
func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn {
|
func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn {
|
||||||
conn := Conn{
|
conn := Conn{
|
||||||
core: core,
|
core: core,
|
||||||
nodeID: nodeID,
|
nodeID: nodeID,
|
||||||
nodeMask: nodeMask,
|
nodeMask: nodeMask,
|
||||||
session: session,
|
session: session,
|
||||||
|
readBuffer: make(chan []byte, 1024),
|
||||||
}
|
}
|
||||||
return &conn
|
return &conn
|
||||||
}
|
}
|
||||||
@ -154,6 +157,45 @@ func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetReadCallback sets a callback which will be called whenever a packet is received.
|
||||||
|
// Note that calls to Read will fail if the callback has been set to a non-nil value.
|
||||||
|
func (c *Conn) SetReadCallback(callback func([]byte)) {
|
||||||
|
c.EnqueueFrom(nil, func() {
|
||||||
|
c._setReadCallback(callback)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) _setReadCallback(callback func([]byte)) {
|
||||||
|
c.readCallback = callback
|
||||||
|
c._drainReadBuffer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) _drainReadBuffer() {
|
||||||
|
if c.readCallback == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case bs := <-c.readBuffer:
|
||||||
|
c.readCallback(bs)
|
||||||
|
c.EnqueueFrom(nil, c._drainReadBuffer) // In case there's more
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called by the session to pass a new message to the Conn
|
||||||
|
func (c *Conn) recvMsg(from phony.IActor, msg []byte) {
|
||||||
|
c.EnqueueFrom(from, func() {
|
||||||
|
if c.readCallback != nil {
|
||||||
|
c.readCallback(msg)
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case c.readBuffer <- msg:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Used internally by Read, the caller is responsible for util.PutBytes when they're done.
|
// Used internally by Read, the caller is responsible for util.PutBytes when they're done.
|
||||||
func (c *Conn) ReadNoCopy() ([]byte, error) {
|
func (c *Conn) ReadNoCopy() ([]byte, error) {
|
||||||
var cancel util.Cancellation
|
var cancel util.Cancellation
|
||||||
@ -170,7 +212,7 @@ func (c *Conn) ReadNoCopy() ([]byte, error) {
|
|||||||
} else {
|
} else {
|
||||||
return nil, ConnError{errors.New("session closed"), false, false, true, 0}
|
return nil, ConnError{errors.New("session closed"), false, false, true, 0}
|
||||||
}
|
}
|
||||||
case bs := <-c.session.toConn:
|
case bs := <-c.readBuffer:
|
||||||
return bs, nil
|
return bs, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -278,6 +320,7 @@ func (c *Conn) LocalAddr() crypto.NodeID {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) RemoteAddr() crypto.NodeID {
|
func (c *Conn) RemoteAddr() crypto.NodeID {
|
||||||
|
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
|
||||||
var n crypto.NodeID
|
var n crypto.NodeID
|
||||||
<-c.SyncExec(func() { n = *c.nodeID })
|
<-c.SyncExec(func() { n = *c.nodeID })
|
||||||
return n
|
return n
|
||||||
@ -290,11 +333,13 @@ func (c *Conn) SetDeadline(t time.Time) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) SetReadDeadline(t time.Time) error {
|
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||||
|
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
|
||||||
<-c.SyncExec(func() { c.readDeadline = &t })
|
<-c.SyncExec(func() { c.readDeadline = &t })
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||||||
|
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
|
||||||
<-c.SyncExec(func() { c.writeDeadline = &t })
|
<-c.SyncExec(func() { c.writeDeadline = &t })
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,6 @@ type sessionInfo struct {
|
|||||||
bytesRecvd uint64 // Bytes of real traffic received in this session
|
bytesRecvd uint64 // Bytes of real traffic received in this session
|
||||||
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
|
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
|
||||||
cancel util.Cancellation // Used to terminate workers
|
cancel util.Cancellation // Used to terminate workers
|
||||||
toConn chan []byte // Decrypted packets go here, picked up by the associated Conn
|
|
||||||
conn *Conn // The associated Conn object
|
conn *Conn // The associated Conn object
|
||||||
callbacks []chan func() // Finished work from crypto workers
|
callbacks []chan func() // Finished work from crypto workers
|
||||||
}
|
}
|
||||||
@ -254,7 +253,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
|
|||||||
sinfo.myHandle = *crypto.NewHandle()
|
sinfo.myHandle = *crypto.NewHandle()
|
||||||
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.toConn = make(chan []byte, 32)
|
|
||||||
ss.sinfos[sinfo.myHandle] = &sinfo
|
ss.sinfos[sinfo.myHandle] = &sinfo
|
||||||
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
||||||
go func() {
|
go func() {
|
||||||
@ -505,15 +503,10 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
|
|||||||
util.PutBytes(p.Payload)
|
util.PutBytes(p.Payload)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch {
|
if !sinfo._nonceIsOK(&p.Nonce) {
|
||||||
case sinfo._nonceIsOK(&p.Nonce):
|
|
||||||
case len(sinfo.toConn) < cap(sinfo.toConn):
|
|
||||||
default:
|
|
||||||
// We're either full or don't like this nonce
|
|
||||||
util.PutBytes(p.Payload)
|
util.PutBytes(p.Payload)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
k := sinfo.sharedSesKey
|
k := sinfo.sharedSesKey
|
||||||
var isOK bool
|
var isOK bool
|
||||||
var bs []byte
|
var bs []byte
|
||||||
@ -530,16 +523,7 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
|
|||||||
sinfo._updateNonce(&p.Nonce)
|
sinfo._updateNonce(&p.Nonce)
|
||||||
sinfo.time = time.Now()
|
sinfo.time = time.Now()
|
||||||
sinfo.bytesRecvd += uint64(len(bs))
|
sinfo.bytesRecvd += uint64(len(bs))
|
||||||
select {
|
sinfo.conn.recvMsg(sinfo, bs)
|
||||||
case sinfo.toConn <- bs:
|
|
||||||
case <-sinfo.cancel.Finished():
|
|
||||||
util.PutBytes(bs)
|
|
||||||
default:
|
|
||||||
// We seem to have filled up the buffer in the mean time
|
|
||||||
// Since we need to not block, but the conn isn't an actor, we need to drop this packet
|
|
||||||
// TODO find some nicer way to interact with the Conn...
|
|
||||||
util.PutBytes(bs)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ch <- callback
|
ch <- callback
|
||||||
sinfo.checkCallbacks()
|
sinfo.checkCallbacks()
|
||||||
|
Loading…
Reference in New Issue
Block a user