A simple little module to capture, process, and dispatch Postgresql NOTIFY streams
- Monitor multiple channels
- Register one or more callbacks with multiple channels
- Register one or more channels with a callback
- Control callbacks on a per channel basis
- Add and remove channels at any time
- Mute and un-mute channels
- Add and remove subscribers at any time
- Mute and un-mute subscribers
- Abstract away asynchronous context for synchronous use
- Automatic str -> type conversion of all valid python types via
ast.literal_eval - Persistent, immutable internal data structures
- Minimalist API
- Tight footprint
pip install pgnotifierimport sys
from pgnotifier import Notifier
print("Ctrl-C to exit", file=sys.stderr)
conf = {
'dbname': "my_db_name",
'user': "my_db_user",
'password': "my_password",
'host': "my_host",
'port': "my_port",
}
n = Notifier(conf)
n.add_channels(['ch1', 'ch3'])
n.subscribe(42, 'ch1',
lambda id, channel, payload, pid: print(
"callback id: ", id, ", channel: ", channel,
", payload: ", payload, ", pid: ", pid))
n.subscribe('an_id', 'ch2',
lambda *_: print("I'm just going to ignore that."))
def do_complex_thing(id, channel, payload, pid):
if isinstance(payload, dict) or type(payload) == dict:
for k,v in payload.items():
print("doing something with key:",k, "-> val",v)
# do something else
# I think you get the idea...
else:
print("payload of type: ",
type(payload), "is not what I was expecting!")
# subscriber with tuple id
n.subscribe((2, 'another_id'), 'ch2', do_complex_thing)From the Postrgesql end, send TEXT or JSON string notifications like so:
select pg_notify('ch1', '"WARNING: Something really bad happened"');
select pg_notify('ch1', '{"topic": "abc", "data": "some data", "something": "else"}');
select pg_notify('ch2', '{"topic": "xyz", "notice": "update", "data": [2, "stuff"]}');
select pg_notify('ch2', '[1,2,3,4,5]');
select pg_notify('ch3', '[1,2,3,4,5]');Back in python, the payload is passed to callbacks subscribed to channels ch1, ch2, etc. The payload is cast to it's native python type via ast.literal_eval. See https://docs.python.org/3/library/ast.html and https://docs.python.org/3/library/ast.html#ast.literal_eval
Important
Postgresql notifications must be text and must be shorter than 8000 bytes. It is recommended to only send the key of a record, or a view or table name, a function reference, etc.
The methods below provide everything needed to work with pgnotifier.
- add_channels
- remove_channels
- channels
- subscribe
- unsubscribe
- subscribers
- mute_channels
- unmute_channels
- mute_subscriber
- unmute_subscriber
- start
- stop
- restart
- is_running
The functions below are not required outside the internals of pgnotifier. They are publicly exposed and included here as a matter of interest.
Documentation about the inner-workings of pgnotifier is kept separately from the README, and can be found over here: Private methods. or via the method links below.
A list of stuff to look into at a later date can be found over here: TODO
The methods below provide everything needed to work with pgnotifier.
Constructor.
Args:
dbconfdatabase configuration, asdict.
from pgnotifier import Notifier
n = Notifier(dbconf)Adds one or more channels to the set of channels to monitor. Is a no-op if channel already exists.
Args:
channelslist of channels to add, asstr(single channel),listorset.
from pgnotifier import Notifier
n = Notifier(conf)
n.add_channels(['ch1', 'ch2', 'ch3'])Removes one or more channels from the set of channels to monitor. Is a no-op if channel doesn't exist. Optionally restarts listener thread (if needed).
Args:
channelslist of channels to remove, asstr(single channel),listorset.autorunrestart listener thread with channels removed, asbool. Defaults toTrue.
Important
Active channels, when removed, will only cease being monitored after a
listener thread restart. Thread restarts happen automatically when autorun=True.
Otherwise, if autorun=False, removed channels will continue to be
monitored until a call to stop() and start(), or restart(), is made.
Inactive channels (e.g. channel is muted and/or has no subscribers and/or has all muted subscribers), when removed, do not require a restart as they will have already been removed from the listener thread.
Listener thread (re)starts are only required under certain, specific
circumstances. It's advisable to allow pgnotifier take care of listener
thread management via the default autorun=True, unless there is a
very good reason to manage it manually.
See __maybe_restart for more detail.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.remove_channels('ch2')
c = n.channels()
print("channels:", c)Returns channel and subscriber data, as dict.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
c = n.channels()
print("channels: ", c)Adds a callback function with id for notifications on channel. Creates channel if channel does not exist. Optionally restarts listener thread (if needed).
Args:
idsubscriber id, ashashable(i.e. any immutable type such as strings, numbers, and tuples containing immutable types).channelnotification channel to subscribe to, asstr.fncallback function, ascallable(i.e. function or method).autorunrestart listener thread (if needed), asbool. Defaults toTrue.
Important
A new channel, when added with this subscriber, or, a channel that becomes
active due to this subscriber can only be monitored after a
listener thread restart. Thread restarts happen automatically when autorun=True.
Otherwise, if autorun=False, activated channels containing this subscriber
will not be monitored until a call to stop() and start(), or restart(),
is made.
Listener thread (re)starts are only required under certain, specific
circumstances. It's advisable to allow pgnotifier take care of listener
thread management via the default autorun=True, unless there is a
very good reason to manage it manually.
See __maybe_restart for more detail.
When a notification is received on a channel, callbacks subscribed to that channel will be executed.
Args:
idthe subscriberidashashable.channelthe notification channel, asstr.payloadthe notification received, as native type as cast byast.literal_eval.pidthe notifying sessions server process PID, asint.
from pgnotifier import Notifier
n = Notifier(conf)
n.subscribe(42, 'ch4',
lambda id, channel, payload, pid: print("id: ", id, ", channel: ", channel,
", payload: ", payload, ", pid: ", pid))Removes a callback function with id from notifications on channel. Optionally restarts listener thread (if needed).
Args:
idthe subscriber id, ashashable.channelnotification channel to unsubscribe from, asstr.autorunrestart listener thread (if needed), asbool. Defaults toTrue.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.unsubscribe(42, 'ch1')Returns subscriber and channel data, as dict.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
s = n.subscribers()
print("subscribers:", s)Mutes channels. Removes channels from listener thread, thereby muting all subscribers associated with those channels (no matter their mute status).
Subscribers will retain their mute status associated with those channels.
Args:
channelslist of channels to mute, asstr(single channel),listorset. If no channels given, ALL channels will be muted.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.mute_channels('ch1')
m = n.muted_channels()
print("muted channels:", m)Un-mutes channels. Adds channels to the listener thread, thereby adding all un-muted subscribers associated with those channels.
Args:
channelslist of channels to un-mute, asstr(single channel),listorset. If no channels given, ALL channels will be un-muted.
Note
Channel will remain inactive (i.e. excluded from the listener thread) if it does not contain any non-muted subscribers.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.unmute_channels()
m = n.non_muted_channels()
print("non muted channels:", m)Mutes subscriber on channels. If a channel no longer contains any non-muted subscribers, it is said to be inactive and is removed from the listener thread.
Args:
idsubscriber id, ashashable(i.e. any immutable type such as strings, numbers, and tuples containing immutable types).channelslist of channels to mute the subscriber on, asstr(single channel),listorset. If no channels given, the subscriber will be muted on ALL channels it is subscribed to.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.mute_subscriber('an_id', 'ch2')
m = n.muted_subscribers()
print("muted subscribers:", m)Un-mutes subscriber on channels. If subscriber is on a non-muted, inactive channel, the channel becomes active and is added to the listener thread.
Args:
idsubscriber id, ashashable(i.e. any immutable type such as strings, numbers, and tuples containing immutable types).channelslist of channels to un-mute the subscriber on, asstr(single channel),listorset. If no channels given, the subscriber will be unmuted on ALL channels it is subscribed to.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.unmute_subscriber('an_id')
m = n.muted_subscribers()
print("muted subscribers:", m)Starts the listener thread (if not already running). Is a no-op if thread already running. This function is generally not needed in userland.
Note
Listener thread (re)starts are only required under certain, specific circumstances. See __maybe_restart for more detail.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added removed, etc. ...
n.start()Stops the listener thread (if running). Is a no-op if thread is not running.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been added removed, etc. ...
n.stop()(Re)starts listener thread. This function is generally not needed in userland.
Note
Listener thread (re)starts are only required under certain, specific circumstances. See __maybe_restart for more detail.
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been changed with arg autorun=False ...
n.restart()Returns True if listener thread currently running, else False, as bool
from pgnotifier import Notifier
n = Notifier(conf)
# channels and/or subscribers, have been changed with arg autorun=False ...
b = n.is_running()
print("listener running?",b)The functions below are not required outside the internals of pgnotifier. They are publicly exposed and included here as a matter of interest.
A clojure-esque nested associative map transformer for Pyrsistent. Associates a new value v at key path pv in map m. Returns a new map with associated changes, as pyrsistent.PMap.
Args:
mmap to transform, aspyrsistent.PMap.pva path vector of keys indicating location of desired assoc, aspyrsistent.PVectororlistvnew value to assoc into path given bypv, aswhatever!
Nested associative map key->val remover for Pyrsistent. Returns a new map with dissociated changes, as pyrsistent.PMap.
Args:
mmap to transform, aspyrsistent.PMap.pva path vector of keys indicating location of desired dissoc, aspyrsistent.PVectororlist
Trivial associative map filter. Returns a new map with filtered changes, as pyrsistent.PMap.
Args:
mmap to filter, aspyrsistent.PMap.ffilter function that accepts at least a key and a value as args, ascallable.*aoptional additional args to pass to filter function, aswhatever!
Runs asynchronous and/or blocking functions in a new asyncio loop, as a task, in a thread. Designed to be called from a synchronous context. Returns concurrent.futures.Future.
Args:
ffunction to run in asyncio loop, ascallable.*aoptional args to functionf(e.g. a blocking function call that might produce a result in the future), aswhatever!