Commit ac1e534d authored by 刘凯's avatar 刘凯

netty 注册中心

parent 2b1537ad
package com.example.demo.netty;
import com.example.demo.netty.client.proxy.RpcClientHandler;
import com.example.demo.netty.protocol.RpcDecoder;
import com.example.demo.netty.protocol.RpcEncoder;
import com.example.demo.netty.protocol.RpcRequest;
import com.example.demo.netty.protocol.RpcResponse;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* @author liukai
* @Description: TODO
* @date 2019/3/199:57 PM
*/
public class RpcClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
cp.addLast(new RpcEncoder(RpcRequest.class));
cp.addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0));
cp.addLast(new RpcDecoder(RpcResponse.class));
cp.addLast(new RpcClientHandler());
}
}
package com.example.demo.netty.client.proxy;
public interface AsyncRPCCallback {
void success(Object result);
void fail(Exception e);
}
package com.example.demo.netty.client.proxy;
import com.example.demo.netty.RpcClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author liukai
* @Description: TODO
* @date 2019/3/148:28 PM
*/
public class ConnectManage {
private static final Logger logger = LoggerFactory.getLogger(ConnectManage.class);
private volatile static ConnectManage connectManage;
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 16,
600L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));
private CopyOnWriteArrayList<RpcClientHandler> connectedHandlers = new CopyOnWriteArrayList<>();
private Map<InetSocketAddress,RpcClientHandler> connectedServerNodes = new ConcurrentHashMap<>();
private ReentrantLock lock = new ReentrantLock();
private Condition connected = lock.newCondition();
private long connectTimeoutMills = 6000;
private AtomicInteger roundRobin = new AtomicInteger(0);
private volatile boolean isRunning = true;
public ConnectManage(){
}
public static ConnectManage getInstance(){
if (connectManage == null) {
synchronized (ConnectManage.class) {
if (connectManage == null) {
connectManage = new ConnectManage();
}
}
}
return connectManage;
}
public void updateConnectedServer(List<String> allServerAddress){
if(allServerAddress!=null){
if( allServerAddress.size() > 0 ){
HashSet<InetSocketAddress> newAllServerNodeSet = new HashSet<InetSocketAddress>();
for (int i = 0; i < allServerAddress.size(); i++) {
String[] array = allServerAddress.get(i).split(":");
if(array.length == 2){
String host = array[0];
int port = Integer.parseInt(array[1]);
final InetSocketAddress remotePeer = new InetSocketAddress(host,port);
newAllServerNodeSet.add(remotePeer);
}
}
newAllServerNodeSet.forEach(serverAddress -> {
if(!connectedServerNodes.keySet().contains(serverAddress)){
connectServerNode(serverAddress);
}
});
for (int i = 0; i < connectedHandlers.size(); i++) {
RpcClientHandler connectedServerHander = connectedHandlers.get(i);
SocketAddress remotePeer = connectedServerHander.getRemotePeer();
if(!newAllServerNodeSet.contains(remotePeer)){
logger.info("Remove invalid server node " + remotePeer);
RpcClientHandler handler = connectedServerNodes.get(remotePeer);
if (handler != null) {
handler.close();
}
connectedServerNodes.remove(remotePeer);
connectedHandlers.remove(connectedServerHander);
}
}
} else {
logger.error("No available server node. All server nodes are down !!!");
for (final RpcClientHandler connectedServerHandler : connectedHandlers) {
SocketAddress remotePeer = connectedServerHandler.getRemotePeer();
RpcClientHandler handler = connectedServerNodes.get(remotePeer);
handler.close();
connectedServerNodes.remove(connectedServerHandler);
}
connectedHandlers.clear();
}
}
}
public void reconnect(final RpcClientHandler handler, final SocketAddress remotePeer) {
if (handler != null) {
connectedHandlers.remove(handler);
connectedServerNodes.remove(handler.getRemotePeer());
}
connectServerNode((InetSocketAddress) remotePeer);
}
private void connectServerNode(InetSocketAddress serverAddress) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new RpcClientInitializer());
ChannelFuture channelFuture = b.connect(serverAddress);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(channelFuture.isSuccess()){
logger.debug("Successfully connect to remote server. remote peer = " + serverAddress);
RpcClientHandler handler = channelFuture.channel().pipeline().get(RpcClientHandler.class);
addHandler(handler);
}
}
});
}
});
}
private void addHandler(RpcClientHandler handler) {
connectedHandlers.add(handler);
InetSocketAddress remoteAddress = (InetSocketAddress) handler.getChannel().remoteAddress();
connectedServerNodes.put(remoteAddress, handler);
signalAvailableHandler();
}
private void signalAvailableHandler() {
lock.lock();
try {
connected.signalAll();
} finally {
lock.unlock();
}
}
private boolean waitingForHandler() throws InterruptedException {
lock.lock();
try {
return connected.await(this.connectTimeoutMills, TimeUnit.MILLISECONDS);
} finally {
lock.unlock();
}
}
public RpcClientHandler chooseHandler() throws InterruptedException {
int size = connectedHandlers.size();
while(isRunning && size <=0){
try {
boolean available = waitingForHandler();
if(available){
size = connectedHandlers.size();
}
} catch (InterruptedException e) {
logger.error("Waiting for available node is interrupted! ", e);
throw new RuntimeException("Can't connect any servers!", e);
}
}
int index = (roundRobin.getAndAdd(1)+size) % size;
return connectedHandlers.get(index);
}
public void stop(){
isRunning = false;
for (int i = 0; i < connectedHandlers.size(); i++) {
RpcClientHandler connectedServerHander = connectedHandlers.get(i);
connectedServerHander.close();
}
signalAvailableHandler();
threadPoolExecutor.shutdown();
eventLoopGroup.shutdownGracefully();
}
}
package com.example.demo.netty.client.proxy;
public interface IAsyncObjectProxy {
RPCFuture call(String funcName, Object... args);
}
package com.example.demo.netty.client.proxy;
import com.example.demo.netty.protocol.RpcRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;
/**
* @author liukai
* @Description:
* @date 2019/3/145:24 PM
*/
public class ObjectProxy<T> implements InvocationHandler,IAsyncObjectProxy {
private static final Logger LOGGER = LoggerFactory.getLogger(ObjectProxy.class);
private Class<T> clazz;
public ObjectProxy(Class<T> clazz){
this.clazz = clazz;
}
@Override
public RPCFuture call(String funcName, Object... args) {
return null;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(Object.class == method.getDeclaringClass()){
String name = method.getName();
if("equals".equals(name)){
return proxy == args[0];
}else if("hashCode".equals(name)){
return System.identityHashCode(proxy);
}else if("toString".equals(name)){
return proxy.getClass().getName()+"@"+ Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
}else{
throw new IllegalStateException(String.valueOf(method));
}
}
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// Debug
LOGGER.debug(method.getDeclaringClass().getName());
LOGGER.debug(method.getName());
for (int i = 0; i < method.getParameterTypes().length; ++i) {
LOGGER.debug(method.getParameterTypes()[i].getName());
}
for (int i = 0; i < args.length; ++i) {
LOGGER.debug(args[i].toString());
}
//TODO
return null;
}
}
package com.example.demo.netty.client.proxy;
import com.example.demo.netty.protocol.RpcRequest;
import com.example.demo.netty.protocol.RpcResponse;
import org.assertj.core.util.Lists;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author liukai
* @Description: 异步
* @date 2019/3/1310:59 AM
*/
public class RPCFuture implements Future<Object> {
private static final Logger logger = LoggerFactory.getLogger(RPCFuture.class);
private Sync sync;
private RpcRequest request;
private RpcResponse response;
private long startTime;
private long responseTimeThreshold =5000;
private List<AsyncRPCCallback> pendingCallbacks = Lists.newArrayList();
private ReentrantLock lock = new ReentrantLock();
public RPCFuture(RpcRequest rpcRequest){
this.sync = new Sync();
this.request = rpcRequest;
this.startTime = System.currentTimeMillis();
}
@Override
public boolean isDone() {
return sync.isDone();
}
@Override
public Object get() throws InterruptedException, ExecutionException {
sync.tryAcquire(-1);
if(this.response != null){
return this.response.getResult();
} else {
return null;
}
}
@Override
public Object get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean success = sync.tryAcquireNanos(-1, unit.toNanos(timeout));
if (success) {
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
} else {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId()
+ ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName());
}
}
public void done(RpcResponse response){
this.response = response;
sync.release(1);
invokeCallbacks();
// Threshold
long responseTime = System.currentTimeMillis() - startTime;
if (responseTime > this.responseTimeThreshold) {
logger.warn("Service response time is too slow. Request id = " + response.getRequestId() + ". Response Time = " + responseTime + "ms");
}
}
private void invokeCallbacks() {
lock.lock();
try {
pendingCallbacks.forEach(callback ->{
runCallback(callback);
});
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public RPCFuture addCallback(AsyncRPCCallback callback) {
lock.lock();
try {
if(isDone()){
runCallback(callback);
}else{
this.pendingCallbacks.add(callback);
}
} finally {
lock.unlock();
}
return this;
}
private void runCallback(AsyncRPCCallback callback) {
final RpcResponse res = this.response;
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
static class Sync extends AbstractQueuedSynchronizer{
private static final long serialVersionUID = 1L;
private final int done = 1;
private final int pending = 0;
@Override
protected boolean tryRelease(int arg) {
if(getState()==done){
if(compareAndSetState(pending,done)){
return true;
}else{
return false;
}
}
return false;
}
@Override
protected boolean tryAcquire(int arg) {
return getState() == done;
}
public boolean isDone() {
return getState() == done;
}
}
}
package com.example.demo.netty.client.proxy;
import com.example.demo.netty.registry.ServiceDiscovery;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.lang.reflect.Proxy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author liukai
* @Description:
* @date 2019/3/145:16 PM
*/
@Service("rpcClient")
public class RpcClient {
@Value("${server.address}")
private String serverAddress;
@Autowired
private ServiceDiscovery serviceDiscovery;
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 16,600L,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));
public RpcClient(String serverAddress) {
this.serverAddress = serverAddress;
}
public RpcClient(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
public static <T> T create(Class<T> interfaceClass){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new ObjectProxy<T>(interfaceClass));
}
public void stop(){
threadPoolExecutor.shutdown();
serviceDiscovery.stop();
}
}
package com.example.demo.netty.client.proxy;
import com.example.demo.netty.protocol.RpcRequest;
import com.example.demo.netty.protocol.RpcResponse;
import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
* @author liukai
* @Description: TODO
* @date 2019/3/148:08 PM
*/
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private static final Logger logger = LoggerFactory.getLogger(RpcClientHandler.class);
private ConcurrentHashMap<String,RPCFuture> pendingRPC = new ConcurrentHashMap<>();
private volatile Channel channel ;
private SocketAddress remotePeer;
public Channel getChannel(){
return channel;
}
public SocketAddress getRemotePeer(){
return remotePeer;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
this.channel = ctx.channel();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.remotePeer = this.channel.remoteAddress();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("client caught exception", cause);
ctx.close();
}
public void close() {
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
String requestId = msg.getRequestId();
RPCFuture future = pendingRPC.get(requestId);
if(future != null){
pendingRPC.remove(requestId);
future.done(msg);
}
}
public RPCFuture sendRequest(RpcRequest request){
final CountDownLatch latch = new CountDownLatch(1);
RPCFuture rpcFuture = new RPCFuture(request);
pendingRPC.put(request.getRequestId(),rpcFuture);
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
latch.countDown();
}
});
try {
latch.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
return rpcFuture;
}
}
......@@ -111,6 +111,17 @@ public class ServiceDiscovery {
return zk;
}
public void stop(){
if(zookeeper!=null){
try {
zookeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public String getRegistryAddress() {
return registryAddress;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment