[kune-commits] r1313 - in trunk: . doc script src/main/java/cc/kune/common/client/tooltip src/main/java/cc/kune/wave/client src/main/java/cc/kune/wave/server src/main/java/cc/ourproject src/main/java/org src/main/java/org/waveprotocol src/main/java/org/waveprotocol/box src/main/java/org/waveprotocol/box/server src/main/java/org/waveprotocol/box/server/rpc

Vicente J. Ruiz Jurado vjrj_ at ourproject.org
Sat Apr 16 12:50:40 CEST 2011


Author: vjrj_
Date: 2011-04-16 12:50:40 +0200 (Sat, 16 Apr 2011)
New Revision: 1313

Added:
   trunk/src/main/java/org/waveprotocol/
   trunk/src/main/java/org/waveprotocol/box/
   trunk/src/main/java/org/waveprotocol/box/server/
   trunk/src/main/java/org/waveprotocol/box/server/rpc/
   trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java
Removed:
   trunk/src/main/java/cc/ourproject/kune/
Modified:
   trunk/doc/kune-tools-and-functionality-summary.ods
   trunk/pom.xml
   trunk/script/mvn-eclipse.sh
   trunk/script/repopubli.sh
   trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java
   trunk/src/main/java/cc/kune/wave/client/WebClient.java
   trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java
   trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java
   trunk/src/main/java/cc/kune/wave/server/WaveMain.java
Log:
WIAB Updated

Modified: trunk/doc/kune-tools-and-functionality-summary.ods
===================================================================
(Binary files differ)

Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/pom.xml	2011-04-16 10:50:40 UTC (rev 1313)
@@ -545,7 +545,6 @@
       <artifactId>guava-gwt</artifactId>
       <version>r07</version>
     </dependency>
-    <!-- From here necessary as .java not included in waveinabox-serve.jar -->
     <dependency>
       <groupId>org.waveprotocol</groupId>
       <artifactId>box-src</artifactId>
@@ -596,7 +595,6 @@
       <artifactId>communication-src</artifactId>
       <version>0.3.0-SNAPSHOT</version>
     </dependency>
-    <!-- to here -->
     <dependency>
       <groupId>net.sourceforge.findbugs</groupId>
       <artifactId>jsr-305</artifactId>

Modified: trunk/script/mvn-eclipse.sh
===================================================================
--- trunk/script/mvn-eclipse.sh	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/script/mvn-eclipse.sh	2011-04-16 10:50:40 UTC (rev 1313)
@@ -1,7 +1,7 @@
 mvn eclipse:eclipse -Dgwt.compiler.skip=true
 RESULT=$?
 perl -p -i -e 's/<\/classpath>//g' .classpath
-perl -p -i -e 's/\/home\/vjrj\/nfsdev\///g' .classpath
+#perl -p -i -e 's/\/home\/vjrj\/nfsdev\///g' .classpath
 #echo -en "  <classpathentry kind=\"src\" path=\"target/generated-sources/apt\" including=\"**/*.java\"/>" >> .classpath
 echo -en "  <classpathentry kind=\"con\" path=\"com.google.gwt.eclipse.core.GWT_CONTAINER\"/>\n</classpath>" >> .classpath
 which kdialog > /dev/null 2>&1

Modified: trunk/script/repopubli.sh
===================================================================
--- trunk/script/repopubli.sh	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/script/repopubli.sh	2011-04-16 10:50:40 UTC (rev 1313)
@@ -1,24 +1,43 @@
 #!/bin/bash
 
-PARAMS=$#
-JAR=$1
-GROUP=$2
-ARTIFACT=$3
-VER=$4
-USER=$5
+usage() {
+    echo "Use: $0 -j <jar> -g <group> -a <artifact> -v <version> [-s (for source)]"
+    echo "$0 -j target/emite-0.4.6-emiteuimodule.jar -g com.calclab.emite -a emite -v 0.4.6"
+}
 
-# CORRECT PARAMS ###############################################################
+while getopts “hg:a:v:j:s” OPTION
+do
+    case $OPTION in
+	h)
+            usage
+            exit 1
+            ;;
+	g)
+	    GROUP=$OPTARG
+	    ;;
+	a)
+	    ARTIFACT=$OPTARG
+	    ;;
+	v)
+	    VER=$OPTARG
+	    ;;
+	j)
+	    JAR=$OPTARG
+	    ;;
+	s)
+            SOURCE='-Dclassifier=sources'
+            ;;
+	?)
+            usage
+            exit
+            ;;
+    esac
+done	
 
-if [ $PARAMS -lt 4 ]
+if [[ -z $GROUP ]] || [[ -z $ARTIFACT ]] || [[ -z $VER ]] || [[ -z $JAR ]] 
 then
-  echo "Use: $0 <jar> <group> <artifact> <version> <username>"
-  echo "$0 target/emite-0.4.6-emiteuimodule.jar com.calclab.emite emite 0.4.6 [luther]"
-  exit
+    usage
+    exit 1
 fi
 
-if [ $PARAMS -gt 4 ]
-then
-  EXTRA=$USER@
-fi
-
-mvn deploy:deploy-file -DgroupId=$2 -DartifactId=$3 -Dversion=$4 -Dpackaging=jar -Dfile=$1 -Durl=scpexe://kune.ourproject.org/home/groups/kune/htdocs/mavenrepo/ -DrepositoryId=kune.ourproject.org
+mvn deploy:deploy-file -DgroupId=$GROUP -DartifactId=$ARTIFACT -Dversion=$VER $SOURCE -Dpackaging=jar -Dfile=$JAR -Durl=scpexe://kune.ourproject.org/home/groups/kune/htdocs/mavenrepo/ -DrepositoryId=kune.ourproject.org

Modified: trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java
===================================================================
--- trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java	2011-04-16 10:50:40 UTC (rev 1313)
@@ -64,6 +64,7 @@
 
     public Tooltip() {
         super.add(uiBinder.createAndBindUi(this));
+        super.setStyleName("k-tooltip-no-chrome");
         super.setAutoHideEnabled(false);
         super.setAnimationEnabled(false);
         final TimerWrapper overTimer = new TimerWrapper();

Modified: trunk/src/main/java/cc/kune/wave/client/WebClient.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/client/WebClient.java	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/client/WebClient.java	2011-04-16 10:50:40 UTC (rev 1313)
@@ -22,6 +22,9 @@
 
 
 
+import java.util.Date;
+import java.util.logging.Logger;
+
 import org.waveprotocol.box.webclient.client.ClientEvents;
 import org.waveprotocol.box.webclient.client.ClientIdGenerator;
 import org.waveprotocol.box.webclient.client.DebugMessagePanel;
@@ -94,6 +97,10 @@
   private static final Binder BINDER = GWT.create(Binder.class);
 
   static Log LOG = Log.get(WebClient.class);
+  // Use of GWT logging is only intended for sending exception reports to the
+  // server, nothing else in the client should use java.util.logging.
+  // Please also see WebClientDemo.gwt.xml.
+  private static final Logger REMOTE_LOG = Logger.getLogger("REMOTE_LOG");
 
   private final ProfileManager profiles = new ProfileManagerImpl();
 
@@ -363,6 +370,7 @@
           public void use(SafeHtml stack) {
               NotifyUser.error("Error in wave client", true);
           //  error.addDetail(stack, null);
+            REMOTE_LOG.severe(stack.asString().replace("<br>", "\n"));
           }
         });
       }
@@ -388,6 +396,8 @@
 
           Throwable error = t;
           while (error != null) {
+            String token = String.valueOf((new Date()).getTime());
+            stack.appendHtmlConstant("Token:  " + token + "<br> ");
             stack.appendEscaped(String.valueOf(error.getMessage())).appendHtmlConstant("<br>");
             for (StackTraceElement elt : error.getStackTrace()) {
               stack.appendHtmlConstant("  ")

Modified: trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java	2011-04-16 10:50:40 UTC (rev 1313)
@@ -39,6 +39,7 @@
 
 import com.google.gxp.base.GxpContext;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
 /**
@@ -46,6 +47,7 @@
  *
  * @author josephg at gmail.com (Joseph Gentle)
  */
+ at Singleton
 public final class CustomUserRegistrationServlet extends HttpServlet {
   private final AccountStore accountStore;
   private final String domain;
@@ -53,20 +55,20 @@
   private final Log LOG = Log.get(CustomUserRegistrationServlet.class);
 
   @Inject
-  public CustomUserRegistrationServlet(final AccountStore accountStore,
-      @Named(CoreSettings.WAVE_SERVER_DOMAIN) final String domain) {
+  public CustomUserRegistrationServlet(AccountStore accountStore,
+      @Named(CoreSettings.WAVE_SERVER_DOMAIN) String domain) {
     this.accountStore = accountStore;
     this.domain = domain;
   }
 
   @Override
-  protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws IOException {
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
     writeRegistrationPage("", AuthenticationServlet.RESPONSE_STATUS_NONE, req.getLocale(), resp);
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException {
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
     req.setCharacterEncoding("UTF-8");
     String password = req.getParameter(HttpRequestBasedCallbackHandler.PASSWORD_FIELD);
     if (password == null) {
@@ -100,7 +102,7 @@
     try {
       // First, some cleanup on the parameters.
       if (username == null) {
-        return "Username portion of address cannot be less than 2 characters";
+        return "Username portion of address cannot be empty";
       }
       username = username.trim().toLowerCase();
       if (username.contains(ParticipantId.DOMAIN_PREFIX)) {
@@ -108,8 +110,8 @@
       } else {
         id = ParticipantId.of(username + ParticipantId.DOMAIN_PREFIX + domain);
       }
-      if (id.getAddress().indexOf("@") < 2) {
-        return "Username portion of address cannot be less than 2 characters";
+      if (id.getAddress().indexOf("@") < 1) {
+        return "Username portion of address cannot be empty";
       }
       final String[] usernameSplit = id.getAddress().split("@");
       if (usernameSplit.length != 2 || !usernameSplit[0].matches("[\\w\\.]+")) {

Modified: trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java	2011-04-16 10:50:40 UTC (rev 1313)
@@ -54,6 +54,7 @@
  *
  * @author kalman at google.com (Benjamin Kalman)
  */
+ at SuppressWarnings("serial")
 @Singleton
 public class CustomWaveClientServlet extends HttpServlet {
 

Modified: trunk/src/main/java/cc/kune/wave/server/WaveMain.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/server/WaveMain.java	2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/server/WaveMain.java	2011-04-16 10:50:40 UTC (rev 1313)
@@ -46,6 +46,7 @@
 import org.waveprotocol.box.server.rpc.SearchServlet;
 import org.waveprotocol.box.server.rpc.ServerRpcProvider;
 import org.waveprotocol.box.server.rpc.SignOutServlet;
+import org.waveprotocol.box.server.rpc.UserRegistrationServlet;
 import org.waveprotocol.box.server.waveserver.WaveBus;
 import org.waveprotocol.box.server.waveserver.WaveServerException;
 import org.waveprotocol.box.server.waveserver.WaveletProvider;
@@ -55,9 +56,11 @@
 import org.waveprotocol.wave.federation.noop.NoOpFederationModule;
 import org.waveprotocol.wave.federation.xmpp.XmppFederationModule;
 import org.waveprotocol.wave.model.version.HashedVersionFactory;
+import org.waveprotocol.wave.model.wave.ParticipantIdUtil;
 import org.waveprotocol.wave.util.logging.Log;
 import org.waveprotocol.wave.util.settings.SettingsBinder;
 
+import com.google.gwt.logging.server.RemoteLoggingServiceImpl;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
@@ -112,10 +115,16 @@
     ServerRpcProvider server = injector.getInstance(ServerRpcProvider.class);
     WaveBus waveBus = injector.getInstance(WaveBus.class);
 
-    initializeServer(injector);
+    String domain =
+      injector.getInstance(Key.get(String.class, Names.named(CoreSettings.WAVE_SERVER_DOMAIN)));
+    if (!ParticipantIdUtil.isDomainAddress(ParticipantIdUtil.makeDomainAddress(domain))) {
+      throw new WaveServerException("Invalid wave domain: " + domain);
+    }
+
+    initializeServer(injector, domain);
     initializeServlets(injector, server);
     initializeRobots(injector, waveBus);
-    initializeFrontend(injector, server, waveBus);
+    initializeFrontend(injector, server, waveBus, domain);
     initializeFederation(injector);
 
     LOG.info("Starting server");
@@ -133,12 +142,11 @@
     return federationModule;
   }
 
-  private static void initializeServer(Injector injector) throws PersistenceException,
-      WaveServerException {
+  private static void initializeServer(Injector injector, String waveDomain)
+      throws PersistenceException, WaveServerException {
     AccountStore accountStore = injector.getInstance(AccountStore.class);
     accountStore.initializeAccountStore();
-    AccountStoreHolder.init(accountStore,
-        injector.getInstance(Key.get(String.class, Names.named(CoreSettings.WAVE_SERVER_DOMAIN))));
+    AccountStoreHolder.init(accountStore, waveDomain);
 
     // Initialize the SignerInfoStore.
     CertPathStore certPathStore = injector.getInstance(CertPathStore.class);
@@ -157,10 +165,10 @@
     server.addServlet(SessionManager.SIGN_IN_URL,
         injector.getInstance(AuthenticationServlet.class));
     server.addServlet("/auth/signout", injector.getInstance(SignOutServlet.class));
-    server.addServlet("/auth/register", injector.getInstance(CustomUserRegistrationServlet.class));
+    server.addServlet("/auth/register", injector.getInstance(UserRegistrationServlet.class));
 
     server.addServlet("/fetch/*", injector.getInstance(FetchServlet.class));
-    
+
     server.addServlet("/search/*", injector.getInstance(SearchServlet.class));
 
     server.addServlet("/robot/dataapi", injector.getInstance(DataApiServlet.class));
@@ -172,14 +180,19 @@
 
     String gadgetServerHostname =injector.getInstance(Key.get(String.class,
         Names.named(CoreSettings.GADGET_SERVER_HOSTNAME)));
+    int gadgetServerPort =
+        injector.getInstance(Key.get(int.class, Names.named(CoreSettings.GADGET_SERVER_PORT)));
+    String gadgetServerPath =
+        injector.getInstance(Key.get(String.class, Names.named(CoreSettings.GADGET_SERVER_PATH)));
     ProxyServlet.Transparent proxyServlet =
-        new ProxyServlet.Transparent("/gadgets", "http", gadgetServerHostname, injector
-            .getInstance(Key.get(int.class, Names.named(CoreSettings.GADGET_SERVER_PORT))),
-            "/gadgets");
+        new ProxyServlet.Transparent("/gadgets", "http", gadgetServerHostname,gadgetServerPort,
+            gadgetServerPath);
     ServletHolder proxyServletHolder = server.addServlet("/gadgets/*", proxyServlet);
     proxyServletHolder.setInitParameter("HostHeader", gadgetServerHostname);
 
-    //  server.addServlet("/", injector.getInstance(CustomWaveClientServlet.class));
+    server.addServlet("/webclient/remote_logging",
+        injector.getInstance(RemoteLoggingServiceImpl.class));
+    // server.addServlet("/", injector.getInstance(WaveClientServlet.class));
   }
 
   private static void initializeRobots(Injector injector, WaveBus waveBus) {
@@ -188,10 +201,12 @@
   }
 
   private static void initializeFrontend(Injector injector, ServerRpcProvider server,
-      WaveBus waveBus) throws WaveServerException {
+      WaveBus waveBus, String waveDomain) throws WaveServerException {
     HashedVersionFactory hashFactory = injector.getInstance(HashedVersionFactory.class);
+
     WaveletProvider provider = injector.getInstance(WaveletProvider.class);
-    ClientFrontend frontend = ClientFrontendImpl.create(hashFactory, provider, waveBus);
+    ClientFrontend frontend =
+        ClientFrontendImpl.create(hashFactory, provider, waveBus, waveDomain);
 
     ProtocolWaveClientRpc.Interface rpcImpl = WaveClientRpcImpl.create(frontend, false);
     server.registerService(ProtocolWaveClientRpc.newReflectiveService(rpcImpl));

Added: trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java
===================================================================
--- trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java	                        (rev 0)
+++ trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java	2011-04-16 10:50:40 UTC (rev 1313)
@@ -0,0 +1,635 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.waveprotocol.box.server.rpc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContextListener;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.resource.ResourceCollection;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult;
+import org.waveprotocol.box.server.CoreSettings;
+import org.waveprotocol.box.server.authentication.SessionManager;
+import org.waveprotocol.box.server.util.NetUtils;
+import org.waveprotocol.wave.model.util.Pair;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+import org.waveprotocol.wave.util.logging.Log;
+
+import com.glines.socketio.server.SocketIOInbound;
+import com.glines.socketio.server.SocketIOServlet;
+import com.glines.socketio.server.transport.FlashSocketTransport;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.servlet.GuiceServletContextListener;
+import com.google.inject.servlet.ServletModule;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
+import com.google.protobuf.UnknownFieldSet;
+
+/**
+ * CustomServerRpcProvider can provide instances of type Service over an
+ * incoming network socket and service incoming RPCs to these services and their
+ * methods.
+ * 
+ * 
+ */
+public class CustomServerRpcProvider {
+    static abstract class Connection implements ProtoCallback {
+        private final Map<Integer, ServerRpcController> activeRpcs = new ConcurrentHashMap<Integer, ServerRpcController>();
+
+        // The logged in user.
+        // Note: Due to this bug:
+        // http://code.google.com/p/wave-protocol/issues/detail?id=119,
+        // the field may be null on first connect and then set later using an
+        // RPC.
+        private ParticipantId loggedInUser;
+
+        private final CustomServerRpcProvider provider;
+
+        /**
+         * @param loggedInUser
+         *            The currently logged in user, or null if no user is logged
+         *            in.
+         * @param provider
+         */
+        public Connection(final ParticipantId loggedInUser, final CustomServerRpcProvider provider) {
+            this.loggedInUser = loggedInUser;
+            this.provider = provider;
+        }
+
+        private ParticipantId authenticate(final String token) {
+            final HttpSession session = provider.sessionManager.getSessionFromToken(token);
+            final ParticipantId user = provider.sessionManager.getLoggedInUser(session);
+            return user;
+        }
+
+        protected void expectMessages(final MessageExpectingChannel channel) {
+            synchronized (provider.registeredServices) {
+                for (final RegisteredServiceMethod serviceMethod : provider.registeredServices.values()) {
+                    channel.expectMessage(serviceMethod.service.getRequestPrototype(serviceMethod.method));
+                    LOG.fine("Expecting: " + serviceMethod.method.getFullName());
+                }
+            }
+            channel.expectMessage(Rpc.CancelRpc.getDefaultInstance());
+        }
+
+        @Override
+        public void message(final int sequenceNo, final Message message) {
+            if (message instanceof Rpc.CancelRpc) {
+                final ServerRpcController controller = activeRpcs.get(sequenceNo);
+                if (controller == null) {
+                    throw new IllegalStateException("Trying to cancel an RPC that is not active!");
+                } else {
+                    LOG.info("Cancelling open RPC " + sequenceNo);
+                    controller.cancel();
+                }
+            } else if (message instanceof ProtocolAuthenticate) {
+                // Workaround for bug:
+                // http://codereview.waveprotocol.org/224001/
+
+                // When we get this message, either the connection will not be
+                // logged in
+                // (loggedInUser == null) or the connection will have been
+                // authenticated
+                // via cookies
+                // (in which case loggedInUser must match the authenticated
+                // user, and
+                // this message has no
+                // effect).
+
+                final ProtocolAuthenticate authMessage = (ProtocolAuthenticate) message;
+                final ParticipantId authenticatedAs = authenticate(authMessage.getToken());
+
+                Preconditions.checkArgument(authenticatedAs != null, "Auth token invalid");
+                Preconditions.checkState(loggedInUser == null || loggedInUser.equals(authenticatedAs),
+                        "Session already authenticated as a different user");
+
+                loggedInUser = authenticatedAs;
+                LOG.info("Session authenticated as " + loggedInUser);
+                sendMessage(sequenceNo, ProtocolAuthenticationResult.getDefaultInstance());
+            } else if (provider.registeredServices.containsKey(message.getDescriptorForType())) {
+                if (activeRpcs.containsKey(sequenceNo)) {
+                    throw new IllegalStateException("Can't invoke a new RPC with a sequence number already in use.");
+                } else {
+                    final RegisteredServiceMethod serviceMethod = provider.registeredServices.get(message.getDescriptorForType());
+
+                    // Create the internal ServerRpcController used to invoke
+                    // the call.
+                    final ServerRpcController controller = new ServerRpcControllerImpl(message, serviceMethod.service,
+                            serviceMethod.method, loggedInUser, new RpcCallback<Message>() {
+                                @Override
+                                synchronized public void run(final Message message) {
+                                    if (message instanceof Rpc.RpcFinished
+                                            || !serviceMethod.method.getOptions().getExtension(Rpc.isStreamingRpc)) {
+                                        // This RPC is over - remove it from the
+                                        // map.
+                                        final boolean failed = message instanceof Rpc.RpcFinished ? ((Rpc.RpcFinished) message).getFailed()
+                                                : false;
+                                        LOG.fine("RPC " + sequenceNo + " is now finished, failed = " + failed);
+                                        if (failed) {
+                                            LOG.info("error = " + ((Rpc.RpcFinished) message).getErrorText());
+                                        }
+                                        activeRpcs.remove(sequenceNo);
+                                    }
+                                    sendMessage(sequenceNo, message);
+                                }
+                            });
+
+                    // Kick off a new thread specific to this RPC.
+                    activeRpcs.put(sequenceNo, controller);
+                    provider.threadPool.execute(controller);
+                }
+            } else {
+                // Sent a message type we understand, but don't expect -
+                // erronous case!
+                throw new IllegalStateException("Got expected but unknown message  (" + message + ") for sequence: "
+                        + sequenceNo);
+            }
+        }
+
+        protected abstract void sendMessage(int sequenceNo, Message message);
+
+        @Override
+        public void unknown(final int sequenceNo, final String messageType, final String message) {
+            throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+                    + ") for sequence: " + sequenceNo);
+        }
+
+        @Override
+        public void unknown(final int sequenceNo, final String messageType, final UnknownFieldSet message) {
+            throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+                    + ") for sequence: " + sequenceNo);
+        }
+    }
+
+    /**
+     * Internal, static container class for any specific registered service
+     * method.
+     */
+    static class RegisteredServiceMethod {
+        final MethodDescriptor method;
+        final Service service;
+
+        RegisteredServiceMethod(final Service service, final MethodDescriptor method) {
+            this.service = service;
+            this.method = method;
+        }
+    }
+
+    static class SocketIOConnection extends Connection {
+        private final SocketIOServerChannel socketChannel;
+
+        SocketIOConnection(final ParticipantId loggedInUser, final CustomServerRpcProvider provider) {
+            super(loggedInUser, provider);
+            socketChannel = new SocketIOServerChannel(this);
+            LOG.info("New websocket connection set up for user " + loggedInUser);
+            expectMessages(socketChannel);
+        }
+
+        public SocketIOServerChannel getWebSocketServerChannel() {
+            return socketChannel;
+        }
+
+        @Override
+        protected void sendMessage(final int sequenceNo, final Message message) {
+            socketChannel.sendMessage(sequenceNo, message);
+        }
+    }
+    @SuppressWarnings("serial")
+    @Singleton
+    public static class WaveSocketIOServlet extends HttpServlet {
+
+        CustomServerRpcProvider provider;
+
+        SocketIOServlet socketIOServlet = new SocketIOServlet() {
+            @Override
+            protected SocketIOInbound doSocketIOConnect(final HttpServletRequest request, final String[] strings) {
+                final ParticipantId loggedInUser = provider.sessionManager.getLoggedInUser(request.getSession(false));
+
+                final SocketIOConnection connection = new SocketIOConnection(loggedInUser, provider);
+                return connection.getWebSocketServerChannel();
+            }
+        };
+
+        @Inject
+        public WaveSocketIOServlet(final CustomServerRpcProvider provider) {
+            super();
+            this.provider = provider;
+        }
+
+        @Override
+        public void init(final ServletConfig config) throws ServletException {
+            socketIOServlet.init(config);
+        }
+
+        @Override
+        protected void service(final HttpServletRequest req, final HttpServletResponse resp) throws ServletException,
+                IOException {
+            socketIOServlet.service(req, resp);
+        }
+
+        @Override
+        public void service(final ServletRequest req, final ServletResponse res) throws ServletException, IOException {
+            socketIOServlet.service(req, res);
+        }
+    }
+    @SuppressWarnings("serial")
+    @Singleton
+    public static class WaveWebSocketServlet extends WebSocketServlet {
+
+        CustomServerRpcProvider provider;
+
+        @Inject
+        public WaveWebSocketServlet(final CustomServerRpcProvider provider) {
+            super();
+            this.provider = provider;
+        }
+
+        @Override
+        protected WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) {
+            final ParticipantId loggedInUser = provider.sessionManager.getLoggedInUser(request.getSession(false));
+
+            final WebSocketConnection connection = new WebSocketConnection(loggedInUser, provider);
+            return connection.getWebSocketServerChannel();
+        }
+    }
+    static class WebSocketConnection extends Connection {
+        private final WebSocketServerChannel socketChannel;
+
+        WebSocketConnection(final ParticipantId loggedInUser, final CustomServerRpcProvider provider) {
+            super(loggedInUser, provider);
+            socketChannel = new WebSocketServerChannel(this);
+            LOG.info("New websocket connection set up for user " + loggedInUser);
+            expectMessages(socketChannel);
+        }
+
+        public WebSocketServerChannel getWebSocketServerChannel() {
+            return socketChannel;
+        }
+
+        @Override
+        protected void sendMessage(final int sequenceNo, final Message message) {
+            socketChannel.sendMessage(sequenceNo, message);
+        }
+    }
+    // We can retrieve the injector from the context attributes via this
+    // attribute name
+    public static final String INJECTOR_ATTRIBUTE = Injector.class.getName();
+    private static final Log LOG = Log.get(CustomServerRpcProvider.class);
+
+    private static InetSocketAddress[] parseAddressList(final List<String> addressList) {
+        if (addressList == null || addressList.size() == 0) {
+            return new InetSocketAddress[0];
+        } else {
+            final Set<InetSocketAddress> addresses = Sets.newHashSet();
+            for (final String str : addressList) {
+                if (str.length() == 0) {
+                    LOG.warning("Encountered empty address in http addresses list.");
+                } else {
+                    try {
+                        final InetSocketAddress address = NetUtils.parseHttpAddress(str);
+                        if (!addresses.contains(address)) {
+                            addresses.add(address);
+                        } else {
+                            LOG.warning("Ignoring duplicate address in http addresses list: Duplicate entry '" + str
+                                    + "' resolved to " + address.getAddress().getHostAddress());
+                        }
+                    } catch (final IOException e) {
+                        LOG.severe("Unable to process address " + str, e);
+                    }
+                }
+            }
+            return addresses.toArray(new InetSocketAddress[0]);
+        }
+    }
+
+    private final Integer flashsocketPolicyPort;
+
+    private final InetSocketAddress[] httpAddresses;
+
+    private Server httpServer = null;
+
+    private final org.eclipse.jetty.server.SessionManager jettySessionManager;
+
+    // Mapping from incoming protocol buffer type -> specific handler.
+    private final Map<Descriptors.Descriptor, RegisteredServiceMethod> registeredServices = Maps.newHashMap();
+
+    // List of webApp source directories ("./war", etc)
+    private final String[] resourceBases;
+
+    /**
+     * Set of servlets
+     */
+    List<Pair<String, ServletHolder>> servletRegistry = Lists.newArrayList();
+
+    private final SessionManager sessionManager;
+
+    private final ExecutorService threadPool;
+
+    /**
+     * Construct a new CustomServerRpcProvider, hosting on the specified
+     * WebSocket addresses.
+     * 
+     * Also accepts an ExecutorService for spawning managing threads.
+     */
+    public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+            final String[] resourceBases, final ExecutorService threadPool, final SessionManager sessionManager,
+            final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+        this.httpAddresses = httpAddresses;
+        this.flashsocketPolicyPort = flashsocketPolicyPort;
+        this.resourceBases = resourceBases;
+        this.threadPool = threadPool;
+        this.sessionManager = sessionManager;
+        this.jettySessionManager = jettySessionManager;
+    }
+
+    /**
+     * Constructs a new CustomServerRpcProvider with a default ExecutorService.
+     */
+    public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+            final String[] resourceBases, final SessionManager sessionManager,
+            final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+        this(httpAddresses, flashsocketPolicyPort, resourceBases, Executors.newCachedThreadPool(), sessionManager,
+                jettySessionManager);
+    }
+
+    @Inject
+    public CustomServerRpcProvider(@Named(CoreSettings.HTTP_FRONTEND_ADDRESSES) final List<String> httpAddresses,
+            @Named(CoreSettings.FLASHSOCKET_POLICY_PORT) final Integer flashsocketPolicyPort,
+            @Named(CoreSettings.RESOURCE_BASES) final List<String> resourceBases, final SessionManager sessionManager,
+            final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+        this(parseAddressList(httpAddresses), flashsocketPolicyPort, resourceBases.toArray(new String[0]),
+                sessionManager, jettySessionManager);
+    }
+
+    /**
+     * Add a servlet to the servlet registry. This servlet will be attached to
+     * the specified URL pattern when the server is started up.
+     * 
+     * @param urlPattern
+     *            the URL pattern for paths. Eg, '/foo', '/foo/*'.
+     * @param servlet
+     *            the servlet class to bind to the specified paths.
+     * @return the {@link ServletHolder} that holds the servlet.
+     */
+    public ServletHolder addServlet(final String urlPattern, final Class<? extends HttpServlet> servlet) {
+        return addServlet(urlPattern, servlet, null);
+    }
+
+    /**
+     * Add a servlet to the servlet registry. This servlet will be attached to
+     * the specified URL pattern when the server is started up.
+     * 
+     * @param urlPattern
+     *            the URL pattern for paths. Eg, '/foo', '/foo/*'.
+     * @param servlet
+     *            the servlet class to bind to the specified paths.
+     * @param initParams
+     *            the map with init params, can be null or empty.
+     * @return the {@link ServletHolder} that holds the servlet.
+     */
+    public ServletHolder addServlet(final String urlPattern, final Class<? extends HttpServlet> servlet,
+            @Nullable final Map<String, String> initParams) {
+        final ServletHolder servletHolder = new ServletHolder(servlet);
+        if (initParams != null) {
+            servletHolder.setInitParameters(initParams);
+        }
+        servletRegistry.add(new Pair<String, ServletHolder>(urlPattern, servletHolder));
+        return servletHolder;
+    }
+
+    public void addWebSocketServlets() {
+        // Servlet where the websocket connection is served from.
+        final ServletHolder wsholder = addServlet("/socket", WaveWebSocketServlet.class);
+        // TODO(zamfi): fix to let messages span frames.
+        wsholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+
+        // Servlet where the websocket connection is served from.
+        final ServletHolder sioholder = addServlet("/socket.io/*", WaveSocketIOServlet.class);
+        // TODO(zamfi): fix to let messages span frames.
+        sioholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+        // Set flash policy server parameters
+        String flashPolicyServerHost = "localhost";
+        final StringBuilder flashPolicyAllowedPorts = new StringBuilder();
+        /*
+         * Loop through addresses, collect list of ports, and determine if we
+         * are to use "localhost" of the AnyHost wildcard.
+         */
+        for (final InetSocketAddress addr : httpAddresses) {
+            if (flashPolicyAllowedPorts.length() > 0) {
+                flashPolicyAllowedPorts.append(",");
+            }
+            flashPolicyAllowedPorts.append(addr.getPort());
+            if (!addr.getAddress().isLoopbackAddress()) {
+                // Until it's possible to pass a list of address, this is the
+                // only valid alternative.
+                flashPolicyServerHost = "0.0.0.0";
+            }
+        }
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_HOST_KEY, flashPolicyServerHost);
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_PORT_KEY, "" + flashsocketPolicyPort);
+        // TODO: Change to use the public http address and all other bound
+        // addresses.
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_DOMAIN_KEY, "*");
+        sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_PORTS_KEY, flashPolicyAllowedPorts.toString());
+
+        // Serve the static content and GWT web client with the default servlet
+        // (acts like a standard file-based web server).
+        addServlet("/static/*", DefaultServlet.class);
+        addServlet("/webclient/*", DefaultServlet.class);
+    }
+
+    /**
+     * @return a list of {@link SelectChannelConnector} each bound to a
+     *         host:port pair form the list addresses.
+     */
+    private List<SelectChannelConnector> getSelectChannelConnectors(final InetSocketAddress[] httpAddresses) {
+        final List<SelectChannelConnector> list = Lists.newArrayList();
+        for (final InetSocketAddress address : httpAddresses) {
+            final SelectChannelConnector connector = new SelectChannelConnector();
+            connector.setHost(address.getAddress().getHostAddress());
+            connector.setPort(address.getPort());
+            list.add(connector);
+        }
+
+        return list;
+    }
+
+    public ServletModule getServletModule(final Injector injector) {
+
+        return new ServletModule() {
+            @Override
+            protected void configureServlets() {
+                // We add servlets here to override the DefaultServlet automatic
+                // registered by WebAppContext
+                // in path "/" with our WaveClientServlet. Any other way to do
+                // this?
+                // Related question (unanswered 08-Apr-2011)
+                // http://web.archiveorange.com/archive/v/d0LdlXf1kN0OXyPNyQZp
+                for (final Pair<String, ServletHolder> servlet : servletRegistry) {
+                    final String url = servlet.getFirst();
+                    @SuppressWarnings({ "unchecked" })
+                    final Class<HttpServlet> clazz = servlet.getSecond().getHeldClass();
+                    @SuppressWarnings({ "unchecked" })
+                    final Map<String, String> params = servlet.getSecond().getInitParameters();
+                    serve(url).with(clazz, params);
+                    bind(clazz).in(Singleton.class);
+                }
+            }
+        };
+    }
+
+    /**
+     * Returns the socket the WebSocket server is listening on.
+     */
+    public SocketAddress getWebSocketAddress() {
+        if (httpServer == null) {
+            return null;
+        } else {
+            final Connector c = httpServer.getConnectors()[0];
+            return new InetSocketAddress(c.getHost(), c.getLocalPort());
+        }
+    }
+
+    /**
+     * Register all methods provided by the given service type.
+     */
+    public void registerService(final Service service) {
+        synchronized (registeredServices) {
+            for (final MethodDescriptor methodDescriptor : service.getDescriptorForType().getMethods()) {
+                registeredServices.put(methodDescriptor.getInputType(), new RegisteredServiceMethod(service,
+                        methodDescriptor));
+            }
+        }
+    }
+
+    public void startWebSocketServer(final Injector injector) {
+        httpServer = new Server();
+
+        final List<SelectChannelConnector> connectors = getSelectChannelConnectors(httpAddresses);
+        if (connectors.isEmpty()) {
+            LOG.severe("No valid http end point address provided!");
+        }
+        for (final SelectChannelConnector connector : connectors) {
+            httpServer.addConnector(connector);
+        }
+        final WebAppContext context = new WebAppContext();
+
+        context.setParentLoaderPriority(true);
+        context.setAttribute(INJECTOR_ATTRIBUTE, injector);
+
+        if (jettySessionManager != null) {
+            context.getSessionHandler().setSessionManager(jettySessionManager);
+        }
+        final ResourceCollection resources = new ResourceCollection(resourceBases);
+        context.setBaseResource(resources);
+
+        addWebSocketServlets();
+
+        try {
+            final Injector parentInjector;
+
+            // If we have a null injector at least bind the
+            // CustomServerRpcProvider for nested static classes
+            if (injector == null) {
+                parentInjector = Guice.createInjector(new AbstractModule() {
+                    @Override
+                    protected void configure() {
+                        bind(CustomServerRpcProvider.class).toInstance(CustomServerRpcProvider.this);
+                    }
+                });
+            } else {
+                parentInjector = injector;
+            }
+
+            // final ServletModule servletModule =
+            // getServletModule(parentInjector);
+
+            final ServletContextListener contextListener = new GuiceServletContextListener() {
+
+                private final Injector childInjector = parentInjector; // .createChildInjector(servletModule);
+
+                @Override
+                protected Injector getInjector() {
+                    return childInjector;
+                }
+            };
+
+            context.addEventListener(contextListener);
+            LOG.fine("Commented guice filter.");
+            // context.addFilter(GuiceFilter.class, "/*", 0);
+            httpServer.setHandler(context);
+
+            httpServer.start();
+
+        } catch (final Exception e) { // yes, .start() throws "Exception"
+            LOG.severe("Fatal error starting http server.", e);
+            return;
+        }
+        LOG.fine("WebSocket server running.");
+    }
+
+    /**
+     * Stops this server.
+     */
+    public void stopServer() throws IOException {
+        try {
+            httpServer.stop(); // yes, .stop() throws "Exception"
+        } catch (final Exception e) {
+            LOG.warning("Fatal error stopping http server.", e);
+        }
+        LOG.fine("server shutdown.");
+    }
+}


Property changes on: trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain




More information about the kune-commits mailing list