Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require 'event_source/version'
require 'event_source/ruby_versions'
require 'event_source/error'
require 'event_source/threaded'
require 'event_source/inflector'
require 'event_source/logging'
require 'event_source/uris/uri'
Expand Down Expand Up @@ -68,6 +69,7 @@ def configure
end

def initialize!
boot_threading!
load_protocols
create_connections
load_async_api_resources
Expand All @@ -89,6 +91,14 @@ def build_async_api_resource(resource)
.call(resource)
.success
end

def boot_threading!
@threaded = EventSource::Threaded.new
end

def threaded
@threaded
end
end

class EventSourceLogger
Expand Down
3 changes: 2 additions & 1 deletion lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ def on_receive_message(
payload,
&executable
)

EventSource.threaded.amqp_consumer_lock.mon_enter
subscription_handler.run
rescue Bunny::Exception => e
logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}"
ensure
EventSource.threaded.amqp_consumer_lock.mon_exit
subscriber = nil
end

Expand Down
14 changes: 14 additions & 0 deletions lib/event_source/threaded.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module EventSource
# Manages concurrent resource access in a threaded environment.
class Threaded

attr_reader :amqp_consumer_lock, :worker_lock

def initialize
@amqp_consumer_lock = ::Monitor.new
@worker_lock = ::Monitor.new
end
end
end