diff options
author | Daniel P. Berrange <berrange@redhat.com> | 2010-12-01 16:35:50 +0000 |
---|---|---|
committer | Daniel P. Berrange <berrange@redhat.com> | 2011-06-24 11:48:45 +0100 |
commit | 434de30da545aea1379324b9098061201dd1529b (patch) | |
tree | e7023ad5f655ad7eb5814a7fb63b91112d62e066 /src/rpc/virnetclientstream.c | |
parent | Introduce generic RPC module for advertising via MDNS (diff) | |
download | libvirt-434de30da545aea1379324b9098061201dd1529b.tar.gz libvirt-434de30da545aea1379324b9098061201dd1529b.tar.bz2 libvirt-434de30da545aea1379324b9098061201dd1529b.zip |
Introduce generic RPC client objects
To facilitate creation of new clients using XDR RPC services,
pull alot of the remote driver code into a set of reusable
objects.
- virNetClient: Encapsulates a socket connection to a
remote RPC server. Handles all the network I/O for
reading/writing RPC messages. Delegates RPC encoding
and decoding to the registered programs
- virNetClientProgram: Handles processing and dispatch
of RPC messages for a single RPC (program,version).
A program can register to receive async events
from a client
- virNetClientStream: Handles generic I/O stream
integration to RPC layer
Each new client program now merely needs to define the list of
RPC procedures & events it wants and their handlers. It does
not need to deal with any of the network I/O functionality at
all.
Diffstat (limited to 'src/rpc/virnetclientstream.c')
-rw-r--r-- | src/rpc/virnetclientstream.c | 442 |
1 files changed, 442 insertions, 0 deletions
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c new file mode 100644 index 000000000..44c9acfe4 --- /dev/null +++ b/src/rpc/virnetclientstream.c @@ -0,0 +1,442 @@ +/* + * virnetclientstream.c: generic network RPC client stream + * + * Copyright (C) 2006-2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange <berrange@redhat.com> + */ + +#include <config.h> + +#include "virnetclientstream.h" +#include "virnetclient.h" +#include "memory.h" +#include "virterror_internal.h" +#include "logging.h" +#include "event.h" + +#define VIR_FROM_THIS VIR_FROM_RPC +#define virNetError(code, ...) \ + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \ + __FUNCTION__, __LINE__, __VA_ARGS__) + +struct _virNetClientStream { + virNetClientProgramPtr prog; + int proc; + unsigned serial; + int refs; + + virError err; + + /* XXX this buffer is unbounded if the client + * app has domain events registered, since packets + * may be read off wire, while app isn't ready to + * recv them. Figure out how to address this some + * time by stopping consuming any incoming data + * off the socket.... + */ + char *incoming; + size_t incomingOffset; + size_t incomingLength; + + + virNetClientStreamEventCallback cb; + void *cbOpaque; + virFreeCallback cbFree; + int cbEvents; + int cbTimer; + int cbDispatch; +}; + + +static void +virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) +{ + if (!st->cb) + return; + + VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + + if ((st->incomingOffset && + (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || + (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { + VIR_DEBUG("Enabling event timer"); + virEventUpdateTimeout(st->cbTimer, 0); + } else { + VIR_DEBUG("Disabling event timer"); + virEventUpdateTimeout(st->cbTimer, -1); + } +} + + +static void +virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virNetClientStreamPtr st = opaque; + int events = 0; + + /* XXX we need a mutex on 'st' to protect this callback */ + + if (st->cb && + (st->cbEvents & VIR_STREAM_EVENT_READABLE) && + st->incomingOffset) + events |= VIR_STREAM_EVENT_READABLE; + if (st->cb && + (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) + events |= VIR_STREAM_EVENT_WRITABLE; + + VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + if (events) { + virNetClientStreamEventCallback cb = st->cb; + void *cbOpaque = st->cbOpaque; + virFreeCallback cbFree = st->cbFree; + + st->cbDispatch = 1; + (cb)(st, events, cbOpaque); + st->cbDispatch = 0; + + if (!st->cb && cbFree) + (cbFree)(cbOpaque); + } +} + + +static void +virNetClientStreamEventTimerFree(void *opaque) +{ + virNetClientStreamPtr st = opaque; + virNetClientStreamFree(st); +} + + +virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, + int proc, + unsigned serial) +{ + virNetClientStreamPtr st; + + if (VIR_ALLOC(st) < 0) { + virReportOOMError(); + return NULL; + } + + virNetClientProgramRef(prog); + + st->refs = 1; + st->prog = prog; + st->proc = proc; + st->serial = serial; + + return st; +} + + +void virNetClientStreamRef(virNetClientStreamPtr st) +{ + st->refs++; +} + +void virNetClientStreamFree(virNetClientStreamPtr st) +{ + st->refs--; + if (st->refs > 0) + return; + + virResetError(&st->err); + VIR_FREE(st->incoming); + virNetClientProgramFree(st->prog); + VIR_FREE(st); +} + +bool virNetClientStreamMatches(virNetClientStreamPtr st, + virNetMessagePtr msg) +{ + if (virNetClientProgramMatches(st->prog, msg) && + st->proc == msg->header.proc && + st->serial == msg->header.serial) + return 1; + return 0; +} + + +bool virNetClientStreamRaiseError(virNetClientStreamPtr st) +{ + if (st->err.code == VIR_ERR_OK) + return false; + + virRaiseErrorFull(__FILE__, __FUNCTION__, __LINE__, + st->err.domain, + st->err.code, + st->err.level, + st->err.str1, + st->err.str2, + st->err.str3, + st->err.int1, + st->err.int2, + "%s", st->err.message ? st->err.message : _("Unknown error")); + + return true; +} + + +int virNetClientStreamSetError(virNetClientStreamPtr st, + virNetMessagePtr msg) +{ + virNetMessageError err; + int ret = -1; + + if (st->err.code != VIR_ERR_OK) + VIR_DEBUG("Overwriting existing stream error %s", NULLSTR(st->err.message)); + + virResetError(&st->err); + memset(&err, 0, sizeof(err)); + + if (virNetMessageDecodePayload(msg, (xdrproc_t)xdr_virNetMessageError, &err) < 0) + goto cleanup; + + if (err.domain == VIR_FROM_REMOTE && + err.code == VIR_ERR_RPC && + err.level == VIR_ERR_ERROR && + err.message && + STRPREFIX(*err.message, "unknown procedure")) { + st->err.code = VIR_ERR_NO_SUPPORT; + } else { + st->err.code = err.code; + } + st->err.message = *err.message; + *err.message = NULL; + st->err.domain = err.domain; + st->err.level = err.level; + st->err.str1 = *err.str1; + st->err.str2 = *err.str2; + st->err.str3 = *err.str3; + st->err.int1 = err.int1; + st->err.int2 = err.int2; + + ret = 0; + +cleanup: + xdr_free((xdrproc_t)xdr_virNetMessageError, (void*)&err); + return ret; +} + + +int virNetClientStreamQueuePacket(virNetClientStreamPtr st, + virNetMessagePtr msg) +{ + size_t avail = st->incomingLength - st->incomingOffset; + size_t need = msg->bufferLength - msg->bufferOffset; + + if (need > avail) { + size_t extra = need - avail; + if (VIR_REALLOC_N(st->incoming, + st->incomingLength + extra) < 0) { + VIR_DEBUG("Out of memory handling stream data"); + return -1; + } + st->incomingLength += extra; + } + + memcpy(st->incoming + st->incomingOffset, + msg->buffer + msg->bufferOffset, + msg->bufferLength - msg->bufferOffset); + st->incomingOffset += (msg->bufferLength - msg->bufferOffset); + + VIR_DEBUG("Stream incoming data offset %zu length %zu", + st->incomingOffset, st->incomingLength); + return 0; +} + + +int virNetClientStreamSendPacket(virNetClientStreamPtr st, + virNetClientPtr client, + int status, + const char *data, + size_t nbytes) +{ + virNetMessagePtr msg; + bool wantReply; + VIR_DEBUG("st=%p status=%d data=%p nbytes=%zu", st, status, data, nbytes); + + if (!(msg = virNetMessageNew())) + return -1; + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.status = status; + msg->header.type = VIR_NET_STREAM; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + if (virNetMessageEncodeHeader(msg) < 0) + goto error; + + /* Data packets are async fire&forget, but OK/ERROR packets + * need a synchronous confirmation + */ + if (status == VIR_NET_CONTINUE) { + if (virNetMessageEncodePayloadRaw(msg, data, nbytes) < 0) + goto error; + wantReply = false; + } else { + if (virNetMessageEncodePayloadRaw(msg, NULL, 0) < 0) + goto error; + wantReply = true; + } + + if (virNetClientSend(client, msg, wantReply) < 0) + goto error; + + + return nbytes; + +error: + VIR_FREE(msg); + return -1; +} + +int virNetClientStreamRecvPacket(virNetClientStreamPtr st, + virNetClientPtr client, + char *data, + size_t nbytes, + bool nonblock) +{ + int rv = -1; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", + st, client, data, nbytes, nonblock); + if (!st->incomingOffset) { + virNetMessagePtr msg; + int ret; + + if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + if (!(msg = virNetMessageNew())) { + virReportOOMError(); + goto cleanup; + } + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.type = VIR_NET_STREAM; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + VIR_DEBUG("Dummy packet to wait for stream data"); + ret = virNetClientSend(client, msg, true); + + virNetMessageFree(msg); + + if (ret < 0) + goto cleanup; + } + + VIR_DEBUG("After IO %zu", st->incomingOffset); + if (st->incomingOffset) { + int want = st->incomingOffset; + if (want > nbytes) + want = nbytes; + memcpy(data, st->incoming, want); + if (want < st->incomingOffset) { + memmove(st->incoming, st->incoming + want, st->incomingOffset - want); + st->incomingOffset -= want; + } else { + VIR_FREE(st->incoming); + st->incomingOffset = st->incomingLength = 0; + } + rv = want; + } else { + rv = 0; + } + + virNetClientStreamEventTimerUpdate(st); + +cleanup: + return rv; +} + + +int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, + int events, + virNetClientStreamEventCallback cb, + void *opaque, + virFreeCallback ff) +{ + if (st->cb) { + virNetError(VIR_ERR_INTERNAL_ERROR, + "%s", _("multiple stream callbacks not supported")); + return 1; + } + + virNetClientStreamRef(st); + if ((st->cbTimer = + virEventAddTimeout(-1, + virNetClientStreamEventTimer, + st, + virNetClientStreamEventTimerFree)) < 0) { + virNetClientStreamFree(st); + return -1; + } + + st->cb = cb; + st->cbOpaque = opaque; + st->cbFree = ff; + st->cbEvents = events; + + virNetClientStreamEventTimerUpdate(st); + + return 0; +} + +int virNetClientStreamEventUpdateCallback(virNetClientStreamPtr st, + int events) +{ + if (!st->cb) { + virNetError(VIR_ERR_INTERNAL_ERROR, + "%s", _("no stream callback registered")); + return -1; + } + + st->cbEvents = events; + + virNetClientStreamEventTimerUpdate(st); + + return 0; +} + +int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st) +{ + if (!st->cb) { + virNetError(VIR_ERR_INTERNAL_ERROR, + "%s", _("no stream callback registered")); + return -1; + } + + if (!st->cbDispatch && + st->cbFree) + (st->cbFree)(st->cbOpaque); + st->cb = NULL; + st->cbOpaque = NULL; + st->cbFree = NULL; + st->cbEvents = 0; + virEventRemoveTimeout(st->cbTimer); + + return 0; +} |