5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-24 21:51:35 +00:00

tun session protocol traffic cleanup

This commit is contained in:
Arceliar 2021-05-23 11:58:52 -05:00
parent 233cf0c962
commit 29dda650b5
6 changed files with 125 additions and 115 deletions

View File

@ -34,8 +34,8 @@ func (t *TunAdapter) SetupAdminHandlers(a *admin.AdminSocket) {
} }
return res, nil return res, nil
}) })
_ = a.AddHandler("getNodeInfo", []string{"key"}, t.nodeinfo.nodeInfoAdminHandler) _ = a.AddHandler("getNodeInfo", []string{"key"}, t.proto.nodeinfo.nodeInfoAdminHandler)
_ = a.AddHandler("debugGetSelf", []string{"key"}, t.debug.getSelfHandler) _ = a.AddHandler("debugGetSelf", []string{"key"}, t.proto.getSelfHandler)
_ = a.AddHandler("debugGetPeers", []string{"key"}, t.debug.getPeersHandler) _ = a.AddHandler("debugGetPeers", []string{"key"}, t.proto.getPeersHandler)
_ = a.AddHandler("debugGetDHT", []string{"key"}, t.debug.getDHTHandler) _ = a.AddHandler("debugGetDHT", []string{"key"}, t.proto.getDHTHandler)
} }

View File

@ -77,22 +77,12 @@ func (tun *TunAdapter) write() {
if !tun.isEnabled { if !tun.isEnabled {
continue // Drop traffic if the tun is disabled continue // Drop traffic if the tun is disabled
} }
case typeSessionNodeInfoRequest: case typeSessionProto:
var key keyArray
copy(key[:], from.(iwt.Addr))
tun.nodeinfo.handleReq(nil, key)
continue
case typeSessionNodeInfoResponse:
var key keyArray
copy(key[:], from.(iwt.Addr))
res := append([]byte(nil), bs[1:n]...)
tun.nodeinfo.handleRes(nil, key, res)
continue
case typeSessionDebug:
var key keyArray var key keyArray
copy(key[:], from.(iwt.Addr)) copy(key[:], from.(iwt.Addr))
data := append([]byte(nil), bs[1:n]...) data := append([]byte(nil), bs[1:n]...)
tun.debug.handleDebug(nil, key, data) tun.proto.handleProto(nil, key, data)
continue
default: default:
continue continue
} }

View File

@ -20,7 +20,7 @@ type NodeInfoPayload []byte
type nodeinfo struct { type nodeinfo struct {
phony.Inbox phony.Inbox
tun *TunAdapter proto *protoHandler
myNodeInfo NodeInfoPayload myNodeInfo NodeInfoPayload
callbacks map[keyArray]nodeinfoCallback callbacks map[keyArray]nodeinfoCallback
} }
@ -44,14 +44,14 @@ type nodeinfoReqRes struct {
// Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep // Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep
// the cache/callback maps clean of stale entries // the cache/callback maps clean of stale entries
func (m *nodeinfo) init(tun *TunAdapter) { func (m *nodeinfo) init(proto *protoHandler) {
m.Act(nil, func() { m.Act(nil, func() {
m._init(tun) m._init(proto)
}) })
} }
func (m *nodeinfo) _init(tun *TunAdapter) { func (m *nodeinfo) _init(proto *protoHandler) {
m.tun = tun m.proto = proto
m.callbacks = make(map[keyArray]nodeinfoCallback) m.callbacks = make(map[keyArray]nodeinfoCallback)
m._cleanup() m._cleanup()
} }
@ -154,7 +154,7 @@ func (m *nodeinfo) _sendReq(key keyArray, callback func(nodeinfo NodeInfoPayload
if callback != nil { if callback != nil {
m._addCallback(key, callback) m._addCallback(key, callback)
} }
m.tun.core.WriteTo([]byte{typeSessionNodeInfoRequest}, iwt.Addr(key[:])) m.proto.tun.core.WriteTo([]byte{typeSessionProto, typeProtoNodeInfoRequest}, iwt.Addr(key[:]))
} }
func (m *nodeinfo) handleReq(from phony.Actor, key keyArray) { func (m *nodeinfo) handleReq(from phony.Actor, key keyArray) {
@ -170,8 +170,8 @@ func (m *nodeinfo) handleRes(from phony.Actor, key keyArray, info NodeInfoPayloa
} }
func (m *nodeinfo) _sendRes(key keyArray) { func (m *nodeinfo) _sendRes(key keyArray) {
bs := append([]byte{typeSessionNodeInfoResponse}, m._getNodeInfo()...) bs := append([]byte{typeSessionProto, typeProtoNodeInfoResponse}, m._getNodeInfo()...)
m.tun.core.WriteTo(bs, iwt.Addr(key[:])) m.proto.tun.core.WriteTo(bs, iwt.Addr(key[:]))
} }
// Admin socket stuff // Admin socket stuff

View File

@ -25,173 +25,189 @@ type reqInfo struct {
timer *time.Timer // time.AfterFunc cleanup timer *time.Timer // time.AfterFunc cleanup
} }
type debugHandler struct { type protoHandler struct {
phony.Inbox phony.Inbox
tun *TunAdapter tun *TunAdapter
sreqs map[keyArray]*reqInfo nodeinfo nodeinfo
preqs map[keyArray]*reqInfo sreqs map[keyArray]*reqInfo
dreqs map[keyArray]*reqInfo preqs map[keyArray]*reqInfo
dreqs map[keyArray]*reqInfo
} }
func (d *debugHandler) init(tun *TunAdapter) { func (p *protoHandler) init(tun *TunAdapter) {
d.tun = tun p.tun = tun
d.sreqs = make(map[keyArray]*reqInfo) p.nodeinfo.init(p)
d.preqs = make(map[keyArray]*reqInfo) p.sreqs = make(map[keyArray]*reqInfo)
d.dreqs = make(map[keyArray]*reqInfo) p.preqs = make(map[keyArray]*reqInfo)
p.dreqs = make(map[keyArray]*reqInfo)
} }
func (d *debugHandler) handleDebug(from phony.Actor, key keyArray, bs []byte) { func (p *protoHandler) handleProto(from phony.Actor, key keyArray, bs []byte) {
d.Act(from, func() { if len(bs) == 0 {
d._handleDebug(key, bs) return
}
switch bs[0] {
case typeProtoDummy:
case typeProtoNodeInfoRequest:
p.nodeinfo.handleReq(p, key)
case typeProtoNodeInfoResponse:
p.nodeinfo.handleRes(p, key, bs[1:])
case typeProtoDebug:
p._handleDebug(key, bs[1:])
}
}
func (p *protoHandler) handleDebug(from phony.Actor, key keyArray, bs []byte) {
p.Act(from, func() {
p._handleDebug(key, bs)
}) })
} }
func (d *debugHandler) _handleDebug(key keyArray, bs []byte) { func (p *protoHandler) _handleDebug(key keyArray, bs []byte) {
if len(bs) == 0 { if len(bs) == 0 {
return return
} }
switch bs[0] { switch bs[0] {
case typeDebugDummy: case typeDebugDummy:
case typeDebugGetSelfRequest: case typeDebugGetSelfRequest:
d._handleGetSelfRequest(key) p._handleGetSelfRequest(key)
case typeDebugGetSelfResponse: case typeDebugGetSelfResponse:
d._handleGetSelfResponse(key, bs[1:]) p._handleGetSelfResponse(key, bs[1:])
case typeDebugGetPeersRequest: case typeDebugGetPeersRequest:
d._handleGetPeersRequest(key) p._handleGetPeersRequest(key)
case typeDebugGetPeersResponse: case typeDebugGetPeersResponse:
d._handleGetPeersResponse(key, bs[1:]) p._handleGetPeersResponse(key, bs[1:])
case typeDebugGetDHTRequest: case typeDebugGetDHTRequest:
d._handleGetDHTRequest(key) p._handleGetDHTRequest(key)
case typeDebugGetDHTResponse: case typeDebugGetDHTResponse:
d._handleGetDHTResponse(key, bs[1:]) p._handleGetDHTResponse(key, bs[1:])
default:
} }
} }
func (d *debugHandler) sendGetSelfRequest(key keyArray, callback func([]byte)) { func (p *protoHandler) sendGetSelfRequest(key keyArray, callback func([]byte)) {
d.Act(nil, func() { p.Act(nil, func() {
if info := d.sreqs[key]; info != nil { if info := p.sreqs[key]; info != nil {
info.timer.Stop() info.timer.Stop()
delete(d.sreqs, key) delete(p.sreqs, key)
} }
info := new(reqInfo) info := new(reqInfo)
info.callback = callback info.callback = callback
info.timer = time.AfterFunc(time.Minute, func() { info.timer = time.AfterFunc(time.Minute, func() {
d.Act(nil, func() { p.Act(nil, func() {
if d.sreqs[key] == info { if p.sreqs[key] == info {
delete(d.sreqs, key) delete(p.sreqs, key)
} }
}) })
}) })
d.sreqs[key] = info p.sreqs[key] = info
d._sendDebug(key, typeDebugGetSelfRequest, nil) p._sendDebug(key, typeDebugGetSelfRequest, nil)
}) })
} }
func (d *debugHandler) _handleGetSelfRequest(key keyArray) { func (p *protoHandler) _handleGetSelfRequest(key keyArray) {
self := d.tun.core.GetSelf() self := p.tun.core.GetSelf()
bs, err := json.Marshal(self) bs, err := json.Marshal(self) // FIXME this puts keys in base64, not hex
if err != nil { if err != nil {
return return
} }
d._sendDebug(key, typeDebugGetSelfResponse, bs) p._sendDebug(key, typeDebugGetSelfResponse, bs)
} }
func (d *debugHandler) _handleGetSelfResponse(key keyArray, bs []byte) { func (p *protoHandler) _handleGetSelfResponse(key keyArray, bs []byte) {
if info := d.sreqs[key]; info != nil { if info := p.sreqs[key]; info != nil {
info.timer.Stop() info.timer.Stop()
info.callback(bs) info.callback(bs)
delete(d.sreqs, key) delete(p.sreqs, key)
} }
} }
func (d *debugHandler) sendGetPeersRequest(key keyArray, callback func([]byte)) { func (p *protoHandler) sendGetPeersRequest(key keyArray, callback func([]byte)) {
d.Act(nil, func() { p.Act(nil, func() {
if info := d.preqs[key]; info != nil { if info := p.preqs[key]; info != nil {
info.timer.Stop() info.timer.Stop()
delete(d.preqs, key) delete(p.preqs, key)
} }
info := new(reqInfo) info := new(reqInfo)
info.callback = callback info.callback = callback
info.timer = time.AfterFunc(time.Minute, func() { info.timer = time.AfterFunc(time.Minute, func() {
d.Act(nil, func() { p.Act(nil, func() {
if d.preqs[key] == info { if p.preqs[key] == info {
delete(d.preqs, key) delete(p.preqs, key)
} }
}) })
}) })
d.preqs[key] = info p.preqs[key] = info
d._sendDebug(key, typeDebugGetPeersRequest, nil) p._sendDebug(key, typeDebugGetPeersRequest, nil)
}) })
} }
func (d *debugHandler) _handleGetPeersRequest(key keyArray) { func (p *protoHandler) _handleGetPeersRequest(key keyArray) {
peers := d.tun.core.GetPeers() peers := p.tun.core.GetPeers()
var bs []byte var bs []byte
for _, p := range peers { for _, pinfo := range peers {
tmp := append(bs, p.Key[:]...) tmp := append(bs, pinfo.Key[:]...)
const responseOverhead = 1 const responseOverhead = 2 // 1 debug type, 1 getpeers type
if uint64(len(tmp))+1 > d.tun.maxSessionMTU() { if uint64(len(tmp))+responseOverhead > p.tun.maxSessionMTU() {
break break
} }
bs = tmp bs = tmp
} }
d._sendDebug(key, typeDebugGetPeersResponse, bs) p._sendDebug(key, typeDebugGetPeersResponse, bs)
} }
func (d *debugHandler) _handleGetPeersResponse(key keyArray, bs []byte) { func (p *protoHandler) _handleGetPeersResponse(key keyArray, bs []byte) {
if info := d.preqs[key]; info != nil { if info := p.preqs[key]; info != nil {
info.timer.Stop() info.timer.Stop()
info.callback(bs) info.callback(bs)
delete(d.preqs, key) delete(p.preqs, key)
} }
} }
func (d *debugHandler) sendGetDHTRequest(key keyArray, callback func([]byte)) { func (p *protoHandler) sendGetDHTRequest(key keyArray, callback func([]byte)) {
d.Act(nil, func() { p.Act(nil, func() {
if info := d.dreqs[key]; info != nil { if info := p.dreqs[key]; info != nil {
info.timer.Stop() info.timer.Stop()
delete(d.dreqs, key) delete(p.dreqs, key)
} }
info := new(reqInfo) info := new(reqInfo)
info.callback = callback info.callback = callback
info.timer = time.AfterFunc(time.Minute, func() { info.timer = time.AfterFunc(time.Minute, func() {
d.Act(nil, func() { p.Act(nil, func() {
if d.dreqs[key] == info { if p.dreqs[key] == info {
delete(d.dreqs, key) delete(p.dreqs, key)
} }
}) })
}) })
d.dreqs[key] = info p.dreqs[key] = info
d._sendDebug(key, typeDebugGetDHTRequest, nil) p._sendDebug(key, typeDebugGetDHTRequest, nil)
}) })
} }
func (d *debugHandler) _handleGetDHTRequest(key keyArray) { func (p *protoHandler) _handleGetDHTRequest(key keyArray) {
dinfos := d.tun.core.GetDHT() dinfos := p.tun.core.GetDHT()
var bs []byte var bs []byte
for _, dinfo := range dinfos { for _, dinfo := range dinfos {
tmp := append(bs, dinfo.Key[:]...) tmp := append(bs, dinfo.Key[:]...)
const responseOverhead = 1 const responseOverhead = 2 // 1 debug type, 1 getdht type
if uint64(len(tmp))+1 > d.tun.maxSessionMTU() { if uint64(len(tmp))+responseOverhead > p.tun.maxSessionMTU() {
break break
} }
bs = tmp bs = tmp
} }
d._sendDebug(key, typeDebugGetDHTResponse, bs) p._sendDebug(key, typeDebugGetDHTResponse, bs)
} }
func (d *debugHandler) _handleGetDHTResponse(key keyArray, bs []byte) { func (p *protoHandler) _handleGetDHTResponse(key keyArray, bs []byte) {
if info := d.dreqs[key]; info != nil { if info := p.dreqs[key]; info != nil {
info.timer.Stop() info.timer.Stop()
info.callback(bs) info.callback(bs)
delete(d.dreqs, key) delete(p.dreqs, key)
} }
} }
func (d *debugHandler) _sendDebug(key keyArray, dType uint8, data []byte) { func (p *protoHandler) _sendDebug(key keyArray, dType uint8, data []byte) {
bs := append([]byte{typeSessionDebug, dType}, data...) bs := append([]byte{typeSessionProto, typeProtoDebug, dType}, data...)
d.tun.core.WriteTo(bs, iwt.Addr(key[:])) p.tun.core.WriteTo(bs, iwt.Addr(key[:]))
} }
// Admin socket stuff // Admin socket stuff
@ -202,7 +218,7 @@ type DebugGetSelfRequest struct {
type DebugGetSelfResponse map[string]interface{} type DebugGetSelfResponse map[string]interface{}
func (d *debugHandler) getSelfHandler(in json.RawMessage) (interface{}, error) { func (p *protoHandler) getSelfHandler(in json.RawMessage) (interface{}, error) {
var req DebugGetSelfRequest var req DebugGetSelfRequest
if err := json.Unmarshal(in, &req); err != nil { if err := json.Unmarshal(in, &req); err != nil {
return nil, err return nil, err
@ -215,7 +231,7 @@ func (d *debugHandler) getSelfHandler(in json.RawMessage) (interface{}, error) {
} }
copy(key[:], kbs) copy(key[:], kbs)
ch := make(chan []byte, 1) ch := make(chan []byte, 1)
d.sendGetSelfRequest(key, func(info []byte) { p.sendGetSelfRequest(key, func(info []byte) {
ch <- info ch <- info
}) })
timer := time.NewTimer(6 * time.Second) timer := time.NewTimer(6 * time.Second)
@ -239,7 +255,7 @@ type DebugGetPeersRequest struct {
type DebugGetPeersResponse map[string]interface{} type DebugGetPeersResponse map[string]interface{}
func (d *debugHandler) getPeersHandler(in json.RawMessage) (interface{}, error) { func (p *protoHandler) getPeersHandler(in json.RawMessage) (interface{}, error) {
var req DebugGetPeersRequest var req DebugGetPeersRequest
if err := json.Unmarshal(in, &req); err != nil { if err := json.Unmarshal(in, &req); err != nil {
return nil, err return nil, err
@ -252,7 +268,7 @@ func (d *debugHandler) getPeersHandler(in json.RawMessage) (interface{}, error)
} }
copy(key[:], kbs) copy(key[:], kbs)
ch := make(chan []byte, 1) ch := make(chan []byte, 1)
d.sendGetPeersRequest(key, func(info []byte) { p.sendGetPeersRequest(key, func(info []byte) {
ch <- info ch <- info
}) })
timer := time.NewTimer(6 * time.Second) timer := time.NewTimer(6 * time.Second)
@ -286,7 +302,7 @@ type DebugGetDHTRequest struct {
type DebugGetDHTResponse map[string]interface{} type DebugGetDHTResponse map[string]interface{}
func (d *debugHandler) getDHTHandler(in json.RawMessage) (interface{}, error) { func (p *protoHandler) getDHTHandler(in json.RawMessage) (interface{}, error) {
var req DebugGetDHTRequest var req DebugGetDHTRequest
if err := json.Unmarshal(in, &req); err != nil { if err := json.Unmarshal(in, &req); err != nil {
return nil, err return nil, err
@ -299,7 +315,7 @@ func (d *debugHandler) getDHTHandler(in json.RawMessage) (interface{}, error) {
} }
copy(key[:], kbs) copy(key[:], kbs)
ch := make(chan []byte, 1) ch := make(chan []byte, 1)
d.sendGetDHTRequest(key, func(info []byte) { p.sendGetDHTRequest(key, func(info []byte) {
ch <- info ch <- info
}) })
timer := time.NewTimer(6 * time.Second) timer := time.NewTimer(6 * time.Second)

View File

@ -46,8 +46,7 @@ type TunAdapter struct {
isOpen bool isOpen bool
isEnabled bool // Used by the writer to drop sessionTraffic if not enabled isEnabled bool // Used by the writer to drop sessionTraffic if not enabled
gatekeeper func(pubkey ed25519.PublicKey, initiator bool) bool gatekeeper func(pubkey ed25519.PublicKey, initiator bool) bool
nodeinfo nodeinfo proto protoHandler
debug debugHandler
} }
func (tun *TunAdapter) SetSessionGatekeeper(gatekeeper func(pubkey ed25519.PublicKey, initiator bool) bool) { func (tun *TunAdapter) SetSessionGatekeeper(gatekeeper func(pubkey ed25519.PublicKey, initiator bool) bool) {
@ -109,12 +108,11 @@ func (tun *TunAdapter) Init(core *yggdrasil.Core, config *config.NodeState, log
tun.store.init(tun) tun.store.init(tun)
tun.config = config tun.config = config
tun.log = log tun.log = log
tun.nodeinfo.init(tun) tun.proto.init(tun)
tun.nodeinfo.setNodeInfo(config.Current.NodeInfo, config.Current.NodeInfoPrivacy) tun.proto.nodeinfo.setNodeInfo(config.Current.NodeInfo, config.Current.NodeInfoPrivacy)
if err := tun.core.SetOutOfBandHandler(tun.oobHandler); err != nil { if err := tun.core.SetOutOfBandHandler(tun.oobHandler); err != nil {
return fmt.Errorf("tun.core.SetOutOfBandHander: %w", err) return fmt.Errorf("tun.core.SetOutOfBandHander: %w", err)
} }
tun.debug.init(tun)
return nil return nil
} }

View File

@ -11,7 +11,13 @@ const (
const ( const (
typeSessionDummy = iota typeSessionDummy = iota
typeSessionTraffic typeSessionTraffic
typeSessionNodeInfoRequest typeSessionProto
typeSessionNodeInfoResponse )
typeSessionDebug // Debug messages, intended to be removed at some point
// Protocol packet types
const (
typeProtoDummy = iota
typeProtoNodeInfoRequest
typeProtoNodeInfoResponse
typeProtoDebug = 255
) )