适合新手:从头开始开发即时消息服务器(基于网络,完整的源代码)

这篇文章是由“园尔文”分享的,博客:juejin.im/user/5cefab8451882510eb758606,有变化和修改。

介绍

网站管理员提示:这篇文章适合即时通讯初学者阅读,但最好有一些网络编程经验。实用代码是网络编程。如果你对网络编程和即时通讯的一些理论知识知之甚少,请先阅读:“初学者入门就够了:从头开始开发移动即时通讯”。本文整理了即时消息小白分类的详细理论数据,请根据需要补充相关知识。

支持源代码:这篇文章相对简单,但不太容易理解。建议将其与代码一起阅读。请从“11。下载完整的源代码”。

(这篇文章同时在http://www.52im.net/thread-2768-1-1.html发表)

1.内容摘要

首先,让我们谈谈即时通讯技术可以用来做什么:

1)聊天:qq、微信; 2)直播:斗鱼直播、抖音; 3)实时位置共享、游戏多人互动等等。

可以说,几乎所有高实时应用场景都需要即时消息技术。

本文将带您从头开始构建一个轻量级即时消息服务器。

虽然麻雀很小,装备也很好,但是我们建立的即时消息服务器有以下功能:

1)一对一的文本消息、文件消息通信; 2)每个消息有“已发送”/“已送达”/“已读”回执; 3)存储离线消息; 4)支持用户登录,好友关系等基本功能; 5)能够方便地水平扩展。

通过这个项目,你可以学到很多后端知识:

1)rpc通信; 2)数据库; 3)缓存; 4)消息队列; 5)分布式、高并发的架构设计; 6)docker部署。

2.相关文章

更实用的代码参考:

《开源移动端IM技术框架MobileIMSDK》(* 推荐) 《自已开发IM有那么难吗?手把手教你自撸一个Andriod版简易IM (有源码)》 《一种Android端IM智能心跳算法的设计与实现探讨(含样例代码)》 《手把手教你用Netty实现网络通信程序的心跳机制、断线重连机制》 《NIO框架入门(一):服务端基于Netty4的UDP双向通信Demo演示 [附件下载]》 《NIO框架入门(二):服务端基于MINA2的UDP双向通信Demo演示 [附件下载]》 《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战 [附件下载]》 《NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战 [附件下载]》 《一个WebSocket实时聊天室Demo:基于node.js+socket.io [附件下载]》

关于即时通讯架构的文章:

《浅谈IM系统的架构设计》 《简述移动端IM开发的那些坑:架构设计、通信协议和客户端》 《一套海量在线用户的移动端IM架构设计实践分享(含详细图文)》 《一套原创分布式即时通讯(IM)系统理论架构方案》 《从零到卓越:京东客服即时通讯系统的技术架构演进历程》 《蘑菇街即时通讯/IM服务器开发之架构选择》 《一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践》

3.消息通信

3.1短信

让我们从最简单的特性开始:发送一条普通的消息。

信息的格式如下:

message ChatMsg{ id= 1; //消息id fromId = Alice //发送者userId destId = Bob //接收者userId msgBody = hello //消息体 }

如上图所示,我们现在有两个用户:连接到服务器的爱丽丝和鲍勃。当爱丽丝向鲍勃发送消息(hello)时,服务器接收到该消息,并根据消息的目的地将其转发给鲍勃。

展开全文

3.2发送收据

我们如何发送收据?

我们定义了回执数据格式ACK,MsgType有三种类型,即发送、传递、读取。

信息的格式如下:

message AckMsg { id; //消息id fromId; //发送者id destId; //接收者id msgType; //消息类型 ackMsgId; //确认的消息id } enum MsgType { DELIVERED; READ; }

当服务器接收到来自爱丽丝的消息时:

1)向爱丽丝发送一个已发送(你好)的消息,表示该消息已发送到服务器:

message AckMsg { id= 2; fromId = Alice; destId = Bob; msgType = SENT; ackMsgId = 1; }

2)在服务器将hello转发给鲍勃后,它会立即将已发送的(hello)发送给爱丽丝,以指示消息已发送给鲍勃:

message AckMsg { id= 3; fromId = Bob; destId = Alice; msgType = DELIVERED; ackMsgId = 1; }

3)在3)鲍勃读取消息后,客户端向服务器发送read(hello)以指示消息已被读取:

message AckMsg { id= 4; fromId = Bob; destId = Alice; msgType = READ; ackMsgId = 1; }

该消息将像普通聊天消息一样由服务器处理,并最终发送给爱丽丝。

服务器中没有区分ChatMsg和AckMsg,过程是相同的:消息的destId被解析和转发。

4.增加耕地面积

随着用户数量的增加,有必要增加服务器的数量,并且用户的连接分散在不同的机器上。此时,有必要存储用户连接到的机器。

我们引入了一个新模块来管理用户的连接信息。

4.1管理用户状态

模块称为用户状态,有三个接口:

public interface UserStatusService { /** * 用户上线,存储userId与机器id的关系 * * @param userId * @param connectorId * @return 如果当前用户在线,则返回他连接的机器id,否则返回null */ String online(String userId, String connectorId); /** * 用户下线 * * @param userId */ voidoffline(String userId); /** * 通过用户id查找他当前连接的机器id * * @param userId * @return */ String getConnectorId(String userId); }

这样,我们可以管理用户的连接状态。具体实现应该考虑用户数量和服务的预期性能。

这里我们使用redis来实现,并以键值的形式存储用户标识和连接器标识之间的关系。

4.2消息转发

此外,需要一个模块在不同的机器上转发消息,其结构如下:

此时,我们的服务分为两个模块,连接器模块用于维护用户的长链接,传输用于在多个连接器之间转发消息。

现在爱丽丝和鲍勃连接到两个连接器上,那么信息将如何传递呢?

1)当爱丽丝上线并连接到机器[1]:

1.1)将爱丽丝及其连接存储在内存中。

1.2)调用用户状态在线方法在线记录爱丽丝。

2)爱丽丝给鲍勃发了一条信息:

2.1)收到消息后,机器[1]解析destId并在内存中查找鲍勃。

2.2)如果不是,这意味着鲍勃没有连接到这台机器,那么它将被转发到传输。

3)转接呼叫用户状态的GetconnectionId(Bob)方法,以找到Bob所连接的连接器,返回到机器[2],并将其转发到机器[2]。

流程图:

4.3总结

引入用户状态模块来管理用户连接,传输模块在不同的机器之间转发,从而可以水平扩展服务。为了满足实时转发,传输需要与每个连接器机器保持长链路。

5.离线消息

如果用户当前不在线,则在用户下次在线后,必须保留并再次推送该消息。mysql在这里用来存储离线消息。

为了便于横向扩展,我们使用消息队列来解耦:

1)如果在1)传送接收到消息后发现用户不在线,将被发送到消息队列进行入库;

2)当用户登录时,服务器从Curilla获取离线消息进行推送。

6、用户登录,朋友关系

用户的注册和登录、帐户管理和朋友关系链等功能更适合使用http协议,因此我们将该模块变成一个restful服务,并将http接口暴露给外部供客户端调用。

这就完成了服务器的基本架构:

7.中场休息......

本文的上述内容,本文帮助每个人构建即时消息服务器架构,但是仍然有许多细节需要我们思考。

例如:

1)如何保证消息的顺序和唯一 2)多个设备在线如何保证消息一致性 3)如何处理消息发送失败 4)消息的安全性 5)如果要存储聊天记录要怎么做 6)数据库分表分库 7)服务高可用 ……

~请继续阅读第二部分了解更多详情~

8.可靠性

什么是可靠性?对于即时消息系统来说,一个可靠的定义至少是不丢失消息,消息不重复,消息没有乱序。只有满足这三点,才能说是一次好的聊天经历。

8.1不要丢失新闻

让我们从永不丢失消息开始。

首先回顾上一节中设计的服务器架构:

让我们从一个简单的例子开始:当爱丽丝给鲍勃发信息时,她可能需要通过这样一个链接:

1)client-->connecter 2)connector-->transfer 3)transfer-->connector 4)connector-->client

整个环节的每个环节都可能出现问题。虽然tcp协议是可靠的,但它只能保证链路层的可靠性,而不能保证应用层的可靠性。

例如,在第一步中,连接器接收到来自客户端的消息,但未能转发传输,则鲍勃无法接收到该消息,爱丽丝也不会意识到该消息未能发送。

如果鲍勃离线,则消息链接为:

1)client-->connector 2)connector-->transfer 3)transfer-->mq

如果在第三步中,传输从连接器接收到消息,但是离线消息未能被存储,则消息也未能被传递。

为了确保应用层的可靠性,我们必须有一个确认机制,使发送方能够确认对方已经收到消息。

具体来说,我们模仿tcp协议,在应用层建立ack机制。

Tcp的消息以字节为单位,而我们以消息为单位。

每次发送方发送消息时,它都必须等待对方的确认响应。ack确认消息应该携带接收到的id供发送方识别。

其次,发送方需要维护一个等待确认的队列。每次发送消息时,消息和计时器都会排队。

此外,还有一个线程一直在轮询队列。如果超时且没有收到确认,它会取出消息并重新发送。

有两种方法可以处理超时未收到ack的消息:

1)像tcp一样连续发送,直到收到确认。

2)设置最大重试次数。如果没有收到超过该数量的ack,将使用故障机制进行处理,从而节省资源。例如,如果连接器长时间没有从客户端接收到ack,它可以主动断开与客户端的连接,剩余的未发送消息将作为脱机消息存储。断开连接后,客户端可以尝试重新连接到服务器。

8.2不重复,不混乱

有时,由于网络原因,ack可能接收缓慢,发送方会重复发送,因此接收方必须有重复数据消除机制。

消除重复的方法是为每封邮件添加一个唯一的id。这个唯一的id不一定是全局的,它只需要在会话中是唯一的。例如,两个人或一群人之间的对话。如果网络断开,重新连接后,将是一个新会话,id将从0重新开始。

有关消息标识生成算法的文章,请参考:

《融云技术分享:解密融云IM产品的聊天消息ID生成策略》 《微信技术分享:微信的海量IM聊天消息序列号生成实践(算法原理篇)》 《微信技术分享:微信的海量IM聊天消息序列号生成实践(容灾方案篇)》 《美团技术分享:深度解密美团的分布式ID生成算法》

接收者需要维护当前会话中接收的最后一条消息的id,称为lastId。

每次收到新消息时,都会将该id与lastId进行比较,看它是否连续。如果它不是连续的,它将被放入临时队列中以便以后处理。

例如:

1)当前会话的lastId=1,然后服务器接收消息msg(id=2),可以判断接收到的消息是连续的,处理该消息,并将lastId修改为2;

2)但是,如果服务器接收到消息msg(标识=3),则表示消息到达时出现故障,然后将消息排队,等到lastId变为2(即服务器接收到消息msg(标识=2)并进行处理),然后取出消息进行处理。

因此,判断消息是否重复只需判断msgId >即可;lastId & amp& amp!Queue.contains(msgId)。如果接收到重复的消息,可以判断ack没有被传送,并且ack被再次发送。

接收方收到消息后的完整处理流程如下:

伪代码如下:

class ProcessMsgNode{ /** * 接收到的消息 */ privateMessage message; /** * 处理消息的方法 */ privateConsumer<Message> consumer; } public CompletableFuture<Void> offer(Long id,Message message,Consumer<Message> consumer) { if(isRepeat(id)) { //消息重复 sendAck(id); return null; } if(!isConsist(id)) { //消息不连续 notConsistMsgMap.put(id, newProcessMsgNode(message, consumer)); return null; } //处理消息 returnprocess(id, message, consumer); } private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) { return CompletableFuture .runAsync(() -> consumer.accept(message)) .thenAccept(v -> sendAck(id)) .thenAccept(v -> lastId.set(id)) .thenComposeAsync(v -> { Long nextId = nextId(id); if(notConsistMsgMap.containsKey(nextId)) { //队列中有下个消息 ProcessMsgNode node = notConsistMsgMap.get(nextId); returnprocess(nextId, node.getMessage(), consumer); } else{ //队列中没有下个消息 CompletableFuture<Void> future = newCompletableFuture<>(); future.complete(null); returnfuture; } }) .exceptionally(e -> { logger.error("[process received msg] has error", e); returnnull; }); }

9.安全

无论是聊天记录还是离线消息,备份肯定会存储在服务器中,因此消息的安全性和客户隐私的保护也非常重要。

因此,所有消息都必须加密。

在存储模块中,有两个维护用户信息和关系链的基本表,即im_user表和im _ relationship链表。

im_user表用于存放用户常规信息,例如用户名密码等,结构比较简单。 im_relation表用于记录好友关系。

的结构如下:

CREATE TABLE `im_relation` ( `id` bigint(20) COMMENT '关系id', `user_id1` varchar(100) COMMENT '用户1id', `user_id2` varchar(100) COMMENT '用户2id', `encrypt_key` char(33) COMMENT 'aes密钥', `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP, `gmt_update` timestamp DEFAUL TCURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARYKEY(`id`), UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`) );

1)user_id1和user_id2是彼此友好的用户id。为了避免重复,它们根据user _ id1 & lt进行存储。User_id2按顺序存储,并添加一个联合索引。

2)encrypt_key是随机生成的密钥。当客户端登录时,它将从数据库中获取用户的所有关系,并将它们存储在内存中,以供后续加密和解密。

3)当客户端向朋友发送消息时,内存中的关系密钥被取出,加密后发送。类似地,当接收到消息时,检索相应的密钥进行解密。

完整的客户端登录过程如下:

1)客户端调用rest接口登录;

2)客户端调用rest接口获取所有关系;用户的;

3)客户端向连接器发送问候消息进行在线通知;

4)连接器提取离线消息并将其推送到客户端;;

5)连接器更新用户会话。

那么为什么连接器在更新会话之前会推送离线消息?

让我们想想如果顺序颠倒会发生什么:

1)用户爱丽丝登录服务器;

2)连接器更新会话;;

3)推送离线消息;

这时鲍勃给爱丽丝发了一条信息。

如果离线消息仍在推送过程中,鲍勃会向爱丽丝发送新消息,服务器会获取爱丽丝的会话,并立即推送。此时,新消息可能会在一堆离线消息中被推送,然后爱丽丝的消息就会出故障。

我们必须确保离线消息的顺序先于新消息。

然后,如果首先推送离线消息,则会话将被更新。在离线消息推送过程中,爱丽丝的状态是“不在线”。此时,鲍勃新发送的消息将只放入im_offline,并且“在线”将不会开始接受新消息,直到im_offline表中的数据被读出。这也避免了混乱。

10.存储设计

10.1存储离线消息

当用户不在线时,离线消息必须存储在服务器中,等待用户上线后再推送。在理解了上一节之后,存储离线消息变得非常容易。

使用以下结构添加脱机消息表im _ offline:

CREATE TABLE `im_offline` ( `id` int(11) COMMENT '主键', `msg_id` bigint(20) COMMENT '消息id', `msg_type` int(2) COMMENT '消息类型', `content` varbinary(5000) COMMENT '消息内容', `to_user_id` varchar(100) COMMENT '收件人id', `has_read` tinyint(1) COMMENT '是否阅读', `gmt_create` timestamp COMMENT '创建时间', PRIMARY KEY(`id`) );

Msg_type用于区分消息类型(聊天、确认)。加密的消息内容以字节数组的形式存储。

当用户上线时,可以根据条件to_user_id= user id来拉记录。

10.2防止离线消息的重复推送

让我们考虑一下多终端登录。爱丽丝有两个设备同时登录。在这种并发情况下,我们需要一些机制来确保离线消息只被读取一次。

此处使用化学文摘社机制:

1)首先,取出所有has_read=false的字段;

2)检查每条消息的has_read值是否为假,如果是,则更改为真。这是一个原子操作:

1 updateim _ offline set has _ read = true where id = $ { msg _ id },has_read = false

3)修改成功时推送,修改不成功时不推送。

我相信到这个时候,学生们已经可以自己构建一个完整的、可用的即时消息服务器了。

11、完整的源代码下载

从头开始开发即时消息服务器(完整的源代码)。zip (或从github下载:https://github.com/52im/im)

附录:更多即时消息开发文章

[1] 更多IM代码实践(适合新手): 《自已开发IM有那么难吗?手把手教你自撸一个Andriod版简易IM (有源码)》 《一种Android端IM智能心跳算法的设计与实现探讨(含样例代码)》 《手把手教你用Netty实现网络通信程序的心跳机制、断线重连机制》 《详解Netty的安全性:原理介绍、代码演示(上篇)》 《详解Netty的安全性:原理介绍、代码演示(下篇)》 《微信本地数据库破解版(含iOS、Android),仅供学习研究 [附件下载]》 《Java NIO基础视频教程、MINA视频教程、Netty快速入门视频 [有源码]》 《轻量级即时通讯框架MobileIMSDK的iOS源码(开源版)[附件下载]》 《开源IM工程“蘑菇街TeamTalk”2015年5月前未删减版完整代码 [附件下载]》 《微信本地数据库破解版(含iOS、Android),仅供学习研究 [附件下载]》 《NIO框架入门(一):服务端基于Netty4的UDP双向通信Demo演示 [附件下载]》 《NIO框架入门(二):服务端基于MINA2的UDP双向通信Demo演示 [附件下载]》 《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战 [附件下载]》 《NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战 [附件下载]》 《用于IM中图片压缩的Android工具类源码,效果可媲美微信 [附件下载]》 《高仿Android版手机QQ可拖拽未读数小气泡源码 [附件下载]》 《一个WebSocket实时聊天室Demo:基于node.js+socket.io [附件下载]》 《Android聊天界面源码:实现了聊天气泡、表情图标(可翻页) [附件下载]》 《高仿Android版手机QQ首页侧滑菜单源码 [附件下载]》 《开源libco库:单机千万连接、支撑微信8亿用户的后台框架基石 [源码下载]》 《分享java AMR音频文件合并源码,全网最全》 《微信团队原创Android资源混淆工具:AndResGuard [有源码]》 《一个基于MQTT通信协议的完整Android推送Demo [附件下载]》 《Android版高仿微信聊天界面源码 [附件下载]》 《高仿手机QQ的Android版锁屏聊天消息提醒功能 [附件下载]》 《高仿iOS版手机QQ录音及振幅动画完整实现 [源码下载]》 《Android端社交应用中的评论和回复功能实战分享[图文+源码]》 《Android端IM应用中的@人功能实现:仿微博、QQ、微信,零入侵、高可扩展[图文+源码]》 《仿微信的IM聊天时间显示格式(含iOS/Android/Web实现)[图文+源码]》 《Android版仿微信朋友圈图片拖拽返回效果 [源码下载]》 《适合新手:从零开发一个IM服务端(基于Netty,有完整源码)》 >> 更多同类文章 …… [2] IM群聊相关的技术文章: 《快速裂变:见证微信强大后台架构从0到1的演进历程(一)》 《如何保证IM实时消息的“时序性”与“一致性”?》 《IM单聊和群聊中的在线状态同步应该用“推”还是“拉”?》 《IM群聊消息如此复杂,如何保证不丢不重?》 《微信后台团队:微信后台异步消息队列的优化升级实践分享》 《移动端IM中大规模群消息的推送如何保证效率、实时性?》 《现代IM系统中聊天消息的同步和存储方案探讨》 《关于IM即时通讯群聊消息的乱序问题讨论》 《IM群聊消息的已读回执功能该怎么实现?》 《IM群聊消息究竟是存1份(即扩散读)还是存多份(即扩散写)?》 《一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践》 《[技术脑洞] 如果把14亿中国人拉到一个微信群里技术上能实现吗?》 《IM群聊机制,除了循环去发消息还有什么方式?如何优化?》 《网易云信技术分享:IM中的万人群聊技术方案实践总结》 >> 更多同类文章 …… [3] 有关IM架构设计的文章: 《浅谈IM系统的架构设计》 《简述移动端IM开发的那些坑:架构设计、通信协议和客户端》 《一套海量在线用户的移动端IM架构设计实践分享(含详细图文)》 《一套原创分布式即时通讯(IM)系统理论架构方案》 《从零到卓越:京东客服即时通讯系统的技术架构演进历程》 《蘑菇街即时通讯/IM服务器开发之架构选择》 《腾讯QQ1.4亿在线用户的技术挑战和架构演进之路PPT》 《微信后台基于时间序的海量数据冷热分级架构设计实践》 《微信技术总监谈架构:微信之道——大道至简(演讲全文)》 《如何解读《微信技术总监谈架构:微信之道——大道至简》》 《快速裂变:见证微信强大后台架构从0到1的演进历程(一)》 《17年的实践:腾讯海量产品的技术方法论》 《移动端IM中大规模群消息的推送如何保证效率、实时性?》 《现代IM系统中聊天消息的同步和存储方案探讨》 《IM开发基础知识补课(二):如何设计大量图片文件的服务端存储架构?》 《IM开发基础知识补课(三):快速理解服务端数据库读写分离原理及实践建议》 《IM开发基础知识补课(四):正确理解HTTP短连接中的Cookie、Session和Token》 《WhatsApp技术实践分享:32人工程团队创造的技术神话》 《微信朋友圈千亿访问量背后的技术挑战和实践总结》 《王者荣耀2亿用户量的背后:产品定位、技术架构、网络方案等》 《IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?》 《腾讯资深架构师干货总结:一文读懂大型分布式系统设计的方方面面》 《以微博类应用场景为例,总结海量社交系统的架构设计步骤》 《快速理解高性能HTTP服务端的负载均衡技术原理》 《子弹短信光鲜的背后:网易云信首席架构师分享亿级IM平台的技术实践》 《知乎技术分享:从单机到2000万QPS并发的Redis高性能缓存实践之路》 《IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列》 《微信技术分享:微信的海量IM聊天消息序列号生成实践(算法原理篇)》 《微信技术分享:微信的海量IM聊天消息序列号生成实践(容灾方案篇)》 《新手入门:零基础理解大型分布式架构的演进历史、技术原理、最佳实践》 《一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践》 《阿里技术分享:深度揭秘阿里数据库技术方案的10年变迁史》 《阿里技术分享:阿里自研金融级数据库OceanBase的艰辛成长之路》 《社交软件红包技术解密(一):全面解密QQ红包技术方案——架构、技术实现等》 《社交软件红包技术解密(二):解密微信摇一摇红包从0到1的技术演进》 《社交软件红包技术解密(三):微信摇一摇红包雨背后的技术细节》 《社交软件红包技术解密(四):微信红包系统是如何应对高并发的》 《社交软件红包技术解密(五):微信红包系统是如何实现高可用性的》 《社交软件红包技术解密(六):微信红包系统的存储层架构演进实践》 《社交软件红包技术解密(七):支付宝红包的海量高并发技术实践》 《社交软件红包技术解密(八):全面解密微博红包技术方案》 《社交软件红包技术解密(九):谈谈手Q红包的功能逻辑、容灾、运维、架构等》 《即时通讯新手入门:一文读懂什么是Nginx?它能否实现IM的负载均衡?》 《即时通讯新手入门:快速理解RPC技术——基本概念、原理和用途》 《多维度对比5款主流分布式MQ消息队列,妈妈再也不担心我的技术选型了》 《从游击队到正规军:马蜂窝旅游网的IM系统架构演进之路》 《IM开发基础知识补课(六):数据库用NoSQL还是SQL?读这篇就够了!》 >> 更多同类文章 ……

(这篇文章同时在http://www.52im.net/thread-2768-1-1.html发表)回到搜狐看更多

负责任的编辑: