diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index e337b34..a151537 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -61,11 +61,10 @@ func (c *Conn) startSearch() { func (c *Conn) Read(b []byte) (int, error) { if c.session == nil { - return 0, errors.New("session not open") + return 0, errors.New("session not ready yet") } if !c.session.init { - // To prevent blocking forever on a session that isn't initialised - return 0, errors.New("session not initialised") + return 0, errors.New("waiting for remote side to accept") } select { case p, ok := <-c.session.recv: @@ -84,6 +83,7 @@ func (c *Conn) Read(b []byte) (int, error) { util.PutBytes(bs) return errors.New("packet dropped due to decryption failure") } + // c.core.log.Println("HOW MANY BYTES?", len(bs)) b = b[:0] b = append(b, bs...) c.session.updateNonce(&p.Nonce) @@ -96,7 +96,7 @@ func (c *Conn) Read(b []byte) (int, error) { atomic.AddUint64(&c.session.bytesRecvd, uint64(len(b))) return len(b), nil case <-c.session.closed: - return len(b), errors.New("session was closed") + return len(b), errors.New("session closed") } } @@ -105,12 +105,12 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.core.router.doAdmin(func() { c.startSearch() }) - return 0, errors.New("session not open") + return 0, errors.New("session not ready yet") } defer util.PutBytes(b) if !c.session.init { // To prevent using empty session keys - return 0, errors.New("session not initialised") + return 0, errors.New("waiting for remote side to accept") } // code isn't multithreaded so appending to this is safe coords := c.session.coords @@ -130,13 +130,14 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { select { case c.session.send <- packet: case <-c.session.closed: - return len(b), errors.New("session was closed") + return len(b), errors.New("session closed") } c.session.core.router.out(packet) return len(b), nil } func (c *Conn) Close() error { + c.session.close() return nil } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 22caf08..dba1c64 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -254,6 +254,20 @@ func (c *Core) Stop() { c.admin.close() } +// ListenConn returns a listener for Yggdrasil session connections. +func (c *Core) ListenConn() (*Listener, error) { + c.sessions.listenerMutex.Lock() + defer c.sessions.listenerMutex.Unlock() + if c.sessions.listener != nil { + return nil, errors.New("a listener already exists") + } + c.sessions.listener = &Listener{ + conn: make(chan *Conn), + close: make(chan interface{}), + } + return c.sessions.listener, nil +} + // Dial opens a session to the given node. The first paramter should be "nodeid" // and the second parameter should contain a hexadecimal representation of the // target node ID. diff --git a/src/yggdrasil/listener.go b/src/yggdrasil/listener.go new file mode 100644 index 0000000..268d8b7 --- /dev/null +++ b/src/yggdrasil/listener.go @@ -0,0 +1,41 @@ +package yggdrasil + +import ( + "errors" + "net" +) + +// Listener waits for incoming sessions +type Listener struct { + conn chan *Conn + close chan interface{} +} + +// Accept blocks until a new incoming session is received +func (l *Listener) Accept() (*Conn, error) { + select { + case c, ok := <-l.conn: + if !ok { + return nil, errors.New("listener closed") + } + return c, nil + case <-l.close: + return nil, errors.New("listener closed") + } +} + +// Close will stop the listener +func (l *Listener) Close() (err error) { + defer func() { + recover() + err = errors.New("already closed") + }() + close(l.close) + close(l.conn) + return nil +} + +// Addr is not implemented for this type yet +func (l *Listener) Addr() net.Addr { + return nil +} diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index c5319bc..64b5d29 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -105,16 +105,18 @@ func (s *sessionInfo) timedout() bool { // Sessions are indexed by handle. // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { - core *Core - reconfigure chan chan error - lastCleanup time.Time - permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot - sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info - conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections - byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle - byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle - addrToPerm map[address.Address]*crypto.BoxPubKey - subnetToPerm map[address.Subnet]*crypto.BoxPubKey + core *Core + listener *Listener + listenerMutex sync.Mutex + reconfigure chan chan error + lastCleanup time.Time + permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot + sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info + conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections + byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle + byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle + addrToPerm map[address.Address]*crypto.BoxPubKey + subnetToPerm map[address.Subnet]*crypto.BoxPubKey } // Initializes the session struct. @@ -461,6 +463,22 @@ func (ss *sessions) handlePing(ping *sessionPing) { if !isIn { panic("This should not happen") } + ss.listenerMutex.Lock() + if ss.listener != nil { + conn := &Conn{ + core: ss.core, + session: sinfo, + nodeID: crypto.GetNodeID(&sinfo.theirPermPub), + nodeMask: &crypto.NodeID{}, + } + for i := range conn.nodeMask { + conn.nodeMask[i] = 0xFF + } + ss.listener.conn <- conn + } else { + ss.core.log.Debugln("Received new session but there is no listener, ignoring") + } + ss.listenerMutex.Unlock() } // Update the session if !sinfo.update(ping) { /*panic("Should not happen in testing")*/