提交之GitHub

This commit is contained in:
2026-02-08 23:58:00 +08:00
commit b4f25e99b1
43 changed files with 7926 additions and 0 deletions
+64
View File
@@ -0,0 +1,64 @@
package server;
import server.data.ServerData;
import util.FileUtil;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Server {
public static void main(String[] args) {
//启动服务器
ServerMainThread serverThread = new ServerMainThread();
serverThread.start();
//启动定时任务:保存服务器数据
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
t.setDaemon(true); // 设置为守护线程
return t;
});
// 每隔半小时执行一次
scheduler.scheduleAtFixedRate(() -> {
System.out.println("保存数据中");
FileUtil.saveServerData();
}, 1200, 1200, TimeUnit.SECONDS);
Scanner sc = new Scanner(System.in);
while (true) {
// System.out.print("SERVER_CMD>>");
String cmd = sc.nextLine();
switch (cmd) {
case "shutdown":
serverThread.shutdown();
System.out.println("=======服务器已关闭=======");
System.exit(0);
break;
case "groupInfo":
System.out.println("=======群聊列表=======");
ServerData.getInstance().getServerGroups().values().forEach(
group -> System.out.println(group.getGroupId() + " " + group.getGroupName())
);
break;
case "chatThreadStatus":
System.out.println("=======聊天线程状态=======");
System.out.println(ServerMainThread.getChatThreadPoolStatus());
break;
case "receiveThreadStatus":
System.out.println("=======接收线程状态=======");
System.out.println(ServerMainThread.getReceiveThreadPoolStatus());
break;
case "blockingQueueStatus":
System.out.println("=======阻塞队列状态=======");
System.out.println(ServerMainThread.getBlockingQueueStatus());
break;
default:
System.out.println("无效指令");
}
}
}
}
+224
View File
@@ -0,0 +1,224 @@
package server;
import server.serveice.*;
import util.FileUtil;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import static global.global.SERVER_PORT;
public class ServerMainThread extends Thread {
// 服务器运行状态
private static volatile boolean running = true;
// 线程池:用于处理聊天消息的线程池
private static ExecutorService chatThreadPool;
// 线程池:用于接收客户端消息的线程池
private static ExecutorService receiveThreadPool;
// 用于存储每个客户端的消息队列
private static ConcurrentHashMap<Socket, ArrayBlockingQueue<Wrapper>> msgQueues;
// 核心:启动服务、监听端口、循环接收客户端连接
@Override
public void run() {
System.out.println("加载本地数据成功");
// 初始化推送信息线程池
chatThreadPool = new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new SynchronousQueue<>(), // 直接提交队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
receiveThreadPool = new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new SynchronousQueue<>(), // 直接提交队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 初始化消息队列
msgQueues = new ConcurrentHashMap<>();
System.out.println("初始化信息线程池成功");
System.out.println("服务器启动成功");
try {
ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
while (running) {
// 扫描端口,接收链接请求,如果有链接请求,则尝试链接
Socket clientSocket = serverSocket.accept();
System.out.println("有新的用户端连接: " + clientSocket.getPort());
// 创建线程,处理客户端请求
ArrayBlockingQueue<Wrapper> threadQueue = new ArrayBlockingQueue<>(40);
msgQueues.put(clientSocket, threadQueue);
ClientChatThread clientChatThread = new ClientChatThread(clientSocket, threadQueue);
chatThreadPool.submit(clientChatThread);
Thread.sleep(200);
ClientReceiveThread clientReceiveThread = new ClientReceiveThread(clientSocket, threadQueue);
receiveThreadPool.submit(clientReceiveThread);
System.out.println("创建线程成功");
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
// 关闭服务器
public void shutdown() {
FileUtil.saveServerData();
// 向所有用户发送服务器关闭信息。
if (msgQueues != null) {
Wrapper exitMsg = new Wrapper(global.global.OPT_EXIT);
for (ArrayBlockingQueue<Wrapper> queue : msgQueues.values()) {
try {
queue.put(exitMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
running = false;
}
// 检查服务器是否运行
public static boolean isRunning() {
return running;
}
// 检查chatThreadPool状态
public static Map<String, Object> getChatThreadPoolStatus() {
Map<String, Object> status = new HashMap<>();
if (chatThreadPool == null) {
status.put("error", "线程池未初始化");
return status;
}
// 检查是否为 ThreadPoolExecutor
if (chatThreadPool instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) chatThreadPool;
status.put("poolSize", tpe.getPoolSize());
status.put("activeCount", tpe.getActiveCount());
status.put("corePoolSize", tpe.getCorePoolSize());
status.put("maximumPoolSize", tpe.getMaximumPoolSize());
status.put("largestPoolSize", tpe.getLargestPoolSize());
status.put("queueSize", tpe.getQueue().size());
status.put("completedTaskCount", tpe.getCompletedTaskCount());
status.put("taskCount", tpe.getTaskCount());
status.put("isShutdown", tpe.isShutdown());
status.put("isTerminated", tpe.isTerminated());
}
// 检查是否为 ForkJoinPool
else if (chatThreadPool instanceof ForkJoinPool) {
ForkJoinPool fjp = (ForkJoinPool) chatThreadPool;
status.put("poolSize", fjp.getPoolSize());
status.put("activeCount", fjp.getActiveThreadCount());
status.put("parallelism", fjp.getParallelism());
status.put("runningThreadCount", fjp.getRunningThreadCount());
status.put("queuedTaskCount", fjp.getQueuedTaskCount());
status.put("queuedSubmissionCount", fjp.getQueuedSubmissionCount());
status.put("stealCount", fjp.getStealCount());
}
// 其他类型的 ExecutorService
else {
// 使用反射尝试获取信息
status.put("type", chatThreadPool.getClass().getName());
status.put("info", "无法直接获取详细状态");
}
return status;
}
// 检查receiveThreadPool的状态
public static Map<String, Object> getReceiveThreadPoolStatus() {
Map<String, Object> status = new HashMap<>();
if (receiveThreadPool == null) {
status.put("error", "线程池未初始化");
return status;
}
// 检查是否为 ThreadPoolExecutor
if (receiveThreadPool instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) receiveThreadPool;
status.put("poolSize", tpe.getPoolSize());
status.put("activeCount", tpe.getActiveCount());
status.put("corePoolSize", tpe.getCorePoolSize());
status.put("maximumPoolSize", tpe.getMaximumPoolSize());
status.put("largestPoolSize", tpe.getLargestPoolSize());
status.put("queueSize", tpe.getQueue().size());
status.put("completedTaskCount", tpe.getCompletedTaskCount());
status.put("taskCount", tpe.getTaskCount());
status.put("isShutdown", tpe.isShutdown());
status.put("isTerminated", tpe.isTerminated());
}
// 检查是否为 ForkJoinPool
else if (receiveThreadPool instanceof ForkJoinPool) {
ForkJoinPool fjp = (ForkJoinPool) receiveThreadPool;
status.put("poolSize", fjp.getPoolSize());
status.put("activeCount", fjp.getActiveThreadCount());
status.put("parallelism", fjp.getParallelism());
status.put("runningThreadCount", fjp.getRunningThreadCount());
status.put("queuedTaskCount", fjp.getQueuedTaskCount());
status.put("queuedSubmissionCount", fjp.getQueuedSubmissionCount());
status.put("stealCount", fjp.getStealCount());
}
// 其他类型的 ExecutorService
else {
// 使用反射尝试获取信息
status.put("type", receiveThreadPool.getClass().getName());
status.put("info", "无法直接获取详细状态");
}
return status;
}
// 检查阻塞队列状态
public static List<String> getBlockingQueueStatus() {
List<String> queueStatus = new ArrayList<>();
if (msgQueues == null) {
queueStatus.add("阻塞队列未初始化");
return queueStatus;
}
msgQueues.forEach(
(key, value) -> queueStatus.add(key + ": " + value.size()));
return queueStatus;
}
/**
* 用于删除不需要的阻塞队列
* 这里只能ClientChatThread调用
*
* @param socket 对应的客户端的socket
*/
public static void dropMsgQueue(Socket socket) {
if (msgQueues == null) {
System.out.println("意外:主线程为初始化的情况下调用了dropMsgQueue");
return;
}
if (msgQueues.containsKey(socket)) {
msgQueues.remove(socket);
} else {
System.out.println("意外:尝试删除不存在的阻塞队列");
}
}
}
+106
View File
@@ -0,0 +1,106 @@
package server.data;
import java.io.Serializable;
import java.util.TreeSet;
/**
* 群聊信息类,用于保存聊天室的具体信息。
*/
public class GroupData implements Serializable {
private static final long serialVersionUID = 4303981922076715842L;
public class GroupMember implements Comparable<GroupMember>, Serializable {
private static final long serialVersionUID = 585007162886079570L;
public String id;
public boolean isOut;
public GroupMember(String id) {
this.id = id;
isOut = true;
}
@Override
public int compareTo(GroupMember o) {
return this.id.compareTo(o.id);
}
}
// 群聊id
private String groupId;
private String groupName;
private GroupMember groupOwner;
private TreeSet<GroupMember> members;
public GroupData(String groupId, String groupName, String groupOwner) {
this.groupName = groupName;
this.groupId = groupId;
this.groupOwner = new GroupMember(groupOwner);
members = new TreeSet<>();
}
public GroupData(String groupId) {
this.groupId = groupId;
this.groupName = "TEMP_TEST";
}
// 添加组员
public void addMember(String id) {
members.add(new GroupMember(id));
}
// 移除组员
public boolean removeMember(String id) {
GroupMember temp = new GroupMember(id);
if (members.contains(temp)) {
members.remove(new GroupMember(id));
return true;
} else {
// System.out.println("组员移除失败. groupId: " + groupId);
return false;
}
}
public int getMemberCount() {
return members.size();
}
// 获取群聊id
public String getGroupId() {
return groupId;
}
// 获取群聊名称
public String getGroupName() {
return groupName;
}
// 获取群主
public GroupMember getGroupOwner() {
return groupOwner;
}
// 获取群成员
public TreeSet<GroupMember> getMembers() {
return members;
}
// 设置群主
public void setGroupOwner(String id) {
this.groupOwner = new GroupMember(id);
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Override
public String toString() {
return "GroupData{" +
"groupId='" + groupId + '\'' +
", groupName='" + groupName + '\'' +
", groupOwner=" + groupOwner.id +
", members count=" + members.size() +
'}';
}
}
+294
View File
@@ -0,0 +1,294 @@
package server.data;
import util.FileUtil;
import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
// 服务器数据,单个服务器仅对应一个服务器数据集合。
// 允许存储,用于数据持久化
// 辅助进行数据核验
public class ServerData implements Serializable {
private static final long serialVersionUID = 5016807647175865383L;
private static volatile ServerData instance = null;
// 获取唯一的serverData对象(线程安全的懒加载)
public static ServerData getInstance() {
if (instance == null) {
synchronized (ServerData.class) {
if (instance == null) {
instance = new ServerData();
instance.loadData();
}
}
}
return instance;
}
// 重置实例(用于测试或重新加载)
public static void resetInstance() {
synchronized (ServerData.class) {
instance = null;
}
}
private Map<String, UserData> serverUsers;
private Map<String, GroupData> serverGroups;
private transient boolean dataLoaded = false;
// 处理服务器信息的主类
public ServerData() {
// 初始化空数据
serverUsers = new ConcurrentHashMap<>();
serverGroups = new ConcurrentHashMap<>();
}
// 显式加载数据的方法
public void loadData() {
if (!dataLoaded) {
synchronized (this) {
if (!dataLoaded) {
ServerData loadedData = FileUtil.loadServerData();
if (loadedData != null) {
// 如果加载到了数据,合并到当前实例
if (loadedData.getServerUsers() != null) {
this.serverUsers = loadedData.getServerUsers();
}
if (loadedData.getServerGroups() != null) {
this.serverGroups = loadedData.getServerGroups();
}
System.out.println("服务器数据加载成功,用户数: " +
(serverUsers != null ? serverUsers.size() : 0) +
", 群组数: " + (serverGroups != null ? serverGroups.size() : 0));
} else {
System.out.println("未找到数据文件或加载失败,使用初始化空数据");
}
dataLoaded = true;
}
}
}
}
// 验证数据是否已正确初始化
private void validateData() {
if (serverUsers == null) {
serverUsers = new ConcurrentHashMap<>();
}
if (serverGroups == null) {
serverGroups = new ConcurrentHashMap<>();
}
}
// 保存服务器数据到本地
public void saveServerData() {
validateData(); // 确保数据有效
FileUtil.saveServerData();
}
// 添加用户
public void addUser(UserData userData) {
serverUsers.put(userData.getUserId(), userData);
}
// 移除用户
public void removeUser(String userId) {
serverUsers.remove(userId);
serverGroups.values().forEach(groupData -> {
groupData.removeMember(userId);
if (groupData.getMemberCount() == 0) {
removeGroup(groupData.getGroupId());
}
});
}
// 修改用户名字
public void updateUserName(String userId, String newName) {
serverUsers.get(userId).setNikename(newName);
}
public void updateUserPwd(String userId, String newPwd) {
serverUsers.get(userId).setPassword(newPwd);
}
// 添加群聊
public void addGroup(GroupData groupData) {
serverGroups.put(groupData.getGroupId(), groupData);
// 更新关联用户信息
groupData.getMembers().forEach(member -> {
addGroupToUser(member.id, groupData.getGroupId());
});
}
public void addGroupToUser(String userId, String groupId) {
serverUsers.get(userId).addGroupId(groupId);
}
// 移除群聊
public void removeGroup(String groupId) {
GroupData groupData = serverGroups.remove(groupId);
groupData.getMembers().forEach(member -> {
serverUsers.get(member.id).removeGroup(groupData.getGroupId());
});
}
// 更新群聊信息
public void updateGroupName(String groupId, String newName) {
serverGroups.get(groupId).setGroupName(newName);
}
public void updateGroupOwner(String groupId, String newOwnerId) {
serverGroups.get(groupId).setGroupOwner(newOwnerId);
}
// 添加群聊成员
public void addUserToGroup(String groupId, String userId) {
serverGroups.get(groupId).addMember(userId);
}
// 移除群聊成员
public void removeUserFromGroup(String groupId, String userId) {
serverGroups.values().forEach(groupData -> {
if (groupData.getGroupId().equals(groupId)) {
groupData.removeMember(userId);
}
});
}
// 移除成员的群聊
public void removeGroupFromUser(String userId, String groupId) {
serverUsers.get(userId).removeGroup(groupId);
}
// 获取用户名字,不存在就回复id本身
public String getUserName(String userId) {
if (serverUsers.containsKey(userId))
return serverUsers.get(userId).getNickname();
else
return userId;
}
// 获取用户数据
public UserData getUserData(String userId) {
return serverUsers.get(userId);
}
// 获取群聊名字
public String getGroupName(String groupId) {
return serverGroups.get(groupId).getGroupName();
}
// 判断群聊是否存在
public boolean containsGroup(String groupId) {
return serverGroups.containsKey(groupId);
}
// 获取用户群聊id组
public TreeSet<String> getUserGroups(String userId) {
if (userId == null) {
return new TreeSet<>();
} else {
return serverUsers.get(userId).getGroupIds();
}
}
// 获取群聊的成员组
public TreeSet<GroupData.GroupMember> getGroupUsers(String groupId) {
return serverGroups.get(groupId).getMembers();
}
// 获取群聊的成员id组
public List<String> getGroupMembersId(String groupId) {
List<String> members = new ArrayList<>();
serverGroups.get(groupId).getMembers().forEach(groupMember -> {
members.add(groupMember.id);
});
return members;
}
/**
* 判断账号是否存在
*
* @param userId 账号
* @return true:存在 false:不存在
*/
public boolean IsAccountExist(String userId) {
return serverUsers.containsKey(userId);
}
/**
* 校验账号与密码是否匹配
*
* @param userId 待校验的用户账号
* @param password 待校验的用户密码
* @return 密码匹配返回true,不匹配返回false
*/
public boolean AccountAndPasswordIsMatch(String userId, String password) {
if (serverUsers.get(userId).getPassword().equals(password)) {
return true;
}
return false;
}
/**
* 设置服务器用户数据
*
* @param serverUsers 服务器用户数据映射表
*/
public void setServerUsers(Map<String, UserData> serverUsers) {
this.serverUsers = serverUsers;
}
/**
* 设置服务器群聊数据
*
* @param serverGroups 服务器群聊数据映射表
*/
public void setServerGroups(Map<String, GroupData> serverGroups) {
this.serverGroups = serverGroups;
}
/**
* 获取服务器用户数据映射表
*
* @return 服务器用户数据映射表
*/
public Map<String, UserData> getServerUsers() {
return serverUsers;
}
/**
* 获取服务器用户id到用户名的映射表
*
* @return 服务器用户id到用户名的映射表
*/
public Map<String, String> getIdNameMap() {
Map<String, String> idNameMap = new HashMap<>();
serverUsers.values().forEach(user -> {
idNameMap.put(user.getUserId(), user.getNickname());
});
return idNameMap;
}
/**
* 获取服务器群聊数据映射表
*
* @return 服务器群聊数据映射表
*/
public Map<String, GroupData> getServerGroups() {
return serverGroups;
}
/**
* 获取服务器群聊数据映射表
*
* @return 服务器群聊数据映射表
*/
public GroupData getGroupById(String groupId) {
return serverGroups.get(groupId);
}
}
+158
View File
@@ -0,0 +1,158 @@
package server.data;
import java.io.Serializable;
import java.util.TreeSet;
/**
* 用户信息类,用于保存用户的具体信息。
*/
public class UserData implements Serializable, Comparable<UserData> {
// 序列化版本号,用于版本控制
private static final long serialVersionUID = 2809761558436195616L;
// 用户昵称
private String nikename;
// 用户ID
private String id;
// 用户密码
private String password;
// 所属群聊ID集合
private TreeSet<String> groupIds;
// 好友ID集合
private TreeSet<String> friendIds;
// 用户邮箱
private String email;
// 用户生日
private String birthday;
// 用户地址
private String address;
// 用户签名
private String signature;
public UserData(String nikename, String id, String password) {
this.nikename = nikename;
this.id = id;
this.password = password;
this.groupIds = new TreeSet<>();
this.friendIds = new TreeSet<>();
// 初始化扩展信息为空字符串,避免 null
this.email = "";
this.birthday = "";
this.address = "";
this.signature = "";
}
// 获取安全的副本(不包含密码),用于网络传输
public UserData getSafeCopy() {
UserData copy = new UserData(this.nikename, this.id, null);
copy.setGroupIds(new TreeSet<>(this.groupIds));
copy.setFriendIds(new TreeSet<>(this.friendIds));
copy.setEmail(this.email);
copy.setBirthday(this.birthday);
copy.setAddress(this.address);
copy.setSignature(this.signature);
return copy;
}
public String getNickname() {
return nikename;
}
public void setNikename(String nikename) {
this.nikename = nikename;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserId() {
return id;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public TreeSet<String> getGroupIds() {
return groupIds;
}
public void setGroupIds(TreeSet<String> groupIds) {
this.groupIds = groupIds;
}
public TreeSet<String> getFriendIds() {
return friendIds;
}
public void setFriendIds(TreeSet<String> friendIds) {
this.friendIds = friendIds;
}
public void addFriend(String friendId) {
this.friendIds.add(friendId);
}
public boolean addGroupId(String groupId) {
return this.groupIds.add(groupId);
}
public boolean removeGroupId(Long groupId) {
if (groupIds.contains(groupId.toString())) {
groupIds.remove(groupId.toString());
return true;
}
return false;
}
@Override
public int compareTo(UserData o) {
return this.id.compareTo(o.id);
}
public void removeGroup(String groupId) {
this.groupIds.remove(groupId);
}
// 扩展信息的 Getter 和 Setter
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getBirthday() {
return birthday;
}
public void setBirthday(String birthday) {
this.birthday = birthday;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
}
+993
View File
@@ -0,0 +1,993 @@
package server.serveice;
import global.global;
import server.ServerMainThread;
import server.data.*;
import util.FileUtil;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.*;
/**
* 不可独立创建线程,这个会在内部自主创建线程。
* 每个客户端连接对应一个线程,用于处理客户端的请求。
* 线程启动后,会进入一个循环,等待服务端发送的消息。
* 收到消息后,会根据消息的操作码,调用对应的处理方法。
* 处理完成后,会将结果发送给客户端。
* 线程会在以下情况下退出:
* 1. 服务端关闭
* 2. 客户端主动关闭
* 3. 线程被中断
*/
public class ClientChatThread implements Runnable {
// 群在线用户存储:全局静态、线程安全
// key:用户id value:用户在线输出流
private static final Map<String, ObjectOutputStream> USER_ONLINE_MAP = new ConcurrentHashMap<>();
// 与客户端相连的套接字
private final Socket clientSocket;
// 是否登录
private boolean isLogin = false;
// 用户的账户
private String userId;
// 阻塞队列,用于进行线程的信息交流
private BlockingQueue<Wrapper> messageQueue;
private ObjectOutputStream oos;
private volatile boolean isRunning = true;
/**
* 创建一个数据发送线程,并且附带创建一个信息接收线程。
* 由于两个线程总是同步创建和销毁的,因此不进行单独创建。
*
* @param clientSocket 与客户端相连的套接字
*/
public ClientChatThread(Socket clientSocket, BlockingQueue<Wrapper> threadQueue)
throws IOException {
this.clientSocket = clientSocket;
this.isLogin = false;
this.userId = null;
messageQueue = threadQueue;
}
// 线程核心:聊天业务主流程
@Override
public void run() {
try {
oos = new ObjectOutputStream(clientSocket.getOutputStream());
} catch (IOException e) {
throw new RuntimeException(e);
}
while (ServerMainThread.isRunning() && this.isRunning) {
try {
Wrapper msg = messageQueue.take();
handleReceiveMsg(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* 处理服务端接收的消息,按规则解析。在解析出结果后,调用下面的内容来处理数据体
*
* @param msg 服务端接收的消息
*/
private void handleReceiveMsg(Wrapper msg) {
int opt = msg.getOperation();
String senderId = msg.getSenderId();
switch (opt) {
// 登录请求
case global.OPT_REGISTER:
handleRegisterRequest(msg);
break;
case global.OPT_LOGIN:
handleLogInRequest(msg);
break;
case global.OPT_LOGOUT:
handleLogOutRequest(senderId);
break;
case global.OPT_DELETE_ACCOUNT:
handleDeleteUserRequest(senderId);
break;
case global.OPT_UPDATE_NICKNAME:
handleUpdateUserNameRequest(msg);
break;
case global.OPT_UPDATE_PASSWORD:
handleUpdateUserPwdRequest(msg);
break;
case global.OPT_GROUP_CREATE:
handleCreateGroupRequest((String) msg.getData(), msg.getGroupId());
break;
case global.OPT_GROUP_INVITE:
handleInviteRequest(msg);
break;
case global.OPT_GROUP_JOIN:
handleGroupJoinRequest(msg);
break;
case global.OPT_FRIEND_ADD:
handleFriendAddRequest(msg);
break;
case global.OPT_FRIEND_ADD_AGREE:
handleFriendAddAgree(msg);
break;
case global.OPT_FRIEND_ADD_REFUSE:
handleFriendAddRefuse(msg);
break;
case global.OPT_GROUP_INVITE_AGREE:
// 用户加入群聊
handleJoinGroupRequest(senderId, msg.getGroupId());
break;
case global.OPT_GROUP_INVITE_REFUSE:
// 拒绝加入群聊
sendToUser(Wrapper.serverResponse(global.OPT_GROUP_INVITE_REFUSE), (String) msg.getData());
break;
case global.OPT_GROUP_QUIT:
handleQuitGroupRequest(senderId, msg.getGroupId());
break;
case global.OPT_GROUP_DISBAND:
handleDeleteGroupRequest(msg.getGroupId());
break;
case global.OPT_CHAT:
sendToGroupExceptSelf(msg, msg.getGroupId());
FileUtil.addChatMessage(msg.getGroupId(), (String) msg.getData());
break;
case global.OPT_PRIVATE_CHAT:
handlePrivateChatRequest(msg);
break;
case global.OPT_GROUP_UPDATE_NAME:
handleUpdateGroupNameRequest(msg);
break;
case global.OPT_GROUP_UPDATE_OWNER:
handleUpdateGroupOwnerRequest(msg);
break;
case global.OPT_INIT_CHAT:
handleInitChatRequest();
break;
case global.OPT_INIT_GROUP:
handleInitGroupRequest();
break;
case global.OPT_INIT_USER:
// 发送用户 ID-Name 映射
sendToSelf(Wrapper.initResponse(ServerData.getInstance().getIdNameMap()));
break;
case global.OPT_INIT_USER_DETAIL:
// 发送所有用户详细信息
handleInitUserDetailRequest();
break;
case global.OPT_UPDATE_USER_DETAIL:
// 处理更新用户详细信息
handleUpdateUserDetailRequest(msg);
break;
}
}
/**
* 处理注册请求
* 1. 检查用户ID是否为空
* 2. 检查用户名是否为空
* 3. 检查密码是否为空
* 4. 检查用户ID是否已存在
* 5. 添加新用户到ServerData
* 6. 发送注册成功消息给客户端
*
* @param msg 注册请求消息
*/
private void handleRegisterRequest(Wrapper msg) {
String[] data = (String[]) msg.getData();
String userId = msg.getSenderId();
String nikname = data[0];
String password = data[1];
// 数据为空的情况
if (userId == null || nikname == null || password == null) {
sendToSelf(Wrapper.serverResponse(global.OPT_QUEST_WRONG));
return;
}
// 账户存在的情况
if (ServerData.getInstance().IsAccountExist(userId)) {
sendToSelf(Wrapper.serverResponse(global.OPT_REGISTER_FAILED_ACC));
return;
}
ServerData.getInstance().addUser(new UserData(nikname, userId, password));
this.isLogin = true;
this.userId = userId;
USER_ONLINE_MAP.values().forEach(oos -> {
Map<String, String> newRegister = new HashMap<>();
newRegister.put(userId, nikname);
try {
oos.writeObject(Wrapper.initResponse(newRegister));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// 添加注册用户的id oos映射,初始化id,群id映射
USER_ONLINE_MAP.put(this.userId, oos);
sendToSelf(Wrapper.serverResponse(global.OPT_REGISTER_SUCCESS));
}
/**
* 处理登录请求
* 1. 检查用户ID是否为空
* 2. 检查密码是否为空
* 3. 检查用户ID是否已存在
* 4. 添加新用户到ServerData
* 5. 发送登录成功消息给客户端
*
* @param msg 登录请求消息
*/
private void handleLogInRequest(Wrapper msg) {
String userId = msg.getSenderId();
String password = (String) msg.getData();
// 检测Id是否存在
if (!ServerData.getInstance().IsAccountExist(userId)) {
sendToSelf(Wrapper.serverResponse(global.OPT_LOGIN_FAILED_ACC));
return;
}
// 检测是否重复登录
if (USER_ONLINE_MAP.containsKey(userId)) {
sendToSelf(Wrapper.serverResponse(global.OPT_LOGIN_FAILED_REPEATED));
return;
}
// 检测账户密码是否相对应
else if (!ServerData.getInstance().AccountAndPasswordIsMatch(userId, password)) {
sendToSelf(Wrapper.serverResponse(global.OPT_LOGIN_FAILED_PWD));
return;
}
// 修改数据
this.isLogin = true;
this.userId = userId;
// 更新用户在线信息
USER_ONLINE_MAP.put(this.userId, oos);
sendToSelf(Wrapper.serverResponse(global.OPT_LOGIN_SUCCESS));
USER_ONLINE_MAP.put(this.userId, oos);
}
/**
* 判断用户是否已登录
*
* @return 如果用户已登录且在在线映射中,则返回true;否则返回false
*/
private boolean YesLogin() {
return isLogin && USER_ONLINE_MAP.containsKey(userId) && this.userId != null;
}
/**
* 关闭当前用户链接,关闭这个线程(接受线程在接收关闭信息时已经结束了)
*/
private void closeClient() {
// 优先结束阻塞队列
ServerMainThread.dropMsgQueue(this.clientSocket);
if (!isRunning) {
return;
}
isRunning = false;
if (oos != null) {
try {
oos.close();
} catch (IOException e) {
System.err.println("关闭输出流异常: " + e.getMessage());
}
}
if (clientSocket != null && !clientSocket.isClosed()) {
try {
clientSocket.close();
} catch (IOException e) {
System.err.println("关闭socket异常: " + e.getMessage());
}
}
}
/**
* 处理登出请求
* 1. 检查用户是否已登录
* 2. 从在线用户映射中移除用户
* 3. 设置用户状态为未登录
* 4. 发送登出成功消息给客户端
*
* @param userId 要登出的用户ID
*/
private void handleLogOutRequest(String userId) {
if (!YesLogin()) {
// 出现错误,现在还未登录
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
USER_ONLINE_MAP.remove(this.userId);
isLogin = false;
this.userId = null;
sendToSelf(Wrapper.serverResponse(global.OPT_LOGOUT));
try {
Thread.sleep(100); // 短暂延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
closeClient();
}
/**
* 处理删除用户请求
* 1. 检查用户是否已登录
* 2. 从在线用户映射中移除用户
* 3. 从服务器数据中删除用户
* 4. 设置用户状态为未登录
* 5. 发送删除成功消息给客户端
*
* @param userId 要删除的用户ID
*/
private void handleDeleteUserRequest(String userId) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
USER_ONLINE_MAP.remove(this.userId);
ServerData.getInstance().removeUser(userId);
isLogin = false;
this.userId = null;
sendToSelf(Wrapper.serverResponse(global.OPT_DELETE_ACCOUNT));
try {
Thread.sleep(100); // 短暂延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
closeClient();
}
/**
* 处理初始化所有用户详细信息的请求
* 1. 检查用户是否已登录
* 2. 从服务器数据中获取所有用户的详细信息
* 3. 发送安全的用户详细信息副本给客户端
*/
private void handleInitUserDetailRequest() {
if (!YesLogin())
return;
Map<String, UserData> allUsers = ServerData.getInstance().getServerUsers();
Map<String, UserData> safeUsers = new HashMap<>();
for (Map.Entry<String, UserData> entry : allUsers.entrySet()) {
// 只发送安全的副本(无密码)
safeUsers.put(entry.getKey(), entry.getValue().getSafeCopy());
}
sendToSelf(Wrapper.initUserDetailResponse(safeUsers));
}
/**
* 处理更新用户详细信息的请求
* 1. 检查用户是否已登录
* 2. 检查更新数据是否包含有效用户ID
* 3. 从服务器数据中获取原始用户数据
* 4. 更新服务器上的用户详细信息
* 5. 广播更新给所有在线用户
*
* @param msg 包含更新用户详细信息的消息
*/
private void handleUpdateUserDetailRequest(Wrapper msg) {
if (!YesLogin())
return;
UserData updatedData = (UserData) msg.getData();
if (updatedData == null || !updatedData.getUserId().equals(userId)) {
return; // 安全校验:只能更新自己的信息
}
// 获取服务器上的原始数据
UserData serverData = ServerData.getInstance().getUserData(userId);
if (serverData != null) {
// 更新字段
serverData.setEmail(updatedData.getEmail());
serverData.setBirthday(updatedData.getBirthday());
serverData.setAddress(updatedData.getAddress());
serverData.setSignature(updatedData.getSignature());
// 注意:不更新密码和昵称(有专门的接口),也不更新关系链
// 广播更新给所有在线用户
UserData safeCopy = serverData.getSafeCopy();
Wrapper updateMsg = Wrapper.updateUserDetailResponse(safeCopy);
for (ObjectOutputStream oos : USER_ONLINE_MAP.values()) {
try {
synchronized (oos) {
oos.writeObject(updateMsg);
oos.flush();
oos.reset();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 处理申请加入群聊请求
* 1. 检查用户是否已登录
* 2. 检查群聊是否存在
* 3. 检查用户是否已加入该群聊
* 4. 加入群聊
* 5. 通知群内其他成员有新人加入
*
* @param msg 包含群聊ID的消息
*/
private void handleGroupJoinRequest(Wrapper msg) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
String userId = msg.getSenderId();
String groupId = msg.getGroupId();
GroupData group = ServerData.getInstance().getGroupById(groupId);
if (group == null) {
sendToSelf(Wrapper.serverResponse(global.OPT_GROUP_JOIN_FAILED));
return;
}
// 检查是否已经是成员
if (group.getMembers().contains(group.new GroupMember(userId))) {
// 已经是成员,视为成功
sendToSelf(Wrapper.initResponse(group));
return;
}
// 添加成员
group.addMember(userId);
ServerData.getInstance().getUserData(userId).addGroupId(groupId);
// 1. 通知自己加入成功 (发送群组信息)
sendToSelf(Wrapper.initResponse(group));
// 2. 通知群内其他成员有新人加入
sendToGroupExceptSelf(Wrapper.initResponse(group), groupId);
}
/**
* 处理添加好友请求
* 1. 检查用户是否已登录
* 2. 检查好友是否存在
* 3. 检查是否已添加该好友
* 4. 转发请求给目标用户(如果在线)
*
* @param msg 包含好友ID的消息
*/
private void handleFriendAddRequest(Wrapper msg) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
String userId = msg.getSenderId();
String friendId = (String) msg.getData(); // 目标好友ID
if (userId.equals(friendId)) {
// 不能添加自己
sendToSelf(Wrapper.serverResponse(global.OPT_FRIEND_ADD_FAILED));
return;
}
UserData friendData = ServerData.getInstance().getUserData(friendId);
if (friendData == null) {
sendToSelf(Wrapper.serverResponse(global.OPT_FRIEND_ADD_FAILED));
return;
}
UserData myData = ServerData.getInstance().getUserData(userId);
// 检查是否已经是好友
if (myData.getFriendIds().contains(friendId)) {
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "你们已经是好友了"));
return;
}
// 如果对方在线,转发请求
if (USER_ONLINE_MAP.containsKey(friendId)) {
try {
// 发送给目标用户:Sender=userId, Data=NickName
USER_ONLINE_MAP.get(friendId)
.writeObject(new Wrapper(myData.getNickname(), userId, null, global.OPT_FRIEND_ADD));
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "好友请求已发送"));
} catch (IOException e) {
e.printStackTrace();
}
} else {
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "用户不在线,无法发送请求"));
}
}
private void handleFriendAddAgree(Wrapper msg) {
String myId = msg.getSenderId();
String friendId = (String) msg.getData(); // 发起者的ID
UserData myData = ServerData.getInstance().getUserData(myId);
UserData friendData = ServerData.getInstance().getUserData(friendId);
if (friendData == null)
return;
// 双向添加
myData.addFriend(friendId);
friendData.addFriend(myId);
// 通知自己成功
Map<String, String> friendInfo = new HashMap<>();
friendInfo.put(friendId, friendData.getNickname());
sendToSelf(new Wrapper(friendInfo, global.SERVER_ACCOUNT, null, global.OPT_FRIEND_ADD_SUCCESS));
// 通知对方成功
if (USER_ONLINE_MAP.containsKey(friendId)) {
Map<String, String> myInfo = new HashMap<>();
myInfo.put(myId, myData.getNickname());
try {
USER_ONLINE_MAP.get(friendId).writeObject(
new Wrapper(myInfo, global.SERVER_ACCOUNT, null, global.OPT_FRIEND_ADD_SUCCESS));
// 还可以发个系统消息提示
USER_ONLINE_MAP.get(friendId).writeObject(
Wrapper.serverResponse(global.SERVER_MESSAGE, myData.getNickname() + " 同意了你的好友请求"));
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理好友添加拒绝请求
* 1. 检查用户是否已登录
* 2. 检查好友是否存在
* 3. 通知拒绝者好友请求已被拒绝
*
* @param msg 包含拒绝好友ID的消息
*/
private void handleFriendAddRefuse(Wrapper msg) {
String myId = msg.getSenderId();
String friendId = (String) msg.getData(); // 发起者的ID
UserData myData = ServerData.getInstance().getUserData(myId);
if (USER_ONLINE_MAP.containsKey(friendId)) {
try {
USER_ONLINE_MAP.get(friendId).writeObject(
new Wrapper(myData.getNickname(), myId, null, global.OPT_FRIEND_ADD_REFUSE));
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理私聊请求
* 1. 检查用户是否已登录
* 2. 检查接收者是否存在
* 3. 检查接收者是否在线
* 4. 转发消息给接收者
*
* @param msg 包含接收者ID和消息内容的消息
*/
private void handlePrivateChatRequest(Wrapper msg) {
String receiverId = msg.getGroupId(); // 接收者ID
if (USER_ONLINE_MAP.containsKey(receiverId)) {
try {
// 转发消息给接收者
USER_ONLINE_MAP.get(receiverId).writeObject(msg);
} catch (IOException e) {
e.printStackTrace();
}
} else {
// 对方不在线
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "对方不在线"));
}
}
/**
* 处理更新用户信息请求
* 1. 检查用户是否已登录
* 2. 更新用户昵称
* 3. 通知所有连接的客户端更新用户信息
*
* @param wrapper 包含新昵称的消息
*/
private void handleUpdateUserNameRequest(Wrapper wrapper) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
String newName = (String) wrapper.getData();
ServerData.getInstance().updateUserName(this.userId, newName);
Map<String, String> idNameMap = new HashMap<>();
idNameMap.put(this.userId, newName);
// OPT_INIT_USER
sentToConnectedGroups(Wrapper.initResponse(idNameMap), this.userId);
}
/**
* 处理更新用户密码请求
* 1. 检查用户是否已登录
* 2. 更新用户密码
* 3. 通知客户端密码更新成功
*
* @param wrapper 包含新密码的消息
*/
private void handleUpdateUserPwdRequest(Wrapper wrapper) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
String newPwd = (String) wrapper.getData();
ServerData.getInstance().updateUserPwd(this.userId, newPwd);
sendToSelf(Wrapper.serverResponse(global.OPT_UPDATE_PASSWORD));
}
/**
* 处理创建群组请求
* 1. 检查用户是否已登录
* 2. 检查群组名称是否为空
* 3. 检查群组ID是否重复
* 4. 创建新的群组
* 5. 通知客户端群组创建成功
*
* @param groupName 群组名称
* @param groupId 群组ID
*/
private void handleCreateGroupRequest(String groupName, String groupId) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
// 如果信息为空
if (groupName == null) {
sendToSelf(Wrapper.serverResponse(global.OPT_QUEST_WRONG));
return;
}
// 如果已经有对应的id的群聊
if (ServerData.getInstance().containsGroup(groupId)) {
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "群聊已存在,id重复"));
return;
}
// 处理数据调用serverData的addGroup
GroupData groupData = new GroupData(groupId, groupName, userId);
groupData.addMember(userId);
ServerData.getInstance().addGroup(groupData);
// 调用serverData的addUser
ServerData.getInstance().addUserToGroup(groupId, userId);
sendToSelf(Wrapper.serverResponse(global.OPT_GROUP_CREATE_SUCCESS));
sendToSelf(Wrapper.initResponse(groupData));
}
/**
* 处理邀请加入群组请求
* 1. 检查用户是否已登录
* 2. 检查邀请人是否是自己
* 3. 检查用户是否已在群聊中
* 4. 检查用户是否在线
* 5. 通知被邀请人加入群组
*
* @param inviteMsg 包含被邀请人ID和群组ID的消息
*/
private void handleInviteRequest(Wrapper inviteMsg) {
String inviteId = (String) inviteMsg.getData();
String theGroupId = inviteMsg.getGroupId();
String senderId = inviteMsg.getSenderId();
// 如果邀请人是自己
if (senderId.equals(inviteId)) {
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "不能邀请自己加入群聊"));
return;
}
// 如果邀请人已经在群聊中
if (ServerData.getInstance().getGroupMembersId(theGroupId).contains(inviteId)) {
sendToSelf(Wrapper.serverResponse(global.SERVER_MESSAGE, "该用户已在群聊中"));
return;
}
// 如果用户不在线
if (!USER_ONLINE_MAP.containsKey(inviteId)) {
sendToSelf(Wrapper.serverResponse(
global.SERVER_MESSAGE,
"用户(" + ServerData.getInstance().getUserName(inviteId) + ")不在线!"));
return;
}
// 通知被邀请人加入群组
sendToUser(Wrapper.groupInviteRequest(
ServerData.getInstance().getGroupName(theGroupId), senderId, theGroupId),
inviteId);
}
/**
* 处理加入群组请求
* 1. 检查用户是否已登录
* 2. 检查用户是否已在群聊中
* 3. 检查群组是否存在
* 4. 加入群组
* 5. 通知群组内所有人用户加入
*
* @param userId 用户ID
* @param groupId 群组ID
*/
private void handleJoinGroupRequest(String userId, String groupId) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
ServerData.getInstance().addUserToGroup(groupId, userId);
ServerData.getInstance().addGroupToUser(userId, groupId);
Wrapper wrapper = Wrapper.initResponse(ServerData.getInstance().getGroupById(groupId));
// 向组内所有人推送更新。(人员加入)
sendToGroup(wrapper, groupId);
}
/**
* 处理退出群组请求
* 1. 检查用户是否已登录
* 2. 检查用户是否已在群聊中
* 3. 从群组中移除用户
* 4. 从用户群组列表中移除群组
* 5. 如果群组为空,则删除群组
* 6. 通知群组内其他用户更新
*
* @param userId 用户ID
* @param groupId 群组ID
*/
private void handleQuitGroupRequest(String userId, String groupId) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
ServerData.getInstance().removeUserFromGroup(groupId, userId);
ServerData.getInstance().removeGroupFromUser(userId, groupId);
if (ServerData.getInstance().getGroupById(groupId).getMemberCount() == 0) {
// 没有人的群聊就删掉
ServerData.getInstance().removeGroup(groupId);
} else {
// 不为空则需要推送更新
Wrapper wrapper = Wrapper.initResponse(ServerData.getInstance().getGroupById(groupId));
// 向组内其它所有人推送更新。
sendToGroupExceptSelf(wrapper, groupId);
}
// 退出人也要进行更新
sendToSelf(new Wrapper(null, null, groupId, global.OPT_GROUP_QUIT));
}
/**
* 处理删除群组请求
* 1. 检查用户是否已登录
* 2. 检查群组是否存在
* 3. 检查用户是否是群组所有者
* 4. 删除群组
* 5. 通知所有连接的客户端群组已删除
*
* @param groupId 群组ID
*/
private void handleDeleteGroupRequest(String groupId) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
Wrapper wrapper = new Wrapper(null, null, groupId, global.OPT_GROUP_QUIT);
// 向所有人推送更新。
ServerData.getInstance().removeGroup(groupId);
sendToGroup(wrapper, groupId);
}
/**
* 处理更新群组名称请求
* 1. 检查用户是否已登录
* 2. 更新群组名称
* 3. 通知所有连接的客户端更新群组名称
*
* @param groupUpdateMsg 包含群组ID和新名称的消息
*/
private void handleUpdateGroupNameRequest(Wrapper groupUpdateMsg) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
ServerData.getInstance().updateGroupName(groupUpdateMsg.getGroupId(),
(String) groupUpdateMsg.getData());
sendToGroup(groupUpdateMsg, groupUpdateMsg.getGroupId());
}
/**
* 处理更新群组管理员请求
* 1. 检查用户是否已登录
* 2. 检查管理员是否是自己
* 3. 更新群组管理员
* 4. 通知群组内所有成员更新管理员
*
* @param groupUpdateMsg 包含新管理员ID和群组ID的消息
*/
private void handleUpdateGroupOwnerRequest(Wrapper groupUpdateMsg) {
if (!YesLogin()) {
sendToSelf(Wrapper.serverResponse(global.OPT_ERROR_NOT_LOGIN));
return;
}
ServerData.getInstance().updateGroupOwner(groupUpdateMsg.getGroupId(), (String) groupUpdateMsg.getData());
sendToGroup(groupUpdateMsg, groupUpdateMsg.getGroupId());
}
/**
* 处理初始化聊天消息请求
* 1. 检查用户是否已登录
* 2. 遍历用户所属的所有群组
* 3. 加载群组聊天记录
* 4. 向用户发送群组聊天记录初始化消息
*/
private void handleInitChatRequest() {
for (String groupId : ServerData.getInstance().getUserGroups(userId)) {
Wrapper wrapper = Wrapper.initResponse(FileUtil.loadGroupChatMsg(groupId), groupId);
sendToSelf(wrapper);
}
}
/**
* 处理初始化群组请求
* 1. 检查用户是否已登录
* 2. 遍历用户所属的所有群组
* 3. 加载群组信息
* 4. 向用户发送群组初始化消息
*/
private void handleInitGroupRequest() {
for (String groupId : ServerData.getInstance().getUserGroups(userId)) {
Wrapper wrapper = Wrapper.initResponse(ServerData.getInstance().getGroupById(groupId));
sendToSelf(wrapper);
}
}
/**
* 向指定用户广播消息
* 1. 遍历用户ID列表
* 2. 检查用户是否在线
* 3. 向在线用户发送消息
*
* @param userIds 接收消息的用户ID列表
* @param wrapper 要发送的消息包装对象
*/
public static void broadcastMsg(String[] userIds, Wrapper wrapper) {
for (String userId : userIds) {
ObjectOutputStream oos = USER_ONLINE_MAP.get(userId);
if (oos == null)
continue;
try {
synchronized (oos) {
oos.writeObject(wrapper);
oos.flush();
oos.reset();
}
} catch (IOException e) {
Thread.currentThread().interrupt();
System.err.println("send error");
}
}
}
/**
* 向用户发送消息
* 1. 检查用户是否在线
* 2. 向在线用户发送消息
*
* @param o 要发送的消息包装对象
*/
private void sendToSelf(Wrapper o) {
try {
synchronized (oos) {
oos.writeObject(o);
oos.flush();
oos.reset();
}
} catch (IOException e) {
Thread.currentThread().interrupt();
System.out.println("send error");
}
}
/**
* 向指定用户发送消息
* 1. 检查用户是否在线
* 2. 向在线用户发送消息
*
* @param wrapper 要发送的消息包装对象
* @param userId 接收消息的用户ID
*/
private void sendToUser(Wrapper wrapper, String userId) {
try {
ObjectOutputStream oos = USER_ONLINE_MAP.get(userId);
if (oos != null) {
synchronized (oos) {
oos.writeObject(wrapper);
oos.flush();
oos.reset();
}
}
} catch (IOException e) {
Thread.currentThread().interrupt();
System.err.println("send error: " + userId);
}
}
/**
* 向指定群聊发送消息
* 1. 遍历群聊成员ID列表
* 2. 检查成员是否在线
* 3. 向在线成员发送消息
*
* @param wrapper 要发送的消息包装对象
* @param groupId 接收消息的群聊ID
*/
private void sendToGroup(Wrapper wrapper, String groupId) {
List<String> members = ServerData.getInstance().getGroupMembersId(groupId);
for (String member : members) {
sendToUser(wrapper, member);
}
}
/**
* 向指定群聊发送消息(排除发送者)
* 1. 遍历群聊成员ID列表
* 2. 检查成员是否在线
* 3. 向在线成员发送消息(排除发送者)
*
* @param wrapper 要发送的消息包装对象
* @param groupId 接收消息的群聊ID
*/
private void sendToGroupExceptSelf(Wrapper wrapper, String groupId) {
List<String> members = ServerData.getInstance().getGroupMembersId(groupId);
for (String member : members) {
if (!member.equals(userId)) {
sendToUser(wrapper, member);
}
}
}
/**
* 向用户所属的所有群聊发送消息
* 1. 遍历用户所属的所有群组
* 2. 向每个群组发送消息
*
* @param wrapper 要发送的消息包装对象
* @param userId 接收消息的用户ID
*/
private void sentToConnectedGroups(Wrapper wrapper, String userId) {
TreeSet<String> groups = ServerData.getInstance().getUserGroups(userId);
groups.forEach(groupId -> sendToGroup(wrapper, groupId));
}
}
@@ -0,0 +1,80 @@
package server.serveice;
import global.global;
import server.ServerMainThread;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.BlockingQueue;
public class ClientReceiveThread implements Runnable {
private BlockingQueue<Wrapper> messageQueue;
private volatile boolean isRunning;
// 接收线程专属资源:构造器传入,仅用于接收消息
private final Socket clientSocket;
ObjectInputStream ois;
// 构造器:初始化套接字资源
public ClientReceiveThread(
Socket clientSocket,
BlockingQueue<Wrapper> messageQueue) throws IOException {
this.messageQueue = messageQueue;
this.clientSocket = clientSocket;
isRunning = true;
}
// 线程核心:循环接收服务端消息(历史/广播),打印展示
@Override
public void run() {
try {
ois = new ObjectInputStream(clientSocket.getInputStream());
} catch (IOException e) {
throw new RuntimeException(e);
}
// 当主线程和自身都在跑的时候
while (ServerMainThread.isRunning() && this.isRunning) {
// 接收服务端消息
// 接收到后将其添加到阻塞队列中
try {
Wrapper msg = (Wrapper) ois.readObject();
// 如果收到的是关闭信息,则这个循环结束后关闭自身
if (msg.getOperation() == global.OPT_LOGOUT) {
isRunning = false;
System.out.println("接受线程已结束");
}
// 添加信息到阻塞队列。
messageQueue.put(msg);
} catch (InterruptedException e) {
System.out.println("消息队列被中断");
e.printStackTrace();
} catch (IOException e) {
if (isConnectionClosed(e)) {
// 链接断开则结束链接
// isRunning =false;
System.out.println(clientSocket.getPort() + ": 连接已断开");
break;
} else {
System.out.println("发送消息时发生IO异常: " + e.getMessage());
e.printStackTrace();
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
private boolean isConnectionClosed(IOException e) {
// 根据异常类型判断连接是否断开
return e instanceof SocketException ||
e.getMessage() != null && (e.getMessage().contains("Connection reset") ||
e.getMessage().contains("Broken pipe") ||
e.getMessage().contains("Connection refused") ||
e.getMessage().contains("Software caused connection abort"));
}
}
+210
View File
@@ -0,0 +1,210 @@
package server.serveice;
import global.global;
import server.data.GroupData;
import server.data.UserData;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
// 打包类,用于将数据打包传输
/**
* Wrapper类的构造方法这里设置为私有,需要时请使用/创建对应的静态构造方法来获取对应的信息载体。
* 详情请查阅源代码。
*/
public class Wrapper implements Serializable {
private static final long serialVersionUID = 7499350690768481854L;
private Object data;
private String senderId;
private String groupId;
private int operation;
public Wrapper(Object data, String senderId, String groupId, int operation) {
this.data = data;
this.senderId = senderId;
this.groupId = groupId;
this.operation = operation;
}
public Object getData() {
return data;
}
public String getSenderId() {
return senderId;
}
public String getGroupId() {
return groupId;
}
public int getOperation() {
return operation;
}
// 一下静态方法都是用于便捷创建消息体的方法。字如其名,当你需要创造相应的信息载体的时候,请使用对应的方法。
// 如果有其它需要,请在这里手动添加新的静态方法用于创建对应数据类别。
// 需要注意的时,有有一部分的类型的信息体是客户端和服务器端都可以发送的,但是他们发送具有不同的含义。
// 注册请求
public static Wrapper registerRequest(String[] nickNameAndpwsd, String senderId) {
return new Wrapper(nickNameAndpwsd, senderId, null, global.OPT_REGISTER);
}
// 注册请求
public static Wrapper registerRequest(String senderId, String password, String username) {
String[] temp = new String[] { username, password };
return new Wrapper(temp, senderId, null, global.OPT_REGISTER);
}
// 登录请求
public static Wrapper loginRequest(String senderId, String password) {
return new Wrapper(password, senderId, null, global.OPT_LOGIN);
}
// 服务器回复,用于回复一些简单的关于操作流的信息
public static Wrapper serverResponse(int opt) {
return new Wrapper(null, global.SERVER_ACCOUNT, null, opt);
}
// 服务器回复,带文本信息
public static Wrapper serverResponse(int opt, String msg) {
return new Wrapper(msg, global.SERVER_ACCOUNT, null, opt);
}
// 构造一个只包含操作码的消息(用于 OPT_EXIT 等)
public Wrapper(int operation) {
this.data = null;
this.senderId = global.SERVER_ACCOUNT;
this.groupId = null;
this.operation = operation;
}
// 登出请求
public static Wrapper logoutRequest(String senderId) {
return new Wrapper(null, senderId, null, global.OPT_LOGOUT);
}
// 初始化请求,请求服务器发送账户和当前的群聊数据
public static Wrapper initRequest(String senderId, int opt) {
return new Wrapper(null, senderId, null, opt);
}
// 初始化回复,将群聊信息回复给客户端
public static Wrapper initResponse(GroupData groupData) {
return new Wrapper(groupData, global.SERVER_ACCOUNT, null, global.OPT_INIT_GROUP);
}
// 初始化回复,将聊天记录回复给客户端
public static Wrapper initResponse(List<String> chatRecords, String groupId) {
return new Wrapper(chatRecords, global.SERVER_ACCOUNT, groupId, global.OPT_INIT_CHAT);
}
// 将用户id/名字回复给客户端。
public static Wrapper initResponse(Map<String, String> idNameMap) {
return new Wrapper(idNameMap, global.SERVER_ACCOUNT, null, global.OPT_INIT_USER);
}
// 更新用户昵称请求
public static Wrapper updateUserNameRequest(String senderId, String nickName) {
return new Wrapper(nickName, senderId, null, global.OPT_UPDATE_NICKNAME);
}
// 申请加入群聊
public static Wrapper joinGroupRequest(String senderId, String groupId) {
return new Wrapper(null, senderId, groupId, global.OPT_GROUP_JOIN);
}
// 申请添加好友
public static Wrapper addFriendRequest(String senderId, String friendId) {
// friendId 放在 data 中
return new Wrapper(friendId, senderId, null, global.OPT_FRIEND_ADD);
}
// 更新用户密码请求
public static Wrapper updateUserPwdRequest2(String senderId, String newPwd) {
return new Wrapper(newPwd, senderId, null, global.OPT_UPDATE_PASSWORD);
}
// 创建群聊请求
public static Wrapper createGroupRequest(String senderId, String groupName, String groupId) {
return new Wrapper(groupName, senderId, groupId, global.OPT_GROUP_CREATE);
}
// 邀请加入群聊请求
public static Wrapper groupInviteRequest(String invitedIdOrGroupName, String senderId, String groupId) {
return new Wrapper(invitedIdOrGroupName, senderId, groupId, global.OPT_GROUP_INVITE);
}
// 退出群聊请求
public static Wrapper groupQuitRequest(String senderId, String groupId) {
return new Wrapper(null, senderId, groupId, global.OPT_GROUP_QUIT);
}
// 退出群聊响应
public static Wrapper groupQuitResponse(String quitMemberId, String groupId) {
return new Wrapper(quitMemberId, global.SERVER_ACCOUNT, groupId, global.OPT_GROUP_QUIT);
}
// 解散群聊请求
public static Wrapper groupDisbandRequest(String senderId, String groupId) {
return new Wrapper(null, senderId, groupId, global.OPT_GROUP_DISBAND);
}
// 更新群聊名字请求
public static Wrapper groupUpdateNameRequest(String senderId, String groupId, String groupName) {
return new Wrapper(groupName, senderId, groupId, global.OPT_GROUP_UPDATE_NAME);
}
// 更新群聊群主请求
public static Wrapper groupUpdateOwnerRequest(String senderId, String groupId, String ownerId) {
return new Wrapper(ownerId, senderId, groupId, global.OPT_GROUP_UPDATE_OWNER);
}
// 聊天信息
public static Wrapper groupChat(String text, String senderId, String groupId) {
return new Wrapper(text, senderId, groupId, global.OPT_CHAT);
}
// 私聊信息
public static Wrapper privateChat(String text, String senderId, String receiverId) {
// 私聊数据存储在 data/friends/chat_data 中
// 为了复用字段,我们将 receiverId 放在 groupId 字段中作为目标ID
return new Wrapper(text, senderId, receiverId, global.OPT_PRIVATE_CHAT);
}
// 同意添加好友
public static Wrapper friendAddAgree(String senderId, String friendId) {
return new Wrapper(friendId, senderId, null, global.OPT_FRIEND_ADD_AGREE);
}
// 拒绝添加好友
public static Wrapper friendAddRefuse(String senderId, String friendId) {
return new Wrapper(friendId, senderId, null, global.OPT_FRIEND_ADD_REFUSE);
}
// 创建简单指令回复
public static Wrapper simpleRequest(String senderId, String groupId, int opt) {
return new Wrapper(null, senderId, groupId, opt);
}
// 初始化用户详细信息回复(Map<String, UserData>
public static Wrapper initUserDetailResponse(Map<String, UserData> userDetails) {
return new Wrapper(userDetails, global.SERVER_ACCOUNT, null, global.OPT_INIT_USER_DETAIL);
}
// 更新用户详细信息请求
public static Wrapper updateUserDetailRequest(String senderId, UserData userData) {
return new Wrapper(userData, senderId, null, global.OPT_UPDATE_USER_DETAIL);
}
// 更新用户详细信息响应(服务端广播)
public static Wrapper updateUserDetailResponse(UserData userData) {
return new Wrapper(userData, global.SERVER_ACCOUNT, null, global.OPT_UPDATE_USER_DETAIL);
}
}