作者:微信小助手
发布时间:2024-12-27T22:19:18
大家好,今天我要和大家分享一个有趣的话题:那些著名的开源框架是如何使用Netty的。这些年我在研究这些框架源码的过程中,发现了很多精妙的设计。让我们一起来扒一扒他们的"老底"。 先说说RocketMQ。作为阿里开源的消息中间件,它的网络层完全基于Netty构建。为什么这么说?让我给大家看点有意思的代码。 有意思的是,RocketMQ把网络层抽象得特别好。记得有次我们要基于RocketMQ做二次开发,就特别感激它这种设计: RocketMQ的这套设计教会我们: 说到Dubbo,它可是把Netty用得出神入化。虽然Dubbo支持多种通信框架,但Netty是它的默认选择,而且是最成熟的实现。 有趣的是Dubbo的编解码器设计: WebFlux是Spring 5引入的响应式Web框架,它的底层网络也是用的Netty。说实话,它对Netty的使用特别优雅。 WebFlux是怎么和Netty配合的?看这段代码: 这里面有很多值得学习的设计: ES的集群间通信也是用Netty实现的。它的实现特别有特色: ES的这套网络架构教会我们: 研究这些框架的源码,我学到了很多: 回顾这些框架对Netty的使用,能看出它们都有各自的特点: 这些设计思路都值得我们学习。说实话,看这些优秀的开源项目源码,就像在和高手过招,每次都能学到新东西。 你们在看这些框架源码时,有没有发现什么有趣的设计?欢迎在评论区分享你的发现! 至此,我们的Netty系列就告一段落了。回顾这些文章,我们从NIO聊到Netty,从原理聊到实践,从基础组件聊到开源框架,算是对Netty有了一个全面的认识。希望这些内容对大家有帮助!解密主流框架中的Netty应用
RocketMQ:消息队列的艺术
网络层设计
public class NettyRemotingServer {
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
public NettyRemotingServer() {
this.serverBootstrap = new ServerBootstrap();
// boss就配一个线程,够用了
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyBossSelector_"));
// worker线程看配置和压力来定
this.workerGroup = new NioEventLoopGroup(
NettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("NettyServerWorkerSelector_")
);
}
}// 请求-响应模型
public class RequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
// 1. 生成请求ID,保证请求唯一性
request.setOpaque(requestId.getAndIncrement());
// 2. 创建响应Future
ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(),
timeoutMillis);
// 3. 发送请求
channel.writeAndFlush(request);
// 4. 等待响应
return responseFuture.waitResponse();
}
}
Dubbo:RPC的智慧
多协议支持
public class NettyServer {
public void doOpen() {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(
Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 这个Pipeline设计得很赞
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new DubboCodec());
pipeline.addLast("handler", new HeaderExchangeHandler());
}
});
}
}public class DubboCodec extends ByteToMessageDecoder {
@Override
protected Object decode(Channel channel, ByteBuf input) {
byte[] header = new byte[16];
input.readBytes(header);
// Dubbo的魔数检查,这个设计我们可以学习
if (header[0] != MAGIC_HIGH || header[1] != MAGIC_LOW) {
throw new IOException("Error magic number!");
}
// 解析协议版本、请求类型等
return decodebody(...);
}
}Spring WebFlux:响应式的艺术
响应式处理
public class HttpServerRoutes {
RouterFunction
.route(GET("/hello"),
request -> ServerResponse.ok()
.body(Mono.just("Hello, Reactive!"), String.class))
.andRoute(POST("/upload"),
request -> request.bodyToFlux(DataBuffer.class)
.map(buffer -> handleBuffer(buffer))
.then(ServerResponse.ok().build()));
}public class ReactorHttpHandlerAdapter extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
NettyServerHttpRequest adaptedRequest =
new NettyServerHttpRequest(ctx, request);
// 转换成响应式流处理
handle(adaptedRequest)
.doOnSuccess(response ->
response.writeWith(Flux.just(buffer))
.subscribe())
.subscribe();
}
}
}
Elasticsearch:搜索引擎的秘密
public class Netty4Transport extends TcpTransport {
protected void doStart() {
bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(settings.get(WORKER_COUNT)))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
// 看看这个Pipeline的设计
ch.pipeline()
.addLast("ssl", new SslHandler())
.addLast("size", new MessageSizeHandler())
.addLast("compress", new MessageCompressionHandler())
.addLast("dispatcher", new MessageDispatcher());
}
});
}
}
经验总结
1. 协议设计
2. 性能优化
3. 可靠性保障
4. 代码设计
写在最后