[kune-commits] r1250 - branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server

Vicente J. Ruiz Jurado vjrj_ at ourproject.org
Sat Feb 12 03:19:16 CET 2011


Author: vjrj_
Date: 2011-02-12 03:19:16 +0100 (Sat, 12 Feb 2011)
New Revision: 1250

Added:
   branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java
   branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java
   branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java
Log:
some wave startup testing

Added: branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java
===================================================================
--- branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java	                        (rev 0)
+++ branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java	2011-02-12 02:19:16 UTC (rev 1250)
@@ -0,0 +1,507 @@
+package cc.kune.wave.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.servlet.Servlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.ourproject.kune.rack.RackServletFilter;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult;
+import org.waveprotocol.box.server.CoreSettings;
+import org.waveprotocol.box.server.authentication.SessionManager;
+import org.waveprotocol.box.server.rpc.BaseUrlHelper;
+import org.waveprotocol.box.server.rpc.MessageExpectingChannel;
+import org.waveprotocol.box.server.rpc.ProtoCallback;
+import org.waveprotocol.box.server.rpc.Rpc;
+import org.waveprotocol.box.server.rpc.ServerRpcController;
+import org.waveprotocol.box.server.rpc.SocketIOServerChannel;
+import org.waveprotocol.box.server.rpc.WebSocketServerChannel;
+import org.waveprotocol.box.server.util.NetUtils;
+import org.waveprotocol.wave.model.util.Pair;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+import org.waveprotocol.wave.util.logging.Log;
+
+import com.glines.socketio.server.SocketIOInbound;
+import com.glines.socketio.server.SocketIOServlet;
+import com.glines.socketio.server.transport.FlashSocketTransport;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.name.Named;
+import com.google.inject.servlet.GuiceFilter;
+import com.google.inject.servlet.GuiceServletContextListener;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
+import com.google.protobuf.UnknownFieldSet;
+
+/**
+ * ServerRpcProvider can provide instances of type Service over an incoming
+ * network socket and service incoming RPCs to these services and their methods.
+ * 
+ * 
+ */
+public class CustomServerRpcProvider {
+    abstract class Connection implements ProtoCallback {
+        private final Map<Long, ServerRpcController> activeRpcs = new ConcurrentHashMap<Long, ServerRpcController>();
+
+        // The logged in user.
+        // Note: Due to this bug:
+        // http://code.google.com/p/wave-protocol/issues/detail?id=119,
+        // the field may be null on first connect and then set later using an
+        // RPC.
+        private ParticipantId loggedInUser;
+
+        /**
+         * @param loggedInUser
+         *            The currently logged in user, or null if no user is logged
+         *            in.
+         */
+        public Connection(final ParticipantId loggedInUser) {
+            this.loggedInUser = loggedInUser;
+        }
+
+        private ParticipantId authenticate(final String token) {
+            final HttpSession session = sessionManager.getSessionFromToken(token);
+            final ParticipantId user = sessionManager.getLoggedInUser(session);
+            return user;
+        }
+
+        protected void expectMessages(final MessageExpectingChannel channel) {
+            synchronized (registeredServices) {
+                for (final RegisteredServiceMethod serviceMethod : registeredServices.values()) {
+                    channel.expectMessage(serviceMethod.service.getRequestPrototype(serviceMethod.method));
+                    LOG.fine("Expecting: " + serviceMethod.method.getFullName());
+                }
+            }
+            channel.expectMessage(Rpc.CancelRpc.getDefaultInstance());
+        }
+
+        @Override
+        public void message(final long sequenceNo, final Message message) {
+            if (message instanceof Rpc.CancelRpc) {
+                final ServerRpcController controller = activeRpcs.get(sequenceNo);
+                if (controller == null) {
+                    throw new IllegalStateException("Trying to cancel an RPC that is not active!");
+                } else {
+                    LOG.info("Cancelling open RPC " + sequenceNo);
+                    controller.cancel();
+                }
+            } else if (message instanceof ProtocolAuthenticate) {
+                // Workaround for bug:
+                // http://codereview.waveprotocol.org/224001/
+
+                // When we get this message, either the connection will not be
+                // logged in
+                // (loggedInUser == null) or the connection will have been
+                // authenticated
+                // via cookies
+                // (in which case loggedInUser must match the authenticated
+                // user, and
+                // this message has no
+                // effect).
+
+                final ProtocolAuthenticate authMessage = (ProtocolAuthenticate) message;
+                final ParticipantId authenticatedAs = authenticate(authMessage.getToken());
+
+                Preconditions.checkArgument(authenticatedAs != null, "Auth token invalid");
+                Preconditions.checkState(loggedInUser == null || loggedInUser.equals(authenticatedAs),
+                        "Session already authenticated as a different user");
+
+                loggedInUser = authenticatedAs;
+                LOG.info("Session authenticated as " + loggedInUser);
+                sendMessage(sequenceNo, ProtocolAuthenticationResult.getDefaultInstance());
+            } else if (registeredServices.containsKey(message.getDescriptorForType())) {
+                if (activeRpcs.containsKey(sequenceNo)) {
+                    throw new IllegalStateException("Can't invoke a new RPC with a sequence number already in use.");
+                } else {
+                    final RegisteredServiceMethod serviceMethod = registeredServices.get(message.getDescriptorForType());
+
+                    // Create the internal ServerRpcController used to invoke
+                    // the call.
+                    final ServerRpcController controller = new ServerRpcControllerImpl(message, serviceMethod.service,
+                            serviceMethod.method, loggedInUser, new RpcCallback<Message>() {
+                                @Override
+                                synchronized public void run(final Message message) {
+                                    if (message instanceof Rpc.RpcFinished
+                                            || !serviceMethod.method.getOptions().getExtension(Rpc.isStreamingRpc)) {
+                                        // This RPC is over - remove it from the
+                                        // map.
+                                        final boolean failed = message instanceof Rpc.RpcFinished ? ((Rpc.RpcFinished) message).getFailed()
+                                                : false;
+                                        LOG.fine("RPC " + sequenceNo + " is now finished, failed = " + failed);
+                                        if (failed) {
+                                            LOG.info("error = " + ((Rpc.RpcFinished) message).getErrorText());
+                                        }
+                                        activeRpcs.remove(sequenceNo);
+                                    }
+                                    sendMessage(sequenceNo, message);
+                                }
+                            });
+
+                    // Kick off a new thread specific to this RPC.
+                    activeRpcs.put(sequenceNo, controller);
+                    threadPool.execute(controller);
+                }
+            } else {
+                // Sent a message type we understand, but don't expect -
+                // erronous case!
+                throw new IllegalStateException("Got expected but unknown message  (" + message + ") for sequence: "
+                        + sequenceNo);
+            }
+        }
+
+        protected abstract void sendMessage(long sequenceNo, Message message);
+
+        @Override
+        public void unknown(final long sequenceNo, final String messageType, final String message) {
+            throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+                    + ") for sequence: " + sequenceNo);
+        }
+
+        @Override
+        public void unknown(final long sequenceNo, final String messageType, final UnknownFieldSet message) {
+            throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+                    + ") for sequence: " + sequenceNo);
+        }
+    }
+
+    /**
+     * Internal, static container class for any specific registered service
+     * method.
+     */
+    static class RegisteredServiceMethod {
+        final MethodDescriptor method;
+        final Service service;
+
+        RegisteredServiceMethod(final Service service, final MethodDescriptor method) {
+            this.service = service;
+            this.method = method;
+        }
+    }
+    class SocketIOConnection extends Connection {
+        private final SocketIOServerChannel socketChannel;
+
+        SocketIOConnection(final ParticipantId loggedInUser) {
+            super(loggedInUser);
+            socketChannel = new SocketIOServerChannel(this);
+            LOG.info("New websocket connection set up for user " + loggedInUser);
+            expectMessages(socketChannel);
+        }
+
+        public SocketIOServerChannel getWebSocketServerChannel() {
+            return socketChannel;
+        }
+
+        @Override
+        protected void sendMessage(final long sequenceNo, final Message message) {
+            socketChannel.sendMessage(sequenceNo, message);
+        }
+    }
+    public class WaveSocketIOServlet extends SocketIOServlet {
+        @Override
+        protected SocketIOInbound doSocketIOConnect(final HttpServletRequest request, final String[] protocols) {
+            final ParticipantId loggedInUser = sessionManager.getLoggedInUser(request.getSession(false));
+
+            final SocketIOConnection connection = new SocketIOConnection(loggedInUser);
+            return connection.getWebSocketServerChannel();
+        }
+    }
+    public class WaveWebSocketServlet extends WebSocketServlet {
+        @Override
+        protected WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) {
+            final ParticipantId loggedInUser = sessionManager.getLoggedInUser(request.getSession(false));
+
+            final WebSocketConnection connection = new WebSocketConnection(loggedInUser);
+            return connection.getWebSocketServerChannel();
+        }
+    }
+    class WebSocketConnection extends Connection {
+        private final WebSocketServerChannel socketChannel;
+
+        WebSocketConnection(final ParticipantId loggedInUser) {
+            super(loggedInUser);
+            socketChannel = new WebSocketServerChannel(this);
+            LOG.info("New websocket connection set up for user " + loggedInUser);
+            expectMessages(socketChannel);
+        }
+
+        public WebSocketServerChannel getWebSocketServerChannel() {
+            return socketChannel;
+        }
+
+        @Override
+        protected void sendMessage(final long sequenceNo, final Message message) {
+            socketChannel.sendMessage(sequenceNo, message);
+        }
+    }
+    private static final Log LOG = Log.get(CustomServerRpcProvider.class);
+
+    private static InetSocketAddress[] parseAddressList(final List<String> addressList) {
+        if (addressList == null || addressList.size() == 0) {
+            return new InetSocketAddress[0];
+        } else {
+            final Set<InetSocketAddress> addresses = Sets.newHashSet();
+            for (final String str : addressList) {
+                if (str.length() == 0) {
+                    LOG.warning("Encountered empty address in http addresses list.");
+                } else {
+                    try {
+                        final InetSocketAddress address = NetUtils.parseHttpAddress(str);
+                        if (!addresses.contains(address)) {
+                            addresses.add(address);
+                        } else {
+                            LOG.warning("Ignoring duplicate address in http addresses list: Duplicate entry '" + str
+                                    + "' resolved to " + address.getAddress().getHostAddress());
+                        }
+                    } catch (final IOException e) {
+                        LOG.severe("Unable to process address " + str, e);
+                    }
+                }
+            }
+            return addresses.toArray(new InetSocketAddress[0]);
+        }
+    }
+    private final String baseUrl;
+
+    private final Integer flashsocketPolicyPort;
+
+    private final InetSocketAddress[] httpAddresses;
+
+    private Server httpServer = null;
+
+    private final Set<Connection> incomingConnections = Sets.newHashSet();
+
+    private final org.eclipse.jetty.server.SessionManager jettySessionManager;
+
+    // Mapping from incoming protocol buffer type -> specific handler.
+    private final Map<Descriptors.Descriptor, RegisteredServiceMethod> registeredServices = Maps.newHashMap();
+
+    private final String resourceBase;
+
+    /**
+     * Set of servlets
+     */
+    List<Pair<String, ServletHolder>> servletRegistry = Lists.newArrayList();
+
+    private final SessionManager sessionManager;
+
+    private final ExecutorService threadPool;
+
+    /**
+     * Construct a new ServerRpcProvider, hosting on the specified WebSocket
+     * addresses.
+     * 
+     * Also accepts an ExecutorService for spawning managing threads.
+     */
+    public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+            final String baseUrl, final String resourceBase, final ExecutorService threadPool,
+            final SessionManager sessionManager, final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+        this.httpAddresses = httpAddresses;
+        this.flashsocketPolicyPort = flashsocketPolicyPort;
+        this.baseUrl = BaseUrlHelper.removeLastSlash(baseUrl);
+        this.resourceBase = resourceBase;
+        this.threadPool = threadPool;
+        this.sessionManager = sessionManager;
+        this.jettySessionManager = jettySessionManager;
+    }
+
+    /**
+     * Constructs a new ServerRpcProvider with a default ExecutorService.
+     */
+    public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+            final String baseUrl, final String resourceBase, final SessionManager sessionManager,
+            final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+        this(httpAddresses, flashsocketPolicyPort, baseUrl, resourceBase, Executors.newCachedThreadPool(),
+                sessionManager, jettySessionManager);
+    }
+
+    @Inject
+    public CustomServerRpcProvider(@Named(CoreSettings.HTTP_FRONTEND_ADDRESSES) final List<String> httpAddresses,
+            @Named(CoreSettings.FLASHSOCKET_POLICY_PORT) final Integer flashsocketPolicyPort,
+            @Named(CoreSettings.HTTP_BASE_URL) final String baseUrl,
+            @Named(CoreSettings.RESOURCE_BASE) final String resourceBase, final SessionManager sessionManager,
+            final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+        this(parseAddressList(httpAddresses), flashsocketPolicyPort, baseUrl, resourceBase, sessionManager,
+                jettySessionManager);
+    }
+
+    /**
+     * Add a servlet to the servlet registry. This servlet will be attached to
+     * the specified URL pattern when the server is started up.
+     * 
+     * @param urlPattern
+     *            URL pattern for paths. Eg, '/foo', '/foo/*'
+     * @param servlet
+     *            The servlet object to bind to the specified paths
+     * @return the {@link ServletHolder} that holds the servlet.
+     */
+    public ServletHolder addServlet(final String urlPattern, final Servlet servlet) {
+        final ServletHolder servletHolder = new ServletHolder(servlet);
+        servletRegistry.add(new Pair<String, ServletHolder>(urlPattern, servletHolder));
+        return servletHolder;
+    }
+
+    /**
+     * @return a list of {@link SelectChannelConnector} each bound to a
+     *         host:port pair form the list addresses.
+     */
+    private List<SelectChannelConnector> getSelectChannelConnectors(final InetSocketAddress[] httpAddresses) {
+        final List<SelectChannelConnector> list = Lists.newArrayList();
+        for (final InetSocketAddress address : httpAddresses) {
+            final SelectChannelConnector connector = new SelectChannelConnector();
+            connector.setHost(address.getAddress().getHostAddress());
+            connector.setPort(address.getPort());
+            list.add(connector);
+        }
+
+        return list;
+    }
+
+    /**
+     * Returns the socket the WebSocket server is listening on.
+     */
+    public SocketAddress getWebSocketAddress() {
+        if (httpServer == null) {
+            return null;
+        } else {
+            final Connector c = httpServer.getConnectors()[0];
+            return new InetSocketAddress(c.getHost(), c.getLocalPort());
+        }
+    }
+
+    /**
+     * Register all methods provided by the given service type.
+     */
+    public void registerService(final Service service) {
+        synchronized (registeredServices) {
+            for (final MethodDescriptor methodDescriptor : service.getDescriptorForType().getMethods()) {
+                registeredServices.put(methodDescriptor.getInputType(), new RegisteredServiceMethod(service,
+                        methodDescriptor));
+            }
+        }
+    }
+
+    public void startWebSocketServer(final Injector injector) {
+        httpServer = new Server();
+
+        final List<SelectChannelConnector> connectors = getSelectChannelConnectors(httpAddresses);
+        if (connectors.isEmpty()) {
+            LOG.severe("No valid http end point address provided!");
+        }
+        for (final SelectChannelConnector connector : connectors) {
+            httpServer.addConnector(connector);
+        }
+
+        final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        if (jettySessionManager != null) {
+            context.getSessionHandler().setSessionManager(jettySessionManager);
+        }
+        context.setResourceBase(resourceBase);
+        context.setContextPath(baseUrl);
+        context.addEventListener(new GuiceServletContextListener() {
+            @Override
+            protected Injector getInjector() {
+                return injector;
+            }
+        });
+        context.setAttribute(RackServletFilter.INJECTOR_PARENT_ATTRIBUTE, injector);
+        context.addFilter(GuiceFilter.class, "/*", 0);
+        final FilterHolder filterHolder = new FilterHolder(RackServletFilter.class);
+        filterHolder.setInitParameter("org.ourproject.kune.rack.RackModule",
+                "org.ourproject.kune.app.server.KuneRackModule");
+        context.addFilter(filterHolder, "/ws*", 0);
+
+        // Servlet where the websocket connection is served from.
+        final ServletHolder wsholder = new ServletHolder(new WaveWebSocketServlet());
+        context.addServlet(wsholder, "/socket");
+        // TODO(zamfi): fix to let messages span frames.
+        wsholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+
+        // Servlet where the websocket connection is served from.
+        final ServletHolder sioholder = new ServletHolder(new WaveSocketIOServlet());
+        context.addServlet(sioholder, "/socket.io/*");
+        // TODO(zamfi): fix to let messages span frames.
+        sioholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+        // Set flash policy server parameters
+        String flashPolicyServerHost = "localhost";
+        final StringBuilder flashPolicyAllowedPorts = new StringBuilder();
+        /*
+         * Loop through addresses, collect list of ports, and determine if we
+         * are to use "localhost" of the AnyHost wildcard.
+         */
+        for (final InetSocketAddress addr : httpAddresses) {
+            if (flashPolicyAllowedPorts.length() > 0) {
+                flashPolicyAllowedPorts.append(",");
+            }
+            flashPolicyAllowedPorts.append(addr.getPort());
+            if (!addr.getAddress().isLoopbackAddress()) {
+                // Until it's possible to pass a list of address, this is the
+                // only valid alternative.
+                flashPolicyServerHost = "0.0.0.0";
+            }
+        }
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_HOST_KEY, flashPolicyServerHost);
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_PORT_KEY, "" + flashsocketPolicyPort);
+        // TODO: Change to use the public http address and all other bound
+        // addresses.
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_DOMAIN_KEY, "*");
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_PORTS_KEY, flashPolicyAllowedPorts.toString());
+
+        // Serve the static content and GWT web client with the default servlet
+        // (acts like a standard file-based web server).
+        final ServletHolder defaultServlet = new ServletHolder(new DefaultServlet());
+        context.addServlet(defaultServlet, "/static/*");
+        context.addServlet(defaultServlet, "/webclient/*");
+
+        for (final Pair<String, ServletHolder> servlet : servletRegistry) {
+            context.addServlet(servlet.getSecond(), servlet.getFirst());
+        }
+
+        httpServer.setHandler(context);
+
+        try {
+            httpServer.start();
+        } catch (final Exception e) { // yes, .start() throws "Exception"
+            LOG.severe("Fatal error starting http server.", e);
+            return;
+        }
+        LOG.fine("WebSocket server running. --------");
+    }
+
+    /**
+     * Stops this server.
+     */
+    public void stopServer() throws IOException {
+        try {
+            httpServer.stop(); // yes, .stop() throws "Exception"
+        } catch (final Exception e) {
+            LOG.warning("Fatal error stopping http server.", e);
+        }
+        LOG.fine("server shutdown.");
+    }
+}

Added: branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java
===================================================================
--- branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java	                        (rev 0)
+++ branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java	2011-02-12 02:19:16 UTC (rev 1250)
@@ -0,0 +1,226 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package cc.kune.wave.server;
+
+import org.waveprotocol.box.server.rpc.Rpc;
+import org.waveprotocol.box.server.rpc.ServerRpcController;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
+
+/**
+ * Implements the server end-point of a wave server RPC connection. This is a
+ * single-use RPC controller.
+ * 
+ * 
+ */
+public class ServerRpcControllerImpl implements ServerRpcController {
+    private final Service backingService;
+    private final RpcCallback<Message> callback;
+    private RpcCallback<Object> cancelCallback = null;
+    private boolean cancelled = false;
+    private boolean complete = false;
+
+    private final boolean isStreamingRpc;
+    private final ParticipantId loggedInUser;
+    private final Message requestMessage;
+    private final Descriptors.MethodDescriptor serviceMethod;
+    // The following variables represent the current status of this instance,
+    // and
+    // must all only be accessed or modified while synchronised on statusLock.
+    private final Object statusLock = new Object();
+
+    /**
+     * Instantiate a new ServerRpcController that may later be completely
+     * invoked by calling {#link run}.
+     * 
+     * @param requestMessage
+     *            the request being handled
+     * @param backingService
+     *            the backing service type
+     * @param serviceMethod
+     *            the specific method within the backing service type
+     * @param loggedInUser
+     *            the currently logged in user
+     * @param callback
+     *            the destination where responses may be passed - may be called
+     *            once (normal RPC) or 1-n times (streaming RPC), and will pass
+     *            instances of RpcFinished as required (error cases, or
+     *            streaming RPC shutdown); is also always called under the
+     *            ServerRpcController's statusLock to ensure that consecutive
+     *            calls (in the streaming case) are called in series
+     */
+    ServerRpcControllerImpl(final Message requestMessage, final Service backingService,
+            final Descriptors.MethodDescriptor serviceMethod, final ParticipantId loggedInUser,
+            final RpcCallback<Message> callback) {
+        this.requestMessage = requestMessage;
+        this.backingService = backingService;
+        this.serviceMethod = serviceMethod;
+        this.loggedInUser = loggedInUser;
+        this.isStreamingRpc = serviceMethod.getOptions().getExtension(Rpc.isStreamingRpc);
+        this.callback = callback;
+    }
+
+    @Override
+    public void cancel() {
+        RpcCallback<Object> runCallback = null;
+        synchronized (statusLock) {
+            if (cancelled) {
+                throw new IllegalStateException("Can't cancel RPC, already cancelled.");
+            }
+            cancelled = true;
+            if (cancelCallback != null && !complete) {
+                runCallback = cancelCallback;
+            }
+        }
+        if (runCallback != null) {
+            runCallback.run(null);
+        }
+    }
+
+    @Override
+    public String errorText() {
+        throw new UnsupportedOperationException("Client-side method of RpcController only.");
+    }
+
+    @Override
+    public boolean failed() {
+        throw new UnsupportedOperationException("Client-side method of RpcController only.");
+    }
+
+    @Override
+    public ParticipantId getLoggedInUser() {
+        return loggedInUser;
+    }
+
+    @Override
+    public boolean isCanceled() {
+        return cancelled;
+    }
+
+    /**
+     * Registers a cancellation callback. This will always be called as part of
+     * this RPC, and always at most once; either when the client asks to cancel
+     * it, or when the RPC finishes (regardless of error case).
+     * 
+     * This callback will be called outside normal locks on ServerRpcController
+     * state, i.e., not within a block synchronised on statusLock.
+     */
+    @Override
+    public void notifyOnCancel(final RpcCallback<Object> callback) {
+        RpcCallback<Object> runCallback = null;
+        synchronized (statusLock) {
+            if (cancelCallback != null) {
+                throw new IllegalStateException("Must only be called once per request.");
+            } else {
+                cancelCallback = callback;
+                if (cancelled || complete) {
+                    runCallback = cancelCallback;
+                }
+            }
+        }
+        if (runCallback != null) {
+            runCallback.run(null);
+        }
+    }
+
+    @Override
+    public void reset() {
+        throw new UnsupportedOperationException("Client-side method of RpcController only.");
+    }
+
+    /**
+     * Run this ServerRpcController in the current thread. This must only be
+     * invoked ONCE, and will throw an IllegalStateException otherwise.
+     */
+    @Override
+    public void run() {
+        final RpcCallback<Message> messageCallback = new RpcCallback<Message>() {
+            @Override
+            public void run(Message result) { // NOPMD by vjrj on 12/02/11 0:16
+                RpcCallback<Object> runCallback = null;
+                synchronized (statusLock) {
+                    if (complete) {
+                        throw new IllegalStateException("Can't send responses over this RPC, as it is"
+                                + " already complete: " + result);
+                    }
+                    if (!isStreamingRpc || result == null) {
+                        // This either completes the streaming RPC (by passing
+                        // an instance
+                        // of RpcFinished in place of null) or completes a
+                        // normal RPC (by
+                        // passing any other message).
+                        if (result == null) {
+                            result = Rpc.RpcFinished.newBuilder().setFailed(false).build();
+                        }
+                        callback.run(result);
+
+                        // Now complete, mark as such and invoke the
+                        // cancellation callback.
+                        complete = true;
+                        if (cancelCallback != null && !cancelled) {
+                            runCallback = cancelCallback;
+                        }
+                    } else {
+                        // Streaming RPC update.
+                        callback.run(result);
+                    }
+                }
+                if (runCallback != null) {
+                    runCallback.run(null);
+                }
+            }
+        };
+        try {
+            backingService.callMethod(serviceMethod, this, requestMessage, messageCallback);
+        } catch (final RuntimeException e) {
+            // Pass the description of any RuntimeException back to the caller.
+            e.printStackTrace();
+            if (!complete) {
+                setFailed(e.toString());
+            }
+        }
+    }
+
+    @Override
+    public void setFailed(final String reason) {
+        RpcCallback<Object> runCallback = null;
+        synchronized (statusLock) {
+            if (complete) {
+                throw new IllegalStateException("Can't fail this RPC, as it is already complete.");
+            } else {
+                complete = true;
+                callback.run(Rpc.RpcFinished.newBuilder().setFailed(true).setErrorText(reason).build());
+                if (cancelCallback != null && !cancelled) {
+                    runCallback = cancelCallback;
+                }
+            }
+        }
+        if (runCallback != null) {
+            runCallback.run(null);
+        }
+    }
+
+    @Override
+    public void startCancel() {
+        throw new UnsupportedOperationException("Client-side method of RpcController only.");
+    }
+}

Added: branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java
===================================================================
--- branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java	                        (rev 0)
+++ branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java	2011-02-12 02:19:16 UTC (rev 1250)
@@ -0,0 +1,197 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package cc.kune.wave.server;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.ProxyServlet;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolWaveClientRpc;
+import org.waveprotocol.box.server.CoreSettings;
+import org.waveprotocol.box.server.ServerModule;
+import org.waveprotocol.box.server.authentication.AccountStoreHolder;
+import org.waveprotocol.box.server.authentication.SessionManager;
+import org.waveprotocol.box.server.frontend.ClientFrontend;
+import org.waveprotocol.box.server.frontend.ClientFrontendImpl;
+import org.waveprotocol.box.server.frontend.WaveClientRpcImpl;
+import org.waveprotocol.box.server.persistence.AccountStore;
+import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.persistence.PersistenceModule;
+import org.waveprotocol.box.server.persistence.SignerInfoStore;
+import org.waveprotocol.box.server.robots.RobotApiModule;
+import org.waveprotocol.box.server.robots.RobotRegistrationServlet;
+import org.waveprotocol.box.server.robots.active.ActiveApiServlet;
+import org.waveprotocol.box.server.robots.dataapi.DataApiOAuthServlet;
+import org.waveprotocol.box.server.robots.dataapi.DataApiServlet;
+import org.waveprotocol.box.server.robots.passive.RobotsGateway;
+import org.waveprotocol.box.server.rpc.AttachmentServlet;
+import org.waveprotocol.box.server.rpc.AuthenticationServlet;
+import org.waveprotocol.box.server.rpc.FetchServlet;
+import org.waveprotocol.box.server.rpc.SignOutServlet;
+import org.waveprotocol.box.server.rpc.UserRegistrationServlet;
+import org.waveprotocol.box.server.rpc.WaveClientServlet;
+import org.waveprotocol.box.server.waveserver.WaveBus;
+import org.waveprotocol.box.server.waveserver.WaveServerException;
+import org.waveprotocol.box.server.waveserver.WaveletProvider;
+import org.waveprotocol.wave.crypto.CertPathStore;
+import org.waveprotocol.wave.federation.FederationSettings;
+import org.waveprotocol.wave.federation.FederationTransport;
+import org.waveprotocol.wave.federation.noop.NoOpFederationModule;
+import org.waveprotocol.wave.federation.xmpp.XmppFederationModule;
+import org.waveprotocol.wave.model.version.HashedVersionFactory;
+import org.waveprotocol.wave.util.logging.Log;
+import org.waveprotocol.wave.util.settings.SettingsBinder;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+
+/**
+ * Wave Server entrypoint.
+ */
+public class WaveMain {
+
+    private static final Log LOG = Log.get(WaveMain.class);
+
+    /**
+     * This is the name of the system property used to find the server config
+     * file.
+     */
+    private static final String PROPERTIES_FILE_KEY = "wave.server.config";
+
+    private static Module buildFederationModule(final Injector settingsInjector, final boolean enableFederation)
+            throws ConfigurationException {
+        Module federationModule;
+        if (enableFederation) {
+            federationModule = settingsInjector.getInstance(XmppFederationModule.class);
+        } else {
+            federationModule = settingsInjector.getInstance(NoOpFederationModule.class);
+        }
+        return federationModule;
+    }
+
+    private static void initializeFederation(final Injector injector) {
+        final FederationTransport federationManager = injector.getInstance(FederationTransport.class);
+        federationManager.startFederation();
+    }
+
+    private static void initializeFrontend(final Injector injector, final CustomServerRpcProvider server,
+            final WaveBus waveBus) throws WaveServerException {
+        final HashedVersionFactory hashFactory = injector.getInstance(HashedVersionFactory.class);
+        final WaveletProvider provider = injector.getInstance(WaveletProvider.class);
+        final ClientFrontend frontend = ClientFrontendImpl.create(hashFactory, provider, waveBus);
+
+        final ProtocolWaveClientRpc.Interface rpcImpl = WaveClientRpcImpl.create(frontend, false);
+        server.registerService(ProtocolWaveClientRpc.newReflectiveService(rpcImpl));
+    }
+
+    private static void initializeRobots(final Injector injector, final WaveBus waveBus) {
+        final RobotsGateway robotsGateway = injector.getInstance(RobotsGateway.class);
+        waveBus.subscribe(robotsGateway);
+    }
+
+    private static void initializeServer(final Injector injector) throws PersistenceException, WaveServerException {
+        final AccountStore accountStore = injector.getInstance(AccountStore.class);
+        accountStore.initializeAccountStore();
+        AccountStoreHolder.init(accountStore,
+                injector.getInstance(Key.get(String.class, Names.named(CoreSettings.WAVE_SERVER_DOMAIN))));
+
+        // Initialize the SignerInfoStore.
+        final CertPathStore certPathStore = injector.getInstance(CertPathStore.class);
+        if (certPathStore instanceof SignerInfoStore) {
+            ((SignerInfoStore) certPathStore).initializeSignerInfoStore();
+        }
+
+        // Initialize the server.
+        final WaveletProvider waveServer = injector.getInstance(WaveletProvider.class);
+        waveServer.initialize();
+    }
+
+    private static void initializeServlets(final Injector injector, final CustomServerRpcProvider server) {
+        server.addServlet("/attachment/*", injector.getInstance(AttachmentServlet.class));
+
+        server.addServlet(SessionManager.SIGN_IN_URL, injector.getInstance(AuthenticationServlet.class));
+        server.addServlet("/auth/signout", injector.getInstance(SignOutServlet.class));
+        server.addServlet("/auth/register", injector.getInstance(UserRegistrationServlet.class));
+
+        server.addServlet("/fetch/*", injector.getInstance(FetchServlet.class));
+
+        server.addServlet("/robot/dataapi", injector.getInstance(DataApiServlet.class));
+        server.addServlet(DataApiOAuthServlet.DATA_API_OAUTH_PATH + "/*",
+                injector.getInstance(DataApiOAuthServlet.class));
+        server.addServlet("/robot/dataapi/rpc", injector.getInstance(DataApiServlet.class));
+        server.addServlet("/robot/register/*", injector.getInstance(RobotRegistrationServlet.class));
+        server.addServlet("/robot/rpc", injector.getInstance(ActiveApiServlet.class));
+
+        final String gadgetServerHostname = injector.getInstance(Key.get(String.class,
+                Names.named(CoreSettings.GADGET_SERVER_HOSTNAME)));
+        final ProxyServlet.Transparent proxyServlet = new ProxyServlet.Transparent("/gadgets", "http",
+                gadgetServerHostname, injector.getInstance(Key.get(int.class,
+                        Names.named(CoreSettings.GADGET_SERVER_PORT))), "/gadgets");
+        final ServletHolder proxyServletHolder = server.addServlet("/gadgets/*", proxyServlet);
+        proxyServletHolder.setInitParameter("HostHeader", gadgetServerHostname);
+
+        server.addServlet("/", injector.getInstance(WaveClientServlet.class));
+    }
+
+    public static void main(final String... args) {
+        try {
+            final Module coreSettings = SettingsBinder.bindSettings(PROPERTIES_FILE_KEY, CoreSettings.class);
+            run(coreSettings);
+            return;
+        } catch (final PersistenceException e) {
+            LOG.severe("PersistenceException when running server:", e);
+        } catch (final ConfigurationException e) {
+            LOG.severe("ConfigurationException when running server:", e);
+        } catch (final WaveServerException e) {
+            LOG.severe("WaveServerException when running server:", e);
+        }
+    }
+
+    public static void run(final Module coreSettings) throws PersistenceException, ConfigurationException,
+            WaveServerException {
+        Injector settingsInjector = Guice.createInjector(coreSettings);
+        final boolean enableFederation = settingsInjector.getInstance(Key.get(Boolean.class,
+                Names.named(CoreSettings.ENABLE_FEDERATION)));
+
+        if (enableFederation) {
+            final Module federationSettings = SettingsBinder.bindSettings(PROPERTIES_FILE_KEY, FederationSettings.class);
+            // This MUST happen first, or bindings will fail if federation is
+            // enabled.
+            settingsInjector = settingsInjector.createChildInjector(federationSettings);
+        }
+
+        final Module federationModule = buildFederationModule(settingsInjector, enableFederation);
+        final PersistenceModule persistenceModule = settingsInjector.getInstance(PersistenceModule.class);
+        final Injector injector = settingsInjector.createChildInjector(new ServerModule(enableFederation),
+                new RobotApiModule(), federationModule, persistenceModule);
+
+        final CustomServerRpcProvider server = injector.getInstance(CustomServerRpcProvider.class);
+        final WaveBus waveBus = injector.getInstance(WaveBus.class);
+
+        initializeServer(injector);
+        initializeServlets(injector, server);
+        initializeRobots(injector, waveBus);
+        initializeFrontend(injector, server, waveBus);
+        initializeFederation(injector);
+
+        LOG.info("Starting server");
+        server.startWebSocketServer(injector);
+    }
+}




More information about the kune-commits mailing list