Index of all namespaces
MQ-based API router
An API router is a component that maps incoming messages to Clojure functions, optionally sending the return value of the function as a reply message. The router's main documentation is below, in the IAPIRouter docstring.
Utility functions shared by multiple API router implementations.
API router (instance of IAPIRouter) that uses function metadata to determine valid API handler functions. Any Clojure function with the :api metadata key is a valid API handler. When given an API call message, the handler will execute any such function. prefix will be prepended to API callers' requests, to restrict their scope. For example, if your API handlers are all in foo.bar.api-handlers (e.g. foo.bar.api-handlers/some-function, foo.bar.api-handlers/other-function, and so on), you can supply a prefix of "foo.bar.api-handlers/", and clients then supply "some-function" and "other-function" as their calls. (Don't forget the trailing slash!)
API router (instance of IAPIRouter) that uses a predefined lookup table to map API calls to Clojure functions. * table-atom is an atom of a map. Keys are keyword API call names (typically keywords), and values are functions. The router will accept string API call names in incoming messages and automatically keywordize them, for JSON compatibility.
Facilities to connect an IQueueEndpoint to core.async channels. The IChannelEndpoint protocol defines the capability to provide core.async endpoints for enqueueing and dequeueing messages on the underlying endpoint. It is initially intended to be built on top of an IQueueEndpoint, but other mechanisms are possible. Helper methods are provided to perform the connection. Given an IQueueEndpoint and an async channel, endpoint> takes messages from the endpoint and places them on the channel. endpoint< takes messages from the channel and places them on the endpoint.
Facilities to ensure that an IChannelEndpoint can send and receive any Clojure data structure via the IDataEndpoint protocol.
The IEndpoint protocol defines a type with a set of operations for interacting with a generic MQ-like asynchronous communications endpoint and providing normalized messages to the rest of the system. Concrete connectors are provided in the bureaucrat.endpoints package. TODO: Send messages in batches. TODO: Retrieve without auto-acknowledge TODO: Support topics in addition to queues? TODO: Conflates the transport mechanism (e.g. HornetQ or IronMQ) with queue naming. It'd be better to have these seperate.
Implementation of the IEndpoint protocol for Immutant's HornetQ message queue. * NOTE 4/2014: HornetQ is being put on hold until Immutant 2.0 comes out. Immutant 2 will give us an easy way to deal with just HornetQ, rather than the whole JBoss stack. Until then, don't expect this to work. Pull requests welcome! User Documentation ================== The name field specifies the queue's string name. Create-time options may be nil, or it may be a map of [http://immutant.org/documentation/current/apidoc/immutant.messaging.html#var-start](Immutant HornetQ options). The options you provide will be passed through directly to immutant.messaging/start. The metadata map given to send! supports any of the options available to immutant.messaging/publish, in addition to those required by IQueueEndpoint. Internal Documentation for Library Developers ============================================= * Handler-cache is an atom used to store a handler function that will be invoked in a background thread when messages are available on the queue. It should accept one argument, which is the message being processed. Note that the handler function are stored per-instance, not globally, and that the underlying HornetQ endpoint is global (i.e. the same underlying queue in HornetQ can potentially be shared by many HornetQEndpoint instances.) If you lose your HornetQEndpoint object without stopping it or removing its listeners, its listeners will still remain attached to the underlying HornetQ queue! TODO: Implement receive-batch! TODO: Configurable DLQ per queue rather than a systemwide one? TODO: State machine modelling backend state? TODO: Cache / memoize (get-backend) calls
Implementation of the IQueueEndpoint protocol for IronMQ. * IronMQ only supports strings as messages. If your message is not a string, it will be coerced into a string with pr-str. It is recommended that you use middleware to ensure that all messages are stringified as you like before they reach this endpoint. * Be aware that these functions throw exceptions if the network is down, since they cannot communicate with Iron in that case. A future version may wrap them in retry logic (or even better, a bridge from a local MQ.) * If a handler function throws an exception, the message is moved to the dead letter queue without a retry. Future versions may implement retries. * IronMQ does not have intrinsic dead letter queues. We simulate a limited DLQ here by creating a queue called "DLQ". If a message listener function throws an exception, the message that caused the exception is placed on the DLQ. * IronMQ's push mode requires you to expose an HTTP endpoint, with associated security annoyances. Future versions may support this. For now, we poll the queue every 5 seconds when a message handler function is registered. Webhook push may be implemented in the future. Internal Documentation for Developers ------------------------------------- * :iron-cache stores the threadpool that polls IronMQ in the background and executes the listener function when messages become available. TODO: I don't like that the programmer must remember to unregister the listener when it is no longer needed, since forgetting about one will leak threads with no (easy) way to find a reference to them and shut them down. That needs a better solution. TODO: use webhook rather than polling.
->IronMQ-EDN-Endpoint ->IronMQ-JSON-Endpoint ->IronMQEndpoint ichannelendpoint-implementations iqueueendpoint-implementations ironmq-edn-endpoint ironmq-endpoint ironmq-json-endpoint log-prefix map->IronMQ-EDN-Endpoint map->IronMQ-JSON-Endpoint map->IronMQEndpoint poll-sleep-time try-handler try-to-get-message
User-facing convenience code to assist with Bureaucrat.
Middleware to perform Base64 transformations on core.async messages.
Middleware to perform EDN transformations on core.async messages.
Middleware to perform AES encryption and decryption on core.async messages using a pre-shared key.
Middleware to perform JSON transformations on core.async messages. JSON doesn't support keywords or symbols, so these get translated into strings when JSONified. When decoding JSON, we have no way of knowing what strings are supposed to be keywords or symbols, so all input strings stay strings. As a special case, the parser will translate strings that are map keys into keywords for us.
Middleware to coerce messages to the standard "Bureaucrat low-level format", called ingress normalizers; and to a standard "low-level inter-service format" (egress normalizers). The standard Bureaucrat internal format is a map with at least the :bureaucrat key. The standard inter-service format is a map without a :bureaucrat key. Higher optional layers of Bureaucrat, such as the API router, define additional constraints on message format beyond these when used. Normalizers are meant to be used when messages enter the system from the outside. Upon ingress: * Messages will be converted into Clojure maps if they aren't maps already. Non-maps will be placed in the output map's :payload key. * A :bureaucrat key will be added for internal use by Bureaucrat. User code should not use this key, as it isn't part of the public API and its structure is not guaranteed to remain the same. (Let me know if you think you can do something interesting with it, though.) It contains a non-serializable reference to the endpoint where the message entered the system, so the API router can send replies on the same transport. Its value is a map with :ingress-endpoint and :ingress-time keys. Egress normalizers are meant to be used when messages in Bureaucrat format (i.e. maps) leave Bureaucrat for transmission to the outside. Currently, they strip the :bureaucrat key. These normalizers do not deal with serialization, encryption, Base64 encoding, and other meta-operations. They deal only with the formatting of the message as a map. This is because some underlying endpoints support Clojure data directly, and don't need any of that, while others do.
The IMessageTransport protocol abstracts a backend transport mechanism capable of asynchronously delivering messages to named queues.
IMessageTransport implementation on IronMQ (http://www.iron.io/). You must specify an IronMQ project ID, OAuth2 token, and server hostname to use. This can be done by creating an ~/.iron.json file, by providing arguments to (ironmq-transport), or by setting the environment variables expected by the IronMQ library before running your Clojure process. The config file and env variables are described in detail at [http://dev.iron.io/worker/reference/configuration/](IronIO's website). I find it most convenient to use ~/.iron.json for development and environment variables for production. These both avoid having credentials in your source code, and env variables map nicely into 12 factor app harnesses. The relevant env variables are: * IRON_PROJECT_ID - set to the project ID from your IronMQ account. * IRON_TOKEN - set to your IronMQ secret access token.
Utility functions for interacting with IronMQ directly.
General utility functions.
An MQ-based API Router for Clojure
Bureaucrat is a Clojure utility library to ease the construction of asynchronous message queue based APIs.
It provides an abstraction over MQ-based communications, implementations for various backend MQs, a tiny router that dispatches incoming API messages to designated Clojure functions, optionally sending replies back to the caller, and a core.async based middleware system for plumbing this all together in arbitrary ways.
Bureaucrat is alpha-stage software; API changes are still possible. Pull requests are welcome!
Why the name “bureaucrat”?
The library merely shuttles messages around between endpoints, which actually perform the useful work.
What underlying transport mechanisms are supported?
Bureaucrat currently supports IronMQ.
Previous versions supported HornetQ. Updating this to modern Bureaucrat style has been put on hold pending the release of Immutant 2.0.
Core.async and Amazon SQS are planned. Adding new endpoints is fairly easy; pull requests with tests are welcome! :)
Layers of Bureaucracy
Bureaucrat can be used on several different levels. Each level provides additional services beyond those in the lower levels, while imposing additional constraints on what sort of messages can be used on that level.
Layers (other than the bottommost Transport Layer) are implemented as core.async middleware; you use core.async channels to plumb them together in arbitrarily complex fashion, according to your needs. Many middleware components are provided, and it’s easy to write more.
- Transport layer - send and receive strings or byte arrays.
An overview is provided in this README. For additional details, consult the docstrings in the source files.
The lowest specification level is the transport layer, embodied in the
IQueueEndpoint protocols. This level provides a unified abstraction over named queue-like asynchronous endpoints. The endpoints must be able to send and receive non-nil data to and from a named endpoint on the transport mechanism. No other constraints on message format exist at this layer.
IMessageTransport defines a system for creating and destroying uniquely named
IQueueEndpoints. These map directly onto an asynchronous message queueing system and its queues. For example, we can build an
IMessageTransport implementation encapsulating HornetQ, IronMQ, core.async, ZeroMQ, or similar services; the
IQueueEndpoints are specific named queues or commmunications channels implemented by those services.
IQueueEndpoints are created by an
IMessageTransport implementation, and represent specific named queues on that transport.
IQueueEndpoints are able to send and receive messages, arbitrary data structures whose format is subject to the limitations of the underlying transport mechanism. For example, an IronMQ-based
IMessageTransport can only transmit string messages, while a core.async
IMessageTransport can transmit any Clojure data structure as a message.
nil and empty string messages are specifically disallowed (because they confuse code that uses nil for other purposes); all messages must be non-nil and non-zero length.
Above the Transport Layer is the Channel Layer, defined by the
IChannelEndpoint protocol. At this level, endpoints must be capable of enqueueing and dequeueing messages via core.async channels.
channel-endpoint namespace provides the
IChannelEndpoint protocol and functions that produce send and receive channels from any
IQueueEndpoint. Various serialization middleware is provided
Various middleware adapters are provided to perform transformations on core.async data flows, and it’s easy to write more.
Middleware is provided to serialize binary data into EDN, JSON, and Base64 formats, and to deserialize strings in those formats. These can be attached to any
If you need to interoperate with another system that requires JSON messages, for example, you can simply drop the JSON serialization middleware into your outbound message pipeline, then write regular Clojure data structures to the pipeline.
An AES-128 shared-secret / symmetric encryption middleware component is provided to encrypt and decrypt messages.
At the Data Layer, defined by the
IDataEndpoint protocol, endpoints must be capable of transmitting general Clojure data structures such as maps and strings as messages, via core.async channels.
Transport Layer implementations that are incapable of transmitting Clojure data structures as messages directly can use middleware to extend their capabilities. For example, an IronMQ transport can only transmit strings as messages, but the EDN or JSON middlewares can be used to serialize Clojure data into strings and back for transmission on IronMQ.
Still other Transport Layers may already be capable of transmitting Clojure data structures, and will only require a thin wrapper to tie them to core.async channels.
All maps at this layer must have keyword keys. JSON does not support keywords, so keyword keys are typically serialized as strings when they become JSON.
The requirement that all Channel Layer compliant endpoints expose core.async connectors allows them to easily participate in complex middleware graphs.
The Normalized Layer defines a standard message format for use by higher-order Bureaucrat services called the “Bureaucrat low-level format”: all messages must be Clojure maps, and certain keys are defined to store specific metadata as their values.
Middleware is provided to translate messages into this format and decorate them with metadata. If the incoming message was not a map, a new map is created, and the original message is added as the value of the
:bureaucrat key is added to the message for internal use by Bureaucrat components. User code should not rely on finding any particular value in this key, as it is not part of the public API and can change at any time. Currently, it contains a reference to the IQueueEndpoint where the message entered Bureaucrat, so that services such as the API router can send reply messages on the same transport.
An egress normalizer is also provided that strips the
:bureaucrat key out of any messages that pass through it. This message format may be called the “low-level inter-service format”. It is identical to the “low-level Bureaucrat format” other than the absence of the
Bureaucrat API Router Layer
The Bureaucrat API message format is defined as an extension of the Bureaucrat low-level format defined above, with additional special keys.
:call key is required; its value specifies the API call that is being made. The router looks up the appropriate handler function for a given call and executes it. The optional but usually present
:payload key is passed to the handler function as its argument. If the message requires a reply and the function returns a non-nil value, the API router creates a new message on the same transport, addressed to the endpoint named in the
:reply-to field of the incoming message.
API Router Details
The API router accepts messages from a core.async middleware pipeline and routes them to designated functions for processing.
Messages fed into the API router must be in “Bureaucrat API format”. This format is an extension of the Bureaucrat low-level format. Besides being maps, messages must have at least the
:call key. Its value specifies what API call to make.
The Bureaucrat API format defines several additional optional keys: *
:reply-to is a string endpoint name where replies to the call should be sent. The system will send them on the same IMessageTransport that the message came in on. *
:reply-call is an optional
:call to specify in the reply message. *
:payload is arbitrary data to be used by the API call.
API handlers are ordinary Clojure functions. The router protocol receives messages and
Messages arrive on the endpoint from somewhere – exactly where is deliberately undefined, to promote loose coupling between components. Bureaucrat dispatches the message payload to the appropriate handler function as its argument. Your app performs app-specific work in the handler function. If the incoming message specified a
:reply-to address and the handler function returns a value, Bureaucrat will send that value to the queue named in
Payloads can be any arbitrary EDN-serializable data.
If you later want to run the same API on a different transport, it’s as easy as swapping in a new IQueueEndpoint component. You can even run the same API handlers simultaneously on multiple transports!
The lowest level abstraction is
bureaucrat.endpoint/IQueueEndpoint. It defines a minimal set of messaging functionality that all queue backends must implement, such as sending and receiving messages. At this level, messages are treated as arbitrary blobs of EDN-serializable data and no special structure is imposed on the messages.
The IAPIRouter protocol provides a generic mechanism for mapping specially formatted messages to Clojure function calls and vice versa.
To use IAPIRouter, your messages must be specially formatted rather than generic EDN blobs. All routable messages must be maps. The only required key is
:call. Its value specifies the API call that the message wishes to invoke.
The exact way this call name is mapped to a handler function is implementation specific. Implementations are provided that allow you to supply a map of keywords to functions to be used as a routing table, and to annotate Clojure functions with the
:api metadata to allow them to be called by incoming messages automatically if the incoming message knows the function’s name.
If the calling message includes the optional key
:reply-to and the handler function returns a value, the API router will look up a queue with the given name on the same transport where the message was received (possibly the same queue), encode the returned value as the
:payload of a map, and send it to that queue.
Example Endpoint: HornetQ Backend
To send and receive messages with HornetQ:
user> (require '[bureaucrat.endpoint :as endpoint]) nil user> (require '[bureaucrat.endpoints.hornetq :as hornetq]) nil user> (def queue (hornetq/start-hornetq-endpoint! "an-example-endpoint")) #'user/queue user> (endpoint/send! queue "Hello, world!") #<HornetQTextMessage HornetQMessage[ID:f81d4fae-7dec-11d0-a765-00a0c91e6bf6]:PERSISTENT> user> (endpoint/receive! queue) "Hello, world!" ```` `receive!` will block forever if not given a timeout. This is mainly useful for testing. Real systems tend to use the version with a timeout, which returns `nil` if no message has become available for receipt in timeout milliseconds, or better yet the asynchronous listener. For example, to use `receive!` with a timeout of one second:
user> (endpoint/receive! queue 1000) nil ````
No message was sent during the 1 second window, so
receive! returned nil.
Asynchronous Message Receipt
To receive messages asynchronously, we can register a handler function on the queue. When a message becomes available, the handler will be invoked in a background thread with the message as its sole argument.
user> (endpoint/register-listener! queue (fn [x] (println "Handler got a message:" x)) 1) #<MessageProcessorGroup org.immutant.messaging.MessageProcessorGroup@12345678> user> (endpoint/send! queue "Hello, world!") Handler got a message: Hello, world! #<HornetQTextMessage HornetQMessage[ID:f81d4fae-7dec-11d0-a765-00a0c91e6bf6]:PERSISTENT>
When using HornetQ, your handler function demarcates an XA distributed transaction. In short, all XA operations in the transaction either fail or succeed atomically. If your handler (or any other XA participant) throws an exception, the entire XA transaction will be aborted, all XA operations participating in the transaction will be rolled back, and the message will be placed back on the queue for some other handler to handle. If all participants succeed, all of the XA operations will be applied.
Any HornetQ messages you generate in the handler function will be participants in the XA transaction, as well as database access performed through XA-compatible JDBC drivers.
The Dead Letter Queue
We want message delivery to be retried in cases where the failure is transitory and not related to the message as such. For example, if a particular processor node’s server hardware fails or if someone accidentally kills a server process while it’s attempting to process a message, we want the message to be replayed somewhere else.
If, however, a particular message is unprocessable by our code due to some bug, there is no point in retrying the message forever since it’ll just fail again. Obvious bugs in message processors are usually caught prior to production deploys (you have tests, right?), but there are sometimes subtle bugs that are only triggered by certain messages. That is, the message processor will usually work fine, but occasionally some specific messages trigger a bug in the handler code that throws an exception. Such messages are called “poison messages.”
HornetQ deals with poison message detection by retrying the delivery some fixed number of times (10 by default). If message delivery still fails after that, the message is diverted to a special “dead letter queue” instead of its intended destination, to prevent endless cycles of bug triggering. The system operators are expected to monitor the dead letter queue and fix the bug.
Every HornetQ system defines a default systemwide dead letter queue called “DLQ”. (Specific queues can [http://immutant.org/builds/LATEST/html-docs/messaging.html#sec-2-3](optionally override that) and define their own dead letter queue if they want.)
You can retrieve the DLQ associated with a particular queue with
bureaucrat.endpoint/dead-letter-queue. It returns an ordinary queue component which you can read like any other.
Note that some backends such as IronMQ do not have a built-in DLQ, so we simulate one in our wrapper.
You can count the messages currently in a queue, and unconditionally purge all messages in the queue.
user> (endpoint/count-messages queue) 1 user> (endpoint/purge! queue) 1 user> (endpoint/count-messages queue) 0
Provided Endpoint Implementations
You must specify an IronMQ project ID, OAuth2 token, and server hostname to use. This can be done by creating an ~/.iron.json file, by providing arguments to (ironmq-transport), or by setting the environment variables expected by the IronMQ library before running your Clojure process. The config file and env variables are described in detail at [http://dev.iron.io/worker/reference/configuration/](IronIO’s website).
I find it most convenient to use ~/.iron.json for development and environment variables for production. These both avoid having credentials in your source code, and env variables map nicely into 12 factor app harnesses.
The relevant env variables are:
IRON_PROJECT_ID- set to the project ID from your IronMQ account.
IRON_TOKEN- set to your IronMQ secret access token.
Security is handled at the transport layer, and not by Bureaucrat. Any message that gets into an incoming message queue will be processed by Bureaucrat, no questions asked. It is up to your application to ensure that bad actors cannot access your queues, or to preprocess your messages somehow to ensure that only authorized users can do things.
Garbage collector performance note
The default JVM garbage collector is a “stop the world” type collector. This means that when your JVM is close to running out of memory, it will pause your program’s execution to perform a collection. The length of the pause depends on how much memory you’ve used; it can range from seconds to minutes for a very large program.
This is generally highly undesirable with server software. It can cause noticeable lags in execution that cascade to other systems which timeout while waiting for an async reply. I recommend running Bureaucrat with a different GC mode that doesn’t stop the world for so long.
For example, you can add something like the following to your
project.clj. You may want to tweak the 512MB of memory as your app requires.
:jvm-opts ["-d64" "-Duser.timezone=GMT" "-server" "-Djava.awt.headless=true" "-Dfile.encoding=utf-8"
"-Xmx512m" "-Xms512m" "-XX:+UseG1GC"]
Component / Lifecycle Architecture
Each Bureaucrat component provides a constructor that returns an uninitialized component as well as a convenience method that returns an initialized component. The uninitialized version is to be used with the Component lifecycle management library; Component will start and stop it automatically. The initialized version returned by the convenience method can be used directly in cases where Component is not in use.
bureaucrat.endpoints.hornetq/hornetq-endpoint returns a record that has not yet been connected to the HornetQ backend. This can be incorporated into Component orchestrations directly, like any other. If you are not using Component in your program, you should instead use the convenience method
bureaucrat.endpoints.hornetq/start-hornetq-endpoint!, which creates the HornetQ adapter component and also starts it for you (that is, connects it to the HornetQ backend.)
- HTTP transport
- Amazon SQS
- Provide additional canned enterprise messaging widgets for routing and transformation beyond the API router.
- IronMQ push mode (by exposing a webhook)
Copyright © 2014 Paul Legato.
Distributed under the Eclipse Public License, the same as Clojure.