Added notification mechanism.

This commit is contained in:
Patrick Fairbank
2014-07-09 00:50:07 -07:00
parent 257fa715bc
commit fa69373ad1
2 changed files with 141 additions and 0 deletions

57
notifier.go Normal file
View File

@@ -0,0 +1,57 @@
// Copyright 2014 Team 254. All Rights Reserved.
// Author: pat@patfairbank.com (Patrick Fairbank)
//
// Publish-subscribe model for nonblocking notification of server events to websocket clients.
package main
import (
"log"
)
// Allow the listeners to buffer a small number of notifications to streamline delivery.
const notifyBufferSize = 3
type Notifier struct {
// The map is essentially a set; the value is ignored.
listeners map[chan interface{}]struct{}
}
func NewNotifier() *Notifier {
notifier := new(Notifier)
notifier.listeners = make(map[chan interface{}]struct{})
return notifier
}
// Registers and returns a channel that can be read from to receive notification messages. The caller is
// responsible for closing the channel, which will cause it to be reaped from the list of listeners.
func (notifier *Notifier) Listen() chan interface{} {
listener := make(chan interface{}, notifyBufferSize)
notifier.listeners[listener] = struct{}{}
return listener
}
// Sends the given message to all registered listeners, and cleans up any listeners that have closed.
func (notifier *Notifier) Notify(message interface{}) {
for listener, _ := range notifier.listeners {
notifier.notifyListener(listener, message)
}
}
func (notifier *Notifier) notifyListener(listener chan interface{}, message interface{}) {
defer func() {
// If channel is closed sending to it will cause a panic; recover and remove it from the list.
if r := recover(); r != nil {
delete(notifier.listeners, listener)
}
}()
// Do a non-blocking send. This guarantees that sending notifications won't interrupt the main event loop,
// at the risk of clients missing some messages.
select {
case listener <- message:
// The notification was sent and received successfully.
default:
log.Println("Failed to send a notification due to blocked listener.")
}
}

84
notifier_test.go Normal file
View File

@@ -0,0 +1,84 @@
// Copyright 2014 Team 254. All Rights Reserved.
// Author: pat@patfairbank.com (Patrick Fairbank)
package main
import (
"github.com/stretchr/testify/assert"
"io/ioutil"
"log"
"testing"
)
func TestNotifier(t *testing.T) {
notifier := NewNotifier()
// Should do nothing when there are no listeners.
notifier.Notify("test message")
notifier.Notify(12345)
notifier.Notify(struct{}{})
listener := notifier.Listen()
notifier.Notify("test message")
assert.Equal(t, "test message", <-listener)
notifier.Notify(12345)
assert.Equal(t, 12345, <-listener)
// Should allow multiple messages without blocking.
notifier.Notify("message1")
notifier.Notify("message2")
notifier.Notify("message3")
assert.Equal(t, "message1", <-listener)
assert.Equal(t, "message2", <-listener)
assert.Equal(t, "message3", <-listener)
// Should stop sending messages and not block once the buffer is full.
log.SetOutput(ioutil.Discard) // Silence noisy log output.
for i := 0; i < 20; i++ {
notifier.Notify(i)
}
var value interface{}
var lastValue interface{}
for lastValue == nil {
select {
case value = <-listener:
default:
lastValue = value
return
}
}
notifier.Notify("next message")
assert.True(t, lastValue.(int) < 10)
assert.Equal(t, "next message", <-listener)
}
func TestNotifyMultipleListeners(t *testing.T) {
notifier := NewNotifier()
listeners := [50]chan interface{}{}
for i := 0; i < len(listeners); i++ {
listeners[i] = notifier.Listen()
}
notifier.Notify("test message")
notifier.Notify(12345)
for listener, _ := range notifier.listeners {
assert.Equal(t, "test message", <-listener)
assert.Equal(t, 12345, <-listener)
}
// Should reap closed channels automatically.
close(listeners[4])
notifier.Notify("message1")
assert.Equal(t, 49, len(notifier.listeners))
for listener, _ := range notifier.listeners {
assert.Equal(t, "message1", <-listener)
}
close(listeners[16])
close(listeners[21])
close(listeners[49])
notifier.Notify("message2")
assert.Equal(t, 46, len(notifier.listeners))
for listener, _ := range notifier.listeners {
assert.Equal(t, "message2", <-listener)
}
}