[kune-commits] r1313 - in trunk: . doc script src/main/java/cc/kune/common/client/tooltip src/main/java/cc/kune/wave/client src/main/java/cc/kune/wave/server src/main/java/cc/ourproject src/main/java/org src/main/java/org/waveprotocol src/main/java/org/waveprotocol/box src/main/java/org/waveprotocol/box/server src/main/java/org/waveprotocol/box/server/rpc
Vicente J. Ruiz Jurado
vjrj_ at ourproject.org
Sat Apr 16 12:50:40 CEST 2011
Author: vjrj_
Date: 2011-04-16 12:50:40 +0200 (Sat, 16 Apr 2011)
New Revision: 1313
Added:
trunk/src/main/java/org/waveprotocol/
trunk/src/main/java/org/waveprotocol/box/
trunk/src/main/java/org/waveprotocol/box/server/
trunk/src/main/java/org/waveprotocol/box/server/rpc/
trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java
Removed:
trunk/src/main/java/cc/ourproject/kune/
Modified:
trunk/doc/kune-tools-and-functionality-summary.ods
trunk/pom.xml
trunk/script/mvn-eclipse.sh
trunk/script/repopubli.sh
trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java
trunk/src/main/java/cc/kune/wave/client/WebClient.java
trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java
trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java
trunk/src/main/java/cc/kune/wave/server/WaveMain.java
Log:
WIAB Updated
Modified: trunk/doc/kune-tools-and-functionality-summary.ods
===================================================================
(Binary files differ)
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/pom.xml 2011-04-16 10:50:40 UTC (rev 1313)
@@ -545,7 +545,6 @@
<artifactId>guava-gwt</artifactId>
<version>r07</version>
</dependency>
- <!-- From here necessary as .java not included in waveinabox-serve.jar -->
<dependency>
<groupId>org.waveprotocol</groupId>
<artifactId>box-src</artifactId>
@@ -596,7 +595,6 @@
<artifactId>communication-src</artifactId>
<version>0.3.0-SNAPSHOT</version>
</dependency>
- <!-- to here -->
<dependency>
<groupId>net.sourceforge.findbugs</groupId>
<artifactId>jsr-305</artifactId>
Modified: trunk/script/mvn-eclipse.sh
===================================================================
--- trunk/script/mvn-eclipse.sh 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/script/mvn-eclipse.sh 2011-04-16 10:50:40 UTC (rev 1313)
@@ -1,7 +1,7 @@
mvn eclipse:eclipse -Dgwt.compiler.skip=true
RESULT=$?
perl -p -i -e 's/<\/classpath>//g' .classpath
-perl -p -i -e 's/\/home\/vjrj\/nfsdev\///g' .classpath
+#perl -p -i -e 's/\/home\/vjrj\/nfsdev\///g' .classpath
#echo -en " <classpathentry kind=\"src\" path=\"target/generated-sources/apt\" including=\"**/*.java\"/>" >> .classpath
echo -en " <classpathentry kind=\"con\" path=\"com.google.gwt.eclipse.core.GWT_CONTAINER\"/>\n</classpath>" >> .classpath
which kdialog > /dev/null 2>&1
Modified: trunk/script/repopubli.sh
===================================================================
--- trunk/script/repopubli.sh 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/script/repopubli.sh 2011-04-16 10:50:40 UTC (rev 1313)
@@ -1,24 +1,43 @@
#!/bin/bash
-PARAMS=$#
-JAR=$1
-GROUP=$2
-ARTIFACT=$3
-VER=$4
-USER=$5
+usage() {
+ echo "Use: $0 -j <jar> -g <group> -a <artifact> -v <version> [-s (for source)]"
+ echo "$0 -j target/emite-0.4.6-emiteuimodule.jar -g com.calclab.emite -a emite -v 0.4.6"
+}
-# CORRECT PARAMS ###############################################################
+while getopts “hg:a:v:j:s” OPTION
+do
+ case $OPTION in
+ h)
+ usage
+ exit 1
+ ;;
+ g)
+ GROUP=$OPTARG
+ ;;
+ a)
+ ARTIFACT=$OPTARG
+ ;;
+ v)
+ VER=$OPTARG
+ ;;
+ j)
+ JAR=$OPTARG
+ ;;
+ s)
+ SOURCE='-Dclassifier=sources'
+ ;;
+ ?)
+ usage
+ exit
+ ;;
+ esac
+done
-if [ $PARAMS -lt 4 ]
+if [[ -z $GROUP ]] || [[ -z $ARTIFACT ]] || [[ -z $VER ]] || [[ -z $JAR ]]
then
- echo "Use: $0 <jar> <group> <artifact> <version> <username>"
- echo "$0 target/emite-0.4.6-emiteuimodule.jar com.calclab.emite emite 0.4.6 [luther]"
- exit
+ usage
+ exit 1
fi
-if [ $PARAMS -gt 4 ]
-then
- EXTRA=$USER@
-fi
-
-mvn deploy:deploy-file -DgroupId=$2 -DartifactId=$3 -Dversion=$4 -Dpackaging=jar -Dfile=$1 -Durl=scpexe://kune.ourproject.org/home/groups/kune/htdocs/mavenrepo/ -DrepositoryId=kune.ourproject.org
+mvn deploy:deploy-file -DgroupId=$GROUP -DartifactId=$ARTIFACT -Dversion=$VER $SOURCE -Dpackaging=jar -Dfile=$JAR -Durl=scpexe://kune.ourproject.org/home/groups/kune/htdocs/mavenrepo/ -DrepositoryId=kune.ourproject.org
Modified: trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java
===================================================================
--- trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/common/client/tooltip/Tooltip.java 2011-04-16 10:50:40 UTC (rev 1313)
@@ -64,6 +64,7 @@
public Tooltip() {
super.add(uiBinder.createAndBindUi(this));
+ super.setStyleName("k-tooltip-no-chrome");
super.setAutoHideEnabled(false);
super.setAnimationEnabled(false);
final TimerWrapper overTimer = new TimerWrapper();
Modified: trunk/src/main/java/cc/kune/wave/client/WebClient.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/client/WebClient.java 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/client/WebClient.java 2011-04-16 10:50:40 UTC (rev 1313)
@@ -22,6 +22,9 @@
+import java.util.Date;
+import java.util.logging.Logger;
+
import org.waveprotocol.box.webclient.client.ClientEvents;
import org.waveprotocol.box.webclient.client.ClientIdGenerator;
import org.waveprotocol.box.webclient.client.DebugMessagePanel;
@@ -94,6 +97,10 @@
private static final Binder BINDER = GWT.create(Binder.class);
static Log LOG = Log.get(WebClient.class);
+ // Use of GWT logging is only intended for sending exception reports to the
+ // server, nothing else in the client should use java.util.logging.
+ // Please also see WebClientDemo.gwt.xml.
+ private static final Logger REMOTE_LOG = Logger.getLogger("REMOTE_LOG");
private final ProfileManager profiles = new ProfileManagerImpl();
@@ -363,6 +370,7 @@
public void use(SafeHtml stack) {
NotifyUser.error("Error in wave client", true);
// error.addDetail(stack, null);
+ REMOTE_LOG.severe(stack.asString().replace("<br>", "\n"));
}
});
}
@@ -388,6 +396,8 @@
Throwable error = t;
while (error != null) {
+ String token = String.valueOf((new Date()).getTime());
+ stack.appendHtmlConstant("Token: " + token + "<br> ");
stack.appendEscaped(String.valueOf(error.getMessage())).appendHtmlConstant("<br>");
for (StackTraceElement elt : error.getStackTrace()) {
stack.appendHtmlConstant(" ")
Modified: trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/server/CustomUserRegistrationServlet.java 2011-04-16 10:50:40 UTC (rev 1313)
@@ -39,6 +39,7 @@
import com.google.gxp.base.GxpContext;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
/**
@@ -46,6 +47,7 @@
*
* @author josephg at gmail.com (Joseph Gentle)
*/
+ at Singleton
public final class CustomUserRegistrationServlet extends HttpServlet {
private final AccountStore accountStore;
private final String domain;
@@ -53,20 +55,20 @@
private final Log LOG = Log.get(CustomUserRegistrationServlet.class);
@Inject
- public CustomUserRegistrationServlet(final AccountStore accountStore,
- @Named(CoreSettings.WAVE_SERVER_DOMAIN) final String domain) {
+ public CustomUserRegistrationServlet(AccountStore accountStore,
+ @Named(CoreSettings.WAVE_SERVER_DOMAIN) String domain) {
this.accountStore = accountStore;
this.domain = domain;
}
@Override
- protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws IOException {
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
writeRegistrationPage("", AuthenticationServlet.RESPONSE_STATUS_NONE, req.getLocale(), resp);
}
@SuppressWarnings("unchecked")
@Override
- protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException {
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
req.setCharacterEncoding("UTF-8");
String password = req.getParameter(HttpRequestBasedCallbackHandler.PASSWORD_FIELD);
if (password == null) {
@@ -100,7 +102,7 @@
try {
// First, some cleanup on the parameters.
if (username == null) {
- return "Username portion of address cannot be less than 2 characters";
+ return "Username portion of address cannot be empty";
}
username = username.trim().toLowerCase();
if (username.contains(ParticipantId.DOMAIN_PREFIX)) {
@@ -108,8 +110,8 @@
} else {
id = ParticipantId.of(username + ParticipantId.DOMAIN_PREFIX + domain);
}
- if (id.getAddress().indexOf("@") < 2) {
- return "Username portion of address cannot be less than 2 characters";
+ if (id.getAddress().indexOf("@") < 1) {
+ return "Username portion of address cannot be empty";
}
final String[] usernameSplit = id.getAddress().split("@");
if (usernameSplit.length != 2 || !usernameSplit[0].matches("[\\w\\.]+")) {
Modified: trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/server/CustomWaveClientServlet.java 2011-04-16 10:50:40 UTC (rev 1313)
@@ -54,6 +54,7 @@
*
* @author kalman at google.com (Benjamin Kalman)
*/
+ at SuppressWarnings("serial")
@Singleton
public class CustomWaveClientServlet extends HttpServlet {
Modified: trunk/src/main/java/cc/kune/wave/server/WaveMain.java
===================================================================
--- trunk/src/main/java/cc/kune/wave/server/WaveMain.java 2011-04-14 14:15:20 UTC (rev 1312)
+++ trunk/src/main/java/cc/kune/wave/server/WaveMain.java 2011-04-16 10:50:40 UTC (rev 1313)
@@ -46,6 +46,7 @@
import org.waveprotocol.box.server.rpc.SearchServlet;
import org.waveprotocol.box.server.rpc.ServerRpcProvider;
import org.waveprotocol.box.server.rpc.SignOutServlet;
+import org.waveprotocol.box.server.rpc.UserRegistrationServlet;
import org.waveprotocol.box.server.waveserver.WaveBus;
import org.waveprotocol.box.server.waveserver.WaveServerException;
import org.waveprotocol.box.server.waveserver.WaveletProvider;
@@ -55,9 +56,11 @@
import org.waveprotocol.wave.federation.noop.NoOpFederationModule;
import org.waveprotocol.wave.federation.xmpp.XmppFederationModule;
import org.waveprotocol.wave.model.version.HashedVersionFactory;
+import org.waveprotocol.wave.model.wave.ParticipantIdUtil;
import org.waveprotocol.wave.util.logging.Log;
import org.waveprotocol.wave.util.settings.SettingsBinder;
+import com.google.gwt.logging.server.RemoteLoggingServiceImpl;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
@@ -112,10 +115,16 @@
ServerRpcProvider server = injector.getInstance(ServerRpcProvider.class);
WaveBus waveBus = injector.getInstance(WaveBus.class);
- initializeServer(injector);
+ String domain =
+ injector.getInstance(Key.get(String.class, Names.named(CoreSettings.WAVE_SERVER_DOMAIN)));
+ if (!ParticipantIdUtil.isDomainAddress(ParticipantIdUtil.makeDomainAddress(domain))) {
+ throw new WaveServerException("Invalid wave domain: " + domain);
+ }
+
+ initializeServer(injector, domain);
initializeServlets(injector, server);
initializeRobots(injector, waveBus);
- initializeFrontend(injector, server, waveBus);
+ initializeFrontend(injector, server, waveBus, domain);
initializeFederation(injector);
LOG.info("Starting server");
@@ -133,12 +142,11 @@
return federationModule;
}
- private static void initializeServer(Injector injector) throws PersistenceException,
- WaveServerException {
+ private static void initializeServer(Injector injector, String waveDomain)
+ throws PersistenceException, WaveServerException {
AccountStore accountStore = injector.getInstance(AccountStore.class);
accountStore.initializeAccountStore();
- AccountStoreHolder.init(accountStore,
- injector.getInstance(Key.get(String.class, Names.named(CoreSettings.WAVE_SERVER_DOMAIN))));
+ AccountStoreHolder.init(accountStore, waveDomain);
// Initialize the SignerInfoStore.
CertPathStore certPathStore = injector.getInstance(CertPathStore.class);
@@ -157,10 +165,10 @@
server.addServlet(SessionManager.SIGN_IN_URL,
injector.getInstance(AuthenticationServlet.class));
server.addServlet("/auth/signout", injector.getInstance(SignOutServlet.class));
- server.addServlet("/auth/register", injector.getInstance(CustomUserRegistrationServlet.class));
+ server.addServlet("/auth/register", injector.getInstance(UserRegistrationServlet.class));
server.addServlet("/fetch/*", injector.getInstance(FetchServlet.class));
-
+
server.addServlet("/search/*", injector.getInstance(SearchServlet.class));
server.addServlet("/robot/dataapi", injector.getInstance(DataApiServlet.class));
@@ -172,14 +180,19 @@
String gadgetServerHostname =injector.getInstance(Key.get(String.class,
Names.named(CoreSettings.GADGET_SERVER_HOSTNAME)));
+ int gadgetServerPort =
+ injector.getInstance(Key.get(int.class, Names.named(CoreSettings.GADGET_SERVER_PORT)));
+ String gadgetServerPath =
+ injector.getInstance(Key.get(String.class, Names.named(CoreSettings.GADGET_SERVER_PATH)));
ProxyServlet.Transparent proxyServlet =
- new ProxyServlet.Transparent("/gadgets", "http", gadgetServerHostname, injector
- .getInstance(Key.get(int.class, Names.named(CoreSettings.GADGET_SERVER_PORT))),
- "/gadgets");
+ new ProxyServlet.Transparent("/gadgets", "http", gadgetServerHostname,gadgetServerPort,
+ gadgetServerPath);
ServletHolder proxyServletHolder = server.addServlet("/gadgets/*", proxyServlet);
proxyServletHolder.setInitParameter("HostHeader", gadgetServerHostname);
- // server.addServlet("/", injector.getInstance(CustomWaveClientServlet.class));
+ server.addServlet("/webclient/remote_logging",
+ injector.getInstance(RemoteLoggingServiceImpl.class));
+ // server.addServlet("/", injector.getInstance(WaveClientServlet.class));
}
private static void initializeRobots(Injector injector, WaveBus waveBus) {
@@ -188,10 +201,12 @@
}
private static void initializeFrontend(Injector injector, ServerRpcProvider server,
- WaveBus waveBus) throws WaveServerException {
+ WaveBus waveBus, String waveDomain) throws WaveServerException {
HashedVersionFactory hashFactory = injector.getInstance(HashedVersionFactory.class);
+
WaveletProvider provider = injector.getInstance(WaveletProvider.class);
- ClientFrontend frontend = ClientFrontendImpl.create(hashFactory, provider, waveBus);
+ ClientFrontend frontend =
+ ClientFrontendImpl.create(hashFactory, provider, waveBus, waveDomain);
ProtocolWaveClientRpc.Interface rpcImpl = WaveClientRpcImpl.create(frontend, false);
server.registerService(ProtocolWaveClientRpc.newReflectiveService(rpcImpl));
Added: trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java
===================================================================
--- trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java (rev 0)
+++ trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java 2011-04-16 10:50:40 UTC (rev 1313)
@@ -0,0 +1,635 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.waveprotocol.box.server.rpc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContextListener;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.resource.ResourceCollection;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate;
+import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult;
+import org.waveprotocol.box.server.CoreSettings;
+import org.waveprotocol.box.server.authentication.SessionManager;
+import org.waveprotocol.box.server.util.NetUtils;
+import org.waveprotocol.wave.model.util.Pair;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+import org.waveprotocol.wave.util.logging.Log;
+
+import com.glines.socketio.server.SocketIOInbound;
+import com.glines.socketio.server.SocketIOServlet;
+import com.glines.socketio.server.transport.FlashSocketTransport;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.servlet.GuiceServletContextListener;
+import com.google.inject.servlet.ServletModule;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
+import com.google.protobuf.UnknownFieldSet;
+
+/**
+ * CustomServerRpcProvider can provide instances of type Service over an
+ * incoming network socket and service incoming RPCs to these services and their
+ * methods.
+ *
+ *
+ */
+public class CustomServerRpcProvider {
+ static abstract class Connection implements ProtoCallback {
+ private final Map<Integer, ServerRpcController> activeRpcs = new ConcurrentHashMap<Integer, ServerRpcController>();
+
+ // The logged in user.
+ // Note: Due to this bug:
+ // http://code.google.com/p/wave-protocol/issues/detail?id=119,
+ // the field may be null on first connect and then set later using an
+ // RPC.
+ private ParticipantId loggedInUser;
+
+ private final CustomServerRpcProvider provider;
+
+ /**
+ * @param loggedInUser
+ * The currently logged in user, or null if no user is logged
+ * in.
+ * @param provider
+ */
+ public Connection(final ParticipantId loggedInUser, final CustomServerRpcProvider provider) {
+ this.loggedInUser = loggedInUser;
+ this.provider = provider;
+ }
+
+ private ParticipantId authenticate(final String token) {
+ final HttpSession session = provider.sessionManager.getSessionFromToken(token);
+ final ParticipantId user = provider.sessionManager.getLoggedInUser(session);
+ return user;
+ }
+
+ protected void expectMessages(final MessageExpectingChannel channel) {
+ synchronized (provider.registeredServices) {
+ for (final RegisteredServiceMethod serviceMethod : provider.registeredServices.values()) {
+ channel.expectMessage(serviceMethod.service.getRequestPrototype(serviceMethod.method));
+ LOG.fine("Expecting: " + serviceMethod.method.getFullName());
+ }
+ }
+ channel.expectMessage(Rpc.CancelRpc.getDefaultInstance());
+ }
+
+ @Override
+ public void message(final int sequenceNo, final Message message) {
+ if (message instanceof Rpc.CancelRpc) {
+ final ServerRpcController controller = activeRpcs.get(sequenceNo);
+ if (controller == null) {
+ throw new IllegalStateException("Trying to cancel an RPC that is not active!");
+ } else {
+ LOG.info("Cancelling open RPC " + sequenceNo);
+ controller.cancel();
+ }
+ } else if (message instanceof ProtocolAuthenticate) {
+ // Workaround for bug:
+ // http://codereview.waveprotocol.org/224001/
+
+ // When we get this message, either the connection will not be
+ // logged in
+ // (loggedInUser == null) or the connection will have been
+ // authenticated
+ // via cookies
+ // (in which case loggedInUser must match the authenticated
+ // user, and
+ // this message has no
+ // effect).
+
+ final ProtocolAuthenticate authMessage = (ProtocolAuthenticate) message;
+ final ParticipantId authenticatedAs = authenticate(authMessage.getToken());
+
+ Preconditions.checkArgument(authenticatedAs != null, "Auth token invalid");
+ Preconditions.checkState(loggedInUser == null || loggedInUser.equals(authenticatedAs),
+ "Session already authenticated as a different user");
+
+ loggedInUser = authenticatedAs;
+ LOG.info("Session authenticated as " + loggedInUser);
+ sendMessage(sequenceNo, ProtocolAuthenticationResult.getDefaultInstance());
+ } else if (provider.registeredServices.containsKey(message.getDescriptorForType())) {
+ if (activeRpcs.containsKey(sequenceNo)) {
+ throw new IllegalStateException("Can't invoke a new RPC with a sequence number already in use.");
+ } else {
+ final RegisteredServiceMethod serviceMethod = provider.registeredServices.get(message.getDescriptorForType());
+
+ // Create the internal ServerRpcController used to invoke
+ // the call.
+ final ServerRpcController controller = new ServerRpcControllerImpl(message, serviceMethod.service,
+ serviceMethod.method, loggedInUser, new RpcCallback<Message>() {
+ @Override
+ synchronized public void run(final Message message) {
+ if (message instanceof Rpc.RpcFinished
+ || !serviceMethod.method.getOptions().getExtension(Rpc.isStreamingRpc)) {
+ // This RPC is over - remove it from the
+ // map.
+ final boolean failed = message instanceof Rpc.RpcFinished ? ((Rpc.RpcFinished) message).getFailed()
+ : false;
+ LOG.fine("RPC " + sequenceNo + " is now finished, failed = " + failed);
+ if (failed) {
+ LOG.info("error = " + ((Rpc.RpcFinished) message).getErrorText());
+ }
+ activeRpcs.remove(sequenceNo);
+ }
+ sendMessage(sequenceNo, message);
+ }
+ });
+
+ // Kick off a new thread specific to this RPC.
+ activeRpcs.put(sequenceNo, controller);
+ provider.threadPool.execute(controller);
+ }
+ } else {
+ // Sent a message type we understand, but don't expect -
+ // erronous case!
+ throw new IllegalStateException("Got expected but unknown message (" + message + ") for sequence: "
+ + sequenceNo);
+ }
+ }
+
+ protected abstract void sendMessage(int sequenceNo, Message message);
+
+ @Override
+ public void unknown(final int sequenceNo, final String messageType, final String message) {
+ throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+ + ") for sequence: " + sequenceNo);
+ }
+
+ @Override
+ public void unknown(final int sequenceNo, final String messageType, final UnknownFieldSet message) {
+ throw new IllegalStateException("Got unknown message (type: " + messageType + ", " + message
+ + ") for sequence: " + sequenceNo);
+ }
+ }
+
+ /**
+ * Internal, static container class for any specific registered service
+ * method.
+ */
+ static class RegisteredServiceMethod {
+ final MethodDescriptor method;
+ final Service service;
+
+ RegisteredServiceMethod(final Service service, final MethodDescriptor method) {
+ this.service = service;
+ this.method = method;
+ }
+ }
+
+ static class SocketIOConnection extends Connection {
+ private final SocketIOServerChannel socketChannel;
+
+ SocketIOConnection(final ParticipantId loggedInUser, final CustomServerRpcProvider provider) {
+ super(loggedInUser, provider);
+ socketChannel = new SocketIOServerChannel(this);
+ LOG.info("New websocket connection set up for user " + loggedInUser);
+ expectMessages(socketChannel);
+ }
+
+ public SocketIOServerChannel getWebSocketServerChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ protected void sendMessage(final int sequenceNo, final Message message) {
+ socketChannel.sendMessage(sequenceNo, message);
+ }
+ }
+ @SuppressWarnings("serial")
+ @Singleton
+ public static class WaveSocketIOServlet extends HttpServlet {
+
+ CustomServerRpcProvider provider;
+
+ SocketIOServlet socketIOServlet = new SocketIOServlet() {
+ @Override
+ protected SocketIOInbound doSocketIOConnect(final HttpServletRequest request, final String[] strings) {
+ final ParticipantId loggedInUser = provider.sessionManager.getLoggedInUser(request.getSession(false));
+
+ final SocketIOConnection connection = new SocketIOConnection(loggedInUser, provider);
+ return connection.getWebSocketServerChannel();
+ }
+ };
+
+ @Inject
+ public WaveSocketIOServlet(final CustomServerRpcProvider provider) {
+ super();
+ this.provider = provider;
+ }
+
+ @Override
+ public void init(final ServletConfig config) throws ServletException {
+ socketIOServlet.init(config);
+ }
+
+ @Override
+ protected void service(final HttpServletRequest req, final HttpServletResponse resp) throws ServletException,
+ IOException {
+ socketIOServlet.service(req, resp);
+ }
+
+ @Override
+ public void service(final ServletRequest req, final ServletResponse res) throws ServletException, IOException {
+ socketIOServlet.service(req, res);
+ }
+ }
+ @SuppressWarnings("serial")
+ @Singleton
+ public static class WaveWebSocketServlet extends WebSocketServlet {
+
+ CustomServerRpcProvider provider;
+
+ @Inject
+ public WaveWebSocketServlet(final CustomServerRpcProvider provider) {
+ super();
+ this.provider = provider;
+ }
+
+ @Override
+ protected WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) {
+ final ParticipantId loggedInUser = provider.sessionManager.getLoggedInUser(request.getSession(false));
+
+ final WebSocketConnection connection = new WebSocketConnection(loggedInUser, provider);
+ return connection.getWebSocketServerChannel();
+ }
+ }
+ static class WebSocketConnection extends Connection {
+ private final WebSocketServerChannel socketChannel;
+
+ WebSocketConnection(final ParticipantId loggedInUser, final CustomServerRpcProvider provider) {
+ super(loggedInUser, provider);
+ socketChannel = new WebSocketServerChannel(this);
+ LOG.info("New websocket connection set up for user " + loggedInUser);
+ expectMessages(socketChannel);
+ }
+
+ public WebSocketServerChannel getWebSocketServerChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ protected void sendMessage(final int sequenceNo, final Message message) {
+ socketChannel.sendMessage(sequenceNo, message);
+ }
+ }
+ // We can retrieve the injector from the context attributes via this
+ // attribute name
+ public static final String INJECTOR_ATTRIBUTE = Injector.class.getName();
+ private static final Log LOG = Log.get(CustomServerRpcProvider.class);
+
+ private static InetSocketAddress[] parseAddressList(final List<String> addressList) {
+ if (addressList == null || addressList.size() == 0) {
+ return new InetSocketAddress[0];
+ } else {
+ final Set<InetSocketAddress> addresses = Sets.newHashSet();
+ for (final String str : addressList) {
+ if (str.length() == 0) {
+ LOG.warning("Encountered empty address in http addresses list.");
+ } else {
+ try {
+ final InetSocketAddress address = NetUtils.parseHttpAddress(str);
+ if (!addresses.contains(address)) {
+ addresses.add(address);
+ } else {
+ LOG.warning("Ignoring duplicate address in http addresses list: Duplicate entry '" + str
+ + "' resolved to " + address.getAddress().getHostAddress());
+ }
+ } catch (final IOException e) {
+ LOG.severe("Unable to process address " + str, e);
+ }
+ }
+ }
+ return addresses.toArray(new InetSocketAddress[0]);
+ }
+ }
+
+ private final Integer flashsocketPolicyPort;
+
+ private final InetSocketAddress[] httpAddresses;
+
+ private Server httpServer = null;
+
+ private final org.eclipse.jetty.server.SessionManager jettySessionManager;
+
+ // Mapping from incoming protocol buffer type -> specific handler.
+ private final Map<Descriptors.Descriptor, RegisteredServiceMethod> registeredServices = Maps.newHashMap();
+
+ // List of webApp source directories ("./war", etc)
+ private final String[] resourceBases;
+
+ /**
+ * Set of servlets
+ */
+ List<Pair<String, ServletHolder>> servletRegistry = Lists.newArrayList();
+
+ private final SessionManager sessionManager;
+
+ private final ExecutorService threadPool;
+
+ /**
+ * Construct a new CustomServerRpcProvider, hosting on the specified
+ * WebSocket addresses.
+ *
+ * Also accepts an ExecutorService for spawning managing threads.
+ */
+ public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+ final String[] resourceBases, final ExecutorService threadPool, final SessionManager sessionManager,
+ final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+ this.httpAddresses = httpAddresses;
+ this.flashsocketPolicyPort = flashsocketPolicyPort;
+ this.resourceBases = resourceBases;
+ this.threadPool = threadPool;
+ this.sessionManager = sessionManager;
+ this.jettySessionManager = jettySessionManager;
+ }
+
+ /**
+ * Constructs a new CustomServerRpcProvider with a default ExecutorService.
+ */
+ public CustomServerRpcProvider(final InetSocketAddress[] httpAddresses, final Integer flashsocketPolicyPort,
+ final String[] resourceBases, final SessionManager sessionManager,
+ final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+ this(httpAddresses, flashsocketPolicyPort, resourceBases, Executors.newCachedThreadPool(), sessionManager,
+ jettySessionManager);
+ }
+
+ @Inject
+ public CustomServerRpcProvider(@Named(CoreSettings.HTTP_FRONTEND_ADDRESSES) final List<String> httpAddresses,
+ @Named(CoreSettings.FLASHSOCKET_POLICY_PORT) final Integer flashsocketPolicyPort,
+ @Named(CoreSettings.RESOURCE_BASES) final List<String> resourceBases, final SessionManager sessionManager,
+ final org.eclipse.jetty.server.SessionManager jettySessionManager) {
+ this(parseAddressList(httpAddresses), flashsocketPolicyPort, resourceBases.toArray(new String[0]),
+ sessionManager, jettySessionManager);
+ }
+
+ /**
+ * Add a servlet to the servlet registry. This servlet will be attached to
+ * the specified URL pattern when the server is started up.
+ *
+ * @param urlPattern
+ * the URL pattern for paths. Eg, '/foo', '/foo/*'.
+ * @param servlet
+ * the servlet class to bind to the specified paths.
+ * @return the {@link ServletHolder} that holds the servlet.
+ */
+ public ServletHolder addServlet(final String urlPattern, final Class<? extends HttpServlet> servlet) {
+ return addServlet(urlPattern, servlet, null);
+ }
+
+ /**
+ * Add a servlet to the servlet registry. This servlet will be attached to
+ * the specified URL pattern when the server is started up.
+ *
+ * @param urlPattern
+ * the URL pattern for paths. Eg, '/foo', '/foo/*'.
+ * @param servlet
+ * the servlet class to bind to the specified paths.
+ * @param initParams
+ * the map with init params, can be null or empty.
+ * @return the {@link ServletHolder} that holds the servlet.
+ */
+ public ServletHolder addServlet(final String urlPattern, final Class<? extends HttpServlet> servlet,
+ @Nullable final Map<String, String> initParams) {
+ final ServletHolder servletHolder = new ServletHolder(servlet);
+ if (initParams != null) {
+ servletHolder.setInitParameters(initParams);
+ }
+ servletRegistry.add(new Pair<String, ServletHolder>(urlPattern, servletHolder));
+ return servletHolder;
+ }
+
+ public void addWebSocketServlets() {
+ // Servlet where the websocket connection is served from.
+ final ServletHolder wsholder = addServlet("/socket", WaveWebSocketServlet.class);
+ // TODO(zamfi): fix to let messages span frames.
+ wsholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+
+ // Servlet where the websocket connection is served from.
+ final ServletHolder sioholder = addServlet("/socket.io/*", WaveSocketIOServlet.class);
+ // TODO(zamfi): fix to let messages span frames.
+ sioholder.setInitParameter("bufferSize", "" + 1024 * 1024); // 1M buffer
+ // Set flash policy server parameters
+ String flashPolicyServerHost = "localhost";
+ final StringBuilder flashPolicyAllowedPorts = new StringBuilder();
+ /*
+ * Loop through addresses, collect list of ports, and determine if we
+ * are to use "localhost" of the AnyHost wildcard.
+ */
+ for (final InetSocketAddress addr : httpAddresses) {
+ if (flashPolicyAllowedPorts.length() > 0) {
+ flashPolicyAllowedPorts.append(",");
+ }
+ flashPolicyAllowedPorts.append(addr.getPort());
+ if (!addr.getAddress().isLoopbackAddress()) {
+ // Until it's possible to pass a list of address, this is the
+ // only valid alternative.
+ flashPolicyServerHost = "0.0.0.0";
+ }
+ }
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_HOST_KEY, flashPolicyServerHost);
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_SERVER_PORT_KEY, "" + flashsocketPolicyPort);
+ // TODO: Change to use the public http address and all other bound
+ // addresses.
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_DOMAIN_KEY, "*");
+ sioholder.setInitParameter(FlashSocketTransport.FLASHPOLICY_PORTS_KEY, flashPolicyAllowedPorts.toString());
+
+ // Serve the static content and GWT web client with the default servlet
+ // (acts like a standard file-based web server).
+ addServlet("/static/*", DefaultServlet.class);
+ addServlet("/webclient/*", DefaultServlet.class);
+ }
+
+ /**
+ * @return a list of {@link SelectChannelConnector} each bound to a
+ * host:port pair form the list addresses.
+ */
+ private List<SelectChannelConnector> getSelectChannelConnectors(final InetSocketAddress[] httpAddresses) {
+ final List<SelectChannelConnector> list = Lists.newArrayList();
+ for (final InetSocketAddress address : httpAddresses) {
+ final SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setHost(address.getAddress().getHostAddress());
+ connector.setPort(address.getPort());
+ list.add(connector);
+ }
+
+ return list;
+ }
+
+ public ServletModule getServletModule(final Injector injector) {
+
+ return new ServletModule() {
+ @Override
+ protected void configureServlets() {
+ // We add servlets here to override the DefaultServlet automatic
+ // registered by WebAppContext
+ // in path "/" with our WaveClientServlet. Any other way to do
+ // this?
+ // Related question (unanswered 08-Apr-2011)
+ // http://web.archiveorange.com/archive/v/d0LdlXf1kN0OXyPNyQZp
+ for (final Pair<String, ServletHolder> servlet : servletRegistry) {
+ final String url = servlet.getFirst();
+ @SuppressWarnings({ "unchecked" })
+ final Class<HttpServlet> clazz = servlet.getSecond().getHeldClass();
+ @SuppressWarnings({ "unchecked" })
+ final Map<String, String> params = servlet.getSecond().getInitParameters();
+ serve(url).with(clazz, params);
+ bind(clazz).in(Singleton.class);
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns the socket the WebSocket server is listening on.
+ */
+ public SocketAddress getWebSocketAddress() {
+ if (httpServer == null) {
+ return null;
+ } else {
+ final Connector c = httpServer.getConnectors()[0];
+ return new InetSocketAddress(c.getHost(), c.getLocalPort());
+ }
+ }
+
+ /**
+ * Register all methods provided by the given service type.
+ */
+ public void registerService(final Service service) {
+ synchronized (registeredServices) {
+ for (final MethodDescriptor methodDescriptor : service.getDescriptorForType().getMethods()) {
+ registeredServices.put(methodDescriptor.getInputType(), new RegisteredServiceMethod(service,
+ methodDescriptor));
+ }
+ }
+ }
+
+ public void startWebSocketServer(final Injector injector) {
+ httpServer = new Server();
+
+ final List<SelectChannelConnector> connectors = getSelectChannelConnectors(httpAddresses);
+ if (connectors.isEmpty()) {
+ LOG.severe("No valid http end point address provided!");
+ }
+ for (final SelectChannelConnector connector : connectors) {
+ httpServer.addConnector(connector);
+ }
+ final WebAppContext context = new WebAppContext();
+
+ context.setParentLoaderPriority(true);
+ context.setAttribute(INJECTOR_ATTRIBUTE, injector);
+
+ if (jettySessionManager != null) {
+ context.getSessionHandler().setSessionManager(jettySessionManager);
+ }
+ final ResourceCollection resources = new ResourceCollection(resourceBases);
+ context.setBaseResource(resources);
+
+ addWebSocketServlets();
+
+ try {
+ final Injector parentInjector;
+
+ // If we have a null injector at least bind the
+ // CustomServerRpcProvider for nested static classes
+ if (injector == null) {
+ parentInjector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(CustomServerRpcProvider.class).toInstance(CustomServerRpcProvider.this);
+ }
+ });
+ } else {
+ parentInjector = injector;
+ }
+
+ // final ServletModule servletModule =
+ // getServletModule(parentInjector);
+
+ final ServletContextListener contextListener = new GuiceServletContextListener() {
+
+ private final Injector childInjector = parentInjector; // .createChildInjector(servletModule);
+
+ @Override
+ protected Injector getInjector() {
+ return childInjector;
+ }
+ };
+
+ context.addEventListener(contextListener);
+ LOG.fine("Commented guice filter.");
+ // context.addFilter(GuiceFilter.class, "/*", 0);
+ httpServer.setHandler(context);
+
+ httpServer.start();
+
+ } catch (final Exception e) { // yes, .start() throws "Exception"
+ LOG.severe("Fatal error starting http server.", e);
+ return;
+ }
+ LOG.fine("WebSocket server running.");
+ }
+
+ /**
+ * Stops this server.
+ */
+ public void stopServer() throws IOException {
+ try {
+ httpServer.stop(); // yes, .stop() throws "Exception"
+ } catch (final Exception e) {
+ LOG.warning("Fatal error stopping http server.", e);
+ }
+ LOG.fine("server shutdown.");
+ }
+}
Property changes on: trunk/src/main/java/org/waveprotocol/box/server/rpc/CustomServerRpcProvider.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
More information about the kune-commits
mailing list