公開されているresque-schedulerで条件に該当するジョブをまとめて削除する方法が見当たらなかったのでハックしたという話です。

resque-schedulerはバックグラウンド用gemであるResqueをさらに拡張したもので、cronのような定期的な動作をさせたり、特定の時刻に動作をさせたりするためのgemです。

resque/resque-scheduler

resque-schedulerのジョブを削除するために少し手をいれる必要があったのでそれについて書き残しておきます。

なお、この記事に書いてある内容はresque-scheduler 2.3.1時点のものであり、これ以降のバージョンでは以下に記す内容は必要なくなっている可能性があります。

Resqueのジョブ

まずResqueの概要ですが、Resqueでバックグラウンドジョブを登録するときはジョブを処理するクラスの名前と任意のパラメータを渡します。たとえばJobHandlerというジョブを処理するクラスがあって、パラメータを2つ受け取るとすると、クラス定義とジョブの登録はそれぞれ次のようになります。

 1# job handler class
 2class JobHandler
 3  def self.perform(hoge_id,piyo_id)
 4    # do something in background...
 5  end
 6end
 7
 8# enqueue
 9def hoge
10  hoge = Hoge.where( ...
11  piyo = Piyo.where( ...
12  Resque.enqueue(JobHandler, hoge.id, piyo.id)
13end

実際の動作はというと、まずResque.enqueueでは渡されたクラス名やパラメータをResque.encodeメソッドにより1つの文字列化したものをRedisに入れます。そうするとバックグラウンドで動いているプロセスがRedis内のデータを参照し、処理する必要があるジョブの文字列をResque.decodeメソッドでクラス名とパラメータに分解し直します。そして、そのクラスのperformメソッドにパラメータを渡して実行するという流れになっています。

resque-schedulerも基本的には同じですが、時間を指定できます。

1Resque.enqueue_at(1.day.from_now, JobHandler, hoge.id, piyo.id)

スケジュールされたジョブを取りやめたい場合

resque-schedulerにはResque.enqueue_atの時間指定以外の引数を与えることができるResque.remove_delayedというメソッドがあります。このメソッドを使って指定したジョブを削除できます。

1def foo
2  Resque.enqueue_at(1.day.from_now, JobHandler, 5, 10)
3end
4
5def bar
6  Rescue.remove_delayed(JobHandler, 5, 10)
7end

こんな風にして、ジョブの追加や削除が可能です。

では、条件に該当する複数のジョブを一気に削除したいときはどうすればいいのでしょう。上の例で言えば、hoge_id=5となる全てのジョブを削除したい、というような場合です。

実は2013年12月17日現在gemとしてリリースされているresque-schedulerの最新バージョン(2.3.1)にはそのためのメソッドが定義されていません。実はgithubのコードにはそのためのメソッドが新しく実装されているので、Gemfileでgithubリポジトリを参照するようにしてもよかったのですが、githubのバージョンではResqueのWeb UIに不具合があって困るので僕は使えないと判断しました。

githubの該当箇所

先ほど紹介したようにResqueのジョブは少しばかりencodeされていて非常に取り回しづらい上、Redisを直に叩くとなるとさらにコードが煩雑になりかねません。そこで、githubリポジトリで導入されている、条件に当てはまるジョブを削除するためのメソッドだけを取り込むことにしました。

ここでは、resque-schedulerがそうしているように、Resqueをextendする形で拡張します。

config/initializers/resque_init.rbあたりのresque用ファイルに次のコードを追記します。

 1module ResqueSchedulerExt
 2  # this methods is not implemented in current version (2.3.1)
 3  def remove_delayed_selection
 4    fail ArgumentError, "Please supply a block" unless block_given?
 5
 6    destroyed = 0
 7    # There is no way to search Redis list entries for a partial match, so we query for all
 8    # delayed job tasks and do our matching after decoding the payload data
 9    jobs = Resque.redis.keys("delayed:*")
10    jobs.each do |job|
11      index = Resque.redis.llen(job) - 1
12      while index >= 0
13        payload = Resque.redis.lindex(job, index)
14        decoded_payload = decode(payload)
15        if yield(decoded_payload['args'])
16          removed = redis.lrem job, 0, payload
17          destroyed += removed
18          index -= removed
19        else
20          index -= 1
21        end
22      end
23    end
24    destroyed
25  end
26end
27
28Resque.extend ResqueSchedulerExt

これでResque.remove_delayed_selectionというメソッドを使えるようになりました。

あとは、必要な箇所でこのメソッドを呼び出すだけでOKです。

1Resque.remove_delayed_selection do |args| # args is [hoge_id, piyo_id]
2  args[0] == 5
3end

これでhoge_id=5であるジョブを全て削除できました。