CrossClj

0.5.6 docs

SourceDocs



RECENT
    VARS
    ->LaminaChannelSink
    ->LaminaChannelSource
    async-promise?
    atom-sink
    cancel-callback
    channel
    channel*
    channel->lazy-seq
    channel->seq
    channel-pair
    channel-seq
    channel?
    close
    close-on-idle
    closed-channel
    closed-result
    closed?
    combine-latest
    complete
    concat*
    defer-onto-queue
    distribute-aggregate
    distributor
    drained-result
    drained?
    drop*
    drop-while*
    emit-in-order
    enqueue
    enqueue-and-close
    error
    error-result
    error-value
    expiring-result
    filter*
    force-close
    force-error
    fork
    ground
    grounded-channel
    idle-result
    join
    join->>
    last*
    lazy-channel-seq
    lazy-seq->channel
    map*
    mapcat*
    merge-channels
    merge-results
    named-channel
    on-closed
    on-drained
    on-error
    on-realized
    partition*
    partition-all*
    partition-every
    periodically
    permanent-channel
    pipeline
    read-channel
    read-channel*
    receive
    receive-all
    receive-in-order
    redirect
    reduce*
    reductions*
    remove*
    restart
    result-channel
    run-pipeline
    sample-every
    sink
    sink->>
    siphon
    siphon->>
    splice
    split
    success
    success-result
    take*
    take-while*
    tap
    timed-result
    transactional?
    transitions
    wait-for-message
    wait-for-result
    wait-stage
    watch-channel
    with-timeout
    zip
    zip-all

    « Index of all namespaces of this project

    Private
    (->LaminaChannelSink ch)
    Private
    (->LaminaChannelSource ch)
    (async-promise? x)
    Returns true if x is a result.
    
    (atom-sink channel)(atom-sink initial-value channel)
    Transforms a channel into an atom which updated with the value of each new message.
    
    (cancel-callback channel callback)
    Cancels a callback registered with receive, receive-all, on-closed, on-drained, or on-error.
    
    (channel & messages)
    Returns a channel containing the given messages.
    
    macro
    (channel* & {:keys [grounded? permanent? transactional? messages description meta], :as options})
    A general-purpose channel creator.  Can be used to mix and match various properties, such as
    
    (channel* :grounded? true, :description "my very own grounded channel")
    
    :grounded?        - ensures that messages cannot accumulate in the queue
    :permanent?       - ensures that the channel cannot be closed or be put in an error state
    :transactional?   - determines whether the channel's queue is transactional
    :messages         - sequence of zero or more messages that will be in the channel's queue
    :description      - a string that will be diplayed in channel visualizations
    :meta             - initial metadata
    (channel->lazy-seq ch)(channel->lazy-seq ch timeout)
    Returns a sequence.  As elements of the sequence are realized, messages from the
    source channel are consumed.  If there are no messages are available to be
    consumed, execution will block until one is available.
    
    A timeout can be defined, either as a number or a no-arg function that returns a
    number.  Each time the seq must wait for a message to consume, it will only wait
    that many milliseconds before giving up and ending the sequence.
    (channel->seq ch)(channel->seq ch timeout)
    An eager variant of channel->lazy-seq.  Blocks until the channel has been drained,
    or until timeout milliseconds have elapsed.
    (channel-pair)
    Returns a pair of channels, where all messages enqueued into one channel can
    be received by the other, and vice-versa.  Closing one channel will automatically
    close the other.
    macro
    (channel-seq & args)
    (channel? x)
    Returns true if x is a channel.  This does not encompass result-channels.
    
    (close channel)
    Closes the channel. Returns if successful, false if channel is permanent, already closed,
    or in an error state.
    (close-on-idle interval channel)(close-on-idle interval task-queue channel)
    Sets up a watcher which will close channel if it doesn't emit a message for interval milliseconds.
    
    Returns channel, for chaining convenience.
    (closed-channel & messages)
    Returns a closed channel containing the given messages.
    
    (closed-result channel)
    Returns a result-channel that will emit a result when channel is closed, or emit an error
    if channel goes into an error state.
    (closed? channel)
    Returns true if channel is closed, false otherwise. 
    
    (combine-latest f & channels)
    Given n-many channels and a function which takes n arguments, reevaluates the function for each new
    value emitted by one of the channels.  Effectively a composition of zip-all and map*.
    (complete value)
    Returns a redirect signal which causes the pipeline's execution to stop, and simply return value.  If value
    is a Throwable, then the pipeline will be realized as that error.
    (concat* ch)
    A dual to concat.
    
    (concat* (channel [1 2] [2 3])) => [1 2 3 4]
    (defer-onto-queue {:keys [timestamp task-queue auto-advance?]} ch)
    Takes an input channel, a timestamp which takes each message and returns the associated time,
    and a task-queue.  If auto-advance? is true, then enqueueing a message will automatically
    advance task-queue to that time.
    
    The returned channel will emit each message from channel only once the designated time has arrived.
    This assumes the timestamp for each message is monotonically increasing.
    (distribute-aggregate {:keys [facet generator period task-queue], :or {task-queue (t/task-queue), period (t/period)}} ch)
    A mechanism similar to a SQL `group by` or the split-apply-combine strategy for data analysis.
    
    For each message from channel, the value returned by `(facet msg)` will be examined, and the
    message will be routed to a channel that consumes all messages with that facet value.  If no
    such channel exists, it will be generated by `(generator facet-value facet-channel)`, which takes
    the facet-value and associated channel, and returns an output channel which will be merged with
    the output of all other facet channels.
    
    The output of each facet channel is assumed to be periodic.  The :period may be specified, but
    is not required.
    
    Returns a channel which will periodically emit a map of facet-value onto the output of the generated
    facet-channel.
    
    Example:
    
      (distribute-aggregate
        {:facet     :uri
         :generator (fn [uri ch]
                          (rate ch))}
        ch)
    
    will return a channel which periodically emits a map of the form
    
           {"/abc" 2
            "/def" 3}
    (distributor {:keys [facet initializer on-clearance]})
    Returns a channel.
    
    Messages enqueued into this channel are split into multiple streams, grouped by
    (facet msg). When a new facet-value is encountered, (initializer facet ch)
    is called, allowing messages with that facet-value to be handled appropriately.
    
    If a facet channel is closed, it will be removed from the distributor, and
    a new channel will be generated when another message of that type is enqueued.
    This allows the use of (close-on-idle ch ...), if facet-values will change over
    time.
    
    Given messages with the form {:type :foo, :value 1}, to print the mean values of all
    types:
    
    (distributor
      {:facet       :type
       :initializer (fn [facet-value ch]
                      (siphon
                        (->> ch (map* :value) moving-average)
                        (sink #(println "average for" facet-value "is" %))))})
    (drained-result channel)
    Returns a result-channel that will emit a result when channel is drained, or emit an error
    if channel goes into an error state.
    (drained? channel)
    Returns true if channel is drained, false otherwise.
    
    (drop* n ch)
    A dual to drop.
    
    (drop* 2 (closed-channel 1 2 3 4) => [3 4]
    (drop-while* f ch)
    A dual to drop-while.
    
    (drop-while* pos? (closed-channel 1 2 0 4) => [0 4]
    (emit-in-order ch)
    Returns a channel that emits messages one at a time.
    
    (enqueue channel)(enqueue channel message)(enqueue channel message & messages)
    Enqueues the message or messages into the channel.
    
    (enqueue-and-close ch & messages)
    Enqueues the message or messages into the channel, and then closes the channel.
    
    (error channel err)
    Puts the channel or result-channel into an error state.
    
    (error-result error)
    Returns a result already realized with an error.
    
    (error-value x default-value)
    Returns the error-value of the channel or async-promise if it's in an error state, and 'default-value'
    otherwise
    (expiring-result interval)(expiring-result interval task-queue)
    Returns a result-channel that will be realized as a 'lamina/timeout!' error if a value is not enqueued within
    interval milliseconds.
    (filter* f channel)
    A dual to filter.
    
    (filter* odd? (channel 1 2 3)) => [1 3]
    (force-close channel)
    Closes channel, even if it is permanent. Returns if successful, false if channel is
    already closed or in an error state.
    (force-error channel err)
    Puts the channel or result-channel into an error state, even if it's permanent.
    
    (fork channel)
    Returns a channel which is an exact duplicate of the source channel, containing all messages
    in the source channel's queue, and emitting all messages emitted by the source channel.
    
    If the forked channel is closed, the source channel is unaffected.  However, if the source
    channel is closed all forked channels are closed.  Similar propagation rules apply to error
    states.
    (ground ch)
    Ensures that messages will not accumulate in the channel's queue.
    
    (grounded-channel)
    Returns a channel that cannot accumulate messages.
    
    (idle-result interval channel)(idle-result interval task-queue channel)
    A result which will be realized if channel doesn't emit a message for interval milliseconds.
    
    (join src dst)(join src dst & rest)
    Takes all messages from src and forwards them to dst.  If either channel closes or goes into an
    error state, the same is done for the other channel.  This is useful for channels which have a 1-to-1
    relationship.
    
    If more than two channels are specified, join becomes transitive.  `(join a b c)` is equivalent to
    `(join a b)` and `(join b c)`.
    macro
    (join->> & transforms+downstream-channel)
    A variant of sink->> where the last argument is assumed to be a channel,
    and the transform chain is joined to it.
    
    (join->> (map* inc) (map* dec) (named-channel :foo))
    
    expands to
    
    (let [ch (channel)]
      (join
        (->> ch (map* inc) (map* dec))
        (named-channel :foo))
      ch)
    (last* ch)
    A dual to last.
    
    macro
    (lazy-channel-seq & args)
    (lazy-seq->channel s)
    Returns a channel representing the elements of the sequence.
    
    (map* f channel)
    A dual to map.
    
    (map* inc (channel 1 2 3)) => [2 3 4]
    (mapcat* f ch)
    A dual to mapcat.
    
    (mapcat* reverse (channel [1 2] [3 4])) => [2 1 4 3]
    (merge-channels & channels)
    Combines n-many streams into a single stream.  The returned channel will be closed only
    once all source channels have been closed.
    (merge-results & results)
    Given n results returns a single async-promise which will be realized as a sequence of all the realized
    results.
    (named-channel id on-create)
    Returns a permanent channel keyed to id.  If the channel doesn't already exist and on-create is non-nil, 
    it will be invoked with the new channel as a parameter.
    (on-closed channel callback)
    Registers a callback that will be invoked with no arguments when channel is closed, or
    immediately if it has already been closed.  callback will only be invoked once, and can
    be cancelled using cancel-callback.
    (on-drained channel callback)
    Registers a callback that will be invoked with no arguments when channel is drained, or
    immediately if it has already been drained.  callback will only be invoked once, and can
    be cancelled using cancel-callback.
    (on-error channel callback)
    Registers a callback that will be called with the error when channel enters an error state,
    or immediately if it's already in an error state.  callback will only be invoked once,
    and can be cancelled using cancel-callback.
    (on-realized result-channel on-success on-error)
    Allows two callbacks to be registered on a result-channel, one in the case of a
    value, the other in case of an error.
    
    This often can and should be replaced by a pipeline.
    (partition* n ch)(partition* n step ch)
    A dual to partition.
    
    (partition* 2 (channel 1 2 3)) => [[1 2]]
    (partition-all* n ch)(partition-all* n step ch)
    A dual to partition-all.
    
    (partition-all* 2 (closed-channel 1 2 3)) => [[1 2] [3]]
    (partition-every {:keys [period task-queue], :as options} ch)
    Takes a source channel, and returns a channel that repeatedly emits a collection
    of all messages from the source channel in the last period milliseconds.
    (periodically {:keys [task-queue immediate? period priority close-latch lazy?], :or {task-queue (t/task-queue), period (t/period), priority 0}} f)
    Returns a channel.  Every period milliseconds, f is invoked with no arguments
    and the value is emitted as a message.
    (permanent-channel & messages)
    Returns a channel which cannot be closed or put into an error state.
    
    macro
    (pipeline & opts+stages)
    A means for composing asynchronous functions.  Returns a function which will pass the value into the first
    function, the result from that function into the second, and so on.
    
    If any function returns an unrealized async-promise, the next function won't be called until that value is realized.
    The call into the pipeline itself returns an async-promise, which won't be realized until all functions have completed.
    If any function throws an exception or returns an async-promise that realizes as an error, this will short-circuit all
    calls to subsequent functions, and cause the pipeline's result to be realized as an error.
    
    Loops and other more complex flows may be created if any stage returns a redirect signal by returning the result of 
    invoking restart, redirect, or complete.  See these functions for more details.
    
    The first argument to pipeline may be a map of optional arguments:
    
      :error-handler - a function which is called when an error occurs in the pipeline.  Takes a single argument, the error,
                         and may optionally return a redirect signal to prevent the pipeline from returning the error.
    
                         If no :error-handler is specified, the error will be logged.  If pipelines are nested, this may result
                         in the same error being logged multiple times.  To hide this error you may define a no-op handler, but
                         only do this if you're sure there's an outer pipeline that will handle/log the error.
    
      :finally - a function which is called with zero arguments when the pipeline completes, either due to success or error.
    
      :result - the result into which the pipeline's result will be forwarded.  Causes the pipeline to not return any value.
    
      :timeout - the max duration of the pipeline's invocation.  If pipeline times out in the middle of a stage it won't terminate
                   computation, but it will not continue onto the next stage.
    
      :implicit? - describes whether the pipeline's execution should show up in higher-level instrumented functions calling into it.
                     Defaults to false.
    
      :unwrap? - if true, and the pipeline does not need to pause between streams, the pipeline will return an actual value 
                   rather than an async-promise.
    
      :with-bindings - if true, conveys the binding context of the initial invocation of the pipeline into any deferred stages.
    (read-channel channel)
    Returns a result-channel representing the next message from the channel.  Only one
    result-channel can represent any given message; calling `(read-channel ...)` multiple times
    will always consume multiple messages.
    
    Enqueueing a value into the result-channel before it is realized will prevent the message
    from being consumed, effectively cancelling the read-channel call.
    macro
    (read-channel* ch & {:keys [timeout predicate result listener-result on-timeout on-error on-false task-queue], :as options})
    A variant of read-channel with more options.
    
    :timeout - the timeout, in milliseconds.  If this elapses, the next message will not be consumed.
    
    :predicate - a function that takes the message, and returns true if it should be consumed.  If the
                   predicate returns false, the returned result will realize as value defined by :on-false.
    
    :result - the result that the read message should be enqueued into.  If the same result is used for
                read-channel calls from multiple channels, this will have the effect of being realized
                as the first message from any of the channels, and not consuming any messages from the other
                channels.
    
     :listener-result - the result that will be returned to the emitter of the message, representing the
                          outcome of the consumption.  This should only be done if there is a clear single
                          outcome for this message (i.e. we're not just accumulating the entire stream.)
    
     :on-timeout - the result that will be realized if we timed out.  If not specified, the result will be
                     realized as a :lamina/timeout error.
    
      :on-error - the result that will be realized if the channel is in an error state.  If not specified,
                    the result will be realized as the channel's error.
    
      :on-false - the result that will be realized if the :predicate returns false.  Defaults to :lamina/false.
    (receive channel callback)
    Registers a callback that will be invoked with the next message enqueued into the channel, or
    the first message already in the queue.  Only one callback can consume any given message;
    registering multiple callbacks will consume multiple messages.
    
    This can be cancelled using cancel-callback.
    (receive-all channel callback)
    Registers a callback that will consume all messages currently in the queue, and all
    subsequent messages that are enqueued into channel.
    
    This can be cancelled using cancel-callback.
    (receive-in-order ch f)
    Consumes messages from the source channel, passing them to f one at a time.  If
    f returns a result-channel, consumption of the next message is deferred until
    it's realized.
    
    If an exception is thrown or the return result is realized as an error, the source
    channel is put into an error state.
    (redirect pipeline value)
    Returns a redirect signal which causes the flow to start at the beginning of pipeline, with an input
    value of value.  The outcome of this new pipeline will be forwarded into the result returned by the
    original pipeline.
    (reduce* f ch)(reduce* f val ch)
    A dual to reduce.  Returns a result-channel that emits the final reduced value
    when the source channel has been drained.
    
    (reduce* max (channel 1 3 2)) => 3
    (reductions* f ch)(reductions* f val ch)
    A dual to reductions.
    
    (reductions* max (channel 1 3 2)) => [1 3 3]
    (remove* f channel)
    A dual to remove.
    
    (remove* even? (channel 2 3 4)) => [3]
    (restart)(restart value)
    A variant of redirect which redirects flow to the top of the current pipeline.
    
    (result-channel)
    Returns a result-channel, representing an unrealized value or error.
    
    macro
    (run-pipeline value & opts+stages)
    Like pipeline, but simply invokes the pipeline with value and returns the result.
    
    (sample-every {:keys [period task-queue], :as options} ch)
    Takes a source channel, and returns a channel that emits the most recent message
    from the source channel every period milliseconds.
    (sink callback)
    Creates a channel which will forward all messages to callback.
    
    macro
    (sink->> & transforms+callback)
    Creates a channel, pipes it through the ->> operator, and sends the
    resulting stream into the callback. This can be useful when defining
    :probes for an instrumented function, among other places.
    
    (sink->> (map* inc) (map* dec) println)
    
    expands to
    
    (let [ch (channel)]
      (receive-all
        (->> ch (map* inc) (map* dec))
        println)
      ch)
    (siphon src dst)(siphon src dst & rest)
    Takes all messages from src and forwards them to dst.  If dst closes, src is closed, but
    not vise-versa.  Error states are similarly propagated.  This is useful for many transient channels
    feeding into one channel.
    
    If more than two channels are specified, siphon becomes transitive.  `(siphon a b c)` is equivalent to
    `(siphon a b)` and `(siphon b c)`.
    macro
    (siphon->> & transforms+downstream-channel)
    A variant of sink->> where the last argument is assumed to be a channel,
    and the contents of the transform chain are siphoned into it.
    
    (siphon->> (map* inc) (map* dec) (named-channel :foo))
    
    expands to
    
    (let [ch (channel)]
      (siphon
        (->> ch (map* inc) (map* dec))
        (named-channel :foo))
      ch)
    (splice emitter receiver)
    Returns a channel where all messages are enqueud into receiver, and
    consumed from emitter.
    macro
    (split & downstream-channels)
    Returns a channel which will forward each message to all downstream-channels.
    This can be used with sink->>, siphon->>, and join->> to define complex
    message flows:
    
    (join->> (map* inc)
      (split
        (sink->> (filter* even?) log-even)
        (sink->> (filter* odd?) log-odd)))
    (success _ val)
    (success-result value)
    Returns a result already realized with a value.
    
    (take* n ch)
    A dual to take.
    
    (take* 2 (channel 1 2 3)) => [1 2]
    (take-while* f ch)
    A dual to take-while.
    
    (take-while* pos? (channel 1 2 0 4)) => [1 2]
    (tap channel)
    Behaves like fork, except that the source channel will not remain open if only the tap
    exists downstream.
    
    If the tap channel is closed, the source channel is unaffected.  However, if the source
    channel is closed all tap channels are closed.  Similar propagation rules apply to error
    states.
    (timed-result interval)(timed-result interval value)(timed-result interval value task-queue)
    Returns a result-channel that will be realized as value (defaulting to nil) in interval milliseconds.
    
    (transactional? channel)
    Returns true if channel has a transactional queue, false otherwise.
    
    (transitions ch)
    Emits messages only when they differ from the preceding message.
    
    (wait-for-message ch)(wait-for-message ch timeout)
    Blocks for the next message from the channel. If the timeout elapses without a message,
    throws a java.util.concurrent.TimeoutException.
    (wait-for-result result-channel)(wait-for-result result-channel timeout)
    Waits for the result to be realized. If the timeout elapses without a value, throws a
    java.util.concurrent.TimeoutException.
    macro
    (wait-stage interval)
    Creates a pipeline stage which simply waits for interval milliseconds before continuing onto the next stage.
    
    (watch-channel reference)
    Transforms a watchable reference into a channel of values.
    
    (with-timeout interval result)
    Returns a new result that will mimic the original result, unless interval milliseconds elapse, in which
    case it will realize as a 'lamina/timeout!' error.
    (zip channels)(zip most-frequent? channels)
    Emits a tuple containing the most recent message from all channels once a single message has been received
    from each channel.
    (zip-all channels)
    For each message from one of the streams in channels, emits a tuple containing the most recent message
    from all streams.  In order for any tuple to be emitted, at least one message must have been emitted by
    all channels.