« Index of all namespaces of this project
The purpose of this namespace is to provide a collection of asynchronous primitives and functions to operate on those primitives. ### Asynchronous values The core primitive is AsyncVal. This is basically just a future (except...), but the name future is already used in clojure.core, so we use a different name to avoid conflicts. A producer is responsible for dealing with raw asynchronous operations and exposing them as AsyncVals. A consumer interacts with the producer, and interacts with the asynchronous operations through AsyncVals. An AsyncVal represents a computation that might still be in progress, and which will eventually succeed or fail. A producer realizes the AsyncVal when the raw asynchronous operations it manages have information the producer wants to pass along to the consumer. For example, an HTTP request function would return an AsyncVal representing the response before it receives the response. Once the HTTP client receives the response, it will realize the AsyncVal, which will invoke any registered realization callbacks. If something goes wrong (for example, the connection was closed prematurely), then the producer would realize the AsyncVal with the exception. If you have an AsyncVal, you will normally receive its realized value by registering a callback on it. You can also dereference the AsyncVal immediately, which will block the current thread under the producer realizes the AsyncVal. You should never dereference an AsyncVal inside an event loop since doing so will freeze the entire system. ### doasync You register callbacks on an AsyncVal using the doasync macro. A full example might look something like: (doasync (http/GET "http://www.google.com") ;; Function invoked when the asynchronous value returned by ;; http/GET is realized. (fn [[status hdrs body]] (println "GOT: " [status hdrs body]))) The doasync macro itself returns an AsyncVal representing the return value from the callback, making doasync composable. Should the producer (the HTTP client in the above example) encounter a failure, the AsyncVal can be aborted with an exception representing the failure. doasync allows these asynchronous exceptions to be handled as well in a similar fashion as clojure’s try / catch. For example: (doasync (http/GET "http://www.some-invalid-host.com/") (fn [resp]) ;; Will not get invoked (catch Exception e (println "Encountered an exception: " e))) If an exception is successfully caught, the AsyncVal representing the doasync will be realized with the catch clause’s return value. Exceptions that are not handled will cause the AsyncVal to be aborted with exception. This semantic allows exceptions to bubble up asynchronously. Additionally, doasync can handle any clojure type or java object, in which case, the callback gets invoked immediately. ### Joining asynchronous values The join function takes an arbitrary number of both asynchronous values and regular types / objects and returns an AsyncVal that becomes realized when all of the arguments become realized. The realized arguments are then applied to the callback function. For example: (doasync (join (http/GET "http://www.google.com/") (http/GET "http://www.bing.com/")) (fn [google-response bing-response] ;; Do something with the responses )) In the event that one of the arguments becomes aborted, the combined AsyncVal will also become aborted with the same exception. ### AsyncSeq AsyncSeq is an AsyncVal that is always realized with a clojure sequence or nil. It also implements the clojure sequence protocol, however calling first, more, or next on it will throw an exception if it has not been realized yet. AsyncSeqs can used with doasync just the same as AsyncVals can. For example: (doasync my-async-seq (fn [[val & more :as realized]] ;; more is another async-seq (when realized (println "GOT: " val) (recur* more)))) The recur* function allows asynchronous recursion. It must be used in a tail position. The recur* function takes N arguments and joins them as explained above. Once the join becomes realized, it is applied to the last invoked function. Just like with AsyncVals, an AsyncSeq might also face a failure scenario and become aborted with an exception. These exceptions may be handled in the same way as the previous catch example ### Composability The above primitives are enough to build up some powerful asynchronous abstractions. This namespace contains a number of these abstractions. For example, map* returns an asynchronous sequence consisting of applying a function to the elements of another asynchronous sequence. The first* function returns an asynchronous value representing the head of a given sequence once it becomes realized. In all of these cases, exception handling behaves as expected.
(->Channel transfer head paused? depth f capacity)
Positional factory function for class momentum.core.async.Channel.
(aborted? v1 v2 & args)
Returns true if the asynchronous value has been aborted.
(async-seq & body)
Takes a body of expressiosn that returns an asynchronous value, ISeq, or nil, and yields a Sequable asynchronous value that will invoke the body only the first time a callback is registered or it is dereferenced, and will cache the result and return it on subsequent calls to seq.
Returns a new unrealized asynchronous value. Dereferencing will cause the current thread to block until the asynchronous value is realized.
(batch n coll)
Alpha - subject to change Returns an async value that is realized with the given collection when all (or n if supplied) elements of the collection have been realized.
(blocking coll ms)
(blocking coll ms default-val)
Returns a lazy sequence consisting of the items in the passed collection If the sequence is an async sequence, then the current thread will wait at most ms milliseconds (or indefinitely if no timeout value passed) for the async sequence to realize.
(channel f capacity)
Returns a new channel. Calling seq with a channel returns an asynchronous sequence of the values that are put into the channel.
Close a channel. Closing a channel causes any associated asynchronous sequences to terminate.
(concat* x y)
Returns an asynchronous sequence representing the concatenation of the elements in the supplied colls.
(doseq* seq-exprs & body)
Repeatedly executes body (presumably for side-effects) with bindings as they are realized and filtering as provided by "for". Does not retain the head of the sequence. Returns an asynchronous value that will be realized with nil once all of the items have been handled.
Returns an async value representing the first item in the collection once it becomes realized.
(future* & body)
Takes a body of expressions and invoke it in another thread. Returns an asynchronous value that will be realized with the result once the computation completes.
(interrupt async-val str)
Interrupts an asynchronous type with an optionally supplied string. Returns true if successful. Returns false otherwise.
(join & args)
Returns an asynchronous value representing the realization of the supplied arguments. The returned asynchronous value will be realized with the realized values of the supplied arguments in the same order or, if any of the suppplied arguments become aborted, it will be aborted with the same exception.
(map* f coll)
(map* f c1 & colls)
Returns an asynchronous sequence consisting of the result of recursively applying f to the set of first items of each coll once they become realized. Function f should accept the number of colls arguments.
Protocol for realizing async types.
Protocol for receiving realized async values.
(receive async-type success-fn error-fn)
Register an success callback and an error callback on an asynchronous value. The success callback will be invoked with the value that the asynchronous value is realized with. The error callback will be invoked with the exception that the asynchronous value is aborted with. Only one of the two callbacks will be invoked.
(recur* v1 v2)
(recur* v1 v2 v3)
(recur* v1 v2 v3 & args)
Accepts an aribtrary number of arguments, passing them to join. Once the joined asynchronous value is realized, the current callback function will be reinvoked with the joined realized values from the supplied arguments. Must be called from the tail position of a doasync callback.
(sink dn coll)
Writes the contents of the collection to a downstream function. Returns a function that accepts :pause, :resume, and :abort events
(splice first & more)
Returns an async seq that consists of map entries of the values of all of the seqs passed in as they materialize and the key referencing the If. seq multiple maps are passed, the returned seq will assign priority in the order of the arguments.
(success? v1 v2 & args)
Returns true if the asynchronous value has been realized successfully.