salt.runners.queue

General management and processing of queues.

This runner facilitates interacting with various queue backends such as the included sqlite3 queue or the planned AWS SQS and Redis queues

The queue functions such as insert, delete, and pop can be used for typical management of the queue.

The process_queue function pops the requested number of items from the queue and creates a Salt Event that can then be processed by a Reactor. The process_queue function can be called manually, or can be configured to run on a schedule with the Salt Scheduler or regular system cron. It is also possible to use the peer system to allow a minion to call the runner.

This runner, as well as the Queues system, is not api stable at this time.

There are many things that could potentially be done with queues within Salt. For the time being the focus will be on queueing infrastructure actions on specific minions. The queues generally will be populated with minion IDs. When the process_queue runner function is called events are created on the Salt Event bus that indicate the queue and a list of one or more minion IDs. The reactor is set up to match on event tags for a specific queue and then take infrastructure actions on those minion IDs. These actions might be to delete the minion's key from the master, use salt-cloud to destroy the vm, or some other custom action.

Queued runners

Using the Salt Queues, references to the commandline arguments of other runners can be saved to be processed later. The queue runners require a queue backend that can store json data (default: pgjsonb).

Once the queue is setup, the runner_queue will need to be configured.

runner_queue:
  queue: runners
  backend: pgjsonb

Note

only the queue is required, this defaults to using pgjsonb

Once this is set, then the following can be added to the scheduler on the master and it will run the specified amount of commands per time period.

schedule:
  runner queue:
    schedule:
      function: queue.process_runner
      minutes: 1
      kwargs:
        quantity: 2

The above configuration will pop 2 runner jobs off the runner queue, and then run them. And it will do this every minute, unless there are any jobs that are still running from the last time the process_runner task was executed.

salt.runners.queue.delete(queue, items, backend='sqlite')

Delete an item or items from a queue

CLI Example:

salt-run queue.delete myqueue myitem
salt-run queue.delete myqueue myitem backend=sqlite
salt-run queue.delete myqueue "['item1', 'item2', 'item3']"
salt.runners.queue.insert(queue, items, backend='sqlite')

Add an item or items to a queue

CLI Example:

salt-run queue.insert myqueue myitem
salt-run queue.insert myqueue "['item1', 'item2', 'item3']"
salt-run queue.insert myqueue myitem backend=sqlite
salt-run queue.insert myqueue "['item1', 'item2', 'item3']" backend=sqlite
salt.runners.queue.insert_runner(fun, args=None, kwargs=None, queue=None, backend=None)

Insert a reference to a runner into the queue so that it can be run later.

fun
The runner function that is going to be run
args
list or comma-seperated string of args to send to fun
kwargs
dictionary of keyword arguments to send to fun
queue
queue to insert the runner reference into
backend
backend that to use for the queue

CLI Example:

salt-run queue.insert_runner test.stdout_print
salt-run queue.insert_runner event.send test_insert_runner kwargs='{"data": {"foo": "bar"}}'
salt.runners.queue.list_items(queue, backend='sqlite')

List contents of a queue

CLI Example:

salt-run queue.list_items myqueue
salt-run queue.list_items myqueue backend=sqlite
salt.runners.queue.list_length(queue, backend='sqlite')

Provide the number of items in a queue

CLI Example:

salt-run queue.list_length myqueue
salt-run queue.list_length myqueue backend=sqlite
salt.runners.queue.list_queues(backend='sqlite')

Return a list of Salt Queues on the backend

CLI Example:

salt-run queue.list_queues
salt-run queue.list_queues backend=sqlite
salt.runners.queue.pop(queue, quantity=1, backend='sqlite', is_runner=False)

Pop one or more or all items from a queue

CLI Example:

salt-run queue.pop myqueue
salt-run queue.pop myqueue 6
salt-run queue.pop myqueue all
salt-run queue.pop myqueue 6 backend=sqlite
salt-run queue.pop myqueue all backend=sqlite
salt.runners.queue.process_queue(queue, quantity=1, backend='sqlite', is_runner=False)

Pop items off a queue and create an event on the Salt event bus to be processed by a Reactor.

CLI Example:

salt-run queue.process_queue myqueue
salt-run queue.process_queue myqueue 6
salt-run queue.process_queue myqueue all backend=sqlite
salt.runners.queue.process_runner(quantity=1, queue=None, backend=None)

Process queued runners

quantity
number of runners to process
queue
queue to insert the runner reference into
backend
backend that to use for the queue

CLI Example:

salt-run queue.process_runner
salt-run queue.process_runner 5