View on GitHub

amqplib

AMQP 0-9-1 library and client for Node.JS

Download this project as a .zip file Download this project as a tar.gz file

Channel-oriented API reference

This is the "main" module in the library:

var amqp = require('amqplib');

The client API is based closely on the protocol model. The modus operandi is to create channels on which to issue commands. Most errors in AMQP invalidate just the channel which had problems, so this ends up being a fairly natural way to use AMQP. The downside is that it doesn't give any guidance on useful ways to use AMQP; that is, it does little beyond giving access to the various AMQP methods.

Most operations in AMQP are RPCs, synchronous at the channel layer of the protocol but asynchronous from the library's point of view; accordingly, most methods return promises yielding the server's reply (often containing useful information such as generated identifiers). RPCs are queued by the channel if it is already waiting for a reply -- synchronising on RPCs in this way is implicitly required by the protocol specification.

Failed operations will

Since the RPCs are effectively synchronised, any such channel error is very likely to have been caused by the outstanding RPC. However, it's often sufficient to fire off a number of RPCs and check only the returned promise for the last, since it'll be rejected if it or any of its predecessors fail.

The exception thrown on operations subsequent to a failure or closure also contains the stack at the point that the channel was closed, in the field stackAtStateChange. This may be useful to determine what has caused an unexpected closure.

connection.createChannel().then(function(ch) {
  ch.close();
  try {
    ch.close();
  }
  catch (alreadyClosed) {
    console.log(alreadyClosed.stackAtStateChange);
  }
});

Promises returned from methods are amenable to composition using, for example, when.js's functions:

amqp.connect().then(function(conn) {
  var ok = conn.createChannel();
  ok = ok.then(function(ch) {
    return when.all([
      ch.assertQueue('foo'),
      ch.assertExchange('bar'),
      ch.bindQueue('foo', 'bar', 'baz'),
      ch.consume('foo', handleMessage)
    ]);
  });
  return ok;
}).then(null, console.warn);

Many operations have mandatory arguments as well as optional arguments with defaults; in general, the former appear as parameters to the method while latter are collected in a single options parameter, to be supplied as an object with the fields mentioned. Extraneous fields in options are ignored, so it is often possible to coalesce the options for a number of operations into a single object, should that be convenient. Likewise, fields from the prototype chain are accepted, so a common options value can be specialised by e.g., using Object.create(common) then setting some fields.

Often, AMQP commands have an arguments table that can contain arbitrary values, usually used by implementation-specific extensions like RabbitMQ's consumer priorities. This is accessible as the option arguments, an object: if an API method does not account for an extension in its options, you can fall back to using the arguments object, though bear in mind that the field name will usually be 'x-something', while the options are just 'something'. Values passed in options, if understood by the API, will override those given in arguments.

var common_options = {durable: true, noAck: true};
ch.assertQueue('foo', common_options);
// Only 'durable' counts for queues

var bar_opts = Object.create(common_options);
bar_opts.autoDelete = true;
// "Subclass" our options
ch.assertQueue('bar', bar_opts);

var foo_consume_opts = Object.create(common_options);
foo_consume_opts.arguments = {'x-priority': 10};
ch.consume('foo', console.log, foo_consume_opts);
// Use the arguments table to give a priority, even though it's
// available as an option

var bar_consume_opts = Object.create(foo_consume_opts);
bar_consume_opts.priority = 5;
ch.consume('bar', console.log, bar_consume_opts);
// The 'priority' option will override that given in the arguments
// table

connect([url], [socketOptions])

Connect to an AMQP 0-9-1 server, optionally given an AMQP URL (see AMQP URI syntax) and socket options. The protocol part (amqp: or amqps:) is mandatory; defaults for elided parts are as given in 'amqp://guest:guest@localhost:5672'. If the URI is omitted entirely, it will default to 'amqp://localhost', which given the defaults for missing parts, will connect to a RabbitMQ installation with factory settings, on localhost.

For convenience, an absent path segment (e.g., as in the URLs just given) is interpreted as the virtual host named /, which is present in RabbitMQ out of the box. Per the URI specification, just a trailing slash as in 'amqp://localhost/' would indicate the virtual host with an empty name, which does not exist unless it's been explicitly created. When specifying another virtual host, remember that its name must be escaped; so e.g., the virtual host named /foo is '%2Ffoo'; in a full URI, 'amqp://localhost/%2Ffoo'.

Further AMQP tuning parameters may be given in the query part of the URI, e.g., as in 'amqp://localhost?frameMax=0x1000'. These are:

The socket options will be passed to the socket library (net or tls). They must be fields set on the object supplied; i.e., not on a prototype. This is useful for supplying certificates and so on for an SSL connection; see the SSL guide.

The socket options may also include the key noDelay, with a boolean value. If the value is true, this sets TCP_NODELAY on the underlying socket.

Returns a promise which will either be resolved with an open ChannelModel or rejected with a sympathetically-worded error (in en_US).

Supplying a malformed URI will cause connect() to throw an exception; other problems, including refused and dropped TCP connections, will result in a rejected promise.

RabbitMQ since version 3.2.0 will send a frame to notify the client of authentication failures, which results in a rejected promise; RabbitMQ before version 3.2.0, per the AMQP specification, will close the socket in the case of an authentication failure, making a dropped connection ambiguous (it will also wait a few seconds before doing so).

Heartbeating

If you supply a non-zero period in seconds as the heartbeat parameter, the connection will be monitored for liveness. If the client fails to read data from the connection for two successive intervals, the connection will emit an error and close. It will also send heartbeats to the server (in the absence of other data).

new ChannelModel(connection)

This constructor represents a connection in the channel API. It takes as an argument a connection.Connection; though it is better to use connect(), which will open the connection for you. It is exported as a potential extension point.

ChannelModel#close

Close the connection cleanly. Will immediately invalidate any unresolved operations, so it's best to make sure you've done everything you need to before calling this. Returns a promise which resolves once the connection, and underlying socket, are closed. The ChannelModel will also emit 'close' at that point.

Although it's not strictly necessary, it will avoid some warnings in the server log if you close the connection before exiting:

var open = amqp.connect();
open.then(function(conn) {
  var ok = doStuffWithConnection(conn);
  return ok.then(conn.close.bind(conn));
}).then(null, console.warn);

Note that I'm synchronising on the return value of doStuffWithConnection(), presumably a promise, so that I can be sure I'm all done.

If your program runs until interrupted, you can hook into the process signal to close the connection:

var open = amqp.connect();
open.then(function(conn) {
  process.once('SIGINT', conn.close.bind(conn));
  return doStuffWithConnection(conn);
}).then(null, console.warn);

NB it's no good using process.on('exit', ...), since close() needs to do I/O.

ChannelModel#on('close', function() {...})

Emitted once the closing handshake initiated by #close() has completed; or, if server closed the connection, once the client has sent the closing handshake; or, if the underlying stream (e.g., socket) has closed.

ChannelModel#on('error', function (err) {...})

Emitted if the connection closes for any reason other than #close being called; such reasons include:

'close' will also be emitted, after 'error'.

ChannelModel#on('blocked', function(reason) {...})

Emitted when a RabbitMQ server (after version 3.2.0) decides to block the connection. Typically it will do this if there is some resource shortage, e.g., memory, and messages are published on the connection. See the RabbitMQ documentation for this extension for details.

ChannelModel#on('unblocked', function() {...})

Emitted at some time after 'blocked', once the resource shortage has alleviated.

ChannelModel#createChannel()

Open a fresh channel. Returns a promise of an open Channel. May fail if there are no more channels available (i.e., if there are already channelMax channels open).

new Channel(connection)

This constructor represents a protocol channel. Channels are multiplexed over connections, and represent something like a session, in that most operations (and thereby most errors) are scoped to channels.

The constructor is exported from the module as an extension point. When using the client library in an application, obtain an open Channel by opening a connection (connect() above) and calling #createChannel or #createConfirmChannel.

Channel#close()

Close a channel. Returns a promise which will be resolved once the closing handshake is complete.

There's not usually any reason to close a channel rather than continuing to use it until you're ready to close the connection altogether. However, the lifetimes of consumers are scoped to channels, and thereby other things such as exclusive locks on queues, so it is occasionally worth being deliberate about opening and closing channels.

Channel#on('close', function() {...})

A channel will emit 'close' once the closing handshake (possibly initiated by #close()) has completed; or, if its connection closes.

When a channel closes, any unresolved operations on the channel will be abandoned (and the returned promises rejected).

Channel#on('error', function(err) {...})

A channel will emit 'error' if the server closes the channel for any reason. Such reasons include

A channel will not emit 'error' if its connection closes with an error.

Channel#on('return', function(msg) {...})

If a message is published with the mandatory flag (it's an option to Channel#publish in this API), it may be returned to the sending channel if it cannot be routed. Whenever this happens, the channel will emit return with a message object (as described in #consume) as an argument.

Channel#on('drain', function() {...})

Like a stream.Writable, a channel will emit 'drain', if it has previously returned false from #publish or #sendToQueue, once its write buffer has been emptied (i.e., it is ready for writes again).

Channel#assertQueue([queue], [options])

Assert a queue into existence. This operation is idempotent given identical arguments; however, it will bork the channel if the queue already exists but has different properties (values supplied in the arguments field may or may not count for borking purposes; check the broker's documentation).

queue is a string; if you supply an empty string or other falsey value, the server will create a random name for you.

options is an object and may also be omitted. The relevant fields in options are:

RabbitMQ extensions can also be supplied as options. These typically require non-standard x-* keys and values sent in the arguments table; e.g., x-expires. Here, I've removed the x- prefix and made them options; they will overwrite anything you supply in arguments.

Returns a promise of the "ok" reply from the server, which includes fields for the queue name (important if you let the server name it), a recent consumer count, and a recent message count; e.g.,

{
  queue: 'foobar',
  messageCount: 0,
  consumerCount: 0
}

Channel#checkQueue(queue)

Check whether a queue exists. This will bork the channel if the named queue doesn't exist; if it does exist, you go through to the next round! There's no options as with #assertQueue(), just the queue name. The reply from the server is the same as for #assertQueue().

Channel#deleteQueue(queue, [options])

Delete the queue named. Naming a queue that doesn't exist will result in the server closing the channel, to teach you a lesson (except in RabbitMQ version 3.2.0 and after1). The options here are:

Note the obverse semantics of the options: if both are true, the queue will be deleted only if it has no consumers and no messages.

You should leave out the options altogether if you want to delete the queue unconditionally.

The server reply contains a single field, messageCount, with the number of messages deleted or dead-lettered along with the queue.

Channel#purgeQueue(queue)

Remove all undelivered messages from the queue named. Note that this won't remove messages that have been delivered but not yet acknowledged; they will remain, and may be requeued under some circumstances (e.g., if the channel to which they were delivered closes without acknowledging them).

The server reply contains a single field, messageCount, containing the number of messages purged from the queue.

Channel#bindQueue(queue, source, pattern, [args])

Assert a routing path from an exchange to a queue: the exchange named by source will relay messages to the queue named, according to the type of the exchange and the pattern given. The RabbitMQ tutorials give a good account of how routing works in AMQP.

args is an object containing extra arguments that may be required for the particular exchange type (for which, see your server's documentation). It may be omitted if not needed, which is equivalent to an empty object.

The server reply has no fields.

Channel#unbindQueue(queue, source, pattern, [args])

Remove a routing path between the queue named and the exchange named as source with the pattern and arguments given. Omitting args is equivalent to supplying an empty object (no arguments). Beware: attempting to unbind when there is no such binding may result in a punitive error (the AMQP specification says it's a connection-killing mistake; RabbitMQ before version 3.2.0 softens this to a channel error, and from version 3.2.0, doesn't treat it as an error at all1. Good ol' RabbitMQ).

Channel#assertExchange(exchange, type, [options])

Assert an exchange into existence. As with queues, if the exchange exists already and has properties different to those supplied, the channel will 'splode; fields in the arguments object may or may not be 'splodey, depending on the type of exchange. Unlike queues, you must supply a name, and it can't be the empty string. You must also supply an exchange type, which determines how messages will be routed through the exchange.

NB There is just one RabbitMQ extension pertaining to exchanges in general (alternateExchange); however, specific exchange types may use the arguments table to supply parameters.

The options:

The server reply echoes the exchange name, in the field exchange.

Channel#checkExchange(exchange)

Check that an exchange exists. If it doesn't exist, the channel will be closed with an error. If it does exist, happy days.

Channel#deleteExchange(name, [options])

Delete an exchange. The only meaningful field in options is:

If the exchange does not exist, a channel error is raised (RabbitMQ version 3.2.0 and after will not raise an error1).

The server reply has no fields.

Channel#bindExchange(destination, source, pattern, [args])

Bind an exchange to another exchange. The exchange named by destination will receive messages from the exchange named by source, according to the type of the source and the pattern given. For example, a direct exchange will relay messages that have a routing key equal to the pattern.

NB Exchange to exchange binding is a RabbitMQ extension.

The server reply has no fields.

Channel#unbindExchange(destination, source, pattern, [args])

Remove a binding from an exchange to another exchange. A binding with the exact source exchange, destination exchange, routing key pattern, and extension args will be removed. If no such binding exists, it's – you guessed it – a channel error, except in RabbitMQ >= version 3.2.0, for which it succeeds trivially1.

Channel#publish(exchange, routingKey, content, [options])

Publish a single message to an exchange. The mandatory parameters (these go in the publish method itself) are:

The remaining parameters are provided as fields in options, and are divided into those that have some meaning to RabbitMQ and those that will be ignored by RabbitMQ but passed on to consumers. options may be omitted altogether, in which case defaults as noted will apply.

The "meaningful" options are a mix of fields in BasicDeliver (the method used to publish a message), BasicProperties (in the message header frame) and RabbitMQ extensions which are given in the headers table in BasicProperties.

Used by RabbitMQ and sent on to consumers:

Used by RabbitMQ but not sent on to consumers:

Not used by RabbitMQ and not sent to consumers:

Ignored by RabbitMQ (but may be useful for applications):

#publish mimics the stream.Writable interface in its return value; it will return false if the channel's write buffer is 'full', and true otherwise.

Channel#sendToQueue(queue, content, [options])

Send a single message with the content given as a buffer to the specific queue named, bypassing routing. The options and return value are exactly the same as for #publish.

Channel#consume(queue, callback, [options])

Set up a consumer with a callback to be invoked with each message.

Options (which may be omitted altogether):

The server reply contains one field, consumerTag. It is necessary to remember this somewhere if you will later want to cancel this consume operation (i.e., to stop getting messages).

The callback supplied will be invoked with message objects of this shape:

{
  content: Buffer,
  fields: Object,
  properties: Object
}

The message content is a buffer containing the bytes published.

The fields object has a handful of bookkeeping values largely of interest only to the library code: deliveryTag, a serial number for th message; consumerTag, identifying the consumer for which the message is destined; exchange and routingKey giving the routing information with which the message was published; and, redelivered, which if true indicates that this message has been delivered before and been handed back to the server (e.g., by a nack or recovery operation).

The properties object contains message properties, which are all the things mentioned under #publish as options that are transmitted. Note that RabbitMQ extensions (just CC, presently) are sent in the headers table so will appear there in deliveries.

If the consumer is cancelled by RabbitMQ, the callback will be invoked with null.

Channel#cancel(consumerTag)

This instructs the server to stop sending messages to the consumer identified by consumerTag. Messages may arrive between sending this and getting its reply; once the reply has resolved, however, there will be no more messages for the consumer, i.e., the callback will not be invoked.

The consumerTag is the string given in the reply to #consume, which may have been generated by the server.

Channel#get(queue, [options])

Ask a queue for a message, as an RPC. This returns a promise that will be resolved with either false, if there is no message to be had, or a message (in the same shape as detailed in #consume).

Options:

Channel#ack(message, [allUpTo])

Acknowledge the given message, or all messages up to and including the given message.

If a #consume or #get is issued with noAck: false (the default), the server will expect acknowledgements for messages before forgetting about them. If no such acknowledgement is given, those messages may be requeued once the channel is closed.

If allUpTo is true, all outstanding messages prior to and including the given message shall be considered acknowledged. If false, or omitted, only the message supplied is acknowledged.

It's an error to supply a message that either doesn't require acknowledgement, or has already been acknowledged. Doing so will errorise the channel. If you want to acknowledge all the messages and you don't have a specific message around, use #ackAll.

Channel#ackAll()

Acknowledge all outstanding messages on the channel. This is a "safe" operation, in that it won't result in an error even if there are no such messages.

Channel#nack(message, [allUpTo], [requeue])

Reject a message. This instructs the server to either requeue the message or throw it away (which may mean dead-lettering it).

If 'allUpTo' is true, all outstanding messages prior to and including the given message are rejected. As with ack, it's a channel-ganking error to use a message that is not outstanding. Defaults to false.

If requeue is true, the server will try to put the message or messages back on the queue or queues from which they came. Defaults to true if not given, so if you want to make sure messages are dead-lettered or discarded, supply false here.

This and #nackAll use a RabbitMQ-specific extension.

Channel#nackAll([requeue])

Reject all messages outstanding on this channel. If requeue is true, or omitted, the server will try to re-enqueue the messages.

Channel#reject(message, [requeue])

Reject a message. Equivalent to #nack(message, false, requeue), but works in older versions of RabbitMQ (< v2.3.0) where nack does not.

Channel#prefetch(count)

Set the prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged. A falsey value for count indicates no such limit.

NB RabbitMQ v3.3.0 changes the meaning of prefetch (basic.qos) to apply per-*consumer*, rather than per-channel. It will apply to consumers started after the method is called. See rabbitmq-prefetch.

Channel#recover()

Requeue unacknowledged messages on this channel. The returned promise will be resolved (with an empty object) once all messages are requeued.

ChannelModel#createConfirmChannel()

Create a channel which uses confirmations (a RabbitMQ extension). As with #createChannel, the return value is a promise that will be resolved with an open channel.

On the resulting channel, each published message is 'acked' or (in exceptional circumstances) 'nacked' by the server, thereby indicating that it's been dealt with.

A confirm channel has the same methods as a regular channel, except that #publish and #sendToQueue take a callback as an additional argument:

var open = require('amqplib').connect();
open.then(function(c) {
  c.createConfirmChannel().then(function(ch) {
    ch.sendToQueue('foo', new Buffer('foobar'), {},
                   function(err, ok) {
                     if (err !== null)
                       console.warn('Message nacked!');
                     else
                       console.log('Message acked');
    });
  });
});

In practice this means the options argument must be supplied, at least as an empty object.

There are, broadly speaking, two uses for confirms. The first is to be able to act on the information that a message has been accepted, for example by responding to an upstream request. The second is to rate limit a publisher by limiting the number of unconfirmed messages it's allowed.

ConfirmChannel(connection)

This constructor is a channel that uses confirms. It is exported as an extension point. To obtain such a channel, use connect to get a connection, then call #createConfirmChannel.


RabbitMQ and deletion

RabbitMQ version 3.2.0 makes queue and exchange deletions (and unbind) effectively idempotent, by not raising an error if the exchange, queue, or binding does not exist.

This does not apply to preconditions given to the operations. For example deleting a queue with {ifEmpty: true} will still fail if there are messages in the queue.