PyPubSub - An asynchronous pubsub protocol written in Python 3

Daniel Gruno e1ad3bf977 Merge pull request #4 from kianmeng/fix-typos 1 month ago
clients 37067efc46 Add Ruby client example 4 years ago
plugins 3eba0f3a41 If the SQS connection breaks, wait 10 sec and retry it from scratch 3 years ago
CHANGELOG.md af9de6f36b Fix typos 2 months ago
LICENSE 4ae6ac4b44 Initial commit 4 years ago
README.md af9de6f36b Fix typos 2 months ago
pypubsub.py 00a2813a48 add pre-flight check for required TLS config options 1 year ago
pypubsub.svg fdcd85c22b add logo 4 years ago
pypubsub.yaml 1af5851ecb Add support for TLS 1 year ago
pypubsub_acl.yaml eb5a2cf9d2 add secure topic example 3 years ago
requirements.txt 4d2eb32aa0 Fix dependency breakage by not requiring a specific version of yaml 1 year ago

README.md

PyPubSub - An asynchronous pubsub protocol written in Python 3

PyPubSub Logo

Introduction

PyPubSub is a simple publisher/subscriber service, where clients can connect and either deliver a payload (in JSON format) or listen for specific payloads as a stream of events. It is written as an asynchronous Python service, and can handle thousands of connections at any given time on a single core. It utilizes the HTTP protocol and JSON for a simplistic delivery scheme.

A working copy of this program is in service by the Apache Software Foundation, listing all development events going on at the organization (see this page for an introduction to their service).

Installing

  • Download or clone this repository: git clone https://github.com/Humbedooh/pypubsub.git
  • Install dependencies using pipenv: pipenv --three install -r requirements.txt
  • Edit pypubsub.yaml and (for ACL) pypubsub_acl.yaml to fit your needs
  • Launch the program in the foreground or as a systemd service: pipenv run python3 pypubsub.py
  • Check that your pubsub service is working: curl -I http://localhost:2069

Topics and publishing/subscribing

PyPubSub is designed around topics for both publishing and subscribing. A client can use topics to describe what an event is for when publishing, as well as what a client expects to subscribe to. Topics are generally simple words consisting of letters and numbers, but can be anything that is allowed in a URI path, apart from forward slashes (/) and commas (,) (these are reserved characters). Subscriptions are made on a "highest common denominator" basis, meaning the more topics you subscribe to, the fewer events you will receive, as the topics of an event must, at least, match all the topics a subscriber has subscribed to. Topics are set using the path segment of a URI, and are order agnostic, meaning fruits and apples is the same as apples and fruits internally.

As an example, let's imagine we wish to subscribe to all events for the topics surrounding apples, which is a sub-topic of fruits. We would then subscribe to http://localhost:2069/fruits/apples and listen for events.
If a payload with fruits/apples comes in, we would receive it. If a payload with just fruits come in, we would not receive it, because we are specifically asking for apples to be present as a topic. Neither would fruit/oranges match our subscription, while fruits/apples/macintosh would, as it contains our topics (and a bit more).

The below matrix shows how subscription paths match topics:

Topics /fruits /fruits/apples /fruits/apples/red /fruits/oranges /apples
fruits
fruits + apples
fruits + apples + red
fruits + oranges

Subscribing to multiple topic streams

As mentioned above, subscription topics are typically AND'ed together, with more topics narrowing the event stream. Thus, if you wanted to subscribe to bar OR foo, you would need two streams.

It is possible (from 0.7.0 onwards) to utilize a single stream to subscribe to multiple topic streams at once, using a comma as a delimiter between topic batches. For instance, to subscribe to both apples/red events AND oranges/ripe events, you may subscribe to:

http://localhost:2069/apples/red,oranges/ripe.

Pushing an event to PyPubSub

Event payloads requires that the IP or IP range (Ipv4 or IPv6) is listed in pypubsub.yaml under payloaders first. Once whitelisted, clients can do a POST or PUT to the pubsub service on port 2069, passing a JSON object as the request body, for instance:

curl -XPUT -d '{"text": "Apples are delicious"}' http://localhost:2069/fruits/apples

Event payloads MUST be in dictionary (hash) format, or they will be rejected.

On the subscription side, any client listening to http://localhost:2069/fruits or http://localhost:2069/fruits/apples will receive the following event in their stream:

{
  "text": "Apples are delicious",
  "pubsub_topics": ["fruits", "apples"],
  "pubsub_path": "/fruits/apples",
  "pubsub_timestamp": 1588293679.5432327,
  "pubsub_cursor": "f02b4908-755f-4455-a215-d1627f190110"
}

Pushing an event via Python

To push an event to PyPubSub via Python, you can make use of the requests library in Python:

import requests
requests.put('http://localhost:2069/fruits/apples', json = {"cultivar": "macintosh"})

Listening for events

Events are broadcast as JSON chunks in a chunked HTTP stream. Each chunk contains either a payload from a publisher, or a keep-alive ping from the PyPubSub server, which looks like this:

{"stillalive": 1588132959.6066568}

The stillalive object is a simple timestamp showing, in epoch seconds, when the ping was sent.

Listening for events via cURL

You can subscribe to topics via cURL like so: curl http://localhost:2069/topics/here where topics/here are the topics you are subscribing to, with / as a delimiter between topics. To subscribe to all events, you can omit the topics.

Listening for events via Python

For Python, you can import the asfpy package via pip and utilize its pubsub plugin:

import asfpy.pubsub

def process_event(payload):
    print("we got an event from pubsub")
    ...

def main():
    pubsub = asfpy.pubsub.Listener('http://localhost:2069')
    pubsub.attach(process_event, raw=True) # poll forever

Listening for events via node.js

Using the PyPubSub class in clients/nodejs/client.js one can listen for pubsub events and process them using node.js:

function process(payload) {
    // ping-back?
    if (payload.stillalive) {
        console.log("Got a ping-back");
    // Actual payload? process it!
    } else {
        console.log("Got a payload from PyPubSub!");
        console.log(payload);
    }
}

const pps = new PyPubSub('http://localhost:2069/');
pps.attach(process);

Listening for events via Ruby

Likewise, using Ruby is a pretty straightforward case:

require 'net/http'
require 'json'
require 'thread'

pubsub_URL = 'http://localhost:2069/'

def do_stuff_with(event)
  print("Got a pubsub event!:\n")
  print(event)
end

def listen(url)
  ps_thread = Thread.new do
    begin
      uri = URI.parse(url)
      Net::HTTP.start(uri.host, uri.port) do |http|
        request = Net::HTTP::Get.new uri.request_uri
        http.request request do |response|
          body = ''
          response.read_body do |chunk|
            event = JSON.parse(chunk)
            if event['stillalive']  # pingback
              print("ping? PONG!\n")
            else
              do_stuff_with(event)
            end
          end
        end
      end
  end
  return ps_thread
end

begin
  ps_thread = listen(pubsub_URL)
  print("Pubsub thread started, waiting for results...")
  while ps_thread.alive?
    sleep 10
  end
  print("Pubsub thread died :(\n")
end

Accessing older payloads via the backlog catalogue

If configured, via the payload_backlog_size setting in the main configuration, clients can request payloads that were pushed before they subscribed, using an X-Fetch-Since request header denoting from when (in seconds since the UNIX epoch) they wish to receive events:

curl -H 'X-Fetch-Since: 1588293679' http://localhost:2069/

If there are any events in the backlog (private or public) that match this (aka are younger than the timestamp presented by the client requesting a backlog), they will be delivered to the client, assuming they are younger than the backlog maximum age requirement.

Accessing older payloads with a sequence cursor

Payloads can also (as of 0.7.3) be replayed by using the value from the last event's pubsub_cursor value, resulting in a playback of all events pertaining to your desired topics made after the event with that cursor value in the X-Fetch-Since-Cursor request header:

curl -H 'X-Fetch-Since-Cursor: f02b4908-755f-4455-a215-d1627f190110' http://localhost:2069/

It is worth noting here, for pseudo security reasons, that if the backlog maximum is set sufficiently low (or the age requirement is omitted), this feature could be used to deduce whether or not private events have happened, as a client can request everything in the backlog and potentially gauge whether the size of the backlog differs from time to time. Clients without authorization cannot see private payloads this way, but it is theoretically possible to deduce that they happened. A sane approach is to always set the maximum age configuration to a value that is sufficiently low compared to the backlog max size. If you expect 1,000 payloads per day, you could set your max age to 48h and the backlog size to 5,000 to easily mitigate any potential deduction.

The backlog maximum age configuration expect a number with a time unit after it, for instance:

  • 45s: 45 seconds
  • 30m: 30 minutes
  • 12h: 12 hours
  • 7d: 7 days

Access-Control-List and private events

PyPubSub supports private events that only authenticated clients can receive.

Pushing a private event

To mark an event as private, simply prepend private as the first topic when you push the event:

curl -XPUT -d '{"private_text": "Squeamish Ossifrage"}' http://localhost/private/topics/here

Events broadcast with a /private prefix will only allude to its privacy via the pubsub_path element in the JSON blob. The topics list does not include 'private' (as it's technically not a topic for the broadcast). Thus the above example would output the following event to all authed subscribers with access:

{
  "private_text": "Squeamish Ossifrage",
  "pubsub_topics": ["topics", "here"],
  "pubsub_path": "/private/topics/here",
  "pubsub_timestamp": 1588293679.5432327
}

Retrieving private events

Clients ACL is defined in pypubsub_acl.yaml (and is entirely optional, you can omit the file). See the example ACL configuration for an example. Access is, as with public events, defined with "highest common denominator" in mind, meaning access to topics is granted to the specific topic group specified in the yaml and its sub-groups. Thus, if you grant access to internal and foo in one ACL segment, events pushed to private/internal/foo would be seen by that client, whereas pushes to private/internal/bar would not.

To authenticate and receive private events, use Basic authentication, such as:

curl -u 'user:pass' http://localhost:2069/internal/topics/here

LDAP-based ACL

PyPubSub supports ACL via asynchronous LDAP, either through group memberships or single users via their dn.

See pypubsub.yaml for an LDAP example.

Securing certain topics

You can secure topics, meaning only authenticated users with special credentials may post using those topics. To do so, you will need to edit the secure_topics list in the clients section of your configuration file, for instance:

clients:
  secure_topics:
    - bread
    - syrup

The above would lock publishing the topics bread and syrup for anyone not specifically allowed to use those topics in their ACL segment. Users or LDAP groups can be allowed topics via the topics directive in their ACL segment. See the pypubsub_acl.yaml file for an example.

Working with Amazon SQS

PyPubSub supports AWS SQS for weaving in payloads from their server-less Simple Queue Services. Multiple queues can be supported and items pushed to SQS will seamlessly appear in the pubsub stream. For these objects to be compatible with pypubsub, they must be JSONified strings with a pubsub_path element specifying the URI they would have otherwise been posted to via PyPubSub, for instance "pubsub_path": "/fruits/apples" for a public payload or "pubsub_path": "/private/secretstuff" for a private (auth-required) payload. If no such path is specified, PyPubSub will assume a default empty topic list (free-for-all).

For more information on how to configure SQS, please see pypubsub.yaml. SQS support assumes that the AWS CLI has been set up, and the user has AWS configured in their .aws directory before startup.

Persistent backlogs

PyPubSub supports using a filesystem-based backlog file for persistent storage of the backlog through restarts. To enable this feature, uncomment the backlog configuration in the pypubsub.yaml config file. This will store up to whatever the max backlog queue size is every 10 seconds assuming the backlog has changed. On restart, the backlog file will be read in and added to the in-memory backlog.

License

PyPubSub is licensed under the Apache License v/2.