Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions internal/bloom_reachability_map.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package internal

import (
"encoding/base64"
"hash/fnv"

"github.com/AutoRoute/bloom"
"github.com/AutoRoute/node/types"
)

type BloomReachabilityMap struct {
Filters []*bloom.BloomFilter
Conglomerate *bloom.BloomFilter
Filters []*bloom.BloomFilter
// Allows us to keep track of filters between nodes.
filter_hashes map[string]bool
Conglomerate *bloom.BloomFilter
}

// Generates a unique hash for a particular filter.
// Args:
// filter: The filter to hash.
// Returns:
// The FNV hash of the filter.
func hashFilter(filter *bloom.BloomFilter) string {
hasher := fnv.New64()
filter.WriteTo(hasher)
return base64.URLEncoding.EncodeToString(hasher.Sum(nil))
}

func NewBloomReachabilityMap() *BloomReachabilityMap {
Expand All @@ -16,8 +32,13 @@ func NewBloomReachabilityMap() *BloomReachabilityMap {

m := BloomReachabilityMap{
Filters: fs,
filter_hashes: make(map[string]bool),
Conglomerate: fs[0].Copy(),
}

// Hash our initial filter to begin with.
initial_hash := hashFilter(fs[0])
m.filter_hashes[initial_hash] = true
return &m
}

Expand All @@ -39,22 +60,69 @@ func (m *BloomReachabilityMap) Increment() {
m.Filters = append(newZeroth, m.Filters...)
}

func (m *BloomReachabilityMap) Merge(n *BloomReachabilityMap) {
// Merges two reachability maps.
// Args:
// n: The map to merge with this one.
// Returns:
// True if the map was modified, false if it wasn't. Practically, it will only
// return false if it is being asked to merge a map whose filters are a subset
// of this one's filters.
func (m *BloomReachabilityMap) Merge(n *BloomReachabilityMap) bool {
modified := false

if len(m.Filters) < len(n.Filters) {
modified = true
for k, v := range m.Filters {
_, found := m.filter_hashes[hashFilter(n.Filters[k])]
if (found) {
// This filter is not new.
continue
}

old_hash := hashFilter(v)
v.Merge(n.Filters[k])
new_hash := hashFilter(v)
if old_hash != new_hash {
delete(m.filter_hashes, old_hash)
m.filter_hashes[new_hash] = true
}
}
// append the remaining Filters
for _, filter := range n.Filters[len(m.Filters):] {
m.filter_hashes[hashFilter(filter)] = true
}
m.Filters = append(m.Filters, n.Filters[len(m.Filters):]...)
} else {
for k, v := range n.Filters {
// Check for an identical filter.
_, found := m.filter_hashes[hashFilter(v)]
if (found) {
// This filter is not new.
continue
}

old_hash := hashFilter(m.Filters[k])
m.Filters[k].Merge(v)
new_hash := hashFilter(m.Filters[k])
if old_hash != new_hash {
delete(m.filter_hashes, old_hash)
m.filter_hashes[new_hash] = true
modified = true
}
}
}

if !modified {
// We didn't add any new filters.
return false
}

// reconstruct the Conglomerate
for _, v := range m.Filters {
m.Conglomerate.Merge(v)
}

return true
}

func (m *BloomReachabilityMap) Copy() *BloomReachabilityMap {
Expand Down
38 changes: 31 additions & 7 deletions internal/reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package internal

import (
"errors"
"expvar"
"log"
"sync"

"github.com/AutoRoute/node/types"
)

// Export the last number of possible nexthops, and the destination we were
// trying to reach.
var next_hops *expvar.Int
var destination *expvar.String

func init() {
next_hops = expvar.NewInt("next_hops")
destination = expvar.NewString("destination")
}

// Takes care of maintaining and relaying maps and insures that we know which
// interfaces can reach which addresses.
type reachabilityHandler struct {
Expand Down Expand Up @@ -37,8 +48,16 @@ func newReachability(me types.NodeAddress) *reachabilityHandler {
func (m *reachabilityHandler) addMap(address types.NodeAddress, new_map *BloomReachabilityMap) {
m.l.Lock()
defer m.l.Unlock()
m.maps[address].Merge(new_map)

if !m.maps[address].Merge(new_map) {
// If this returns false, then we know we have already seen this map and
// passed it along.
log.Print("Dropping duplicate map.")
return
}

m.merged_map.Merge(new_map)

for addr, conn := range m.conns {
if addr != address {
conn.SendMap(new_map.Copy())
Expand All @@ -61,7 +80,7 @@ func (m *reachabilityHandler) AddConnection(id types.NodeAddress, c MapConnectio
defer m.l.Unlock()
err := c.SendMap(initial_map)
if err != nil {
log.Fatal(err)
log.Fatalf("Sending map failed: %s\n", err)
}
}()

Expand Down Expand Up @@ -92,14 +111,17 @@ func (m *reachabilityHandler) HandleConnection(id types.NodeAddress, c MapConnec
// All the nodes that we could possibly send the packet to.
func (m *reachabilityHandler) FindPossibleDests(id types.NodeAddress,
src types.NodeAddress) ([]types.NodeAddress, error) {
human_address, err := id.MarshalText()
if err != nil {
log.Printf("Warning: Converting to human-readable address failed: %s\n", err)
}
destination.Set(string(human_address))

m.l.Lock()
defer m.l.Unlock()
_, ok := m.conns[id]
if ok {
return []types.NodeAddress{id}, nil
}

if id == m.me {
if ok || id == m.me {
next_hops.Set(1)
return []types.NodeAddress{id}, nil
}

Expand All @@ -116,8 +138,10 @@ func (m *reachabilityHandler) FindPossibleDests(id types.NodeAddress,
}

if len(dests) == 0 {
next_hops.Set(0)
return nil, errors.New("Unable to find host")
}
next_hops.Set(int64(len(dests)))
return dests, nil
}

Expand Down