diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 575bbdfd2..1b4fa66b2 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -16,6 +16,7 @@ import ( "github.com/BurntSushi/toml" "github.com/judwhite/go-svc/svc" "github.com/mreiferson/go-options" + "github.com/nsqio/nsq/contrib" "github.com/nsqio/nsq/internal/app" "github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/nsqd" @@ -144,6 +145,9 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)") flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)") + optModulesOptions := app.StringArray{} + flagSet.Var(&optModulesOptions, "mod-opt", "optional module options, of form: --mod-opt={{moduleName}}={{moduleOpt}}={{moduleOptValue}}") + return flagSet } @@ -232,6 +236,10 @@ func (p *program) Start() error { } nsqd.Main() + // hook into addons + addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd) + addons.Start() + p.nsqd = nsqd return nil } diff --git a/contrib/README.md b/contrib/README.md new file mode 100644 index 000000000..884a9a03f --- /dev/null +++ b/contrib/README.md @@ -0,0 +1,28 @@ +## Optional/Contrib Modules + +Contrib modules are a way to add functionality to nsqd, in a decoupled way. + + +The modules currently available are: + +- Datadog + + +### Architecture + +Contrib modules are initialized by passing in `--mod-opt=` to nsqd. This may +be provided multiple times. An array of `mod-opt`s are then passed to the +contrib module initializer (during nsqd initialization). Each module is then +passed its options to see if valid options were provided, after which it is +initialized and added to the nsqd waitGroup. + + +### Datadog + +Datadog contrib module, reports nsqd statistics to a datadog daemon. The options +it exposes are: + +- `--mod-opt=-dogstatsd-address=` +- `--mod-opt=-dogstatsd-interval=` +- `--mod-opt=-dogstatsd-prefix=` + diff --git a/contrib/datadog_client.go b/contrib/datadog_client.go new file mode 100644 index 000000000..aca43db27 --- /dev/null +++ b/contrib/datadog_client.go @@ -0,0 +1,83 @@ +package contrib + +import ( + "errors" + "fmt" + "net" + "strings" + "time" +) + +type DataDogClient struct { + conn net.Conn + addr string + prefix string +} + +type DataDogTag struct { + k string + v string +} + +type DataDogTags struct { + tags []*DataDogTag +} + +// returns dogstatd compatible string +// "#tag1:value1,tag2:value2 +func (ddt *DataDogTags) String() string { + ts := []string{} + for _, tag := range ddt.tags { + ts = append(ts, fmt.Sprintf("%s:%s", tag.k, tag.v)) + } + return "#" + strings.Join(ts, ",") +} + +func NewDataDogClient(addr string, prefix string) *DataDogClient { + return &DataDogClient{ + addr: addr, + prefix: prefix, + } +} + +func (c *DataDogClient) String() string { + return c.addr +} + +func (c *DataDogClient) CreateSocket() error { + conn, err := net.DialTimeout("udp", c.addr, time.Second) + if err != nil { + return err + } + c.conn = conn + return nil +} + +func (c *DataDogClient) Close() error { + return c.conn.Close() +} + +func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error { + return c.send(stat, "%d|c", count, tags) +} + +func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error { + return c.send(stat, "%d|c", -count, tags) +} + +func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error { + return c.send(stat, "%d|ms", delta, tags) +} + +func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error { + return c.send(stat, "%d|g", value, tags) +} + +func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error { + if c.conn == nil { + return errors.New("not connected") + } + format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String()) + _, err := fmt.Fprintf(c.conn, format, value) + return err +} diff --git a/contrib/datadog_client_test.go b/contrib/datadog_client_test.go new file mode 100644 index 000000000..e7ad3b13a --- /dev/null +++ b/contrib/datadog_client_test.go @@ -0,0 +1,58 @@ +package contrib + +import ( + "github.com/nsqio/nsq/internal/test" + "net" + "testing" +) + +func TestDDTagsStringNoTags(t *testing.T) { + test.Equal( + t, + (&DataDogTags{}).String(), + "#", + ) +} + +func TestDDTagsStringSingleString(t *testing.T) { + test.Equal( + t, + (&DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: "test_topic"}, + }, + }).String(), + "#topic_name:test_topic", + ) +} + +func TestDDTagsStringMultipleStrings(t *testing.T) { + test.Equal( + t, + (&DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: "test_topic"}, + {k: "channel_name", v: "test_channel"}, + }, + }).String(), + "#topic_name:test_topic,channel_name:test_channel", + ) +} + +func TestDDCSend(t *testing.T) { + r, w := net.Pipe() + b := make([]byte, len("nsq.topic.depth:100|t|#")) + + go func() { + ddc := &DataDogClient{ + conn: w, + addr: "test", + prefix: "nsq.", + } + testValue := int64(100) + ddc.send("topic.depth", "%d|t", testValue, &DataDogTags{}) + }() + + r.Read(b) + test.Equal(t, string(b), "nsq.topic.depth:100|t|#") +} diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go new file mode 100644 index 000000000..9146d3cb4 --- /dev/null +++ b/contrib/dogstatsd.go @@ -0,0 +1,242 @@ +package contrib + +import ( + "flag" + "fmt" + "github.com/nsqio/nsq/nsqd" + "time" +) + +type NSQDDogStatsdOptions struct { + DogStatsdAddress string `flag:"dogstatsd-address"` + DogStatsdPrefix string `flag:"dogstatsd-prefix"` + DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` +} + +func NewNSQDDogStatsdContribFlags(opts *NSQDDogStatsdOptions) *flag.FlagSet { + flagSet := flag.NewFlagSet("dogstatsd", flag.ExitOnError) + flagSet.StringVar( + &opts.DogStatsdAddress, + "dogstatsd-address", + "", + "UDP : of a statsd daemon for pushing stats", + ) + flagSet.DurationVar( + &opts.DogStatsdInterval, + "dogstatsd-interval", + 10*time.Second, + "duration between pushing to dogstatsd", + ) + // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") + flagSet.StringVar( + &opts.DogStatsdPrefix, + "dogstatsd-prefix", + "nsq.", + "prefix used for keys sent to statsd (%s for host replacement)", + ) + return flagSet +} + +func NewNSQDDogStatsd(contribOpts []string, n INSQD) INSQDAddon { + n.Logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) + + dogStatsdOpts := &NSQDDogStatsdOptions{} + flagSet := NewNSQDDogStatsdContribFlags(dogStatsdOpts) + + flagSet.Parse(contribOpts) + n.Logf(nsqd.LOG_INFO, "Parsed Options: %+v", dogStatsdOpts) + + // pass the dogstats specific opts on + return &NSQDDogStatsd{ + opts: dogStatsdOpts, + nsqd: n, + } +} + +type NSQDDogStatsd struct { + nsqd INSQD + opts *NSQDDogStatsdOptions + singleLoop bool +} + +func (dd *NSQDDogStatsd) Enabled() bool { + dd.nsqd.Logf(nsqd.LOG_INFO, "%+v", dd.opts) + if dd.opts.DogStatsdAddress != "" { + return true + } else { + return false + } +} + +func (dd *NSQDDogStatsd) Start() { + dd.nsqd.Logf(nsqd.LOG_INFO, "Starting Datadog NSQD Monitor") + dd.nsqd.AddModuleGoroutine(dd.Loop) +} + +func (dd *NSQDDogStatsd) Loop(exitChan chan int) { + // var lastMemStats *nsqd.memStats + var lastStats []nsqd.TopicStats + var stat string + + ticker := time.NewTicker(dd.opts.DogStatsdInterval) + + dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") + + for { + select { + case <-exitChan: + goto exit + case <-ticker.C: + dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING") + + client := NewDataDogClient( + dd.opts.DogStatsdAddress, + dd.opts.DogStatsdPrefix, + ) + err := client.CreateSocket() + if err != nil { + dd.nsqd.Logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) + continue + } + + dd.nsqd.Logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) + + stats := dd.nsqd.GetStats() + for _, topic := range stats { + // try to find the topic in the last collection + lastTopic := nsqd.TopicStats{} + for _, checkTopic := range lastStats { + if topic.TopicName == checkTopic.TopicName { + lastTopic = checkTopic + break + } + } + diff := topic.MessageCount - lastTopic.MessageCount + + // can topics/channels have commas in their names? + client.Incr("message_count", int64(diff), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, + }) + + client.Gauge("topic.depth", topic.Depth, &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, + }) + + client.Gauge("topic.backend_depth", topic.BackendDepth, &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, + }) + + for _, item := range topic.E2eProcessingLatency.Percentiles { + stat = fmt.Sprintf("topic.e2e_processing_latency_%.0f", item["quantile"]*100.0) + // We can cast the value to int64 since a value of 1 is the + // minimum resolution we will have, so there is no loss of + // accuracy + client.Gauge(stat, int64(item["value"]), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, + }) + } + + for _, channel := range topic.Channels { + // try to find the channel in the last collection + lastChannel := nsqd.ChannelStats{} + for _, checkChannel := range lastTopic.Channels { + if channel.ChannelName == checkChannel.ChannelName { + lastChannel = checkChannel + break + } + } + diff := channel.MessageCount - lastChannel.MessageCount + client.Incr("channel.message_count", int64(diff), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + client.Gauge("channel.depth", channel.Depth, &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + client.Gauge("channel.backend_depth", channel.BackendDepth, &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + // stat = fmt.Sprintf("topic.%s.channel.%s.in_flight_count", topic.TopicName, channel.ChannelName) + client.Gauge("channel.in_flight_count", int64(channel.InFlightCount), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + // stat = fmt.Sprintf("topic.%s.channel.%s.deferred_count", topic.TopicName, channel.ChannelName) + client.Gauge("channel.deferred_count", int64(channel.DeferredCount), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + diff = channel.RequeueCount - lastChannel.RequeueCount + // stat = fmt.Sprintf("topic.%s.channel.%s.requeue_count", topic.TopicName, channel.ChannelName) + client.Incr("channel.requeue_count", int64(diff), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + diff = channel.TimeoutCount - lastChannel.TimeoutCount + // stat = fmt.Sprintf("topic.%s.channel.%s.timeout_count", topic.TopicName, channel.ChannelName) + client.Incr("channel.timeout_count", int64(diff), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + // stat = fmt.Sprintf("topic.%s.channel.%s.clients", topic.TopicName, channel.ChannelName) + client.Gauge("channel.clients", int64(len(channel.Clients)), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + + for _, item := range channel.E2eProcessingLatency.Percentiles { + stat = fmt.Sprintf("channel.e2e_processing_latency_%.0f", item["quantile"]*100.0) + client.Gauge(stat, int64(item["value"]), &DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, + }, + }) + } + } + } + lastStats = stats + client.Close() + + if dd.singleLoop { + goto exit + } + } + } + +exit: + ticker.Stop() +} diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go new file mode 100644 index 000000000..5d3b1832c --- /dev/null +++ b/contrib/dogstatsd_test.go @@ -0,0 +1,128 @@ +package contrib + +import ( + "bytes" + "fmt" + "github.com/nsqio/nsq/internal/lg" + "github.com/nsqio/nsq/internal/quantile" + "github.com/nsqio/nsq/internal/test" + "github.com/nsqio/nsq/nsqd" + "net" + "testing" + "time" +) + +type StubNSQD struct{} + +func (n *StubNSQD) Logf(level lg.LogLevel, f string, args ...interface{}) { + fmt.Printf(f, args) +} +func (n *StubNSQD) GetStats() []nsqd.TopicStats { + return []nsqd.TopicStats{ + { + TopicName: "test", + E2eProcessingLatency: &quantile.Result{}, + Channels: []nsqd.ChannelStats{ + { + ChannelName: "test_channel", + E2eProcessingLatency: &quantile.Result{}, + }, + }, + }, + } +} +func (n *StubNSQD) AddModuleGoroutine(addonFn func(exitChan chan int)) {} + +func TestEnabledTrueWhenAddressPresent(t *testing.T) { + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{ + DogStatsdAddress: "test.com.org", + }, + nsqd: &StubNSQD{}, + } + test.Equal(t, dd.Enabled(), true) + +} + +func TestEnabledFalseWhenAddressAbsent(t *testing.T) { + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{}, + nsqd: &StubNSQD{}, + } + test.Equal(t, dd.Enabled(), false) +} + +func TestFlagsParsedSuccess(t *testing.T) { + opts := []string{"-dogstatsd-address", "127.0.0.1:8125"} + addon := NewNSQDDogStatsd(opts, &StubNSQD{}) + test.Equal(t, addon.(*NSQDDogStatsd).opts.DogStatsdAddress, "127.0.0.1:8125") +} + +// Tests that no opts are parsed when the - prefix is missing from the module +// opts. The - is required because the optional module opts list is passed directly +// back to flags.Parse() +func TestFlagsMissingDashPrefix(t *testing.T) { + opts := []string{"dogstatsd-address", "127.0.0.1:8125"} + addon := NewNSQDDogStatsd(opts, &StubNSQD{}) + test.Equal(t, addon.(*NSQDDogStatsd).opts.DogStatsdAddress, "") +} + +func TestLoopSendsCorrectMessages(t *testing.T) { + // setup the UDP Server + conn, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 0, + }) + if err != nil { + panic(err) + } + defer conn.Close() + + // Start the DD loop to send status updates to the test server addr + go func(addr net.Addr) { + exitChan := make(chan int) + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{ + DogStatsdAddress: addr.String(), + DogStatsdInterval: time.Second, + }, + nsqd: &StubNSQD{}, + singleLoop: true, + } + + dd.Loop(exitChan) + + }(conn.LocalAddr()) + + conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + + // read from server conn and assert all data was sent from datadog + // as expected + // Makes X Specific reads + cases := []struct { + Name string + A string + }{ + {Name: "message_count", A: "message_count:0|c|#topic_name:test"}, + {Name: "topic_depth", A: "topic.depth:0|g|#topic_name:test"}, + {Name: "backend_depth", A: "topic.backend_depth:0|g|#topic_name:test"}, + {Name: "channel.message_count", A: "channel.message_count:0|c|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.depth", A: "channel.depth:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.backend_depth", A: "channel.backend_depth:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.in_flight_count", A: "channel.in_flight_count:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.deferred_count", A: "channel.deferred_count:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.requeue_count", A: "channel.requeue_count:0|c|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.timeout_count", A: "channel.timeout_count:0|c|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.clients", A: "channel.clients:0|g|#topic_name:test,channel_name:test_channel"}, + } + for _, tc := range cases { + buffer := make([]byte, 128) + + _, _, err := conn.ReadFromUDP(buffer) + if err != nil { + panic(err) + } + fmt.Println(string(buffer)) + test.Equal(t, string(bytes.Trim(buffer, "\x00")), tc.A) + } +} diff --git a/contrib/nsqd.go b/contrib/nsqd.go new file mode 100644 index 000000000..b69c0760a --- /dev/null +++ b/contrib/nsqd.go @@ -0,0 +1,68 @@ +package contrib + +import ( + "github.com/nsqio/nsq/internal/lg" + "github.com/nsqio/nsq/nsqd" + "strings" +) + +type INSQD interface { + Logf(level lg.LogLevel, f string, args ...interface{}) + GetStats() []nsqd.TopicStats + AddModuleGoroutine(addonFn func(exitChan chan int)) +} + +type initializer func([]string, INSQD) INSQDAddon + +type INSQDAddon interface { + Enabled() bool + Start() +} + +type NSQDAddons struct { + addons []INSQDAddon +} + +// Starts all addons that are active +func (as *NSQDAddons) Start() { + + for _, addon := range as.addons { + addon.Start() + } +} + +func optHasPrefix(opt string, prefix string) bool { + return strings.Index(opt, prefix) == 0 +} + +// Initializes addons that have options set +func NewEnabledNSQDAddons(contribOpts []string, n INSQD) *NSQDAddons { + var activeAddons []INSQDAddon + + initializers := map[string]initializer{ + "-dogstatsd-": NewNSQDDogStatsd, + } + + n.Logf(nsqd.LOG_INFO, "Addons Initializing") + + for prefix, initializer := range initializers { + validOpts := []string{} + // check if any of the options contains this addon's expected argument + // keeps track of all options starting with the correct prefix + // and initializes with the valid options + for _, opt := range contribOpts { + if optHasPrefix(opt, prefix) { + validOpts = append(validOpts, opt) + } + } + + addon := initializer(contribOpts, n) + if addon.Enabled() { + activeAddons = append(activeAddons, addon) + } + } + + return &NSQDAddons{ + addons: activeAddons, + } +} diff --git a/contrib/nsqd_test.go b/contrib/nsqd_test.go new file mode 100644 index 000000000..8c4ea61bf --- /dev/null +++ b/contrib/nsqd_test.go @@ -0,0 +1,37 @@ +package contrib + +import ( + "github.com/nsqio/nsq/internal/test" + "testing" +) + +type TestAddon struct { + numStartCalls int +} + +func (ta *TestAddon) Start() { + ta.numStartCalls += 1 +} + +func (ta *TestAddon) Enabled() bool { + return true +} + +func TestStartMultipleAddons(t *testing.T) { + ta1 := &TestAddon{} + ta2 := &TestAddon{} + + as := &NSQDAddons{ + addons: []INSQDAddon{ta1, ta2}, + } + as.Start() + + test.Equal(t, ta1.numStartCalls, 1) + test.Equal(t, ta2.numStartCalls, 1) +} + +func TestNewEnabledNSQDAddonsNoAddons(t *testing.T) { + var opts []string + addons := NewEnabledNSQDAddons(opts, &StubNSQD{}) + test.Equal(t, addons.addons, []INSQDAddon(nil)) +} diff --git a/nsqd/logger.go b/nsqd/logger.go index 51ac28db7..a2b161695 100644 --- a/nsqd/logger.go +++ b/nsqd/logger.go @@ -18,3 +18,9 @@ func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) { opts := n.getOpts() lg.Logf(opts.Logger, opts.logLevel, level, f, args...) } + +// would like to expose logf to the contrib modules so that contrib can share the +// configuration end user specifies on CLI +func (n *NSQD) Logf(level lg.LogLevel, f string, args ...interface{}) { + n.logf(level, f, args...) +} diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 1649d5d25..5ebbd9e29 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -265,6 +265,10 @@ func (n *NSQD) Main() { } } +func (n *NSQD) AddModuleGoroutine(addonFn func(exitChan chan int)) { + n.waitGroup.Wrap(func() { addonFn(n.exitChan) }) +} + type meta struct { Topics []struct { Name string `json:"name"` diff --git a/nsqd/options.go b/nsqd/options.go index ee461edb9..9157d810e 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -79,6 +79,9 @@ type Options struct { DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` + + // Contrib/Optional Modules + ModOpt []string `flag:"mod-opt"` } func NewOptions() *Options { @@ -141,5 +144,7 @@ func NewOptions() *Options { SnappyEnabled: true, TLSMinVersion: tls.VersionTLS10, + + ModOpt: make([]string, 0), } }