mirror of
https://github.com/KevinMidboe/linguist.git
synced 2025-10-29 09:40:21 +00:00
386 lines
11 KiB
Ruby
386 lines
11 KiB
Ruby
require 'redis/namespace'
|
|
|
|
require 'resque/version'
|
|
|
|
require 'resque/errors'
|
|
|
|
require 'resque/failure'
|
|
require 'resque/failure/base'
|
|
|
|
require 'resque/helpers'
|
|
require 'resque/stat'
|
|
require 'resque/job'
|
|
require 'resque/worker'
|
|
require 'resque/plugin'
|
|
require 'resque/queue'
|
|
require 'resque/multi_queue'
|
|
require 'resque/coder'
|
|
require 'resque/multi_json_coder'
|
|
|
|
module Resque
|
|
include Helpers
|
|
extend self
|
|
|
|
# Accepts:
|
|
# 1. A 'hostname:port' String
|
|
# 2. A 'hostname:port:db' String (to select the Redis db)
|
|
# 3. A 'hostname:port/namespace' String (to set the Redis namespace)
|
|
# 4. A Redis URL String 'redis://host:port'
|
|
# 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
|
|
# or `Redis::Namespace`.
|
|
def redis=(server)
|
|
case server
|
|
when String
|
|
if server =~ /redis\:\/\//
|
|
redis = Redis.connect(:url => server, :thread_safe => true)
|
|
else
|
|
server, namespace = server.split('/', 2)
|
|
host, port, db = server.split(':')
|
|
redis = Redis.new(:host => host, :port => port,
|
|
:thread_safe => true, :db => db)
|
|
end
|
|
namespace ||= :resque
|
|
|
|
@redis = Redis::Namespace.new(namespace, :redis => redis)
|
|
when Redis::Namespace
|
|
@redis = server
|
|
else
|
|
@redis = Redis::Namespace.new(:resque, :redis => server)
|
|
end
|
|
@queues = Hash.new { |h,name|
|
|
h[name] = Resque::Queue.new(name, @redis, coder)
|
|
}
|
|
end
|
|
|
|
# Encapsulation of encode/decode. Overwrite this to use it across Resque.
|
|
# This defaults to MultiJson for backwards compatibilty.
|
|
def coder
|
|
@coder ||= MultiJsonCoder.new
|
|
end
|
|
attr_writer :coder
|
|
|
|
# Returns the current Redis connection. If none has been created, will
|
|
# create a new one.
|
|
def redis
|
|
return @redis if @redis
|
|
self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
|
|
self.redis
|
|
end
|
|
|
|
def redis_id
|
|
# support 1.x versions of redis-rb
|
|
if redis.respond_to?(:server)
|
|
redis.server
|
|
elsif redis.respond_to?(:nodes) # distributed
|
|
redis.nodes.map { |n| n.id }.join(', ')
|
|
else
|
|
redis.client.id
|
|
end
|
|
end
|
|
|
|
# The `before_first_fork` hook will be run in the **parent** process
|
|
# only once, before forking to run the first job. Be careful- any
|
|
# changes you make will be permanent for the lifespan of the
|
|
# worker.
|
|
#
|
|
# Call with a block to set the hook.
|
|
# Call with no arguments to return the hook.
|
|
def before_first_fork(&block)
|
|
block ? (@before_first_fork = block) : @before_first_fork
|
|
end
|
|
|
|
# Set a proc that will be called in the parent process before the
|
|
# worker forks for the first time.
|
|
attr_writer :before_first_fork
|
|
|
|
# The `before_fork` hook will be run in the **parent** process
|
|
# before every job, so be careful- any changes you make will be
|
|
# permanent for the lifespan of the worker.
|
|
#
|
|
# Call with a block to set the hook.
|
|
# Call with no arguments to return the hook.
|
|
def before_fork(&block)
|
|
block ? (@before_fork = block) : @before_fork
|
|
end
|
|
|
|
# Set the before_fork proc.
|
|
attr_writer :before_fork
|
|
|
|
# The `after_fork` hook will be run in the child process and is passed
|
|
# the current job. Any changes you make, therefore, will only live as
|
|
# long as the job currently being processed.
|
|
#
|
|
# Call with a block to set the hook.
|
|
# Call with no arguments to return the hook.
|
|
def after_fork(&block)
|
|
block ? (@after_fork = block) : @after_fork
|
|
end
|
|
|
|
# Set the after_fork proc.
|
|
attr_writer :after_fork
|
|
|
|
def to_s
|
|
"Resque Client connected to #{redis_id}"
|
|
end
|
|
|
|
attr_accessor :inline
|
|
|
|
# If 'inline' is true Resque will call #perform method inline
|
|
# without queuing it into Redis and without any Resque callbacks.
|
|
# The 'inline' is false Resque jobs will be put in queue regularly.
|
|
alias :inline? :inline
|
|
|
|
#
|
|
# queue manipulation
|
|
#
|
|
|
|
# Pushes a job onto a queue. Queue name should be a string and the
|
|
# item should be any JSON-able Ruby object.
|
|
#
|
|
# Resque works generally expect the `item` to be a hash with the following
|
|
# keys:
|
|
#
|
|
# class - The String name of the job to run.
|
|
# args - An Array of arguments to pass the job. Usually passed
|
|
# via `class.to_class.perform(*args)`.
|
|
#
|
|
# Example
|
|
#
|
|
# Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
|
|
#
|
|
# Returns nothing
|
|
def push(queue, item)
|
|
queue(queue) << item
|
|
end
|
|
|
|
# Pops a job off a queue. Queue name should be a string.
|
|
#
|
|
# Returns a Ruby object.
|
|
def pop(queue)
|
|
begin
|
|
queue(queue).pop(true)
|
|
rescue ThreadError
|
|
nil
|
|
end
|
|
end
|
|
|
|
# Returns an integer representing the size of a queue.
|
|
# Queue name should be a string.
|
|
def size(queue)
|
|
queue(queue).size
|
|
end
|
|
|
|
# Returns an array of items currently queued. Queue name should be
|
|
# a string.
|
|
#
|
|
# start and count should be integer and can be used for pagination.
|
|
# start is the item to begin, count is how many items to return.
|
|
#
|
|
# To get the 3rd page of a 30 item, paginatied list one would use:
|
|
# Resque.peek('my_list', 59, 30)
|
|
def peek(queue, start = 0, count = 1)
|
|
queue(queue).slice start, count
|
|
end
|
|
|
|
# Does the dirty work of fetching a range of items from a Redis list
|
|
# and converting them into Ruby objects.
|
|
def list_range(key, start = 0, count = 1)
|
|
if count == 1
|
|
decode redis.lindex(key, start)
|
|
else
|
|
Array(redis.lrange(key, start, start+count-1)).map do |item|
|
|
decode item
|
|
end
|
|
end
|
|
end
|
|
|
|
# Returns an array of all known Resque queues as strings.
|
|
def queues
|
|
Array(redis.smembers(:queues))
|
|
end
|
|
|
|
# Given a queue name, completely deletes the queue.
|
|
def remove_queue(queue)
|
|
queue(queue).destroy
|
|
@queues.delete(queue.to_s)
|
|
end
|
|
|
|
# Return the Resque::Queue object for a given name
|
|
def queue(name)
|
|
@queues[name.to_s]
|
|
end
|
|
|
|
|
|
#
|
|
# job shortcuts
|
|
#
|
|
|
|
# This method can be used to conveniently add a job to a queue.
|
|
# It assumes the class you're passing it is a real Ruby class (not
|
|
# a string or reference) which either:
|
|
#
|
|
# a) has a @queue ivar set
|
|
# b) responds to `queue`
|
|
#
|
|
# If either of those conditions are met, it will use the value obtained
|
|
# from performing one of the above operations to determine the queue.
|
|
#
|
|
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
|
|
#
|
|
# Returns true if the job was queued, nil if the job was rejected by a
|
|
# before_enqueue hook.
|
|
#
|
|
# This method is considered part of the `stable` API.
|
|
def enqueue(klass, *args)
|
|
enqueue_to(queue_from_class(klass), klass, *args)
|
|
end
|
|
|
|
# Just like `enqueue` but allows you to specify the queue you want to
|
|
# use. Runs hooks.
|
|
#
|
|
# `queue` should be the String name of the queue you're targeting.
|
|
#
|
|
# Returns true if the job was queued, nil if the job was rejected by a
|
|
# before_enqueue hook.
|
|
#
|
|
# This method is considered part of the `stable` API.
|
|
def enqueue_to(queue, klass, *args)
|
|
# Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
|
|
before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
|
|
klass.send(hook, *args)
|
|
end
|
|
return nil if before_hooks.any? { |result| result == false }
|
|
|
|
Job.create(queue, klass, *args)
|
|
|
|
Plugin.after_enqueue_hooks(klass).each do |hook|
|
|
klass.send(hook, *args)
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
# This method can be used to conveniently remove a job from a queue.
|
|
# It assumes the class you're passing it is a real Ruby class (not
|
|
# a string or reference) which either:
|
|
#
|
|
# a) has a @queue ivar set
|
|
# b) responds to `queue`
|
|
#
|
|
# If either of those conditions are met, it will use the value obtained
|
|
# from performing one of the above operations to determine the queue.
|
|
#
|
|
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
|
|
#
|
|
# If no args are given, this method will dequeue *all* jobs matching
|
|
# the provided class. See `Resque::Job.destroy` for more
|
|
# information.
|
|
#
|
|
# Returns the number of jobs destroyed.
|
|
#
|
|
# Example:
|
|
#
|
|
# # Removes all jobs of class `UpdateNetworkGraph`
|
|
# Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph)
|
|
#
|
|
# # Removes all jobs of class `UpdateNetworkGraph` with matching args.
|
|
# Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph, 'repo:135325')
|
|
#
|
|
# This method is considered part of the `stable` API.
|
|
def dequeue(klass, *args)
|
|
# Perform before_dequeue hooks. Don't perform dequeue if any hook returns false
|
|
before_hooks = Plugin.before_dequeue_hooks(klass).collect do |hook|
|
|
klass.send(hook, *args)
|
|
end
|
|
return if before_hooks.any? { |result| result == false }
|
|
|
|
Job.destroy(queue_from_class(klass), klass, *args)
|
|
|
|
Plugin.after_dequeue_hooks(klass).each do |hook|
|
|
klass.send(hook, *args)
|
|
end
|
|
end
|
|
|
|
# Given a class, try to extrapolate an appropriate queue based on a
|
|
# class instance variable or `queue` method.
|
|
def queue_from_class(klass)
|
|
klass.instance_variable_get(:@queue) ||
|
|
(klass.respond_to?(:queue) and klass.queue)
|
|
end
|
|
|
|
# This method will return a `Resque::Job` object or a non-true value
|
|
# depending on whether a job can be obtained. You should pass it the
|
|
# precise name of a queue: case matters.
|
|
#
|
|
# This method is considered part of the `stable` API.
|
|
def reserve(queue)
|
|
Job.reserve(queue)
|
|
end
|
|
|
|
# Validates if the given klass could be a valid Resque job
|
|
#
|
|
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
|
|
#
|
|
# If given klass is nil this method will raise a `Resque::NoClassError`
|
|
def validate(klass, queue = nil)
|
|
queue ||= queue_from_class(klass)
|
|
|
|
if !queue
|
|
raise NoQueueError.new("Jobs must be placed onto a queue.")
|
|
end
|
|
|
|
if klass.to_s.empty?
|
|
raise NoClassError.new("Jobs must be given a class.")
|
|
end
|
|
end
|
|
|
|
|
|
#
|
|
# worker shortcuts
|
|
#
|
|
|
|
# A shortcut to Worker.all
|
|
def workers
|
|
Worker.all
|
|
end
|
|
|
|
# A shortcut to Worker.working
|
|
def working
|
|
Worker.working
|
|
end
|
|
|
|
# A shortcut to unregister_worker
|
|
# useful for command line tool
|
|
def remove_worker(worker_id)
|
|
worker = Resque::Worker.find(worker_id)
|
|
worker.unregister_worker
|
|
end
|
|
|
|
#
|
|
# stats
|
|
#
|
|
|
|
# Returns a hash, similar to redis-rb's #info, of interesting stats.
|
|
def info
|
|
return {
|
|
:pending => queues.inject(0) { |m,k| m + size(k) },
|
|
:processed => Stat[:processed],
|
|
:queues => queues.size,
|
|
:workers => workers.size.to_i,
|
|
:working => working.size,
|
|
:failed => Stat[:failed],
|
|
:servers => [redis_id],
|
|
:environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
|
|
}
|
|
end
|
|
|
|
# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
|
|
# is O(N) for the keyspace, so be careful - this can be slow for big databases.
|
|
def keys
|
|
redis.keys("*").map do |key|
|
|
key.sub("#{redis.namespace}:", '')
|
|
end
|
|
end
|
|
end
|
|
|