cctools
Functions
mq.h File Reference

Non-blocking message-oriented queue for network communication. More...

#include <stddef.h>
#include <time.h>
#include "buffer.h"

Go to the source code of this file.

Functions

struct mq * mq_connect (const char *addr, int port)
 Connect to a remote host. More...
 
struct mq * mq_serve (const char *addr, int port)
 Prepare to accept connections. More...
 
void mq_close (struct mq *mq)
 Close a connection. More...
 
struct mq * mq_accept (struct mq *server)
 Accept a connection. More...
 
int mq_wait (struct mq *mq, time_t stoptime)
 Wait for a message or connection. More...
 
void mq_set_tag (struct mq *mq, void *tag)
 Set the tag associated with the given queue. More...
 
void * mq_get_tag (struct mq *mq)
 Get the tag associated with the given queue. More...
 
int mq_address_local (struct mq *mq, char *addr, int *port)
 Return the local address of the queue in text format. More...
 
int mq_address_remote (struct mq *mq, char *addr, int *port)
 Return the remote address of the queue in text format. More...
 
int mq_geterror (struct mq *mq)
 Check for errors on the connection. More...
 
struct mq_poll * mq_poll_create (void)
 Create a new (empty) polling group. More...
 
void mq_poll_delete (struct mq_poll *p)
 Delete a polling group. More...
 
int mq_poll_add (struct mq_poll *p, struct mq *mq)
 Add a message queue to a polling group. More...
 
int mq_poll_rm (struct mq_poll *p, struct mq *mq)
 Remove a message queue from a polling group. More...
 
int mq_poll_wait (struct mq_poll *p, time_t stoptime)
 Wait for messages or connections. More...
 
struct mq * mq_poll_readable (struct mq_poll *p)
 Find a queue with messages waiting. More...
 
struct mq * mq_poll_acceptable (struct mq_poll *p)
 Find a server queue with connections waiting. More...
 
struct mq * mq_poll_error (struct mq_poll *p)
 Find a queue in the error state or closed socket. More...
 
int mq_send_buffer (struct mq *mq, buffer_t *buf, size_t maxlen)
 Push a message onto the send queue. More...
 
int mq_send_fd (struct mq *mq, int fd, size_t maxlen)
 Stream a file descriptor across the wire. More...
 
int mq_store_buffer (struct mq *mq, buffer_t *buf, size_t maxlen)
 Store the next message in the given buffer. More...
 
int mq_store_fd (struct mq *mq, int fd, size_t maxlen)
 Write the next message to the given file descriptor. More...
 
mq_msg_t mq_recv (struct mq *mq, size_t *length)
 Pop a message from the receive queue. More...
 

Detailed Description

Non-blocking message-oriented queue for network communication.

This module provides ordered, message-oriented semantics and queuing over the network. See link.h for lower-level socket communication.

Rather than calling send() or recv() and waiting for the other side, messages are asynchronously placed in send and receive queues. To send a message, simply append it to the send queue. Likewise received messages are put in the receive queue and can be popped at the application's convenience.

Pushing a message onto the send queue is a constant-time operation, since nothing is actually sent over the network. To trigger real network communication, it is necessary regularly call mq_wait or mq_poll_wait. These block until waiting message(s)/connection(s) become available (or a signal handler or timeout interrupts).

The polling interface loosely approximates the Linux epoll interface. First, a set of message queues are added to a polling group with mq_poll_add. Then a call to mq_poll_wait blocks until at least one of the queues in the group has message(s)/connection(s) ready. mq_poll_wait calls mq_flush_send and mq_flush_receive internally, so the event loop does not need to. Send buffers will be flushed as much as possible while waiting for messages/connections. Helper functions (mq_poll_readable, mq_poll_acceptable, mq_poll_error) are available to efficiently find queues with messages/connections available.

The examples that follow are lazy about checking for errors, be sure to check carefully!

Example Client:

 struct mq *mq = mq_connect("127.0.0.1", 1234);
 mq_send(mq, msg);
 while (true) {
    mq_store_buffer(mq, &buf);
    switch (mq_wait(mq, time() + 30)) {
            case 0:
            // interrupted by timeout or signal
            case -1:
            // error or closed socket, mq should be deleted
            default:
            // got some messages!
            mq_recv(mq, NULL);
            // process the result
            // make sure something breaks out of the loop!
    }
 }
 mq_close(mq);

Example Server:

 struct mq *server = mq_serve(NULL, 1234);
 while (true) {
    switch (mq_wait(server, time() + 30)) {
            case 0:
            // interrupted by timeout or signal
            case -1:
            // error or closed socket, mq should be deleted
            default:
            // got connections!
            handle_client(mq_accept(server));
    }
 }
 mq_close(server);

Example Polling Server:

 struct mq_poll *M = mq_poll_create();
 struct mq *server = mq_serve(NULL, 1234);
 mq_poll_add(M, server);

 while (true) {
    switch (mq_poll_wait(M, time() + 30)) {
            case 0:
            // interrupted by timeout or signal
            case -1:
            // error, time to abort
            default: {
                    struct mq *mq;
                    if ((mq = mq_poll_acceptable(M))) {
                            // got a new connection
                            struct mq *n = mq_accept(mq);
                            mq_poll_add(M, n);
                            setup_client(n);
                    }
                    if ((mq = mq_poll_readable(M))) {
                            // got a new message
                            handle_client(mq);
                    }
            }
    }
 }
 // make sure all client connections were closed
 mq_poll_delete(M);
 mq_close(server);

Function Documentation

struct mq* mq_connect ( const char *  addr,
int  port 
)

Connect to a remote host.

Returns immediately, with sends buffered until the connection actually completes. If the connection fails, mq_flush_* will return -1.

Parameters
addrIP address of the server in string form.
portPort of the server.
Returns
A pointer to the new message queue.
NULL on failure, with errno set appropriately.
struct mq* mq_serve ( const char *  addr,
int  port 
)

Prepare to accept connections.

The server socket will listen on the given port. Use mq_accept to get client connections.

Parameters
addrIP address of the network interface, or NULL to accept connections on any interface.
portPort for the service.
Returns
A server queue that can be passed to mq_accept, or NULL on failure, with errno set appropriately.
void mq_close ( struct mq *  mq)

Close a connection.

Parameters
mqThe connection to close.
struct mq* mq_accept ( struct mq *  server)

Accept a connection.

This is a non-blocking operation, so it will return immediately if no connections are available.

Parameters
serverA queue returned from mq_serve*.
Returns
A connection to a client.
NULL if no connections are available.
int mq_wait ( struct mq *  mq,
time_t  stoptime 
)

Wait for a message or connection.

Blocks the current thread until a message/connection is received (or until a signal or timeout interrupts). Sends are still carried out while waiting. Note that all signals are unblocked for the duration of this call. Before waiting, the storage for the next message MUST be specified with mq_store_* functions.

Parameters
mqThe queue to wait on.
stoptimeThe time at which to stop waiting.
Returns
1 if a message/connection is available.
0 if interrupted by timeout/signal handling.
-1 on closed socket or other error, with errno set appropriately.
void mq_set_tag ( struct mq *  mq,
void *  tag 
)

Set the tag associated with the given queue.

The tag is not used by this module; it is intended to allow for finding the worker/connection associated with the given queue. This may be useful when using mq_poll_readable and friends.

Parameters
mqThe queue to use.
tagThe new tag to set.
void* mq_get_tag ( struct mq *  mq)

Get the tag associated with the given queue.

See mq_set_tag for details.

Parameters
mqThe queue to check.
Returns
The tag previously associated with mq. Defaults to NULL.
int mq_address_local ( struct mq *  mq,
char *  addr,
int *  port 
)

Return the local address of the queue in text format.

Parameters
mqThe queue to examine.
addrPointer to a string of at least LINK_ADDRESS_MAX bytes, which will be filled with a text representation of the local IP address.
portPointer to an integer, which will be filled with the TCP port number.
Returns
Positive on success, zero on failure.
int mq_address_remote ( struct mq *  mq,
char *  addr,
int *  port 
)

Return the remote address of the queue in text format.

Parameters
mqThe queue to examine.
addrPointer to a string of at least LINK_ADDRESS_MAX bytes, which will be filled with a text representation of the remote IP address.
portPointer to an integer, which will be filled with the TCP port number.
Returns
Positive on success, zero on failure.
int mq_geterror ( struct mq *  mq)

Check for errors on the connection.

Since actual IO happens in the background, any errors are reported here. Once a connection is in the error state, further IO operations will fail. After handling any received messages, the connection must be closed. Socket disconnection is indicated by ECONNRESET.

Parameters
mqThe queue to check.
Returns
0 if mq is not in an error state.
The errno that put mq into an error state.
struct mq_poll* mq_poll_create ( void  )

Create a new (empty) polling group.

This is a specialized hash table that knows about the states of the channels it contains, and can efficiently handle non-blocking IO for a large number of connections.

Returns
A new polling group.
void mq_poll_delete ( struct mq_poll *  p)

Delete a polling group.

Note that the polling group does not take ownership of queues it contains, so you should be keeping a list somewhere else. This call only deletes the polling group itself, not the message queues it contains.

Parameters
pThe polling group to delete.
int mq_poll_add ( struct mq_poll *  p,
struct mq *  mq 
)

Add a message queue to a polling group.

Note that a queue can (currently) only be added to a single polling group.

Parameters
pThe polling group to add to.
mqThe queue to add.
Returns
0 on success.
-1 on error, with errno set appropriately.
int mq_poll_rm ( struct mq_poll *  p,
struct mq *  mq 
)

Remove a message queue from a polling group.

Be sure to remove a message queue from any polling groups before deleting the it! Removing a message queue from a polling group that does not contain is is a no-op, but will succeed.

Parameters
pThe polling group to remove from.
mqThe message queue to remove.
Returns
0 on success.
-1 on error, with errno set appropriately.
int mq_poll_wait ( struct mq_poll *  p,
time_t  stoptime 
)

Wait for messages or connections.

Blocks the current thread until a message/connection is received on one of the message queues in the polling group (or until a signal or timeout interrupts). Sends are still carried out while waiting. Note that all signals are unblocked for the duration of this call. Before waiting, the storage for the next message MUST be specified with mq_store_* functions.

Parameters
pThe polling group to wait on.
stoptimeThe time at which to stop waiting.
Returns
The number of events available (or zero if interrupted by timeout/signal handling).
-1 on error, with errno set appropriately.
struct mq* mq_poll_readable ( struct mq_poll *  p)

Find a queue with messages waiting.

Returns the an arbitrary queue in the polling group, and may return the same queue until its messages are popped. This is more efficient than looping over a list of message queues and calling mq_recv on all of them, since the polling group keeps track of queue states internally.

Parameters
pThe polling group to inspect.
Returns
A queue with messages waiting.
NULL if no queues in the polling group have messages waiting.
struct mq* mq_poll_acceptable ( struct mq_poll *  p)

Find a server queue with connections waiting.

Returns an arbitrary server queue in the polling group, and may return the same queue until its connections are accepted. This is more efficient than looping over a list of queues and calling mq_accept on all of them, since the polling group keeps track of queue states internally.

Parameters
pThe polling group to inspect.
Returns
A queue with connections waiting.
NULL if no queues in the polling group have connections waiting.
struct mq* mq_poll_error ( struct mq_poll *  p)

Find a queue in the error state or closed socket.

Returns an arbitrary queue in the polling group, and may return the same queue until it is removed. This is more efficient than looping over a list of message queues and checking for errors on all of them, since the polling group keeps track of queue states internally.

Parameters
pThe polling group to inspect.
Returns
A queue in the error state or with a closed socket.
NULL if no queues in the polling group are in error.
int mq_send_buffer ( struct mq *  mq,
buffer_t buf,
size_t  maxlen 
)

Push a message onto the send queue.

This is a non-blocking operation, and will return immediately. Messages are delivered in the order they were sent. Note that the queue takes ownership of the passed-in buffer, so callers MUST NOT use/delete the buffer after pushing it onto the send queue. It will be automatically deleted when the send is finished. In particular, passing in a stack-allocated buffer_t WILL result in memory corruption. Only use heap-allocated buffers here.

Parameters
mqThe message queue.
bufThe message to send.
maxlenMaximum number of bytes to send, or 0 to use the entire buffer.
Returns
0 on success. Note that this only indicates that the message was successfully queued. It gives no indication about delivery.
-1 on failure.
int mq_send_fd ( struct mq *  mq,
int  fd,
size_t  maxlen 
)

Stream a file descriptor across the wire.

This operates in the same way as mq_send_buffer, but takes a file descriptor. Note that the queue takes ownership of fd and will close it when the message is sent, so callers MUST NOT use/close fd after passing it in. This function will read from fd until reaching the end of the file. This function will not seek fd, so it's possible to send slices from within files.

Parameters
mqThe message queue.
fdThe file descriptor to read.
maxlenMaximum number of bytes to send, or 0 to read to EOF.
Returns
0 on success. Note that this only indicates that the message was successfully queued. It gives no indication about delivery.
-1 on failure.
int mq_store_buffer ( struct mq *  mq,
buffer_t buf,
size_t  maxlen 
)

Store the next message in the given buffer.

This function allows the caller to provide the storage space for the next message to be received. buf must already be initialized. Any existing contents will be overwritten. Callers MUST NOT inspect/modify/free buf until a successful call to mq_recv indicates completed receipt of a message. It is undefined behavior to call this if a message has already been partially received. It is therefore only safe to call this before calling *_wait() for the first time or immediately after receiving a message from mq_recv(). Note that if a message exceeds maxlen the connection will be killed, so use with caution.

Parameters
mqThe message queue.
bufThe buffer to use to store the next message.
maxlenMaximum bytes to accept, or 0 for no limit.
Returns
0 on success.
-1 on failure, with errno set appropriately.
int mq_store_fd ( struct mq *  mq,
int  fd,
size_t  maxlen 
)

Write the next message to the given file descriptor.

This function operates in the same way as mq_store_buffer, but takes a file descriptor. Callers MUST NOT use/close fd until a successful call to mq_recv indicates completed receipt of a message. This function will not seek fd, so it is possible to write to arbitrary positions within a file. Note that if a message exceeds maxlen the connection will be killed, so use with caution.

Parameters
mqThe message queue.
fdThen file descriptor to write to.
maxlenMaximum bytes to accept, or 0 for no limit.
Returns
0 on success.
-1 on failure, with errno set appropriately.
mq_msg_t mq_recv ( struct mq *  mq,
size_t *  length 
)

Pop a message from the receive queue.

This is a non-blocking operation, and will return immediately if no messages are available. Once this function indicates receipt of a message, the caller takes back ownership of the underlying storage provided via mq_store_*. After receiving a message, be sure to use mq_store_* to specify the storage for the next message before waiting again.

Parameters
mqThe message queue.
lengthA pointer to store the total length in bytes of the message. Can be NULL.
Returns
MQ_MSG_NONE if no message is available.
MQ_MSG_BUFFER if a message has been written to a previously-provided buffer.
MQ_MSG_FD if a message has been written to a previously-provided fd.