public class WrappedChannelHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class); protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } public void close() { try { if (executor != null) { executor.shutdown(); } } catch (Throwable t) { logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t); } } @Override public void connected(Channel channel) throws RemotingException { handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException { handler.disconnected(channel); } @Override public void sent(Channel channel, Object message) throws RemotingException { handler.sent(channel, message); } @Override public void received(Channel channel, Object message) throws RemotingException { handler.received(channel, message); } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { handler.caught(channel, exception); } public ExecutorService getExecutor() { return executor; } @Override public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } } public URL getUrl() { return url; } public ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; } }
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); if (message instanceof Request) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly, // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent // this scenario from happening, but a better solution should be considered later. if (t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); } } else { handler.received(channel, message); } } }
public class ChannelEventRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class); private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) { this(channel, handler, state, null); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) { this(channel, handler, state, message, null); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) { this(channel, handler, state, null, t); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) { this.channel = channel; this.handler = handler; this.state = state; this.message = message; this.exception = exception; } @Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } } /** * ChannelState * * */ public enum ChannelState { /** * CONNECTED */ CONNECTED, /** * DISCONNECTED */ DISCONNECTED, /** * SENT */ SENT, /** * RECEIVED */ RECEIVED, /** * CAUGHT */ CAUGHT } }