Type-safe publish/subscribe in C++ (part 1)

In the next few posts, I'll be describing a type-safe implementation of the publish/subscribe pattern in C++ that has proven quite useful to me. I've used this code for everything from handling input events to implementing message passing across a network. It's main advantages are:

  • No modifications to a subscriber's interface are necessary (ie: doesn't need to define a specific method or implement any specific interface)
  • Abstracts the manner in which messages are published to subscribers behind a common interface (ie: publish messages immediately, queue them up for delivery from a single thread, etc.)
  • Easily allows subscribing for specific messages or entire groups of messages (ie: subscribing for the escape key versus subscribing for all keyboard messages)
  • Type safety (subscribers will only receive messages of the C++ type they specified at subscription time)
  • Doesn't rely on singletons or any other type of global state (more concurrency-friendly and allows localized subscriptions for better performance)

This first post will concentrate on the system's primary interfaces and won't delve into implementation very much. Subsequent posts will then go under the hood as well as describe some of the non-essential, but useful pieces of code. This post assumes some familiarity with templates as well as common Boost components such as Bind, Function, and Shared Pointer.

First, lets define what a "message" is in the context of this system. Basically, a message is any C++ type that derives from a specific polymorphic base class which we will call "Message".

/// Base class for message types
class Message
{
public:
    virtual ~Message() = 0;
};

inline Message::~Message()
{
}


At first glance, this class doesn't look very useful. It doesn't even define any kind of interface. It just "is". Trust me, it's purpose will become apparent later. For now, just know that any type wishing to be published as a message must derive from this class (the is-a relationship implied by public derivation also nicely documents the purpose of the derived class).

Now, lets take a high-level look at how messages will be published and subscribed for. When you want to subscribe for a message, you will have to specify both the C++ type you wish to subscribe for and an arbitrary message identifier. When messages are published, you will only receive messages matching both the type and identifier you specified. Lets take a look at the MessageIdentifier class:

/// Hierarchical identifier of the form /foo/bar/a
class MessageIdentifier
{
public:
    MessageIdentifier();
    MessageIdentifier( const char* stringLiteral );
    MessageIdentifier( const std::string& string );

    /// Appends the given identifier string literal
    /// to this MessageIdentifier
    void Append( const char* stringLiteral );
    void Append( const std::string& string );

    /// Removes the token at the given zero-based index from
    /// this MessageIdentifier.  For example, removing token
    /// 1 from "/Buttons/TutorialButtons/Help3.btn" would
    /// result in "/Buttons/Help3.btn".
    void RemoveToken( size_t index );

    /// Returns the number of tokens in this MessageIdentifier
    size_t GetTokenCount() const;

    /// Returns the hash of the token at the given zero-based index
    Hash GetTokenHash( size_t index ) const;

    /// This method will return true if this MessageIdentifier is
    /// a valid match for the passed-in "other" MessageIdentifier.
    /// A match occurs when all tokens of this Message are a valid
    /// subset or duplicate of the tokens of the other message.
    /// Note what this means:
    /// if "Buttons/TutorialButtons/Help3.btn" is other,
    /// then the MessageIdentifier "Buttons/" would match
    /// and "Buttons/TutorialButtons/" would match 
    /// but "Buttons/TutorialButtons/Help2.btn" would not match
    bool Matches( const MessageIdentifier& other ) const;

    bool operator<( const MessageIdentifier& rhs ) const;
    bool operator==( const MessageIdentifier& rhs ) const;

private:
    ...
};


A lot of the flexibility of this system is due to this class. MessageIdentifier objects are constructed from strings of the form "/foo/bar/a" which are internally broken up into tokens using the '/' as a delimiter. When deciding if two identifiers match, they are interpreted hierarchically. For example, if someone is publishing keyboard input messages using identifiers of the form "/input/keyboard/escape", "/input/keyboard/space", etc, then you could simply subscribe for all keyboard messages by subscribing for the identifier "/input/keyboard". However, if you subscribe for "/input/keyboard/escape", you will only receive a message when the escape key is pressed. Also, the slashes are simply delimiters. This means that "/foo/bar/a", "foo/bar/a", "foo/bar/a/", and "/foo/bar/a/" are all equivalent identifiers.

Next, we will define a class to handle the publication of messages and registration of subscribers. Lets call it "Publisher". Note that it is NOT a singleton.

/// Abstract interface that facilitates publishing and subscribing for messages
class Publisher : boost::noncopyable
{
public:
    Publisher();
    virtual ~Publisher();

    /// Publishes the given message to handlers subscribed for that type of
    /// message and the given message identifier
    virtual void Publish( const MessageIdentifier& id,
                          boost::shared_ptr<Message> message ) = 0;

    /// Returns a subscription binding the given message type and id to
    /// the given callback
    template< typename MessageType, typename Callback >
    boost::shared_ptr< const Subscription >
    Subscribe( const MessageIdentifier& id, const Callback& callback )
    {
        using detail::DynamicHandler;
        using detail::SubscriptionDeleter;

        boost::shared_ptr< const Subscription > newSubscription(
            new Subscription( id, DynamicHandler<MessageType,Callback>(callback) ),
            SubscriptionDeleter(m_subscriptions) );

        AddNewSubscription( newSubscription );

        return newSubscription;
    }

protected:
    ...
};


This class is really the meat of the system, so lets dig in. As you can see, I've elected to make this class non-copyable. There's really nothing preventing it from being copyable, but copying it would result in shared state between the original and the copy which would be unintuitive and confusing. The first method is a pure virtual method named "Publish", which takes the message to be published and a MessageIdentifier object as parameters. The message is supplied via pointer both to allow the publication of arbitrary types via polymorphism (remember that empty Message class?) and to prevent any expensive copying of messages. Furthermore, this pointer is a shared_ptr in order to properly maintain the lifetime of the message object. Publish is pure virtual to allow different methods of message publication to be specified behind the common Publisher interface.

Next, we have a somewhat scary looking method named "Subscribe". Ignore the implementation for now. It really isn't that bad when you break it down. As you can see, it takes two template parameters. The first is the C++ message type being subscribed for. The second is the type of the callback that will receive the message. The callback type will be deduced by the compiler. Only the message type must be explicitely provided when calling this method. Next, you'll notice that this method returns a shared_ptr to a Subscription object. All you need to know about this object is that the subscription exists as long as this object does. The parameter "id" is the message identifier you're subscribing for, and "callback" is any callable type with a signature convertible to "void( MessageIdentifier id, boost::shared_ptr message )". That being said, passing the result of something like boost::bind as the callback parameter allows you to use pretty much anything with any signature as your callback...way cool.

We've seen enough now to show some basic example usage. This code will output "hello world" to the console.

/// A sample message class
class TestMessage : public Message
{
public:
    TestMessage( const std::string& text )
        : m_text( text )
    {
    }

    const std::string& GetText() const
    {
        return m_text;
    }

private:
    std::string m_text;
};

/// Another sample message class
class AnotherTestMessage : public Message
{

};

void MessageHandler( const MessageIdentifier& id,
                     boost::shared_ptr<TestMessage> message )
{
    if( id == "/test/message/hello" )
    {
        std::cout << message->GetText();
    }
}

void AnotherMessageHandler()
{
    std::cout << " world" << std::endl;
}

int main()
{
    // DirectPublsher is a derivative of Publisher that simply calls
    // handlers immediately when a subscription is invoked.
    DirectPublisher publisher;

    // This subscription will call the MessageHandler function when a
    // TestMessage object is published with an identifier matching
    // "/test/message/hello".  The function will receive the
    // message identifier and message object as parameters.
    boost::shared_ptr<const Subscription> subscription1 =
        publisher.Subscribe<TestMessage>( "/test/message", &MessageHandler );

    // This subscription will call the AnotherMessageHandler function
    // when a TestMessage object is published with an identifier matching
    // "/test/message/world".  The function will not be passed any parameters
    boost::shared_ptr<const Subscription> subscription2 =
        publisher.Subscribe<TestMessage>( "/test/message/world",
            boost::bind(AnotherMessageHandler) );

    // Will invoke the first subscription
    publisher.Publish( "/test/message/hello",
        boost::make_shared<TestMessage>("hello") );

    // Will invoke the second subscription
    publisher.Publish( "/test/message/world",
        boost::make_shared<TestMessage>("doesn't matter") );

    // Won't invoke the first subscription since the message type is wrong
    publisher.Publish( "/test/message/hello",
        boost::make_shared<AnotherTestMessage>() );

    // Will invoke the first subscription but nothing will be printed since
    // the handler checks for a specific message identifier internally
    publisher.Publish( "/test/message/",
        boost::make_shared<TestMessage>("doesn't matter") );

    // Won't invoke either subscription since the message identifier doesn't
    // match anything
    publisher.Publish( "/foo/bar",
        boost::make_shared<TestMessage>("doesn't matter") );

    return 0;
};


That's all for now. The next post will delve into the implementation of the code described so far. After that, I'll describe some optional constructs that make the system simpler to use. All of the source code can be downloaded here if you want to play with it or cheat ahead. You will need Boost and CMake to build the provided unit tests and demo program. Thanks for reading!