[kune-commits] r1250 - branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server
Vicente J. Ruiz Jurado
vjrj_ at ourproject.org
Sat Feb 12 03:19:16 CET 2011
Author: vjrj_
Date: 2011-02-12 03:19:16 +0100 (Sat, 12 Feb 2011)
New Revision: 1250
Added:
branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java
branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java
branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java
Log:
some wave startup testing
Added: branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java
===================================================================
--- branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java (rev 0)
+++ branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/CustomServerRpcProvider.java 2011-02-12 02:19:16 UTC (rev 1250)
@@ -0,0 +1,507 @@
+package cc.kune.wave.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.servlet.Servlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.ourproject.kune.rack.RackServletFilter;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult;
+import org.waveprotocol.box.server.CoreSettings;
+import org.waveprotocol.box.server.authentication.SessionManager;
+import org.waveprotocol.box.server.rpc.BaseUrlHelper;
+import org.waveprotocol.box.server.rpc.MessageExpectingChannel;
+import org.waveprotocol.box.server.rpc.ProtoCallback;
+import org.waveprotocol.box.server.rpc.Rpc;
+import org.waveprotocol.box.server.rpc.ServerRpcController;
+import org.waveprotocol.box.server.rpc.SocketIOServerChannel;
+import org.waveprotocol.box.server.rpc.WebSocketServerChannel;
+import org.waveprotocol.box.server.util.NetUtils;
+import org.waveprotocol.wave.model.util.Pair;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+import org.waveprotocol.wave.util.logging.Log;
+
+import com.glines.socketio.server.SocketIOInbound;
+import com.glines.socketio.server.SocketIOServlet;
+import com.glines.socketio.server.transport.FlashSocketTransport;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.name.Named;
+import com.google.inject.servlet.GuiceFilter;
+import com.google.inject.servlet.GuiceServletContextListener;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
+import com.google.protobuf.UnknownFieldSet;
+
+/**
+ * ServerRpcProvider can provide instances of type Service over an incoming
+ * network socket and service incoming RPCs to these services and their methods.
+ *
+ *
+ */
+public class CustomServerRpcProvider {
+ abstract class Connection implements ProtoCallback {
+ private final Map<Long, ServerRpcController> activeRpcs = new ConcurrentHashMap<Long, ServerRpcController>();
+
+ // The logged in user.
+ // Note: Due to this bug:
+ // http://code.google.com/p/wave-protocol/issues/detail?id=119,
+ // the field may be null on first connect and then set later using an
+ // RPC.
+ private ParticipantId loggedInUser;
+
+ /**
+ * @param loggedInUser
+ * The currently logged in user, or null if no user is logged
+ * in.
+ */
+ public Connection(final ParticipantId loggedInUser) {
+ this.loggedInUser = loggedInUser;
+ }
+
+ private ParticipantId authenticate(final String token) {
+ final HttpSession session = sessionManager.getSessionFromToken(token);
+ final ParticipantId user = sessionManager.getLoggedInUser(session);
+ return user;
+ }
+
+ protected void expectMessages(final MessageExpectingChannel channel) {
+ synchronized (registeredServices) {
+ for (final RegisteredServiceMethod serviceMethod : registeredServices.values()) {
+ channel.expectMessage(serviceMethod.service.getRequestPrototype(serviceMethod.method));
+ LOG.fine("Expecting: " + serviceMethod.method.getFullName());
+ }
+ }
+ channel.expectMessage(Rpc.CancelRpc.getDefaultInstance());
+ }
+
+ @Override
+ public void message(final long sequenceNo, final Message message) {
+ if (message instanceof Rpc.CancelRpc) {
+ final ServerRpcController controller = activeRpcs.get(sequenceNo);
+ if (controller == null) {
+ throw new IllegalStateException("Trying to cancel an RPC that is not active!");
+ } else {
+ LOG.info("Cancelling open RPC " + sequenceNo);
+ controller.cancel();
+ }
+ } else if (message instanceof ProtocolAuthenticate) {
+ // Workaround for bug:
+ // http://codereview.waveprotocol.org/224001/
+
+ // When we get this message, either the connection will not be
+ // logged in
+ // (loggedInUser == null) or the connection will have been
+ // authenticated
+ // via cookies
+ // (in which case loggedInUser must match the authenticated
+ // user, and
+ // this message has no
+ // effect).
+
+ final ProtocolAuthenticate authMessage = (ProtocolAuthenticate) message;
+ final ParticipantId authenticatedAs = authenticate(authMessage.getToken());
+
+ Preconditions.checkArgument(authenticatedAs != null, "Auth token invalid");
+ Preconditions.checkState(loggedInUser == null || loggedInUser.equals(authenticatedAs),
+ "Session already authenticated as a different user");
+
+ loggedInUser = authenticatedAs;
+ LOG.info("Session authenticated as " + loggedInUser);
+ sendMessage(sequenceNo, ProtocolAuthenticationResult.getDefaultInstance());
+ } else if (registeredServices.containsKey(message.getDescriptorForType())) {
+ if (activeRpcs.containsKey(sequenceNo)) {
+ throw new IllegalStateException("Can't invoke a new RPC with a sequence number already in use.");
+ } else {
+ final RegisteredServiceMethod serviceMethod = registeredServices.get(message.getDescriptorForType());
+
+ // Create the internal ServerRpcController used to invoke
+ // the call.
+ final ServerRpcController controller = new ServerRpcControllerImpl(message, serviceMethod.service,
+ serviceMethod.method, loggedInUser, new RpcCallback<Message>() {
+ @Override
+ synchronized public void run(final Message message) {
+ if (message instanceof Rpc.RpcFinished
+ || !serviceMethod.method.getOptions().getExtension(Rpc.isStreamingRpc)) {
+ // This RPC is over - remove it from the
+ // map.
+ final boolean failed = message instanceof Rpc.RpcFinished ? ((Rpc.RpcFinished) message).getFailed()
+ : false;
+ LOG.fine("RPC " + sequenceNo + " is now finished, failed = " + failed);
+ if (failed) {
+ LOG.info("error = " + ((Rpc.RpcFinished) message).getErrorText());
+ }
+ activeRpcs.remove(sequenceNo);
+ }
+ sendMessage(sequenceNo, message);
+ }
+ });
+
+ // Kick off a new thread specific to this RPC.
+ activeRpcs.put(sequenceNo, controller);
+ threadPool.execute(controller);
+ }
+ } else {
+ // Sent a message type we understand, but don't expect -
+ // erronous case!
+ throw new IllegalStateException("Got expected but unknown message (" + message + ") for sequence: "
+ + sequenceNo);
+ }
+ }
+
+ protected abstract void sendMessage(long sequenceNo, Message message);
+
+ @Override
+ public void unknown(final long sequenceNo, final String messageType, final String message) {
+ throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+ + ") for sequence: " + sequenceNo);
+ }
+
+ @Override
+ public void unknown(final long sequenceNo, final String messageType, final UnknownFieldSet message) {
+ throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+ + ") for sequence: " + sequenceNo);
+ }
+ }
+
+ /**
+ * Internal, static container class for any specific registered service
+ * method.
+ */
+ static class RegisteredServiceMethod {
+ final MethodDescriptor method;
+ final Service service;
+
+ RegisteredServiceMethod(final Service service, final MethodDescriptor method) {
+ this.service = service;
+ this.method = method;
+ }
+ }
+ class SocketIOConnection extends Connection {
+ private final SocketIOServerChannel socketChannel;
+
+ SocketIOConnection(final ParticipantId loggedInUser) {
+ super(loggedInUser);
+ socketChannel = new SocketIOServerChannel(this);
+ LOG.info("New websocket connection set up for user " + loggedInUser);
+ expectMessages(socketChannel);
+ }
+
+ public SocketIOServerChannel getWebSocketServerChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ protected void sendMessage(final long sequenceNo, final Message message) {
+ socketChannel.sendMessage(sequenceNo, message);
+ }
+ }
+ public class WaveSocketIOServlet extends SocketIOServlet {
+ @Override
+ protected SocketIOInbound doSocketIOConnect(final HttpServletRequest request, final String[] protocols) {
+ final ParticipantId loggedInUser = sessionManager.getLoggedInUser(request.getSession(false));
+
+ final SocketIOConnection connection = new SocketIOConnection(loggedInUser);
+ return connection.getWebSocketServerChannel();
+ }
+ }
+ public class WaveWebSocketServlet extends WebSocketServlet {
+ @Override
+ protected WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) {
+ final ParticipantId loggedInUser = sessionManager.getLoggedInUser(request.getSession(false));
+
+ final WebSocketConnection connection = new WebSocketConnection(loggedInUser);
+ return connection.getWebSocketServerChannel();
+ }
+ }
+ class WebSocketConnection extends Connection {
+ private final WebSocketServerChannel socketChannel;
+
+ WebSocketConnection(final ParticipantId loggedInUser) {
+ super(loggedInUser);
+ socketChannel = new WebSocketServerChannel(this);
+ LOG.info("New websocket connection set up for user " + loggedInUser);
+ expectMessages(socketChannel);
+ }
+
+ public WebSocketServerChannel getWebSocketServerChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ protected void sendMessage(final long sequenceNo, final Message message) {
+ socketChannel.sendMessage(sequenceNo, message);
+ }
+ }
+ private static final Log LOG = Log.get(CustomServerRpcProvider.class);
+
+ private static InetSocketAddress[] parseAddressList(final List<String> addressList) {
+ if (addressList == null || addressList.size() == 0) {
+ return new InetSocketAddress[0];
+ } else {
+ final Set<InetSocketAddress> addresses = Sets.newHashSet();
+ for (final String str : addressList) {
+ if (str.length() == 0) {
+ LOG.warning("Encountered empty address in http addresses list.");
+ } else {
+ try {
+ final InetSocketAddress address = NetUtils.parseHttpAddress(str);
+ if (!addresses.contains(address)) {
+ addresses.add(address);
+ } else {
+ LOG.warning("Ignoring duplicate address in http addresses list: Duplicate entry '" + str
+ + "' resolved to " + address.getAddress().getHostAddress());
+ }
+ } catch (final IOException e) {
+ LOG.severe("Unable to process address " + str, e);
+ }
+ }
+ }
+ return addresses.toArray(new InetSocketAddress[0]);
+ }
+ }
+ private final String baseUrl;
+
+ private final Integer flashsocketPolicyPort;
+
+ private final InetSocketAddress[] httpAddresses;
+
+ private Server httpServer = null;
+
+ private final Set<Connection> incomingConnections = Sets.newHashSet();
+
+ private final org.eclipse.jetty.server.SessionManager jettySessionManager;
+
+ // Mapping from incoming protocol buffer type -> specific handler.
+ private final Map<Descriptors.Descriptor, RegisteredServiceMethod> registeredServices = Maps.newHashMap();
+
+ private final String resourceBase;
+
+ /**
+ * Set of servlets
+ */
+ List<Pair<String, ServletHolder>> servletRegistry = Lists.newArrayList();
+
+ private final SessionManager sessionManager;
+
+ private final ExecutorService threadPool;
+
+ /**
+ * Construct a new ServerRpcProvider, hosting on the specified WebSocket
+ * addresses.
+ *
+ * Also accepts an ExecutorService for spawning managing threads.
+ */
+ public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+ final String baseUrl, final String resourceBase, final ExecutorService threadPool,
+ final SessionManager sessionManager, final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+ this.httpAddresses = httpAddresses;
+ this.flashsocketPolicyPort = flashsocketPolicyPort;
+ this.baseUrl = BaseUrlHelper.removeLastSlash(baseUrl);
+ this.resourceBase = resourceBase;
+ this.threadPool = threadPool;
+ this.sessionManager = sessionManager;
+ this.jettySessionManager = jettySessionManager;
+ }
+
+ /**
+ * Constructs a new ServerRpcProvider with a default ExecutorService.
+ */
+ public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+ final String baseUrl, final String resourceBase, final SessionManager sessionManager,
+ final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+ this(httpAddresses, flashsocketPolicyPort, baseUrl, resourceBase, Executors.newCachedThreadPool(),
+ sessionManager, jettySessionManager);
+ }
+
+ @Inject
+ public CustomServerRpcProvider(@Named(CoreSettings.HTTP_FRONTEND_ADDRESSES) final List<String> httpAddresses,
+ @Named(CoreSettings.FLASHSOCKET_POLICY_PORT) final Integer flashsocketPolicyPort,
+ @Named(CoreSettings.HTTP_BASE_URL) final String baseUrl,
+ @Named(CoreSettings.RESOURCE_BASE) final String resourceBase, final SessionManager sessionManager,
+ final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+ this(parseAddressList(httpAddresses), flashsocketPolicyPort, baseUrl, resourceBase, sessionManager,
+ jettySessionManager);
+ }
+
+ /**
+ * Add a servlet to the servlet registry. This servlet will be attached to
+ * the specified URL pattern when the server is started up.
+ *
+ * @param urlPattern
+ * URL pattern for paths. Eg, '/foo', '/foo/*'
+ * @param servlet
+ * The servlet object to bind to the specified paths
+ * @return the {@link ServletHolder} that holds the servlet.
+ */
+ public ServletHolder addServlet(final String urlPattern, final Servlet servlet) {
+ final ServletHolder servletHolder = new ServletHolder(servlet);
+ servletRegistry.add(new Pair<String, ServletHolder>(urlPattern, servletHolder));
+ return servletHolder;
+ }
+
+ /**
+ * @return a list of {@link SelectChannelConnector} each bound to a
+ * host:port pair form the list addresses.
+ */
+ private List<SelectChannelConnector> getSelectChannelConnectors(final InetSocketAddress[] httpAddresses) {
+ final List<SelectChannelConnector> list = Lists.newArrayList();
+ for (final InetSocketAddress address : httpAddresses) {
+ final SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setHost(address.getAddress().getHostAddress());
+ connector.setPort(address.getPort());
+ list.add(connector);
+ }
+
+ return list;
+ }
+
+ /**
+ * Returns the socket the WebSocket server is listening on.
+ */
+ public SocketAddress getWebSocketAddress() {
+ if (httpServer == null) {
+ return null;
+ } else {
+ final Connector c = httpServer.getConnectors()[0];
+ return new InetSocketAddress(c.getHost(), c.getLocalPort());
+ }
+ }
+
+ /**
+ * Register all methods provided by the given service type.
+ */
+ public void registerService(final Service service) {
+ synchronized (registeredServices) {
+ for (final MethodDescriptor methodDescriptor : service.getDescriptorForType().getMethods()) {
+ registeredServices.put(methodDescriptor.getInputType(), new RegisteredServiceMethod(service,
+ methodDescriptor));
+ }
+ }
+ }
+
+ public void startWebSocketServer(final Injector injector) {
+ httpServer = new Server();
+
+ final List<SelectChannelConnector> connectors = getSelectChannelConnectors(httpAddresses);
+ if (connectors.isEmpty()) {
+ LOG.severe("No valid http end point address provided!");
+ }
+ for (final SelectChannelConnector connector : connectors) {
+ httpServer.addConnector(connector);
+ }
+
+ final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ if (jettySessionManager != null) {
+ context.getSessionHandler().setSessionManager(jettySessionManager);
+ }
+ context.setResourceBase(resourceBase);
+ context.setContextPath(baseUrl);
+ context.addEventListener(new GuiceServletContextListener() {
+ @Override
+ protected Injector getInjector() {
+ return injector;
+ }
+ });
+ context.setAttribute(RackServletFilter.INJECTOR_PARENT_ATTRIBUTE, injector);
+ context.addFilter(GuiceFilter.class, "/*", 0);
+ final FilterHolder filterHolder = new FilterHolder(RackServletFilter.class);
+ filterHolder.setInitParameter("org.ourproject.kune.rack.RackModule",
+ "org.ourproject.kune.app.server.KuneRackModule");
+ context.addFilter(filterHolder, "/ws*", 0);
+
+ // Servlet where the websocket connection is served from.
+ final ServletHolder wsholder = new ServletHolder(new WaveWebSocketServlet());
+ context.addServlet(wsholder, "/socket");
+ // TODO(zamfi): fix to let messages span frames.
+ wsholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+
+ // Servlet where the websocket connection is served from.
+ final ServletHolder sioholder = new ServletHolder(new WaveSocketIOServlet());
+ context.addServlet(sioholder, "/socket.io/*");
+ // TODO(zamfi): fix to let messages span frames.
+ sioholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+ // Set flash policy server parameters
+ String flashPolicyServerHost = "localhost";
+ final StringBuilder flashPolicyAllowedPorts = new StringBuilder();
+ /*
+ * Loop through addresses, collect list of ports, and determine if we
+ * are to use "localhost" of the AnyHost wildcard.
+ */
+ for (final InetSocketAddress addr : httpAddresses) {
+ if (flashPolicyAllowedPorts.length() > 0) {
+ flashPolicyAllowedPorts.append(",");
+ }
+ flashPolicyAllowedPorts.append(addr.getPort());
+ if (!addr.getAddress().isLoopbackAddress()) {
+ // Until it's possible to pass a list of address, this is the
+ // only valid alternative.
+ flashPolicyServerHost = "0.0.0.0";
+ }
+ }
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_HOST_KEY, flashPolicyServerHost);
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_PORT_KEY, "" + flashsocketPolicyPort);
+ // TODO: Change to use the public http address and all other bound
+ // addresses.
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_DOMAIN_KEY, "*");
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_PORTS_KEY, flashPolicyAllowedPorts.toString());
+
+ // Serve the static content and GWT web client with the default servlet
+ // (acts like a standard file-based web server).
+ final ServletHolder defaultServlet = new ServletHolder(new DefaultServlet());
+ context.addServlet(defaultServlet, "/static/*");
+ context.addServlet(defaultServlet, "/webclient/*");
+
+ for (final Pair<String, ServletHolder> servlet : servletRegistry) {
+ context.addServlet(servlet.getSecond(), servlet.getFirst());
+ }
+
+ httpServer.setHandler(context);
+
+ try {
+ httpServer.start();
+ } catch (final Exception e) { // yes, .start() throws "Exception"
+ LOG.severe("Fatal error starting http server.", e);
+ return;
+ }
+ LOG.fine("WebSocket server running. --------");
+ }
+
+ /**
+ * Stops this server.
+ */
+ public void stopServer() throws IOException {
+ try {
+ httpServer.stop(); // yes, .stop() throws "Exception"
+ } catch (final Exception e) {
+ LOG.warning("Fatal error stopping http server.", e);
+ }
+ LOG.fine("server shutdown.");
+ }
+}
Added: branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java
===================================================================
--- branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java (rev 0)
+++ branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/ServerRpcControllerImpl.java 2011-02-12 02:19:16 UTC (rev 1250)
@@ -0,0 +1,226 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package cc.kune.wave.server;
+
+import org.waveprotocol.box.server.rpc.Rpc;
+import org.waveprotocol.box.server.rpc.ServerRpcController;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
+
+/**
+ * Implements the server end-point of a wave server RPC connection. This is a
+ * single-use RPC controller.
+ *
+ *
+ */
+public class ServerRpcControllerImpl implements ServerRpcController {
+ private final Service backingService;
+ private final RpcCallback<Message> callback;
+ private RpcCallback<Object> cancelCallback = null;
+ private boolean cancelled = false;
+ private boolean complete = false;
+
+ private final boolean isStreamingRpc;
+ private final ParticipantId loggedInUser;
+ private final Message requestMessage;
+ private final Descriptors.MethodDescriptor serviceMethod;
+ // The following variables represent the current status of this instance,
+ // and
+ // must all only be accessed or modified while synchronised on statusLock.
+ private final Object statusLock = new Object();
+
+ /**
+ * Instantiate a new ServerRpcController that may later be completely
+ * invoked by calling {#link run}.
+ *
+ * @param requestMessage
+ * the request being handled
+ * @param backingService
+ * the backing service type
+ * @param serviceMethod
+ * the specific method within the backing service type
+ * @param loggedInUser
+ * the currently logged in user
+ * @param callback
+ * the destination where responses may be passed - may be called
+ * once (normal RPC) or 1-n times (streaming RPC), and will pass
+ * instances of RpcFinished as required (error cases, or
+ * streaming RPC shutdown); is also always called under the
+ * ServerRpcController's statusLock to ensure that consecutive
+ * calls (in the streaming case) are called in series
+ */
+ ServerRpcControllerImpl(final Message requestMessage, final Service backingService,
+ final Descriptors.MethodDescriptor serviceMethod, final ParticipantId loggedInUser,
+ final RpcCallback<Message> callback) {
+ this.requestMessage = requestMessage;
+ this.backingService = backingService;
+ this.serviceMethod = serviceMethod;
+ this.loggedInUser = loggedInUser;
+ this.isStreamingRpc = serviceMethod.getOptions().getExtension(Rpc.isStreamingRpc);
+ this.callback = callback;
+ }
+
+ @Override
+ public void cancel() {
+ RpcCallback<Object> runCallback = null;
+ synchronized (statusLock) {
+ if (cancelled) {
+ throw new IllegalStateException("Can't cancel RPC, already cancelled.");
+ }
+ cancelled = true;
+ if (cancelCallback != null && !complete) {
+ runCallback = cancelCallback;
+ }
+ }
+ if (runCallback != null) {
+ runCallback.run(null);
+ }
+ }
+
+ @Override
+ public String errorText() {
+ throw new UnsupportedOperationException("Client-side method of RpcController only.");
+ }
+
+ @Override
+ public boolean failed() {
+ throw new UnsupportedOperationException("Client-side method of RpcController only.");
+ }
+
+ @Override
+ public ParticipantId getLoggedInUser() {
+ return loggedInUser;
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return cancelled;
+ }
+
+ /**
+ * Registers a cancellation callback. This will always be called as part of
+ * this RPC, and always at most once; either when the client asks to cancel
+ * it, or when the RPC finishes (regardless of error case).
+ *
+ * This callback will be called outside normal locks on ServerRpcController
+ * state, i.e., not within a block synchronised on statusLock.
+ */
+ @Override
+ public void notifyOnCancel(final RpcCallback<Object> callback) {
+ RpcCallback<Object> runCallback = null;
+ synchronized (statusLock) {
+ if (cancelCallback != null) {
+ throw new IllegalStateException("Must only be called once per request.");
+ } else {
+ cancelCallback = callback;
+ if (cancelled || complete) {
+ runCallback = cancelCallback;
+ }
+ }
+ }
+ if (runCallback != null) {
+ runCallback.run(null);
+ }
+ }
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException("Client-side method of RpcController only.");
+ }
+
+ /**
+ * Run this ServerRpcController in the current thread. This must only be
+ * invoked ONCE, and will throw an IllegalStateException otherwise.
+ */
+ @Override
+ public void run() {
+ final RpcCallback<Message> messageCallback = new RpcCallback<Message>() {
+ @Override
+ public void run(Message result) { // NOPMD by vjrj on 12/02/11 0:16
+ RpcCallback<Object> runCallback = null;
+ synchronized (statusLock) {
+ if (complete) {
+ throw new IllegalStateException("Can't send responses over this RPC, as it is"
+ + " already complete: " + result);
+ }
+ if (!isStreamingRpc || result == null) {
+ // This either completes the streaming RPC (by passing
+ // an instance
+ // of RpcFinished in place of null) or completes a
+ // normal RPC (by
+ // passing any other message).
+ if (result == null) {
+ result = Rpc.RpcFinished.newBuilder().setFailed(false).build();
+ }
+ callback.run(result);
+
+ // Now complete, mark as such and invoke the
+ // cancellation callback.
+ complete = true;
+ if (cancelCallback != null && !cancelled) {
+ runCallback = cancelCallback;
+ }
+ } else {
+ // Streaming RPC update.
+ callback.run(result);
+ }
+ }
+ if (runCallback != null) {
+ runCallback.run(null);
+ }
+ }
+ };
+ try {
+ backingService.callMethod(serviceMethod, this, requestMessage, messageCallback);
+ } catch (final RuntimeException e) {
+ // Pass the description of any RuntimeException back to the caller.
+ e.printStackTrace();
+ if (!complete) {
+ setFailed(e.toString());
+ }
+ }
+ }
+
+ @Override
+ public void setFailed(final String reason) {
+ RpcCallback<Object> runCallback = null;
+ synchronized (statusLock) {
+ if (complete) {
+ throw new IllegalStateException("Can't fail this RPC, as it is already complete.");
+ } else {
+ complete = true;
+ callback.run(Rpc.RpcFinished.newBuilder().setFailed(true).setErrorText(reason).build());
+ if (cancelCallback != null && !cancelled) {
+ runCallback = cancelCallback;
+ }
+ }
+ }
+ if (runCallback != null) {
+ runCallback.run(null);
+ }
+ }
+
+ @Override
+ public void startCancel() {
+ throw new UnsupportedOperationException("Client-side method of RpcController only.");
+ }
+}
Added: branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java
===================================================================
--- branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java (rev 0)
+++ branches/wave-jetty-embed-guice/src/main/java/cc/kune/wave/server/WaveMain.java 2011-02-12 02:19:16 UTC (rev 1250)
@@ -0,0 +1,197 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package cc.kune.wave.server;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.ProxyServlet;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolWaveClientRpc;
+import org.waveprotocol.box.server.CoreSettings;
+import org.waveprotocol.box.server.ServerModule;
+import org.waveprotocol.box.server.authentication.AccountStoreHolder;
+import org.waveprotocol.box.server.authentication.SessionManager;
+import org.waveprotocol.box.server.frontend.ClientFrontend;
+import org.waveprotocol.box.server.frontend.ClientFrontendImpl;
+import org.waveprotocol.box.server.frontend.WaveClientRpcImpl;
+import org.waveprotocol.box.server.persistence.AccountStore;
+import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.persistence.PersistenceModule;
+import org.waveprotocol.box.server.persistence.SignerInfoStore;
+import org.waveprotocol.box.server.robots.RobotApiModule;
+import org.waveprotocol.box.server.robots.RobotRegistrationServlet;
+import org.waveprotocol.box.server.robots.active.ActiveApiServlet;
+import org.waveprotocol.box.server.robots.dataapi.DataApiOAuthServlet;
+import org.waveprotocol.box.server.robots.dataapi.DataApiServlet;
+import org.waveprotocol.box.server.robots.passive.RobotsGateway;
+import org.waveprotocol.box.server.rpc.AttachmentServlet;
+import org.waveprotocol.box.server.rpc.AuthenticationServlet;
+import org.waveprotocol.box.server.rpc.FetchServlet;
+import org.waveprotocol.box.server.rpc.SignOutServlet;
+import org.waveprotocol.box.server.rpc.UserRegistrationServlet;
+import org.waveprotocol.box.server.rpc.WaveClientServlet;
+import org.waveprotocol.box.server.waveserver.WaveBus;
+import org.waveprotocol.box.server.waveserver.WaveServerException;
+import org.waveprotocol.box.server.waveserver.WaveletProvider;
+import org.waveprotocol.wave.crypto.CertPathStore;
+import org.waveprotocol.wave.federation.FederationSettings;
+import org.waveprotocol.wave.federation.FederationTransport;
+import org.waveprotocol.wave.federation.noop.NoOpFederationModule;
+import org.waveprotocol.wave.federation.xmpp.XmppFederationModule;
+import org.waveprotocol.wave.model.version.HashedVersionFactory;
+import org.waveprotocol.wave.util.logging.Log;
+import org.waveprotocol.wave.util.settings.SettingsBinder;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+
+/**
+ * Wave Server entrypoint.
+ */
+public class WaveMain {
+
+ private static final Log LOG = Log.get(WaveMain.class);
+
+ /**
+ * This is the name of the system property used to find the server config
+ * file.
+ */
+ private static final String PROPERTIES_FILE_KEY = "wave.server.config";
+
+ private static Module buildFederationModule(final Injector settingsInjector, final boolean enableFederation)
+ throws ConfigurationException {
+ Module federationModule;
+ if (enableFederation) {
+ federationModule = settingsInjector.getInstance(XmppFederationModule.class);
+ } else {
+ federationModule = settingsInjector.getInstance(NoOpFederationModule.class);
+ }
+ return federationModule;
+ }
+
+ private static void initializeFederation(final Injector injector) {
+ final FederationTransport federationManager = injector.getInstance(FederationTransport.class);
+ federationManager.startFederation();
+ }
+
+ private static void initializeFrontend(final Injector injector, final CustomServerRpcProvider server,
+ final WaveBus waveBus) throws WaveServerException {
+ final HashedVersionFactory hashFactory = injector.getInstance(HashedVersionFactory.class);
+ final WaveletProvider provider = injector.getInstance(WaveletProvider.class);
+ final ClientFrontend frontend = ClientFrontendImpl.create(hashFactory, provider, waveBus);
+
+ final ProtocolWaveClientRpc.Interface rpcImpl = WaveClientRpcImpl.create(frontend, false);
+ server.registerService(ProtocolWaveClientRpc.newReflectiveService(rpcImpl));
+ }
+
+ private static void initializeRobots(final Injector injector, final WaveBus waveBus) {
+ final RobotsGateway robotsGateway = injector.getInstance(RobotsGateway.class);
+ waveBus.subscribe(robotsGateway);
+ }
+
+ private static void initializeServer(final Injector injector) throws PersistenceException, WaveServerException {
+ final AccountStore accountStore = injector.getInstance(AccountStore.class);
+ accountStore.initializeAccountStore();
+ AccountStoreHolder.init(accountStore,
+ injector.getInstance(Key.get(String.class, Names.named(CoreSettings.WAVE_SERVER_DOMAIN))));
+
+ // Initialize the SignerInfoStore.
+ final CertPathStore certPathStore = injector.getInstance(CertPathStore.class);
+ if (certPathStore instanceof SignerInfoStore) {
+ ((SignerInfoStore) certPathStore).initializeSignerInfoStore();
+ }
+
+ // Initialize the server.
+ final WaveletProvider waveServer = injector.getInstance(WaveletProvider.class);
+ waveServer.initialize();
+ }
+
+ private static void initializeServlets(final Injector injector, final CustomServerRpcProvider server) {
+ server.addServlet("/attachment/*", injector.getInstance(AttachmentServlet.class));
+
+ server.addServlet(SessionManager.SIGN_IN_URL, injector.getInstance(AuthenticationServlet.class));
+ server.addServlet("/auth/signout", injector.getInstance(SignOutServlet.class));
+ server.addServlet("/auth/register", injector.getInstance(UserRegistrationServlet.class));
+
+ server.addServlet("/fetch/*", injector.getInstance(FetchServlet.class));
+
+ server.addServlet("/robot/dataapi", injector.getInstance(DataApiServlet.class));
+ server.addServlet(DataApiOAuthServlet.DATA_API_OAUTH_PATH + "/*",
+ injector.getInstance(DataApiOAuthServlet.class));
+ server.addServlet("/robot/dataapi/rpc", injector.getInstance(DataApiServlet.class));
+ server.addServlet("/robot/register/*", injector.getInstance(RobotRegistrationServlet.class));
+ server.addServlet("/robot/rpc", injector.getInstance(ActiveApiServlet.class));
+
+ final String gadgetServerHostname = injector.getInstance(Key.get(String.class,
+ Names.named(CoreSettings.GADGET_SERVER_HOSTNAME)));
+ final ProxyServlet.Transparent proxyServlet = new ProxyServlet.Transparent("/gadgets", "http",
+ gadgetServerHostname, injector.getInstance(Key.get(int.class,
+ Names.named(CoreSettings.GADGET_SERVER_PORT))), "/gadgets");
+ final ServletHolder proxyServletHolder = server.addServlet("/gadgets/*", proxyServlet);
+ proxyServletHolder.setInitParameter("HostHeader", gadgetServerHostname);
+
+ server.addServlet("/", injector.getInstance(WaveClientServlet.class));
+ }
+
+ public static void main(final String... args) {
+ try {
+ final Module coreSettings = SettingsBinder.bindSettings(PROPERTIES_FILE_KEY, CoreSettings.class);
+ run(coreSettings);
+ return;
+ } catch (final PersistenceException e) {
+ LOG.severe("PersistenceException when running server:", e);
+ } catch (final ConfigurationException e) {
+ LOG.severe("ConfigurationException when running server:", e);
+ } catch (final WaveServerException e) {
+ LOG.severe("WaveServerException when running server:", e);
+ }
+ }
+
+ public static void run(final Module coreSettings) throws PersistenceException, ConfigurationException,
+ WaveServerException {
+ Injector settingsInjector = Guice.createInjector(coreSettings);
+ final boolean enableFederation = settingsInjector.getInstance(Key.get(Boolean.class,
+ Names.named(CoreSettings.ENABLE_FEDERATION)));
+
+ if (enableFederation) {
+ final Module federationSettings = SettingsBinder.bindSettings(PROPERTIES_FILE_KEY, FederationSettings.class);
+ // This MUST happen first, or bindings will fail if federation is
+ // enabled.
+ settingsInjector = settingsInjector.createChildInjector(federationSettings);
+ }
+
+ final Module federationModule = buildFederationModule(settingsInjector, enableFederation);
+ final PersistenceModule persistenceModule = settingsInjector.getInstance(PersistenceModule.class);
+ final Injector injector = settingsInjector.createChildInjector(new ServerModule(enableFederation),
+ new RobotApiModule(), federationModule, persistenceModule);
+
+ final CustomServerRpcProvider server = injector.getInstance(CustomServerRpcProvider.class);
+ final WaveBus waveBus = injector.getInstance(WaveBus.class);
+
+ initializeServer(injector);
+ initializeServlets(injector, server);
+ initializeRobots(injector, waveBus);
+ initializeFrontend(injector, server, waveBus);
+ initializeFederation(injector);
+
+ LOG.info("Starting server");
+ server.startWebSocketServer(injector);
+ }
+}
More information about the kune-commits
mailing list