Style
We have provided an Eclipse Formatter file formatter.xml
with all the formatting conventions currently used in the project. Highlights include no tabs, 4-space indentation, and 120-char width. Please respect this so as to reduce the amount of formatting-related noise produced in commits.
Static Analysis
We would like to use static analysis tools PMD and FindBugs to maintain code quality. However, we have not yet arrived at a consensus on what rules to adhere to, and what to ignore.
Netty Notes
The asynchronous network IO infrastructure that Hedwig uses is Netty. Here are some notes on Netty's concurrency architecture and its filter pipeline design.
Concurrency Architecture
After calling ServerBootstrap.bind()
, Netty starts a boss thread (NioServerSocketPipelineSink.Boss
) that just accepts new connections and registers them with one of the workers from the NioWorker
pool in round-robin fashion (pool size defaults to CPU count). Each worker runs its own select loop over just the set of keys that have been registered with it. Workers start lazily on demand and run only so long as there are interested fd's/keys. All selected events are handled in the same thread and sent up the pipeline attached to the channel (this association is established by the boss as soon as a new connection is accepted).
All workers, and the boss, run via the executor thread pool; hence, the executor must support at least two simultaneous threads.
Handler Pipeline
A pipeline implements the intercepting filter pattern. A pipeline is a sequence of handlers. Whenever a packet is read from the wire, it travels up the stream, stopping at each handler that can handle upstream events. Vice-versa for writes. Between each filter, control flows back through the centralized pipeline, and a linked list of contexts keeps track of where we are in the pipeline (one context object per handler).
Pseudocode
This summarizes the control flow through the system.
publish
Need to document
subscribe
Need to document
ReadAhead Cache
The delivery manager class is responsible for pushing published messages from the hubs to the subscribers. The most common case is that all subscribers are connected and either caught up, or close to the tail end of the topic. In this case, we don't want the delivery manager to be polling bookkeeper for any newly arrived messages on the topic; new messages should just be pushed to the delivery manager. However, there is also the uncommon case when a subscriber is behind, and messages must be pulled from Bookkeeper.
Since all publishes go through the hub, it is possible to cache the recently published messages in the hub, and then the delivery manager won't have to make the trip to bookkeeper to get the messages but instead get them from local process memory.
These ideas of push, pull, and caching are unified in the following way: - A hub has a cache of messages
- When the delivery manager wants to deliver a message, it asks the cache for it. There are 3 cases:
- The message is available in the cache, in which case it is given to the delivery manager
- The message is not present in the cache and the seq-id of the message is beyond the last message published on that topic (this happens if the subscriber is totally caught up for that topic). In this case, a stub is put in the cache in order to notify the delivery manager when that message does happen to be published.
- The message is not in the cache but has been published to the topic. In this case, a stub is put in the cache, and a read is issued to bookkeeper.
- Whenever a message is published, it is cached. If there is a stub already in the cache for that message, the delivery manager is notified.
- Whenever a message is read from bookkeeper, it is cached. There must be a stub for that message (since reads to bookkeeper are issued only after putting a stub), so the delivery manager is notified.
- The cache does readahead, i.e., if a message requested by the delivery manager is not in the cache, a stub is established not only for that message, but also for the next n messages where n is configurable (default 10). On a cache hit, we look ahead n/2 messages, and if that message is not present, we establish another n/2 stubs. In short, we always ensure that the next n stubs are always established.
- Over time, the cache will grow in size. There are 2 pruning mechanisms:
- Once all subscribers have consumed up to a particular seq-id, they notify the cache, and all messages up to that seq-id are pruned from the cache.
- If the above pruning is not working (e.g., because some subscribers are down), the cache will eventually hit its size limit which is configurable
(default, half of maximum jvm heap size). At this point, messages are just pruned in FIFO order. We use the size of the blobs in the message for estimating the cache size. The assumption is that that size will dominate over fixed, object-level size overheads.
- Stubs are not purged because according to the above simplification, they are of 0 size.
Scalability Bottlenecks Down the Road
- Currently each topic subscription is served on a different channel. The number of channels will become a bottleneck at higher channels. We should switch to an architecture, where multiple topic subscriptions between the same client, hub pair should be served on the same channel. We can have commands to start, stop subscriptions sent all the way to the server (right now these are local).
- Publishes for a topic are serialized through a hub, to get ordering guarantees. Currently, all subscriptions to that topic are served from the same hub. If we start having large number of subscribers to heavy-volume topics, the outbound bandwidth at the hub, or the CPU at that hub might become the bottleneck. In that case, we can setup other regions through which the messages are routed (this hierarchical scheme) reduces bandwidth requirements at any single node. It should be possible to do this entirely through configuration.