5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-12-22 23:25:39 +00:00

have the core wrap and export the underlying PacketConn, move IPv6 ReadWriteCloser wrapper logic to a separate package

This commit is contained in:
Arceliar 2021-07-05 13:14:12 -05:00
parent 35e8ff7c9d
commit f990a56046
11 changed files with 170 additions and 110 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/defaults"
"github.com/yggdrasil-network/yggdrasil-go/src/core"
"github.com/yggdrasil-network/yggdrasil-go/src/ipv6rwc"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/tuntap"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
@ -353,7 +354,8 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) {
}
n.multicast.SetupAdminHandlers(n.admin)
// Start the TUN/TAP interface
if err := n.tuntap.Init(&n.core, cfg, logger, nil); err != nil {
rwc := ipv6rwc.NewReadWriteCloser(&n.core)
if err := n.tuntap.Init(rwc, cfg, logger, nil); err != nil {
logger.Errorln("An error occurred initialising TUN/TAP:", err)
} else if err := n.tuntap.Start(); err != nil {
logger.Errorln("An error occurred starting TUN/TAP:", err)

View File

@ -48,7 +48,7 @@ type Session struct {
func (c *Core) GetSelf() Self {
var self Self
s := c.pc.PacketConn.Debug.GetSelf()
s := c.PacketConn.PacketConn.Debug.GetSelf()
self.Key = s.Key
self.Root = s.Root
self.Coords = s.Coords
@ -63,7 +63,7 @@ func (c *Core) GetPeers() []Peer {
names[info.conn] = info.lname
}
c.links.mutex.Unlock()
ps := c.pc.PacketConn.Debug.GetPeers()
ps := c.PacketConn.PacketConn.Debug.GetPeers()
for _, p := range ps {
var info Peer
info.Key = p.Key
@ -81,7 +81,7 @@ func (c *Core) GetPeers() []Peer {
func (c *Core) GetDHT() []DHTEntry {
var dhts []DHTEntry
ds := c.pc.PacketConn.Debug.GetDHT()
ds := c.PacketConn.PacketConn.Debug.GetDHT()
for _, d := range ds {
var info DHTEntry
info.Key = d.Key
@ -94,7 +94,7 @@ func (c *Core) GetDHT() []DHTEntry {
func (c *Core) GetPaths() []PathEntry {
var paths []PathEntry
ps := c.pc.PacketConn.Debug.GetPaths()
ps := c.PacketConn.PacketConn.Debug.GetPaths()
for _, p := range ps {
var info PathEntry
info.Key = p.Key
@ -106,7 +106,7 @@ func (c *Core) GetPaths() []PathEntry {
func (c *Core) GetSessions() []Session {
var sessions []Session
ss := c.pc.Debug.GetSessions()
ss := c.PacketConn.Debug.GetSessions()
for _, s := range ss {
var info Session
info.Key = s.Key
@ -239,38 +239,6 @@ func (c *Core) PublicKey() ed25519.PublicKey {
return c.public
}
func (c *Core) MaxMTU() uint64 {
return c.store.maxSessionMTU()
}
func (c *Core) SetMTU(mtu uint64) {
if mtu < 1280 {
mtu = 1280
}
c.store.mutex.Lock()
c.store.mtu = mtu
c.store.mutex.Unlock()
}
func (c *Core) MTU() uint64 {
c.store.mutex.Lock()
mtu := c.store.mtu
c.store.mutex.Unlock()
return mtu
}
// Implement io.ReadWriteCloser
func (c *Core) Read(p []byte) (n int, err error) {
n, err = c.store.readPC(p)
return
}
func (c *Core) Write(p []byte) (n int, err error) {
n, err = c.store.writePC(p)
return
}
func (c *Core) Close() error {
c.Stop()
return nil

View File

@ -7,10 +7,12 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/url"
"time"
iw "github.com/Arceliar/ironwood/encrypted"
iwe "github.com/Arceliar/ironwood/encrypted"
iwt "github.com/Arceliar/ironwood/types"
"github.com/Arceliar/phony"
"github.com/gologme/log"
@ -26,13 +28,12 @@ type Core struct {
// We're going to keep our own copy of the provided config - that way we can
// guarantee that it will be covered by the mutex
phony.Inbox
pc *iw.PacketConn
*iwe.PacketConn
config *config.NodeConfig // Config
secret ed25519.PrivateKey
public ed25519.PublicKey
links links
proto protoHandler
store keyStore
log *log.Logger
addPeerTimer *time.Timer
ctx context.Context
@ -62,9 +63,8 @@ func (c *Core) _init() error {
c.public = c.secret.Public().(ed25519.PublicKey)
// TODO check public against current.PublicKey, error if they don't match
c.pc, err = iw.NewPacketConn(c.secret)
c.PacketConn, err = iwe.NewPacketConn(c.secret)
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.store.init(c)
c.proto.init(c)
if err := c.proto.nodeinfo.setNodeInfo(c.config.NodeInfo, c.config.NodeInfoPrivacy); err != nil {
return fmt.Errorf("setNodeInfo: %w", err)
@ -168,7 +168,7 @@ func (c *Core) Stop() {
func (c *Core) _stop() {
c.log.Infoln("Stopping...")
c.ctxCancel()
c.pc.Close()
c.PacketConn.Close() // TODO make c.Close() do the right thing (act like c.Stop())
if c.addPeerTimer != nil {
c.addPeerTimer.Stop()
c.addPeerTimer = nil
@ -181,3 +181,53 @@ func (c *Core) _stop() {
*/
c.log.Infoln("Stopped")
}
func (c *Core) MTU() uint64 {
const sessionTypeOverhead = 1
return c.PacketConn.MTU() - sessionTypeOverhead
}
func (c *Core) ReadFrom(p []byte) (n int, from net.Addr, err error) {
buf := make([]byte, c.PacketConn.MTU(), 65535)
for {
bs := buf
n, from, err = c.PacketConn.ReadFrom(bs)
if err != nil {
return 0, from, err
}
if n == 0 {
continue
}
switch bs[0] {
case typeSessionTraffic:
// This is what we want to handle here
case typeSessionProto:
var key keyArray
copy(key[:], from.(iwt.Addr))
data := append([]byte(nil), bs[1:n]...)
c.proto.handleProto(nil, key, data)
continue
default:
continue
}
bs = bs[1:n]
copy(p, bs)
if len(p) < len(bs) {
n = len(p)
} else {
n = len(bs)
}
return
}
}
func (c *Core) WriteTo(p []byte, addr net.Addr) (n int, err error) {
buf := make([]byte, 0, 65535)
buf = append(buf, typeSessionTraffic)
buf = append(buf, p...)
n, err = c.PacketConn.WriteTo(buf, addr)
if n > 0 {
n -= 1
}
return
}

View File

@ -230,7 +230,7 @@ func (intf *link) handler() (chan struct{}, error) {
intf.links.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Run the handler
err = intf.links.core.pc.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
err = intf.links.core.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
// TODO don't report an error if it's just a 'use of closed network connection'
if err != nil {
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",

View File

@ -129,7 +129,7 @@ func (m *nodeinfo) _sendReq(key keyArray, callback func(nodeinfo NodeInfoPayload
if callback != nil {
m._addCallback(key, callback)
}
_, _ = m.proto.core.pc.WriteTo([]byte{typeSessionProto, typeProtoNodeInfoRequest}, iwt.Addr(key[:]))
_, _ = m.proto.core.WriteTo([]byte{typeSessionProto, typeProtoNodeInfoRequest}, iwt.Addr(key[:]))
}
func (m *nodeinfo) handleReq(from phony.Actor, key keyArray) {
@ -146,7 +146,7 @@ func (m *nodeinfo) handleRes(from phony.Actor, key keyArray, info NodeInfoPayloa
func (m *nodeinfo) _sendRes(key keyArray) {
bs := append([]byte{typeSessionProto, typeProtoNodeInfoResponse}, m._getNodeInfo()...)
_, _ = m.proto.core.pc.WriteTo(bs, iwt.Addr(key[:]))
_, _ = m.proto.core.WriteTo(bs, iwt.Addr(key[:]))
}
// Admin socket stuff

View File

@ -1,6 +1,7 @@
package core
import (
"crypto/ed25519"
"encoding/hex"
"encoding/json"
"errors"
@ -29,6 +30,8 @@ type reqInfo struct {
timer *time.Timer // time.AfterFunc cleanup
}
type keyArray [ed25519.PublicKeySize]byte
type protoHandler struct {
phony.Inbox
core *Core
@ -149,7 +152,7 @@ func (p *protoHandler) _handleGetPeersRequest(key keyArray) {
for _, pinfo := range peers {
tmp := append(bs, pinfo.Key[:]...)
const responseOverhead = 2 // 1 debug type, 1 getpeers type
if uint64(len(tmp))+responseOverhead > p.core.store.maxSessionMTU() {
if uint64(len(tmp))+responseOverhead > p.core.MTU() {
break
}
bs = tmp
@ -191,7 +194,7 @@ func (p *protoHandler) _handleGetDHTRequest(key keyArray) {
for _, dinfo := range dinfos {
tmp := append(bs, dinfo.Key[:]...)
const responseOverhead = 2 // 1 debug type, 1 getdht type
if uint64(len(tmp))+responseOverhead > p.core.store.maxSessionMTU() {
if uint64(len(tmp))+responseOverhead > p.core.MTU() {
break
}
bs = tmp
@ -209,7 +212,7 @@ func (p *protoHandler) _handleGetDHTResponse(key keyArray, bs []byte) {
func (p *protoHandler) _sendDebug(key keyArray, dType uint8, data []byte) {
bs := append([]byte{typeSessionProto, typeProtoDebug, dType}, data...)
_, _ = p.core.pc.WriteTo(bs, iwt.Addr(key[:]))
_, _ = p.core.WriteTo(bs, iwt.Addr(key[:]))
}
// Admin socket stuff

View File

@ -1,12 +1,5 @@
package core
// Out-of-band packet types
const (
typeKeyDummy = iota // nolint:deadcode,varcheck
typeKeyLookup
typeKeyResponse
)
// In-band packet types
const (
typeSessionDummy = iota // nolint:deadcode,varcheck

View File

@ -1,4 +1,4 @@
package core
package ipv6rwc
// The ICMPv6 module implements functions to easily create ICMPv6
// packets. These functions, when mixed with the built-in Go IPv6

View File

@ -1,4 +1,4 @@
package core
package ipv6rwc
import (
"crypto/ed25519"
@ -14,14 +14,22 @@ import (
iwt "github.com/Arceliar/ironwood/types"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/core"
)
const keyStoreTimeout = 2 * time.Minute
// Out-of-band packet types
const (
typeKeyDummy = iota // nolint:deadcode,varcheck
typeKeyLookup
typeKeyResponse
)
type keyArray [ed25519.PublicKeySize]byte
type keyStore struct {
core *Core
core *core.Core
address address.Address
subnet address.Subnet
mutex sync.Mutex
@ -45,11 +53,11 @@ type buffer struct {
timeout *time.Timer
}
func (k *keyStore) init(core *Core) {
k.core = core
k.address = *address.AddrForKey(k.core.public)
k.subnet = *address.SubnetForKey(k.core.public)
if err := k.core.pc.SetOutOfBandHandler(k.oobHandler); err != nil {
func (k *keyStore) init(c *core.Core) {
k.core = c
k.address = *address.AddrForKey(k.core.PublicKey())
k.subnet = *address.SubnetForKey(k.core.PublicKey())
if err := k.core.SetOutOfBandHandler(k.oobHandler); err != nil {
err = fmt.Errorf("tun.core.SetOutOfBandHander: %w", err)
panic(err)
}
@ -66,7 +74,7 @@ func (k *keyStore) sendToAddress(addr address.Address, bs []byte) {
if info := k.addrToInfo[addr]; info != nil {
k.resetTimeout(info)
k.mutex.Unlock()
_, _ = k.core.pc.WriteTo(bs, iwt.Addr(info.key[:]))
_, _ = k.core.WriteTo(bs, iwt.Addr(info.key[:]))
} else {
var buf *buffer
if buf = k.addrBuffer[addr]; buf == nil {
@ -95,7 +103,7 @@ func (k *keyStore) sendToSubnet(subnet address.Subnet, bs []byte) {
if info := k.subnetToInfo[subnet]; info != nil {
k.resetTimeout(info)
k.mutex.Unlock()
_, _ = k.core.pc.WriteTo(bs, iwt.Addr(info.key[:]))
_, _ = k.core.WriteTo(bs, iwt.Addr(info.key[:]))
} else {
var buf *buffer
if buf = k.subnetBuffer[subnet]; buf == nil {
@ -135,11 +143,11 @@ func (k *keyStore) update(key ed25519.PublicKey) *keyInfo {
k.resetTimeout(info)
k.mutex.Unlock()
if buf := k.addrBuffer[info.address]; buf != nil {
k.core.pc.WriteTo(buf.packet, iwt.Addr(info.key[:]))
k.core.WriteTo(buf.packet, iwt.Addr(info.key[:]))
delete(k.addrBuffer, info.address)
}
if buf := k.subnetBuffer[info.subnet]; buf != nil {
k.core.pc.WriteTo(buf.packet, iwt.Addr(info.key[:]))
k.core.WriteTo(buf.packet, iwt.Addr(info.key[:]))
delete(k.subnetBuffer, info.subnet)
}
} else {
@ -191,46 +199,29 @@ func (k *keyStore) oobHandler(fromKey, toKey ed25519.PublicKey, data []byte) {
}
func (k *keyStore) sendKeyLookup(partial ed25519.PublicKey) {
sig := ed25519.Sign(k.core.secret, partial[:])
sig := ed25519.Sign(k.core.PrivateKey(), partial[:])
bs := append([]byte{typeKeyLookup}, sig...)
_ = k.core.pc.SendOutOfBand(partial, bs)
_ = k.core.SendOutOfBand(partial, bs)
}
func (k *keyStore) sendKeyResponse(dest ed25519.PublicKey) {
sig := ed25519.Sign(k.core.secret, dest[:])
sig := ed25519.Sign(k.core.PrivateKey(), dest[:])
bs := append([]byte{typeKeyResponse}, sig...)
_ = k.core.pc.SendOutOfBand(dest, bs)
}
func (k *keyStore) maxSessionMTU() uint64 {
const sessionTypeOverhead = 1
return k.core.pc.MTU() - sessionTypeOverhead
_ = k.core.SendOutOfBand(dest, bs)
}
func (k *keyStore) readPC(p []byte) (int, error) {
buf := make([]byte, k.core.pc.MTU(), 65535)
buf := make([]byte, k.core.MTU(), 65535)
for {
bs := buf
n, from, err := k.core.pc.ReadFrom(bs)
n, from, err := k.core.ReadFrom(bs)
if err != nil {
return n, err
}
if n == 0 {
continue
}
switch bs[0] {
case typeSessionTraffic:
// This is what we want to handle here
case typeSessionProto:
var key keyArray
copy(key[:], from.(iwt.Addr))
data := append([]byte(nil), bs[1:n]...)
k.core.proto.handleProto(nil, key, data)
continue
default:
continue
}
bs = bs[1:n]
bs = bs[:n]
if len(bs) == 0 {
continue
}
@ -294,15 +285,69 @@ func (k *keyStore) writePC(bs []byte) (int, error) {
strErr := fmt.Sprint("incorrect source address: ", net.IP(srcAddr[:]).String())
return 0, errors.New(strErr)
}
buf := make([]byte, 1+len(bs), 65535)
buf[0] = typeSessionTraffic
copy(buf[1:], bs)
if dstAddr.IsValid() {
k.sendToAddress(dstAddr, buf)
k.sendToAddress(dstAddr, bs)
} else if dstSubnet.IsValid() {
k.sendToSubnet(dstSubnet, buf)
k.sendToSubnet(dstSubnet, bs)
} else {
return 0, errors.New("invalid destination address")
}
return len(bs), nil
}
// Exported API
func (k *keyStore) MaxMTU() uint64 {
return k.core.MTU()
}
func (k *keyStore) SetMTU(mtu uint64) {
if mtu > k.MaxMTU() {
mtu = k.MaxMTU()
}
if mtu < 1280 {
mtu = 1280
}
k.mutex.Lock()
k.mtu = mtu
k.mutex.Unlock()
}
func (k *keyStore) MTU() uint64 {
k.mutex.Lock()
mtu := k.mtu
k.mutex.Unlock()
return mtu
}
type ReadWriteCloser struct {
keyStore
}
func NewReadWriteCloser(c *core.Core) *ReadWriteCloser {
rwc := new(ReadWriteCloser)
rwc.init(c)
return rwc
}
func (rwc *ReadWriteCloser) Address() address.Address {
return rwc.address
}
func (rwc *ReadWriteCloser) Subnet() address.Subnet {
return rwc.subnet
}
func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) {
return rwc.readPC(p)
}
func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) {
return rwc.writePC(p)
}
func (rwc *ReadWriteCloser) Close() error {
err := rwc.core.Close()
rwc.core.Stop()
return err
}

View File

@ -17,7 +17,7 @@ func (tun *TunAdapter) read() {
begin := TUN_OFFSET_BYTES
end := begin + n
bs := buf[begin:end]
if _, err := tun.core.Write(bs); err != nil {
if _, err := tun.rwc.Write(bs); err != nil {
tun.log.Debugln("Unable to send packet:", err)
}
}
@ -27,7 +27,7 @@ func (tun *TunAdapter) write() {
var buf [TUN_OFFSET_BYTES + 65535]byte
for {
bs := buf[TUN_OFFSET_BYTES:]
n, err := tun.core.Read(bs)
n, err := tun.rwc.Read(bs)
if err != nil {
tun.log.Errorln("Exiting tun writer due to core read error:", err)
return

View File

@ -21,8 +21,8 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/core"
"github.com/yggdrasil-network/yggdrasil-go/src/defaults"
"github.com/yggdrasil-network/yggdrasil-go/src/ipv6rwc"
)
type MTU uint16
@ -32,7 +32,7 @@ type MTU uint16
// should pass this object to the yggdrasil.SetRouterAdapter() function before
// calling yggdrasil.Start().
type TunAdapter struct {
core *core.Core
rwc *ipv6rwc.ReadWriteCloser
config *config.NodeConfig
log *log.Logger
addr address.Address
@ -93,8 +93,8 @@ func MaximumMTU() uint64 {
// Init initialises the TUN module. You must have acquired a Listener from
// the Yggdrasil core before this point and it must not be in use elsewhere.
func (tun *TunAdapter) Init(core *core.Core, config *config.NodeConfig, log *log.Logger, options interface{}) error {
tun.core = core
func (tun *TunAdapter) Init(rwc *ipv6rwc.ReadWriteCloser, config *config.NodeConfig, log *log.Logger, options interface{}) error {
tun.rwc = rwc
tun.config = config
tun.log = log
return nil
@ -119,9 +119,8 @@ func (tun *TunAdapter) _start() error {
if tun.config == nil {
return errors.New("no configuration available to TUN")
}
pk := tun.core.PublicKey()
tun.addr = *address.AddrForKey(pk)
tun.subnet = *address.SubnetForKey(pk)
tun.addr = tun.rwc.Address()
tun.subnet = tun.rwc.Subnet()
addr := fmt.Sprintf("%s/%d", net.IP(tun.addr[:]).String(), 8*len(address.GetPrefix())-1)
if tun.config.IfName == "none" || tun.config.IfName == "dummy" {
tun.log.Debugln("Not starting TUN as ifname is none or dummy")
@ -130,8 +129,8 @@ func (tun *TunAdapter) _start() error {
return nil
}
mtu := tun.config.IfMTU
if tun.core.MaxMTU() < mtu {
mtu = tun.core.MaxMTU()
if tun.rwc.MaxMTU() < mtu {
mtu = tun.rwc.MaxMTU()
}
if err := tun.setup(tun.config.IfName, addr, mtu); err != nil {
return err
@ -139,7 +138,7 @@ func (tun *TunAdapter) _start() error {
if tun.MTU() != mtu {
tun.log.Warnf("Warning: Interface MTU %d automatically adjusted to %d (supported range is 1280-%d)", tun.config.IfMTU, tun.MTU(), MaximumMTU())
}
tun.core.SetMTU(tun.MTU())
tun.rwc.SetMTU(tun.MTU())
tun.isOpen = true
tun.isEnabled = true
go tun.read()