diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 1656d39..7ddf4bb 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -96,13 +96,9 @@ type peer struct { //in <-chan []byte //out chan<- []byte //in func([]byte) - out func([]byte) - core *Core - port switchPort - msgAnc *msgAnnounce - msgHops []*msgHop - myMsg *switchMessage - mySigs []sigInfo + out func([]byte) + core *Core + port switchPort // This is used to limit how often we perform expensive operations // Specifically, processing switch messages, signing, and verifying sigs // Resets at the start of each tick @@ -175,8 +171,6 @@ func (ps *peers) removePeer(port switchPort) { func (p *peer) linkLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - var counter uint8 - var lastRSeq uint64 for { select { case packet, ok := <-p.linkIn: @@ -193,27 +187,8 @@ func (p *peer) linkLoop() { if p.port == 0 { continue } // Don't send announces on selfInterface - p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) - var update bool - switch { - case p.msgAnc == nil: - update = true - case lastRSeq != p.msgAnc.Seq: - update = true - case p.msgAnc.Rseq != p.myMsg.seq: - update = true - case counter%4 == 0: - update = true - } - if true || update { - // TODO change update logic, the new switchMsg works differently - if p.msgAnc != nil { - lastRSeq = p.msgAnc.Seq - } - p.sendSwitchMsg() - //p.sendSwitchAnnounce() - } - counter = (counter + 1) % 4 + // TODO change update logic, the new switchMsg works differently, we only need to send if something changes + p.sendSwitchMsg() } } } @@ -303,186 +278,10 @@ func (p *peer) handleLinkTraffic(bs []byte) { switch pType { case wire_SwitchMsg: p.handleSwitchMsg(payload) - case wire_SwitchAnnounce: - p.handleSwitchAnnounce(payload) - case wire_SwitchHopRequest: - p.handleSwitchHopRequest(payload) - case wire_SwitchHop: - p.handleSwitchHop(payload) + default: // TODO?... } } -func (p *peer) handleSwitchAnnounce(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchAnnounce") - anc := msgAnnounce{} - //err := wire_decode_struct(packet, &anc) - //if err != nil { return } - if !anc.decode(packet) { - return - } - //if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil } - if p.msgAnc == nil || - anc.Root != p.msgAnc.Root || - anc.Tstamp != p.msgAnc.Tstamp || - anc.Seq != p.msgAnc.Seq { - p.msgHops = nil - } - p.msgAnc = &anc - p.processSwitchMessage() - p.lastAnc = time.Now() -} - -func (p *peer) requestHop(hop uint64) { - //p.core.log.Println("DEBUG requestHop") - req := msgHopReq{} - req.Root = p.msgAnc.Root - req.Tstamp = p.msgAnc.Tstamp - req.Seq = p.msgAnc.Seq - req.Hop = hop - packet := req.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) handleSwitchHopRequest(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchHopRequest") - if p.throttle > peer_Throttle { - return - } - if p.myMsg == nil { - return - } - req := msgHopReq{} - if !req.decode(packet) { - return - } - if req.Root != p.myMsg.locator.root { - return - } - if req.Tstamp != p.myMsg.locator.tstamp { - return - } - if req.Seq != p.myMsg.seq { - return - } - if uint64(len(p.myMsg.locator.coords)) <= req.Hop { - return - } - res := msgHop{} - res.Root = p.myMsg.locator.root - res.Tstamp = p.myMsg.locator.tstamp - res.Seq = p.myMsg.seq - res.Hop = req.Hop - res.Port = p.myMsg.locator.coords[res.Hop] - sinfo := p.getSig(res.Hop) - //p.core.log.Println("DEBUG sig:", sinfo) - res.Next = sinfo.next - res.Sig = sinfo.sig - packet = res.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) handleSwitchHop(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchHop") - if p.throttle > peer_Throttle { - return - } - if p.msgAnc == nil { - return - } - res := msgHop{} - if !res.decode(packet) { - return - } - if res.Root != p.msgAnc.Root { - return - } - if res.Tstamp != p.msgAnc.Tstamp { - return - } - if res.Seq != p.msgAnc.Seq { - return - } - if res.Hop != uint64(len(p.msgHops)) { - return - } // always process in order - loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)} - loc.root = res.Root - loc.tstamp = res.Tstamp - for _, hop := range p.msgHops { - loc.coords = append(loc.coords, hop.Port) - } - loc.coords = append(loc.coords, res.Port) - thisHopKey := &res.Root - if res.Hop != 0 { - thisHopKey = &p.msgHops[res.Hop-1].Next - } - bs := getBytesForSig(&res.Next, &loc) - if p.core.sigs.check(thisHopKey, &res.Sig, bs) { - p.msgHops = append(p.msgHops, &res) - p.processSwitchMessage() - } else { - p.throttle++ - } -} - -func (p *peer) processSwitchMessage() { - //p.core.log.Println("DEBUG: processSwitchMessage") - if p.throttle > peer_Throttle { - return - } - if p.msgAnc == nil { - return - } - if uint64(len(p.msgHops)) < p.msgAnc.Len { - p.requestHop(uint64(len(p.msgHops))) - return - } - p.throttle++ - if p.msgAnc.Len != uint64(len(p.msgHops)) { - return - } - msg := switchMessage{} - coords := make([]switchPort, 0, len(p.msgHops)) - sigs := make([]sigInfo, 0, len(p.msgHops)) - for idx, hop := range p.msgHops { - // Consistency checks, should be redundant (already checked these...) - if hop.Root != p.msgAnc.Root { - return - } - if hop.Tstamp != p.msgAnc.Tstamp { - return - } - if hop.Seq != p.msgAnc.Seq { - return - } - if hop.Hop != uint64(idx) { - return - } - coords = append(coords, hop.Port) - sigs = append(sigs, sigInfo{next: hop.Next, sig: hop.Sig}) - } - msg.from = p.sig - msg.locator.root = p.msgAnc.Root - msg.locator.tstamp = p.msgAnc.Tstamp - msg.locator.coords = coords - msg.seq = p.msgAnc.Seq - //msg.RSeq = p.msgAnc.RSeq - //msg.Degree = p.msgAnc.Deg - p.core.switchTable.handleMessage(&msg, p.port, sigs) - if len(coords) == 0 { - return - } - // Pass a mesage to the dht informing it that this peer (still) exists - l := msg.locator - l.coords = l.coords[:len(l.coords)-1] - dinfo := dhtInfo{ - key: p.box, - coords: l.getCoords(), - } - p.core.dht.peers <- &dinfo - p.core.log.Println("DEBUG: peers<-&dhtInfo", dinfo, p.box, msg) -} - func (p *peer) sendSwitchMsg() { info, sigs := p.core.switchTable.createMessage(p.port) var msg switchMsg @@ -549,34 +348,6 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.core.dht.peers <- &dinfo } -func (p *peer) sendSwitchAnnounce() { - anc := msgAnnounce{} - anc.Root = p.myMsg.locator.root - anc.Tstamp = p.myMsg.locator.tstamp - anc.Seq = p.myMsg.seq - anc.Len = uint64(len(p.myMsg.locator.coords)) - //anc.Deg = p.myMsg.Degree - if p.msgAnc != nil { - anc.Rseq = p.msgAnc.Seq - } - packet := anc.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) getSig(hop uint64) sigInfo { - //p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop) - if hop < uint64(len(p.mySigs)) { - return p.mySigs[hop] - } - bs := getBytesForSig(&p.sig, &p.myMsg.locator) - sig := sigInfo{} - sig.next = p.sig - sig.sig = *sign(&p.core.sigPriv, bs) - p.mySigs = append(p.mySigs, sig) - //p.core.log.Println("DEBUG sig bs:", bs) - return sig -} - func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { //bs, err := wire_encode_locator(loc) //if err != nil { panic(err) } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index bd298de..9344d90 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -13,9 +13,6 @@ const ( wire_ProtocolTraffic // protocol traffic, pub keys for crypto wire_LinkProtocolTraffic // link proto traffic, pub keys for crypto wire_SwitchMsg // inside link protocol traffic header - wire_SwitchAnnounce // inside protocol traffic header - wire_SwitchHopRequest // inside protocol traffic header - wire_SwitchHop // inside protocol traffic header wire_SessionPing // inside protocol traffic header wire_SessionPong // inside protocol traffic header wire_DHTLookupRequest // inside protocol traffic header @@ -173,136 +170,8 @@ func (m *switchMsg) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// -// Announces that we can send parts of a Message with a particular seq -type msgAnnounce struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Len uint64 - //Deg uint64 - Rseq uint64 -} - -func (m *msgAnnounce) encode() []byte { - bs := wire_encode_uint64(wire_SwitchAnnounce) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Len)...) - bs = append(bs, wire_encode_uint64(m.Rseq)...) - return bs -} - -func (m *msgAnnounce) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchAnnounce: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Len, &bs): - return false - case !wire_chop_uint64(&m.Rseq, &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - -type msgHopReq struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Hop uint64 -} - -func (m *msgHopReq) encode() []byte { - bs := wire_encode_uint64(wire_SwitchHopRequest) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Hop)...) - return bs -} - -func (m *msgHopReq) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchHopRequest: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Hop, &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - -type msgHop struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Hop uint64 - Port switchPort - Next sigPubKey - Sig sigBytes -} - -func (m *msgHop) encode() []byte { - bs := wire_encode_uint64(wire_SwitchHop) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Hop)...) - bs = append(bs, wire_encode_uint64(uint64(m.Port))...) - bs = append(bs, m.Next[:]...) - bs = append(bs, m.Sig[:]...) - return bs -} - -func (m *msgHop) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchHop: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Hop, &bs): - return false - case !wire_chop_uint64((*uint64)(&m.Port), &bs): - return false - case !wire_chop_slice(m.Next[:], &bs): - return false - case !wire_chop_slice(m.Sig[:], &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - // Format used to check signatures only, so no need to also support decoding +// TODO something else for signatures func wire_encode_locator(loc *switchLocator) []byte { coords := wire_encode_coords(loc.getCoords()) var bs []byte