diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 573b6e7..7207b22 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -45,7 +45,7 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*a inPacket := func(packet []byte) { intf.link.fromlink <- packet } - intf.stream.init(inPacket) + intf.stream.init(nil, inPacket) // FIXME nil = ReadWriteCloser go intf.handler() l.mutex.Lock() l.interfaces[name] = &intf diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 32b5ea7..423a968 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -17,6 +17,15 @@ type link struct { interfaces map[string]*linkInterface } +type linkInterfaceMsgIO interface { + readMsg() ([]byte, error) + writeMsg([]byte) (int, error) + close() error + // These are temporary workarounds to stream semantics + _sendMetaBytes([]byte) error + _recvMetaBytes() ([]byte, error) +} + type linkInterface struct { name string link *link diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 43eff3f..ecfa245 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -3,21 +3,115 @@ package yggdrasil import ( "errors" "fmt" + "io" "github.com/yggdrasil-network/yggdrasil-go/src/util" ) +// Test that this matches the interface we expect +var _ = linkInterfaceMsgIO(&stream{}) + type stream struct { - inputBuffer []byte + rwc io.ReadWriteCloser + inputBuffer []byte // Incoming packet stream + didFirstSend bool // Used for metadata exchange + didFirstRecv bool // Used for metadata exchange + // TODO remove the rest, it shouldn't matter in the long run handlePacket func([]byte) } +func (s *stream) close() error { + return s.rwc.Close() +} + const streamMsgSize = 2048 + 65535 var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" -func (s *stream) init(in func([]byte)) { +func (s *stream) init(rwc io.ReadWriteCloser, in func([]byte)) { + // TODO have this also do the metadata handshake and create the peer struct + s.rwc = rwc s.handlePacket = in + + // TODO call something to do the metadata exchange +} + +// writeMsg writes a message with stream padding, and is *not* thread safe. +func (s *stream) writeMsg(bs []byte) (int, error) { + buf := util.GetBytes() + defer util.PutBytes(buf) + buf = append(buf, streamMsg[:]...) + buf = append(buf, wire_encode_uint64(uint64(len(bs)))...) + padLen := len(buf) + buf = append(buf, bs...) + var bn int + for bn < len(buf) { + n, err := s.rwc.Write(buf[bn:]) + bn += n + if err != nil { + l := bn - padLen + if l < 0 { + l = 0 + } + return l, err + } + } + return len(bs), nil +} + +// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. +func (s *stream) readMsg() ([]byte, error) { + for { + buf := s.inputBuffer + msg, ok, err := stream_chopMsg(&buf) + switch { + case err != nil: + // Something in the stream format is corrupt + return nil, fmt.Errorf("message error: %v", err) + case ok: + // Copy the packet into bs, shift the buffer, and return + msg = append(util.GetBytes(), msg...) + s.inputBuffer = append(s.inputBuffer[:0], buf...) + return msg, nil + default: + // Wait for the underlying reader to return enough info for us to proceed + frag := make([]byte, 2*streamMsgSize) + n, err := s.rwc.Read(frag) + if n > 0 { + s.inputBuffer = append(s.inputBuffer, frag[:n]...) + } else if err != nil { + return nil, err + } + } + } +} + +// Writes metadata bytes without stream padding, meant to be temporary +func (s *stream) _sendMetaBytes(metaBytes []byte) error { + var written int + for written < len(metaBytes) { + n, err := s.rwc.Write(metaBytes) + written += n + if err != nil { + return err + } + } + return nil +} + +// Reads metadata bytes without stream padding, meant to be temporary +func (s *stream) _recvMetaBytes() ([]byte, error) { + var meta version_metadata + frag := meta.encode() + metaBytes := make([]byte, 0, len(frag)) + for len(metaBytes) < len(frag) { + n, err := s.rwc.Read(frag) + if err != nil { + return nil, err + } + metaBytes = append(metaBytes, frag[:n]...) + } + return metaBytes, nil } // This reads from the channel into a []byte buffer for incoming messages. It @@ -28,7 +122,8 @@ func (s *stream) init(in func([]byte)) { func (s *stream) handleInput(bs []byte) error { if len(bs) > 0 { s.inputBuffer = append(s.inputBuffer, bs...) - msg, ok, err2 := stream_chopMsg(&s.inputBuffer) + buf := s.inputBuffer + msg, ok, err2 := stream_chopMsg(&buf) if err2 != nil { return fmt.Errorf("message error: %v", err2) } @@ -37,7 +132,7 @@ func (s *stream) handleInput(bs []byte) error { return nil } newMsg := append(util.GetBytes(), msg...) - s.inputBuffer = append(s.inputBuffer[:0], s.inputBuffer...) + s.inputBuffer = append(s.inputBuffer[:0], buf...) s.handlePacket(newMsg) util.Yield() // Make sure we give up control to the scheduler } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 1d4ec99..975da46 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -440,7 +440,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - iface.stream.init(p.handlePacket) + iface.stream.init(sock, p.handlePacket) bs := make([]byte, 2*streamMsgSize) var n int for {