公開されているresque-schedulerで条件に該当するジョブをまとめて削除する方法が見当たらなかったのでハックしたという話です。
resque-schedulerはバックグラウンド用gemであるResqueをさらに拡張したもので、cronのような定期的な動作をさせたり、特定の時刻に動作をさせたりするためのgemです。
resque-schedulerのジョブを削除するために少し手をいれる必要があったのでそれについて書き残しておきます。
なお、この記事に書いてある内容はresque-scheduler 2.3.1時点のものであり、これ以降のバージョンでは以下に記す内容は必要なくなっている可能性があります。
Resqueのジョブ
まずResqueの概要ですが、Resqueでバックグラウンドジョブを登録するときはジョブを処理するクラスの名前と任意のパラメータを渡します。たとえばJobHandler
というジョブを処理するクラスがあって、パラメータを2つ受け取るとすると、クラス定義とジョブの登録はそれぞれ次のようになります。
# job handler class
class JobHandler
def self.perform(hoge_id,piyo_id)
# do something in background...
end
end
# enqueue
def hoge
hoge = Hoge.where( ...
piyo = Piyo.where( ...
Resque.enqueue(JobHandler, hoge.id, piyo.id)
end
実際の動作はというと、まずResque.enqueue
では渡されたクラス名やパラメータをResque.encode
メソッドにより1つの文字列化したものをRedisに入れます。そうするとバックグラウンドで動いているプロセスがRedis内のデータを参照し、処理する必要があるジョブの文字列をResque.decode
メソッドでクラス名とパラメータに分解し直します。そして、そのクラスのperform
メソッドにパラメータを渡して実行するという流れになっています。
resque-schedulerも基本的には同じですが、時間を指定できます。
Resque.enqueue_at(1.day.from_now, JobHandler, hoge.id, piyo.id)
スケジュールされたジョブを取りやめたい場合
resque-schedulerにはResque.enqueue_at
の時間指定以外の引数を与えることができるResque.remove_delayed
というメソッドがあります。このメソッドを使って指定したジョブを削除できます。
def foo
Resque.enqueue_at(1.day.from_now, JobHandler, 5, 10)
end
def bar
Rescue.remove_delayed(JobHandler, 5, 10)
end
こんな風にして、ジョブの追加や削除が可能です。
では、条件に該当する複数のジョブを一気に削除したいときはどうすればいいのでしょう。上の例で言えば、hoge_id=5
となる全てのジョブを削除したい、というような場合です。
実は2013年12月17日現在gemとしてリリースされているresque-schedulerの最新バージョン(2.3.1)にはそのためのメソッドが定義されていません。実はgithubのコードにはそのためのメソッドが新しく実装されているので、Gemfileでgithubリポジトリを参照するようにしてもよかったのですが、githubのバージョンではResqueのWeb UIに不具合があって困るので僕は使えないと判断しました。
先ほど紹介したようにResqueのジョブは少しばかりencodeされていて非常に取り回しづらい上、Redisを直に叩くとなるとさらにコードが煩雑になりかねません。そこで、githubリポジトリで導入されている、条件に当てはまるジョブを削除するためのメソッドだけを取り込むことにしました。
ここでは、resque-schedulerがそうしているように、Resqueをextendする形で拡張します。
config/initializers/resque_init.rb
あたりのresque用ファイルに次のコードを追記します。
module ResqueSchedulerExt
# this methods is not implemented in current version (2.3.1)
def remove_delayed_selection
fail ArgumentError, "Please supply a block" unless block_given?
destroyed = 0
# There is no way to search Redis list entries for a partial match, so we query for all
# delayed job tasks and do our matching after decoding the payload data
jobs = Resque.redis.keys("delayed:*")
jobs.each do |job|
index = Resque.redis.llen(job) - 1
while index >= 0
payload = Resque.redis.lindex(job, index)
decoded_payload = decode(payload)
if yield(decoded_payload['args'])
removed = redis.lrem job, 0, payload
destroyed += removed
index -= removed
else
index -= 1
end
end
end
destroyed
end
end
Resque.extend ResqueSchedulerExt
これでResque.remove_delayed_selection
というメソッドを使えるようになりました。
あとは、必要な箇所でこのメソッドを呼び出すだけでOKです。
Resque.remove_delayed_selection do |args| # args is [hoge_id, piyo_id]
args[0] == 5
end
これでhoge_id=5
であるジョブを全て削除できました。