Class DistributedQueue<T>

java.lang.Object
org.apache.curator.framework.recipes.queue.DistributedQueue<T>
All Implemented Interfaces:
Closeable, AutoCloseable, QueueBase<T>

public class DistributedQueue<T> extends Object implements QueueBase<T>

An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK's PERSISTENT_SEQUENTIAL node).

Guarantees:

  • If a single consumer takes items out of the queue, they will be ordered FIFO. i.e. if ordering is important, use a LeaderSelector to nominate a single consumer.
  • Unless a QueueBuilder.lockPath(String) is used, there is only guaranteed processing of each message to the point of receipt by a given instance.
  • If an instance receives an item from the queue but dies while processing it, the item will be lost. If you need message recoverability, use a QueueBuilder.lockPath(String)
  • Method Details

    • start

      public void start() throws Exception
      Start the queue. No other methods work until this is called
      Specified by:
      start in interface QueueBase<T>
      Throws:
      Exception - startup errors
    • close

      public void close() throws IOException
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Throws:
      IOException
    • getPutListenerContainer

      public Listenable<QueuePutListener<T>> getPutListenerContainer()
      Return the manager for put listeners
      Specified by:
      getPutListenerContainer in interface QueueBase<T>
      Returns:
      put listener container
    • setErrorMode

      public void setErrorMode(ErrorMode newErrorMode)
      Used when the queue is created with a QueueBuilder.lockPath(String). Determines the behavior when the queue consumer throws an exception
      Specified by:
      setErrorMode in interface QueueBase<T>
      Parameters:
      newErrorMode - the new error mode (the default is ErrorMode.REQUEUE
    • flushPuts

      public boolean flushPuts(long waitTime, TimeUnit timeUnit) throws InterruptedException
      Wait until any pending puts are committed
      Specified by:
      flushPuts in interface QueueBase<T>
      Parameters:
      waitTime - max wait time
      timeUnit - time unit
      Returns:
      true if the flush was successful, false if it timed out first
      Throws:
      InterruptedException - if thread was interrupted
    • put

      public void put(T item) throws Exception
      Add an item into the queue. Adding is done in the background - thus, this method will return quickly.

      NOTE: if an upper bound was set via QueueBuilder.maxItems, this method will block until there is available space in the queue.
      Parameters:
      item - item to add
      Throws:
      Exception - connection issues
    • put

      public boolean put(T item, int maxWait, TimeUnit unit) throws Exception
      Same as put(Object) but allows a maximum wait time if an upper bound was set via QueueBuilder.maxItems.
      Parameters:
      item - item to add
      maxWait - maximum wait
      unit - wait unit
      Returns:
      true if items was added, false if timed out
      Throws:
      Exception
    • putMulti

      public void putMulti(MultiItem<T> items) throws Exception
      Add a set of items into the queue. Adding is done in the background - thus, this method will return quickly.

      NOTE: if an upper bound was set via QueueBuilder.maxItems, this method will block until there is available space in the queue.
      Parameters:
      items - items to add
      Throws:
      Exception - connection issues
    • putMulti

      public boolean putMulti(MultiItem<T> items, int maxWait, TimeUnit unit) throws Exception
      Same as putMulti(MultiItem) but allows a maximum wait time if an upper bound was set via QueueBuilder.maxItems.
      Parameters:
      items - items to add
      maxWait - maximum wait
      unit - wait unit
      Returns:
      true if items was added, false if timed out
      Throws:
      Exception
    • getLastMessageCount

      public int getLastMessageCount()
      Return the most recent message count from the queue. This is useful for debugging/information purposes only.
      Specified by:
      getLastMessageCount in interface QueueBase<T>
      Returns:
      count (can be 0)
    • sortChildren

      protected void sortChildren(List<String> children)
    • getChildren

      protected List<String> getChildren() throws Exception
      Throws:
      Exception
    • getDelay

      protected long getDelay(String itemNode)
    • tryRemove

      protected boolean tryRemove(String itemNode) throws Exception
      Throws:
      Exception
    • processWithLockSafety

      protected boolean processWithLockSafety(String itemNode, DistributedQueue.ProcessType type) throws Exception
      Throws:
      Exception
    • makeRequeueItemPath

      protected String makeRequeueItemPath(String itemPath)