cctools
|
Non-blocking message-oriented queue for network communication. More...
Go to the source code of this file.
Functions | |
struct mq * | mq_connect (const char *addr, int port) |
Connect to a remote host. More... | |
struct mq * | mq_serve (const char *addr, int port) |
Prepare to accept connections. More... | |
void | mq_close (struct mq *mq) |
Close a connection. More... | |
struct mq * | mq_accept (struct mq *server) |
Accept a connection. More... | |
int | mq_wait (struct mq *mq, time_t stoptime) |
Wait for a message or connection. More... | |
void | mq_set_tag (struct mq *mq, void *tag) |
Set the tag associated with the given queue. More... | |
void * | mq_get_tag (struct mq *mq) |
Get the tag associated with the given queue. More... | |
int | mq_address_local (struct mq *mq, char *addr, int *port) |
Return the local address of the queue in text format. More... | |
int | mq_address_remote (struct mq *mq, char *addr, int *port) |
Return the remote address of the queue in text format. More... | |
int | mq_geterror (struct mq *mq) |
Check for errors on the connection. More... | |
struct mq_poll * | mq_poll_create (void) |
Create a new (empty) polling group. More... | |
void | mq_poll_delete (struct mq_poll *p) |
Delete a polling group. More... | |
int | mq_poll_add (struct mq_poll *p, struct mq *mq) |
Add a message queue to a polling group. More... | |
int | mq_poll_rm (struct mq_poll *p, struct mq *mq) |
Remove a message queue from a polling group. More... | |
int | mq_poll_wait (struct mq_poll *p, time_t stoptime) |
Wait for messages or connections. More... | |
struct mq * | mq_poll_readable (struct mq_poll *p) |
Find a queue with messages waiting. More... | |
struct mq * | mq_poll_acceptable (struct mq_poll *p) |
Find a server queue with connections waiting. More... | |
struct mq * | mq_poll_error (struct mq_poll *p) |
Find a queue in the error state or closed socket. More... | |
int | mq_send_buffer (struct mq *mq, buffer_t *buf, size_t maxlen) |
Push a message onto the send queue. More... | |
int | mq_send_fd (struct mq *mq, int fd, size_t maxlen) |
Stream a file descriptor across the wire. More... | |
int | mq_store_buffer (struct mq *mq, buffer_t *buf, size_t maxlen) |
Store the next message in the given buffer. More... | |
int | mq_store_fd (struct mq *mq, int fd, size_t maxlen) |
Write the next message to the given file descriptor. More... | |
mq_msg_t | mq_recv (struct mq *mq, size_t *length) |
Pop a message from the receive queue. More... | |
Non-blocking message-oriented queue for network communication.
This module provides ordered, message-oriented semantics and queuing over the network. See link.h for lower-level socket communication.
Rather than calling send() or recv() and waiting for the other side, messages are asynchronously placed in send and receive queues. To send a message, simply append it to the send queue. Likewise received messages are put in the receive queue and can be popped at the application's convenience.
Pushing a message onto the send queue is a constant-time operation, since nothing is actually sent over the network. To trigger real network communication, it is necessary regularly call mq_wait or mq_poll_wait. These block until waiting message(s)/connection(s) become available (or a signal handler or timeout interrupts).
The polling interface loosely approximates the Linux epoll interface. First, a set of message queues are added to a polling group with mq_poll_add. Then a call to mq_poll_wait blocks until at least one of the queues in the group has message(s)/connection(s) ready. mq_poll_wait calls mq_flush_send and mq_flush_receive internally, so the event loop does not need to. Send buffers will be flushed as much as possible while waiting for messages/connections. Helper functions (mq_poll_readable, mq_poll_acceptable, mq_poll_error) are available to efficiently find queues with messages/connections available.
The examples that follow are lazy about checking for errors, be sure to check carefully!
Example Client:
struct mq *mq = mq_connect("127.0.0.1", 1234); mq_send(mq, msg); while (true) { mq_store_buffer(mq, &buf); switch (mq_wait(mq, time() + 30)) { case 0: // interrupted by timeout or signal case -1: // error or closed socket, mq should be deleted default: // got some messages! mq_recv(mq, NULL); // process the result // make sure something breaks out of the loop! } } mq_close(mq);
Example Server:
struct mq *server = mq_serve(NULL, 1234); while (true) { switch (mq_wait(server, time() + 30)) { case 0: // interrupted by timeout or signal case -1: // error or closed socket, mq should be deleted default: // got connections! handle_client(mq_accept(server)); } } mq_close(server);
Example Polling Server:
struct mq_poll *M = mq_poll_create(); struct mq *server = mq_serve(NULL, 1234); mq_poll_add(M, server); while (true) { switch (mq_poll_wait(M, time() + 30)) { case 0: // interrupted by timeout or signal case -1: // error, time to abort default: { struct mq *mq; if ((mq = mq_poll_acceptable(M))) { // got a new connection struct mq *n = mq_accept(mq); mq_poll_add(M, n); setup_client(n); } if ((mq = mq_poll_readable(M))) { // got a new message handle_client(mq); } } } } // make sure all client connections were closed mq_poll_delete(M); mq_close(server);
struct mq* mq_connect | ( | const char * | addr, |
int | port | ||
) |
Connect to a remote host.
Returns immediately, with sends buffered until the connection actually completes. If the connection fails, mq_flush_* will return -1.
addr | IP address of the server in string form. |
port | Port of the server. |
struct mq* mq_serve | ( | const char * | addr, |
int | port | ||
) |
Prepare to accept connections.
The server socket will listen on the given port. Use mq_accept to get client connections.
addr | IP address of the network interface, or NULL to accept connections on any interface. |
port | Port for the service. |
void mq_close | ( | struct mq * | mq | ) |
Close a connection.
mq | The connection to close. |
struct mq* mq_accept | ( | struct mq * | server | ) |
Accept a connection.
This is a non-blocking operation, so it will return immediately if no connections are available.
server | A queue returned from mq_serve*. |
int mq_wait | ( | struct mq * | mq, |
time_t | stoptime | ||
) |
Wait for a message or connection.
Blocks the current thread until a message/connection is received (or until a signal or timeout interrupts). Sends are still carried out while waiting. Note that all signals are unblocked for the duration of this call. Before waiting, the storage for the next message MUST be specified with mq_store_* functions.
mq | The queue to wait on. |
stoptime | The time at which to stop waiting. |
void mq_set_tag | ( | struct mq * | mq, |
void * | tag | ||
) |
Set the tag associated with the given queue.
The tag is not used by this module; it is intended to allow for finding the worker/connection associated with the given queue. This may be useful when using mq_poll_readable and friends.
mq | The queue to use. |
tag | The new tag to set. |
void* mq_get_tag | ( | struct mq * | mq | ) |
Get the tag associated with the given queue.
See mq_set_tag for details.
mq | The queue to check. |
int mq_address_local | ( | struct mq * | mq, |
char * | addr, | ||
int * | port | ||
) |
Return the local address of the queue in text format.
mq | The queue to examine. |
addr | Pointer to a string of at least LINK_ADDRESS_MAX bytes, which will be filled with a text representation of the local IP address. |
port | Pointer to an integer, which will be filled with the TCP port number. |
int mq_address_remote | ( | struct mq * | mq, |
char * | addr, | ||
int * | port | ||
) |
Return the remote address of the queue in text format.
mq | The queue to examine. |
addr | Pointer to a string of at least LINK_ADDRESS_MAX bytes, which will be filled with a text representation of the remote IP address. |
port | Pointer to an integer, which will be filled with the TCP port number. |
int mq_geterror | ( | struct mq * | mq | ) |
Check for errors on the connection.
Since actual IO happens in the background, any errors are reported here. Once a connection is in the error state, further IO operations will fail. After handling any received messages, the connection must be closed. Socket disconnection is indicated by ECONNRESET.
mq | The queue to check. |
struct mq_poll* mq_poll_create | ( | void | ) |
Create a new (empty) polling group.
This is a specialized hash table that knows about the states of the channels it contains, and can efficiently handle non-blocking IO for a large number of connections.
void mq_poll_delete | ( | struct mq_poll * | p | ) |
Delete a polling group.
Note that the polling group does not take ownership of queues it contains, so you should be keeping a list somewhere else. This call only deletes the polling group itself, not the message queues it contains.
p | The polling group to delete. |
int mq_poll_add | ( | struct mq_poll * | p, |
struct mq * | mq | ||
) |
Add a message queue to a polling group.
Note that a queue can (currently) only be added to a single polling group.
p | The polling group to add to. |
mq | The queue to add. |
int mq_poll_rm | ( | struct mq_poll * | p, |
struct mq * | mq | ||
) |
Remove a message queue from a polling group.
Be sure to remove a message queue from any polling groups before deleting the it! Removing a message queue from a polling group that does not contain is is a no-op, but will succeed.
p | The polling group to remove from. |
mq | The message queue to remove. |
int mq_poll_wait | ( | struct mq_poll * | p, |
time_t | stoptime | ||
) |
Wait for messages or connections.
Blocks the current thread until a message/connection is received on one of the message queues in the polling group (or until a signal or timeout interrupts). Sends are still carried out while waiting. Note that all signals are unblocked for the duration of this call. Before waiting, the storage for the next message MUST be specified with mq_store_* functions.
p | The polling group to wait on. |
stoptime | The time at which to stop waiting. |
struct mq* mq_poll_readable | ( | struct mq_poll * | p | ) |
Find a queue with messages waiting.
Returns the an arbitrary queue in the polling group, and may return the same queue until its messages are popped. This is more efficient than looping over a list of message queues and calling mq_recv on all of them, since the polling group keeps track of queue states internally.
p | The polling group to inspect. |
struct mq* mq_poll_acceptable | ( | struct mq_poll * | p | ) |
Find a server queue with connections waiting.
Returns an arbitrary server queue in the polling group, and may return the same queue until its connections are accepted. This is more efficient than looping over a list of queues and calling mq_accept on all of them, since the polling group keeps track of queue states internally.
p | The polling group to inspect. |
struct mq* mq_poll_error | ( | struct mq_poll * | p | ) |
Find a queue in the error state or closed socket.
Returns an arbitrary queue in the polling group, and may return the same queue until it is removed. This is more efficient than looping over a list of message queues and checking for errors on all of them, since the polling group keeps track of queue states internally.
p | The polling group to inspect. |
int mq_send_buffer | ( | struct mq * | mq, |
buffer_t * | buf, | ||
size_t | maxlen | ||
) |
Push a message onto the send queue.
This is a non-blocking operation, and will return immediately. Messages are delivered in the order they were sent. Note that the queue takes ownership of the passed-in buffer, so callers MUST NOT use/delete the buffer after pushing it onto the send queue. It will be automatically deleted when the send is finished. In particular, passing in a stack-allocated buffer_t WILL result in memory corruption. Only use heap-allocated buffers here.
mq | The message queue. |
buf | The message to send. |
maxlen | Maximum number of bytes to send, or 0 to use the entire buffer. |
int mq_send_fd | ( | struct mq * | mq, |
int | fd, | ||
size_t | maxlen | ||
) |
Stream a file descriptor across the wire.
This operates in the same way as mq_send_buffer, but takes a file descriptor. Note that the queue takes ownership of fd and will close it when the message is sent, so callers MUST NOT use/close fd after passing it in. This function will read from fd until reaching the end of the file. This function will not seek fd, so it's possible to send slices from within files.
mq | The message queue. |
fd | The file descriptor to read. |
maxlen | Maximum number of bytes to send, or 0 to read to EOF. |
int mq_store_buffer | ( | struct mq * | mq, |
buffer_t * | buf, | ||
size_t | maxlen | ||
) |
Store the next message in the given buffer.
This function allows the caller to provide the storage space for the next message to be received. buf must already be initialized. Any existing contents will be overwritten. Callers MUST NOT inspect/modify/free buf until a successful call to mq_recv indicates completed receipt of a message. It is undefined behavior to call this if a message has already been partially received. It is therefore only safe to call this before calling *_wait() for the first time or immediately after receiving a message from mq_recv(). Note that if a message exceeds maxlen the connection will be killed, so use with caution.
mq | The message queue. |
buf | The buffer to use to store the next message. |
maxlen | Maximum bytes to accept, or 0 for no limit. |
int mq_store_fd | ( | struct mq * | mq, |
int | fd, | ||
size_t | maxlen | ||
) |
Write the next message to the given file descriptor.
This function operates in the same way as mq_store_buffer, but takes a file descriptor. Callers MUST NOT use/close fd until a successful call to mq_recv indicates completed receipt of a message. This function will not seek fd, so it is possible to write to arbitrary positions within a file. Note that if a message exceeds maxlen the connection will be killed, so use with caution.
mq | The message queue. |
fd | Then file descriptor to write to. |
maxlen | Maximum bytes to accept, or 0 for no limit. |
mq_msg_t mq_recv | ( | struct mq * | mq, |
size_t * | length | ||
) |
Pop a message from the receive queue.
This is a non-blocking operation, and will return immediately if no messages are available. Once this function indicates receipt of a message, the caller takes back ownership of the underlying storage provided via mq_store_*. After receiving a message, be sure to use mq_store_* to specify the storage for the next message before waiting again.
mq | The message queue. |
length | A pointer to store the total length in bytes of the message. Can be NULL. |