Skip to content

Commit be19104

Browse files
committed
respect the state update counter if present
1 parent 33263c8 commit be19104

File tree

5 files changed

+157
-1
lines changed

5 files changed

+157
-1
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
module Travis
2+
module Hub
3+
module Service
4+
class StateUpdate < Struct.new(:data)
5+
class Counter < Struct.new(:job_id, :redis)
6+
TTL = 3600 * 12
7+
8+
def count
9+
@count ||= redis.get(key).to_i
10+
end
11+
12+
def increment
13+
count = redis.incr(key)
14+
redis.expire(key, TTL)
15+
count
16+
end
17+
18+
private
19+
20+
def key
21+
"job:state_update_count:#{job_id}"
22+
end
23+
end
24+
25+
include Helper::Context
26+
27+
MSGS = {
28+
missing: 'Received state update with no count for job id=%p, last known count: %p.',
29+
ordered: 'Received state update %p for job id=%p, last known count: %p',
30+
unordered: 'Received state update %p for job id=%p, last known count: %p. %s',
31+
skip: 'Skipping the message.'
32+
}
33+
34+
def apply?
35+
return missing unless given?
36+
apply = ordered? ? ordered : unordered
37+
return true unless ENV['UPDATE_COUNT']
38+
apply
39+
end
40+
41+
private
42+
43+
def given?
44+
!count.nil?
45+
end
46+
47+
def missing
48+
warn :missing, job_id, counter.count
49+
true
50+
end
51+
52+
def ordered
53+
info :ordered, count, job_id, counter.count
54+
true
55+
end
56+
57+
def unordered
58+
warn :unordered, count, job_id, counter.count, ENV['UPDATE_COUNT'] ? MSGS[:skip] : ''
59+
false
60+
end
61+
62+
def ordered?
63+
count >= counter.count
64+
end
65+
66+
def counter
67+
@counter ||= Counter.new(job_id, redis)
68+
end
69+
70+
def job_id
71+
data[:id]
72+
end
73+
74+
def count
75+
meta[:state_update_count]
76+
end
77+
78+
def meta
79+
data[:meta] || {}
80+
end
81+
end
82+
end
83+
end
84+
end

lib/travis/hub/service/update_job.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'travis/hub/helper/locking'
44
require 'travis/hub/model/job'
55
require 'travis/hub/service/error_job'
6+
require 'travis/hub/service/state_update'
67
require 'travis/hub/service/notify_workers'
78
require 'travis/hub/helper/limit'
89

@@ -16,7 +17,7 @@ class UpdateJob < Struct.new(:event, :data)
1617
EVENTS = [:receive, :reset, :start, :finish, :cancel, :restart]
1718

1819
MSGS = {
19-
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %s to %s data=%s',
20+
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %p to %p data=%s',
2021
}
2122

2223
def run
@@ -37,6 +38,7 @@ def job
3738
private
3839

3940
def update_job
41+
return skipped unless apply_state_update?
4042
return error_job if event == :reset && resets.limited? && !job.finished?
4143
return skipped if skip_canceled?
4244
return skipped unless job.reload.send(:"#{event}!", attrs)
@@ -63,6 +65,10 @@ def skipped
6365
warn :skipped, event, job.id, job.state, data[:state], data
6466
end
6567

68+
def apply_state_update?
69+
StateUpdate.new(context, data).apply?
70+
end
71+
6672
def resets
6773
@resets ||= Limit.new(redis, :resets, job.id, config.limit.resets)
6874
end

spec/spec_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@
2626
Travis::Event.instance_variable_set(:@subscriptions, nil)
2727
# Travis::Addons.setup({ host: 'host.com', encryption: { key: 'secret' * 10 } }, logger)
2828
Time.stubs(:now).returns(NOW)
29+
context.redis.flushall
2930
end
3031
end

spec/support/context.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module Context
66

77
included do
88
let(:stdout) { StringIO.new }
9+
let(:log) { stdout.string }
910
let(:logger) { Travis::Logger.new(stdout) }
1011
let(:context) { Travis::Hub::Context.new(logger: logger) }
1112
before { Travis::Instrumentation.setup(context.logger) }

spec/travis/hub/service/update_job_spec.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,4 +201,68 @@ def recieve(msg)
201201
expect(job.reload.state).to eql :passed
202202
end
203203
end
204+
205+
describe 'state update count' do
206+
let(:job) { FactoryGirl.create(:job, state: :received) }
207+
let(:event) { :start }
208+
let(:data) { { id: job.id, state: :started, meta: meta } }
209+
210+
before { ENV['UPDATE_COUNT'] = 'true' }
211+
after { ENV['UPDATE_COUNT'] = nil }
212+
213+
describe 'with no count stored' do
214+
describe 'given no meta' do
215+
let(:meta) { nil }
216+
before { subject.run }
217+
218+
it { expect(job.reload.state).to eq :started }
219+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 0" }
220+
end
221+
222+
describe 'given no count' do
223+
let(:meta) { {} }
224+
before { subject.run }
225+
226+
it { expect(job.reload.state).to eq :started }
227+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 0" }
228+
end
229+
230+
describe 'given a count' do
231+
let(:meta) { { state_update_count: 2 } }
232+
before { subject.run }
233+
234+
it { expect(job.reload.state).to eq :started }
235+
it { expect(log).to include "I Received state update 2 for job id=#{job.id}, last known count: 0" }
236+
end
237+
end
238+
239+
describe 'with a count stored' do
240+
before { context.redis.set("job:state_update_count:#{job.id}", 3) }
241+
242+
describe 'given no meta it skips the message' do
243+
let(:meta) { nil }
244+
before { subject.run }
245+
246+
it { expect(job.reload.state).to eq :started }
247+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 3" }
248+
end
249+
250+
describe 'given no count it skips the message' do
251+
let(:meta) { {} }
252+
before { subject.run }
253+
254+
it { expect(job.reload.state).to eq :started }
255+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 3" }
256+
end
257+
258+
describe 'given a count it skips the message' do
259+
let(:meta) { { state_update_count: 2 } }
260+
before { subject.run }
261+
262+
it { expect(job.reload.state).to eq :received }
263+
it { expect(log).to include "W Received state update 2 for job id=#{job.id}, last known count: 3. Skipping the message." }
264+
it { expect(log).to include "W Skipped event job:start for <Job id=#{job.id}> trying to update state from :received to :started" }
265+
end
266+
end
267+
end
204268
end

0 commit comments

Comments
 (0)