diff --git a/lib/event_source.rb b/lib/event_source.rb index 0956d8c6..c577c44d 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -15,6 +15,7 @@ require 'event_source/version' require 'event_source/ruby_versions' require 'event_source/error' +require 'event_source/threaded' require 'event_source/inflector' require 'event_source/logging' require 'event_source/uris/uri' @@ -68,6 +69,7 @@ def configure end def initialize! + boot_threading! load_protocols create_connections load_async_api_resources @@ -89,6 +91,14 @@ def build_async_api_resource(resource) .call(resource) .success end + + def boot_threading! + @threaded = EventSource::Threaded.new + end + + def threaded + @threaded + end end class EventSourceLogger diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 89c07b58..fbfe376e 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -143,11 +143,12 @@ def on_receive_message( payload, &executable ) - + EventSource.threaded.amqp_consumer_lock.mon_enter subscription_handler.run rescue Bunny::Exception => e logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure + EventSource.threaded.amqp_consumer_lock.mon_exit subscriber = nil end diff --git a/lib/event_source/threaded.rb b/lib/event_source/threaded.rb new file mode 100644 index 00000000..43de85a6 --- /dev/null +++ b/lib/event_source/threaded.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module EventSource + # Manages concurrent resource access in a threaded environment. + class Threaded + + attr_reader :amqp_consumer_lock, :worker_lock + + def initialize + @amqp_consumer_lock = ::Monitor.new + @worker_lock = ::Monitor.new + end + end +end