5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-12-23 12:15:39 +00:00

Listen-Accept-Read-Write pattern now works, amazingly

This commit is contained in:
Neil Alexander 2019-04-19 22:57:52 +01:00
parent 27b78b925d
commit aac88adbed
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
4 changed files with 91 additions and 17 deletions

View File

@ -61,11 +61,10 @@ func (c *Conn) startSearch() {
func (c *Conn) Read(b []byte) (int, error) {
if c.session == nil {
return 0, errors.New("session not open")
return 0, errors.New("session not ready yet")
}
if !c.session.init {
// To prevent blocking forever on a session that isn't initialised
return 0, errors.New("session not initialised")
return 0, errors.New("waiting for remote side to accept")
}
select {
case p, ok := <-c.session.recv:
@ -84,6 +83,7 @@ func (c *Conn) Read(b []byte) (int, error) {
util.PutBytes(bs)
return errors.New("packet dropped due to decryption failure")
}
// c.core.log.Println("HOW MANY BYTES?", len(bs))
b = b[:0]
b = append(b, bs...)
c.session.updateNonce(&p.Nonce)
@ -96,7 +96,7 @@ func (c *Conn) Read(b []byte) (int, error) {
atomic.AddUint64(&c.session.bytesRecvd, uint64(len(b)))
return len(b), nil
case <-c.session.closed:
return len(b), errors.New("session was closed")
return len(b), errors.New("session closed")
}
}
@ -105,12 +105,12 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.core.router.doAdmin(func() {
c.startSearch()
})
return 0, errors.New("session not open")
return 0, errors.New("session not ready yet")
}
defer util.PutBytes(b)
if !c.session.init {
// To prevent using empty session keys
return 0, errors.New("session not initialised")
return 0, errors.New("waiting for remote side to accept")
}
// code isn't multithreaded so appending to this is safe
coords := c.session.coords
@ -130,13 +130,14 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
select {
case c.session.send <- packet:
case <-c.session.closed:
return len(b), errors.New("session was closed")
return len(b), errors.New("session closed")
}
c.session.core.router.out(packet)
return len(b), nil
}
func (c *Conn) Close() error {
c.session.close()
return nil
}

View File

@ -254,6 +254,20 @@ func (c *Core) Stop() {
c.admin.close()
}
// ListenConn returns a listener for Yggdrasil session connections.
func (c *Core) ListenConn() (*Listener, error) {
c.sessions.listenerMutex.Lock()
defer c.sessions.listenerMutex.Unlock()
if c.sessions.listener != nil {
return nil, errors.New("a listener already exists")
}
c.sessions.listener = &Listener{
conn: make(chan *Conn),
close: make(chan interface{}),
}
return c.sessions.listener, nil
}
// Dial opens a session to the given node. The first paramter should be "nodeid"
// and the second parameter should contain a hexadecimal representation of the
// target node ID.

41
src/yggdrasil/listener.go Normal file
View File

@ -0,0 +1,41 @@
package yggdrasil
import (
"errors"
"net"
)
// Listener waits for incoming sessions
type Listener struct {
conn chan *Conn
close chan interface{}
}
// Accept blocks until a new incoming session is received
func (l *Listener) Accept() (*Conn, error) {
select {
case c, ok := <-l.conn:
if !ok {
return nil, errors.New("listener closed")
}
return c, nil
case <-l.close:
return nil, errors.New("listener closed")
}
}
// Close will stop the listener
func (l *Listener) Close() (err error) {
defer func() {
recover()
err = errors.New("already closed")
}()
close(l.close)
close(l.conn)
return nil
}
// Addr is not implemented for this type yet
func (l *Listener) Addr() net.Addr {
return nil
}

View File

@ -105,16 +105,18 @@ func (s *sessionInfo) timedout() bool {
// Sessions are indexed by handle.
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
type sessions struct {
core *Core
reconfigure chan chan error
lastCleanup time.Time
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot
sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info
conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections
byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle
byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle
addrToPerm map[address.Address]*crypto.BoxPubKey
subnetToPerm map[address.Subnet]*crypto.BoxPubKey
core *Core
listener *Listener
listenerMutex sync.Mutex
reconfigure chan chan error
lastCleanup time.Time
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot
sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info
conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections
byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle
byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle
addrToPerm map[address.Address]*crypto.BoxPubKey
subnetToPerm map[address.Subnet]*crypto.BoxPubKey
}
// Initializes the session struct.
@ -461,6 +463,22 @@ func (ss *sessions) handlePing(ping *sessionPing) {
if !isIn {
panic("This should not happen")
}
ss.listenerMutex.Lock()
if ss.listener != nil {
conn := &Conn{
core: ss.core,
session: sinfo,
nodeID: crypto.GetNodeID(&sinfo.theirPermPub),
nodeMask: &crypto.NodeID{},
}
for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF
}
ss.listener.conn <- conn
} else {
ss.core.log.Debugln("Received new session but there is no listener, ignoring")
}
ss.listenerMutex.Unlock()
}
// Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/