nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.

It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).

It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

Command Line Options

-auth-http-address value
    <addr>:<port> to query auth server (may be given multiple times)
-broadcast-address string
    address that will be registered with lookupd (defaults to the OS hostname) (default "yourhost.local")
-broadcast-http-port int
    HTTP port that will be registered with lookupd (defaults to the HTTP port that this nsqd is listening to)
-broadcast-tcp-port int
    TCP port that will be registered with lookupd (defaults to the TCP port that this nsqd is listening to)
-config string
    path to config file
-data-path string
    path to store disk-backed messages
-deflate
    enable deflate feature negotiation (client compression) (default true)
-e2e-processing-latency-percentile value
    message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)
-e2e-processing-latency-window-time duration
    calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s)
-http-address string
    <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151")
-http-client-connect-timeout duration
    timeout for HTTP connect (default 2s)
-http-client-request-timeout duration
    timeout for HTTP request (default 5s)
-https-address string
    <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152")
-log-level value
    set log verbosity: debug, info, warn, error, or fatal (default INFO)
-log-prefix string
    log message prefix (default "[nsqd] ")
-lookupd-tcp-address value
    lookupd TCP address (may be given multiple times)
-max-body-size int
    maximum size of a single command body (default 5242880)
-max-bytes-per-file int
    number of bytes per diskqueue file before rolling (default 104857600)
-max-channel-consumers int
    maximum channel consumer connection count per nsqd instance (default 0, i.e., unlimited)
-max-deflate-level int
    max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6)
-max-heartbeat-interval duration
    maximum client configurable duration of time between client heartbeats (default 1m0s)
-max-msg-size int
    maximum size of a single message in bytes (default 1048576)
-max-msg-timeout duration
    maximum duration before a message will timeout (default 15m0s)
-max-output-buffer-size int
    maximum client configurable size (in bytes) for a client output buffer (default 65536)
-max-output-buffer-timeout duration
    maximum client configurable duration of time between flushing to a client (default 30s)
-max-rdy-count int
    maximum RDY count for a client (default 2500)
-max-req-timeout duration
    maximum requeuing timeout for a message (default 1h0m0s)
-mem-queue-size int
    number of messages to keep in memory (per topic/channel) (default 10000)
-min-output-buffer-timeout duration
    minimum client configurable duration of time between flushing to a client (default 25ms)
-msg-timeout duration
    default duration to wait before auto-requeing a message (default 1m0s)
-node-id int
    unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 248)
-output-buffer-timeout duration
    default duration of time between flushing data to clients (default 250ms)
-snappy
    enable snappy feature negotiation (client compression) (default true)
-statsd-address string
    UDP <addr>:<port> of a statsd daemon for pushing stats
-statsd-interval duration
    duration between pushing to statsd (default 1m0s)
-statsd-mem-stats
    toggle sending memory and GC stats to statsd (default true)
-statsd-prefix string
    prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s")
-statsd-udp-packet-size int
    the size in bytes of statsd UDP packets (default 508)
-sync-every int
    number of messages per diskqueue fsync (default 2500)
-sync-timeout duration
    duration of time per diskqueue fsync (default 2s)
-tcp-address string
    <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150")
-tls-cert string
    path to certificate file
-tls-client-auth-policy string
    client certificate auth policy ('require' or 'require-verify')
-tls-key string
    path to key file
-tls-min-version value
    minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769)
-tls-required
    require TLS for client connections (true, false, tcp-https)
-tls-root-ca-file string
    path to certificate authority file
-verbose
    [deprecated] has no effect, use --log-level
-version
    print version string
-worker-id
    [deprecated] use --node-id

HTTP API

v1 namespace (as of nsqd v0.2.29+):

  • /topic/create - create a new topic
  • /topic/delete - delete a topic
  • /topic/empty - empty a topic
  • /topic/pause - pause message flow for a topic
  • /topic/unpause - unpause message flow for a topic
  • /channel/create - create a new channel
  • /channel/delete - delete a channel
  • /channel/empty - empty a channel
  • /channel/pause - pause message flow for a channel
  • /channel/unpause - unpause message flow for a channel

  • POST /pub

    Publish a message

    Query Params:

    topic - the topic to publish to
    defer - the time in ms to delay message delivery (optional)
    

    Body:

    raw message bytes
    

    Example:

    $ curl -d "<message>" http://127.0.0.1:4151/pub?topic=name
    
  • POST /mpub

    Publish multiple messages in one roundtrip

    Query Params:

    topic - the topic to publish to
    binary - bool ('true' or 'false') to enable binary mode
    

    Body

    \n separated raw message bytes
    

    NOTE: by default /mpub expects messages to be delimited by \n, use the ?binary=true query parameter to enable binary mode where the POST body is expected to be in the following format (the HTTP Content-Length header should be sent as the total size of the POST body):

    [ 4-byte num messages ]
    [ 4-byte message #1 size ][ N-byte binary data ]
          ... (repeated <num_messages> times)
    

    Example:

    $ curl -d "<message>\n<message>\n<message>" http://127.0.0.1:4151/mpub?topic=name
    
  • POST /topic/create

    Create a topic

    Query Params:

    topic - the topic to create
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/topic/create?topic=name
    
  • POST /topic/delete

    Delete an existing topic (and all channels)

    Query Params:

    topic - the existing topic to delete
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/topic/delete?topic=name
    
  • POST /channel/create

    Create a channel for an existing topic

    Query Params:

    topic - the existing topic
    channel - the channel to create
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/channel/create?topic=name&channel=name
    
  • POST /channel/delete

    Delete an existing channel on an existing topic

    Query Params:

    topic - the existing topic
    channel - the existing channel to delete
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/channel/delete?topic=name&channel=name
    
  • POST /topic/empty

    Empty all the queued messages (in-memory and disk) for an existing topic

    Query Params:

    topic - the existing topic to empty
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/topic/empty?topic=name
    
  • POST /channel/empty

    Empty all the queued messages (in-memory and disk) for an existing channel

    Query Params:

    topic - the existing topic
    channel - the existing channel to empty
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/channel/empty?topic=name&channel=name
    
  • POST /topic/pause

    Pause message flow to all channels on an existing topic (messages will queue at topic)

    Query Params:

    topic - the existing topic
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/topic/pause?topic=name
    
  • POST /topic/unpause

    Resume message flow to channels of an existing, paused, topic

    Query Params:

    topic - the existing topic
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/topic/unpause?topic=name
    
  • POST /channel/pause

    Pause message flow to consumers of an existing channel (messages will queue at channel)

    Query Params:

    topic - the existing topic
    channel - the existing channel to pause
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/channel/pause?topic=name&channel=name
    
  • POST /channel/unpause

    Resume message flow to consumers of an existing, paused, channel

    Query Params:

    topic - the existing topic
    channel - the existing channel to pause
    

    Example:

    $ curl -X POST http://127.0.0.1:4151/channel/unpause?topic=name&channel=name
    
  • GET /stats

    Return internal statistics

    Query Params

    format - (optional) `text` or `json` (default = `text`)
    topic - (optional) filter to topic
    channel - (optional) filter to channel
    include_clients - (optional) include clients stats (default = `true`)
    include_mem - (optional) include memory stats (default = `true`)
    

    Example:

    $ curl http://127.0.0.1:4151/stats
    
  • GET /ping

    Monitoring endpoint, should return 200 OK. It returns an HTTP 500 if it is not healthy.

    NOTE: The only “unhealthy” state is if nsqd failed to write messages to disk when overflow occurred.

  • GET /info

    Version information

  • GET /debug/pprof

    An index page of available debugging endpoints

  • GET /debug/pprof/profile

    Starts a pprof CPU profile for 30s and returns the output via the request

    NOTE: this endpoint is not listed in the /debug/pprof index page because of its effect on runtime performance.

  • GET /debug/pprof/goroutine

    Returns a stack trace for all running goroutines

  • GET /debug/pprof/heap

    Returns a heap and memstats profile (top portion can be used as a pprof memory profile)

  • GET /debug/pprof/block

    Returns a goroutine blocking profile

  • GET /debug/pprof/threadcreate

    Returns goroutine stack traces that led to the creation of an OS thread

  • GET /config/nsqlookupd_tcp_addresses

    List of nsqlookupd TCP addresses.

    Example:

    $ curl http://127.0.0.1:4151/config/nsqlookupd_tcp_addresses
    
  • PUT /config/nsqlookupd_tcp_addresses

    Update the nsqlookupd TCP addresses.

    Body:

    JSON array of TCP addresses.
    

    Example:

    $ curl -X PUT http://127.0.0.1:4151/config/nsqlookupd_tcp_addresses \
        -d '["127.0.0.1:4160", "127.0.0.2:4160"]'
    

Debugging and Profiling

nsqd provides a suite of profiling endpoints that integrate directly with Go’s pprof tool. If you have the go tool suite installed, simply run:

# memory profiling
$ go tool pprof http://localhost:4151/debug/pprof/heap

# cpu profiling
$ go tool pprof http://localhost:4151/debug/pprof/profile

TLS

When nsqd is configured with --tls-cert and --tls-key clients can negotiate upgrading their connection to TLS for enhanced security.

Additionally, you can require that clients negotiate TLS with --tls-required (as of nsqd v0.2.28+).

You can configure an nsqd client certificate policy via --tls-client-auth-policy (require or require-verify):

  • require - the client must offer a certificate, otherwise rejected
  • require-verify - the client must offer a valid certificate according to the default CA or the chain specified by --tls-root-ca-file, otherwise rejected

This can be used as a form of client authentication (as of nsqd v0.2.28+).

If you want to generate a password-less self-signed certificate using openssl:

$ openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes

AUTH

NOTE: available in nsqd v0.2.29+

To configure nsqd to require authorization you need to specify the -auth-http-address=host:port flag with an Auth Server that conforms to the Auth HTTP protocol.

NOTE: It is expected when using authorization that only the nsqd TCP protocol is exposed to external clients, not the HTTP(S) endpoints. See the note below about exposing stats and lookup to clients with auth.

The Auth Server must accept an HTTP request on:

/auth?remote_ip=...&tls=...&auth_secret=...

And return a response in the following format:

{
  "ttl": 3600,
  "identity": "username",
  "identity_url": "https://....",
  "authorizations": [
    {
      "permissions": [
        "subscribe",
        "publish"
      ],
      "topic": ".*",
      "channels": [
        ".*"
      ]
    }
  ]
}

Note that topic and channel strings must be regexes for nsqd to apply permissions. nsqd will cache the response for up to the TTL duration and will re-request authorization on that interval.

It is expected that in most cases authorization will be used with TLS to secure sensitive information secrets passed from clients to nsqd. Communication between nsqd and the Auth Server is expected to be over a trusted network (and is not encrypted). If an Auth Server is making auth choices solely based on remote IP information, clients can use a placeholder string (like .) as the AUTH command body (which the Auth Server ignores).

An example Auth Server is pynsqauthd.

A helper server to expose nsqlookupd and nsqd /stats data to clients, filtered by permissions from the Auth Server, can be found in nsqauthfilter

When using the command line utilities, authorization can be used via the --reader-opt flag.

$ nsq_tail ... -reader-opt="tls_v1,true" -reader-opt="auth_secret,$SECRET"

End-to-End Processing Latency

You can optionally configure nsqd to collect and emit end-to-end message processing latency for configurable percentiles using the --e2e-processing-latency-percentile flag.

The values are calculated using a probabilistic percentile technique described in Effective Computation of Biased Quantiles over Data Streams. We use the perks package by bmizerany which implements this algorithm.

In order to bias the view toward more recent processing behavior we only keep quantile information for the past N minutes (configurable via --e2e-processing-latency-window-time). Internally we maintain two quantiles, per channel, that each store N/2 minutes worth of latency data. Every N/2 minutes we reset one quantile (and start inserting new data into it). Since quantiles can be merged, this results in a coarse rolling window.

Since we only collect data at the channel level, for a topic we aggregate and merge all channel quantiles. This technique can only be used if the data is on the same nsqd instance. However when data is being accumulated across nsqd (for instance via nsqlookupd), we take the average of the quantile values for each nsqd. In order to maintain some statistical accuracy regarding the distribution of latencies across nsqd, we also provide the min/max values in addition to the average.

NOTE: if there are no consumers connected the values cannot update despite there (obviously) being a slowly increasing end-to-end processing time for the queued messages. This is because end-to-end metrics are only calculated when nsqd receives a FIN for a given message from a client. When a consumer reconnects the values will adjust appropriately.

Statsd / Graphite Integration

When using --statsd-address to specify the UDP <addr>:<port> for statsd (or a port of statsd like statsdaemon), nsqd will push metrics to statsd periodically based on the interval specified in --statsd-interval (IMPORTANT: this interval should always be less than or equal to the interval at which statsd flushes to graphite). With this enabled nsqadmin can be configured to display charts directly from graphite.

We recommend the following configuration for graphite (but these choices should be evaluated based on your available resources and requirements). Again, the important piece to remember is that statsd should flush at an interval less than or equal to the smallest time bucket in storage-schemas.conf and nsqd should be configured to flush at or below that same interval via --statsd-interval.

# storage-schemas.conf
[nsq]
pattern = ^nsq\..*
retentions = 1m:1d,5m:30d,15m:1y

# storage-aggregation.conf
[default_nsq]
pattern = ^nsq\..*
xFilesFactor = 0.2
aggregationMethod = average

The nsqd instance will push to the following statsd paths:

nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.backend_depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.message_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.message_bytes
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.backend_depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.clients [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.deferred_count [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.in_flight_count [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.message_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.requeue_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.timeout_count

# if --statsd-mem-stats is enabled
nsq.<nsqd_host>_<nsqd_port>.mem.heap_objects [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_idle_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_in_use_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_released_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_100 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_99 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_95 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.mem.next_gc_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_runs

# if --e2e-processing-latency-percentile is specified, for each percentile
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.e2e_processing_latency_<percent> [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.e2e_processing_latency_<percent> [gauge]