aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel P. Berrange <berrange@redhat.com>2010-12-01 16:35:50 +0000
committerDaniel P. Berrange <berrange@redhat.com>2011-06-24 11:48:45 +0100
commit434de30da545aea1379324b9098061201dd1529b (patch)
treee7023ad5f655ad7eb5814a7fb63b91112d62e066 /src/rpc/virnetclientstream.c
parentIntroduce generic RPC module for advertising via MDNS (diff)
downloadlibvirt-434de30da545aea1379324b9098061201dd1529b.tar.gz
libvirt-434de30da545aea1379324b9098061201dd1529b.tar.bz2
libvirt-434de30da545aea1379324b9098061201dd1529b.zip
Introduce generic RPC client objects
To facilitate creation of new clients using XDR RPC services, pull alot of the remote driver code into a set of reusable objects. - virNetClient: Encapsulates a socket connection to a remote RPC server. Handles all the network I/O for reading/writing RPC messages. Delegates RPC encoding and decoding to the registered programs - virNetClientProgram: Handles processing and dispatch of RPC messages for a single RPC (program,version). A program can register to receive async events from a client - virNetClientStream: Handles generic I/O stream integration to RPC layer Each new client program now merely needs to define the list of RPC procedures & events it wants and their handlers. It does not need to deal with any of the network I/O functionality at all.
Diffstat (limited to 'src/rpc/virnetclientstream.c')
-rw-r--r--src/rpc/virnetclientstream.c442
1 files changed, 442 insertions, 0 deletions
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
new file mode 100644
index 000000000..44c9acfe4
--- /dev/null
+++ b/src/rpc/virnetclientstream.c
@@ -0,0 +1,442 @@
+/*
+ * virnetclientstream.c: generic network RPC client stream
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Daniel P. Berrange <berrange@redhat.com>
+ */
+
+#include <config.h>
+
+#include "virnetclientstream.h"
+#include "virnetclient.h"
+#include "memory.h"
+#include "virterror_internal.h"
+#include "logging.h"
+#include "event.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...) \
+ virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \
+ __FUNCTION__, __LINE__, __VA_ARGS__)
+
+struct _virNetClientStream {
+ virNetClientProgramPtr prog;
+ int proc;
+ unsigned serial;
+ int refs;
+
+ virError err;
+
+ /* XXX this buffer is unbounded if the client
+ * app has domain events registered, since packets
+ * may be read off wire, while app isn't ready to
+ * recv them. Figure out how to address this some
+ * time by stopping consuming any incoming data
+ * off the socket....
+ */
+ char *incoming;
+ size_t incomingOffset;
+ size_t incomingLength;
+
+
+ virNetClientStreamEventCallback cb;
+ void *cbOpaque;
+ virFreeCallback cbFree;
+ int cbEvents;
+ int cbTimer;
+ int cbDispatch;
+};
+
+
+static void
+virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
+{
+ if (!st->cb)
+ return;
+
+ VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
+
+ if ((st->incomingOffset &&
+ (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
+ (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
+ VIR_DEBUG("Enabling event timer");
+ virEventUpdateTimeout(st->cbTimer, 0);
+ } else {
+ VIR_DEBUG("Disabling event timer");
+ virEventUpdateTimeout(st->cbTimer, -1);
+ }
+}
+
+
+static void
+virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virNetClientStreamPtr st = opaque;
+ int events = 0;
+
+ /* XXX we need a mutex on 'st' to protect this callback */
+
+ if (st->cb &&
+ (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
+ st->incomingOffset)
+ events |= VIR_STREAM_EVENT_READABLE;
+ if (st->cb &&
+ (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
+ events |= VIR_STREAM_EVENT_WRITABLE;
+
+ VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
+ if (events) {
+ virNetClientStreamEventCallback cb = st->cb;
+ void *cbOpaque = st->cbOpaque;
+ virFreeCallback cbFree = st->cbFree;
+
+ st->cbDispatch = 1;
+ (cb)(st, events, cbOpaque);
+ st->cbDispatch = 0;
+
+ if (!st->cb && cbFree)
+ (cbFree)(cbOpaque);
+ }
+}
+
+
+static void
+virNetClientStreamEventTimerFree(void *opaque)
+{
+ virNetClientStreamPtr st = opaque;
+ virNetClientStreamFree(st);
+}
+
+
+virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog,
+ int proc,
+ unsigned serial)
+{
+ virNetClientStreamPtr st;
+
+ if (VIR_ALLOC(st) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ virNetClientProgramRef(prog);
+
+ st->refs = 1;
+ st->prog = prog;
+ st->proc = proc;
+ st->serial = serial;
+
+ return st;
+}
+
+
+void virNetClientStreamRef(virNetClientStreamPtr st)
+{
+ st->refs++;
+}
+
+void virNetClientStreamFree(virNetClientStreamPtr st)
+{
+ st->refs--;
+ if (st->refs > 0)
+ return;
+
+ virResetError(&st->err);
+ VIR_FREE(st->incoming);
+ virNetClientProgramFree(st->prog);
+ VIR_FREE(st);
+}
+
+bool virNetClientStreamMatches(virNetClientStreamPtr st,
+ virNetMessagePtr msg)
+{
+ if (virNetClientProgramMatches(st->prog, msg) &&
+ st->proc == msg->header.proc &&
+ st->serial == msg->header.serial)
+ return 1;
+ return 0;
+}
+
+
+bool virNetClientStreamRaiseError(virNetClientStreamPtr st)
+{
+ if (st->err.code == VIR_ERR_OK)
+ return false;
+
+ virRaiseErrorFull(__FILE__, __FUNCTION__, __LINE__,
+ st->err.domain,
+ st->err.code,
+ st->err.level,
+ st->err.str1,
+ st->err.str2,
+ st->err.str3,
+ st->err.int1,
+ st->err.int2,
+ "%s", st->err.message ? st->err.message : _("Unknown error"));
+
+ return true;
+}
+
+
+int virNetClientStreamSetError(virNetClientStreamPtr st,
+ virNetMessagePtr msg)
+{
+ virNetMessageError err;
+ int ret = -1;
+
+ if (st->err.code != VIR_ERR_OK)
+ VIR_DEBUG("Overwriting existing stream error %s", NULLSTR(st->err.message));
+
+ virResetError(&st->err);
+ memset(&err, 0, sizeof(err));
+
+ if (virNetMessageDecodePayload(msg, (xdrproc_t)xdr_virNetMessageError, &err) < 0)
+ goto cleanup;
+
+ if (err.domain == VIR_FROM_REMOTE &&
+ err.code == VIR_ERR_RPC &&
+ err.level == VIR_ERR_ERROR &&
+ err.message &&
+ STRPREFIX(*err.message, "unknown procedure")) {
+ st->err.code = VIR_ERR_NO_SUPPORT;
+ } else {
+ st->err.code = err.code;
+ }
+ st->err.message = *err.message;
+ *err.message = NULL;
+ st->err.domain = err.domain;
+ st->err.level = err.level;
+ st->err.str1 = *err.str1;
+ st->err.str2 = *err.str2;
+ st->err.str3 = *err.str3;
+ st->err.int1 = err.int1;
+ st->err.int2 = err.int2;
+
+ ret = 0;
+
+cleanup:
+ xdr_free((xdrproc_t)xdr_virNetMessageError, (void*)&err);
+ return ret;
+}
+
+
+int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
+ virNetMessagePtr msg)
+{
+ size_t avail = st->incomingLength - st->incomingOffset;
+ size_t need = msg->bufferLength - msg->bufferOffset;
+
+ if (need > avail) {
+ size_t extra = need - avail;
+ if (VIR_REALLOC_N(st->incoming,
+ st->incomingLength + extra) < 0) {
+ VIR_DEBUG("Out of memory handling stream data");
+ return -1;
+ }
+ st->incomingLength += extra;
+ }
+
+ memcpy(st->incoming + st->incomingOffset,
+ msg->buffer + msg->bufferOffset,
+ msg->bufferLength - msg->bufferOffset);
+ st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+
+ VIR_DEBUG("Stream incoming data offset %zu length %zu",
+ st->incomingOffset, st->incomingLength);
+ return 0;
+}
+
+
+int virNetClientStreamSendPacket(virNetClientStreamPtr st,
+ virNetClientPtr client,
+ int status,
+ const char *data,
+ size_t nbytes)
+{
+ virNetMessagePtr msg;
+ bool wantReply;
+ VIR_DEBUG("st=%p status=%d data=%p nbytes=%zu", st, status, data, nbytes);
+
+ if (!(msg = virNetMessageNew()))
+ return -1;
+
+ msg->header.prog = virNetClientProgramGetProgram(st->prog);
+ msg->header.vers = virNetClientProgramGetVersion(st->prog);
+ msg->header.status = status;
+ msg->header.type = VIR_NET_STREAM;
+ msg->header.serial = st->serial;
+ msg->header.proc = st->proc;
+
+ if (virNetMessageEncodeHeader(msg) < 0)
+ goto error;
+
+ /* Data packets are async fire&forget, but OK/ERROR packets
+ * need a synchronous confirmation
+ */
+ if (status == VIR_NET_CONTINUE) {
+ if (virNetMessageEncodePayloadRaw(msg, data, nbytes) < 0)
+ goto error;
+ wantReply = false;
+ } else {
+ if (virNetMessageEncodePayloadRaw(msg, NULL, 0) < 0)
+ goto error;
+ wantReply = true;
+ }
+
+ if (virNetClientSend(client, msg, wantReply) < 0)
+ goto error;
+
+
+ return nbytes;
+
+error:
+ VIR_FREE(msg);
+ return -1;
+}
+
+int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
+ virNetClientPtr client,
+ char *data,
+ size_t nbytes,
+ bool nonblock)
+{
+ int rv = -1;
+ VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
+ st, client, data, nbytes, nonblock);
+ if (!st->incomingOffset) {
+ virNetMessagePtr msg;
+ int ret;
+
+ if (nonblock) {
+ VIR_DEBUG("Non-blocking mode and no data available");
+ rv = -2;
+ goto cleanup;
+ }
+
+ if (!(msg = virNetMessageNew())) {
+ virReportOOMError();
+ goto cleanup;
+ }
+
+ msg->header.prog = virNetClientProgramGetProgram(st->prog);
+ msg->header.vers = virNetClientProgramGetVersion(st->prog);
+ msg->header.type = VIR_NET_STREAM;
+ msg->header.serial = st->serial;
+ msg->header.proc = st->proc;
+
+ VIR_DEBUG("Dummy packet to wait for stream data");
+ ret = virNetClientSend(client, msg, true);
+
+ virNetMessageFree(msg);
+
+ if (ret < 0)
+ goto cleanup;
+ }
+
+ VIR_DEBUG("After IO %zu", st->incomingOffset);
+ if (st->incomingOffset) {
+ int want = st->incomingOffset;
+ if (want > nbytes)
+ want = nbytes;
+ memcpy(data, st->incoming, want);
+ if (want < st->incomingOffset) {
+ memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
+ st->incomingOffset -= want;
+ } else {
+ VIR_FREE(st->incoming);
+ st->incomingOffset = st->incomingLength = 0;
+ }
+ rv = want;
+ } else {
+ rv = 0;
+ }
+
+ virNetClientStreamEventTimerUpdate(st);
+
+cleanup:
+ return rv;
+}
+
+
+int virNetClientStreamEventAddCallback(virNetClientStreamPtr st,
+ int events,
+ virNetClientStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff)
+{
+ if (st->cb) {
+ virNetError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("multiple stream callbacks not supported"));
+ return 1;
+ }
+
+ virNetClientStreamRef(st);
+ if ((st->cbTimer =
+ virEventAddTimeout(-1,
+ virNetClientStreamEventTimer,
+ st,
+ virNetClientStreamEventTimerFree)) < 0) {
+ virNetClientStreamFree(st);
+ return -1;
+ }
+
+ st->cb = cb;
+ st->cbOpaque = opaque;
+ st->cbFree = ff;
+ st->cbEvents = events;
+
+ virNetClientStreamEventTimerUpdate(st);
+
+ return 0;
+}
+
+int virNetClientStreamEventUpdateCallback(virNetClientStreamPtr st,
+ int events)
+{
+ if (!st->cb) {
+ virNetError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("no stream callback registered"));
+ return -1;
+ }
+
+ st->cbEvents = events;
+
+ virNetClientStreamEventTimerUpdate(st);
+
+ return 0;
+}
+
+int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st)
+{
+ if (!st->cb) {
+ virNetError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("no stream callback registered"));
+ return -1;
+ }
+
+ if (!st->cbDispatch &&
+ st->cbFree)
+ (st->cbFree)(st->cbOpaque);
+ st->cb = NULL;
+ st->cbOpaque = NULL;
+ st->cbFree = NULL;
+ st->cbEvents = 0;
+ virEventRemoveTimeout(st->cbTimer);
+
+ return 0;
+}