Go完整即时通讯项目及Go的生态介绍

末蓝、 2024-03-17 09:23 147阅读 0赞

Go完整即时通讯项目

项目架构:
在这里插入图片描述

1 编写基本服务端-Server

server.go

  1. package main
  2. import (
  3. "fmt"
  4. "net"
  5. )
  6. // 定义服务端
  7. type Server struct {
  8. ip string
  9. port int
  10. }
  11. // 创建一个Server
  12. func NewServer(ip string, port int) *Server {
  13. return &Server{
  14. ip: ip,
  15. port: port,
  16. }
  17. }
  18. // 定义Server方法
  19. func (this *Server) Start() {
  20. //根据ip+port监听socket套接字 tcp表明类型【socket listen】
  21. listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.ip, this.port))
  22. if err != nil {
  23. fmt.Println("net.Listen err=", err)
  24. return
  25. }
  26. //关闭套接字,避免资源浪费
  27. defer listen.Close()
  28. for {
  29. conn, err := listen.Accept()
  30. if err != nil {
  31. fmt.Println("listen accept err=", err)
  32. continue
  33. }
  34. //处理连接请求【具体业务:处理客户端请求】
  35. //开启协程处理,避免占用主线程【server一直要监听ip+port】
  36. go this.Handler(conn)
  37. }
  38. }
  39. // 处理连接请求
  40. func (this *Server) Handler(conn net.Conn) {
  41. //当前连接的业务
  42. fmt.Printf("连接建立成功...")
  43. }

通过在main.go中启动一个server并配合telnet命令检测代码是否正确

  • telnet:可以模拟连接的建立
  • telnet 127.0.0.1 8082

main.go:

  1. package main
  2. func main() {
  3. //创建一个server
  4. server := NewServer("127.0.0.1", 8082)
  5. //启动server【监听】
  6. server.Start()
  7. }
  8. //打包代码为exe
  9. go build -o intime.exe .\main.go .\server.go

在这里插入图片描述

2 实现用户上线广播机制【用户上线功能】

架构图:Server端存储一个OnlineMap,用于记录在线的用户

在这里插入图片描述

  1. 编写user.go,编写User结构体并实现对user.channel的监听
  2. 修改server.go,新增OnlineMap和Message属性,在处理的客户端上线的Handler中连接建立成功之后将用户添加到OnlineMap;并新增广播消息方法
  3. 在server.go中新增监听广播消息channel的方法,同时用一个goroutine单独监听message
  1. //构建代码 生成intime.exe文件
  2. go build -o intime.exe .\main.go .\server.go .\user.go

3 用户消息广播机制

修改server.go,完善一个handle处理业务方法,启动一个专门针对当前用户的goroutine

server.go:

  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync"
  7. )
  8. // 定义服务端
  9. type Server struct {
  10. ip string
  11. port int
  12. //定义一个map,用于存储在线用户
  13. OnlineMap map[string]*User
  14. //定义map锁,保证存储map时候的数据正确,避免并发读取
  15. mapLock sync.RWMutex
  16. //消息广播的channel[当该chan中有数据时,直接广播给所有在线用户]
  17. Message chan string
  18. }
  19. // 创建一个Server
  20. func NewServer(ip string, port int) *Server {
  21. return &Server{
  22. ip: ip,
  23. port: port,
  24. OnlineMap: make(map[string]*User),
  25. Message: make(chan string),
  26. }
  27. }
  28. // 定义Server方法
  29. func (this *Server) Start() {
  30. //根据ip+port监听socket套接字 tcp表明类型【socket listen】
  31. listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.ip, this.port))
  32. if err != nil {
  33. fmt.Println("net.Listen err=", err)
  34. return
  35. }
  36. //关闭套接字,避免资源浪费
  37. defer listen.Close()
  38. //启动监听Message的goroutine
  39. go this.ListenMessage()
  40. for {
  41. conn, err := listen.Accept()
  42. if err != nil {
  43. fmt.Println("listen accept err=", err)
  44. continue
  45. }
  46. //处理连接请求【具体业务:处理客户端请求】
  47. //开启协程处理,避免占用主线程【server一直要监听ip+port】
  48. go this.Handler(conn)
  49. }
  50. }
  51. // 处理连接请求
  52. func (this *Server) Handler(conn net.Conn) {
  53. //当前连接的业务[用户上线之后,发送广播通知]
  54. //fmt.Printf("连接建立成功...")
  55. //1. 创建user,并将user添加到OnlineMap中,此处操作map(可能涉及并发,因此加锁保证安全性)
  56. user := NewUser(conn)
  57. this.mapLock.Lock()
  58. this.OnlineMap[user.Name] = user
  59. this.mapLock.Unlock()
  60. //2. 将该用户上线消息进行广播
  61. this.Broadcast(user, "已上线")
  62. //3. 接收用户发送的消息,并广播
  63. go func() {
  64. buf := make([]byte, 4096)
  65. //接收用户信息
  66. n, err := conn.Read(buf)
  67. if n == 0 {
  68. this.Broadcast(user, "下线")
  69. return
  70. }
  71. if err != nil && err != io.EOF {
  72. fmt.Println("Conn read err:", err)
  73. return
  74. }
  75. //提取用户消息(去除"\n")
  76. msg := string(buf[:n-1])
  77. //广播消息
  78. this.Broadcast(user, msg)
  79. }()
  80. //4. 阻塞当前handler
  81. select {
  82. }
  83. }
  84. // 广播消息
  85. func (this *Server) Broadcast(user *User, message string) {
  86. sendMsg := "[" + user.Addr + "] " + user.Name + ":" + message
  87. //将消息写入广播chan中
  88. this.Message <- sendMsg
  89. }
  90. // 监听Message广播消息channel的goroutine,一旦有消息就广播发送给所有在线用户
  91. func (this *Server) ListenMessage() {
  92. for {
  93. msg := <-this.Message
  94. //遍历OnlineMap,将消息广播给所有用户[设置并发操作map,加锁]
  95. this.mapLock.Lock()
  96. for _, cli := range this.OnlineMap {
  97. //将广播消息写入用户的channel,等待用户监听读取
  98. cli.C <- msg
  99. }
  100. this.mapLock.Unlock()
  101. }
  102. }

4 用户业务层封装

修改user.go,新增对应方法:

  • user中新增一个Server属性
  • Online
  • Offline
  • DoMessage等

替换之前server.go中涉及到user的代码

user.go:

  1. package main
  2. import (
  3. "net"
  4. "unicode/utf8"
  5. )
  6. type User struct {
  7. Name string
  8. Addr string
  9. //管道用于接收服务端的消息
  10. C chan string
  11. //与服务器端的连接
  12. conn net.Conn
  13. //对应连接的Server
  14. server *Server
  15. }
  16. func NewUser(conn net.Conn, server *Server) *User {
  17. userAddr := conn.RemoteAddr().String()
  18. user := &User{
  19. Name: userAddr,
  20. Addr: userAddr,
  21. C: make(chan string),
  22. conn: conn,
  23. server: server,
  24. }
  25. //创建一个user就应该监听自己chan管道中的消息,如果有就取出
  26. go user.ListenMessage()
  27. return user
  28. }
  29. // 监听当前user 的channel,一旦有消息,就直接发送给对应的客户端
  30. func (this *User) ListenMessage() {
  31. for {
  32. msg := <-this.C
  33. //从user管道中读取消息,发送给user客户端
  34. 对中文进行处理
  35. //msgByte := []byte(msg)
  36. //bytes, err := simplifiedchinese.GB18030.NewDecoder().Bytes(msgByte)
  37. //if err != nil {
  38. // fmt.Println("simplifiedchinese decoder err=", err)
  39. //}
  40. runes := []rune(msg + "\n")
  41. bytes := make([]byte, len(runes)*4)
  42. for _, v := range runes {
  43. buf := make([]byte, 4)
  44. size := utf8.EncodeRune(buf, v)
  45. bytes = append(bytes, buf[:size]...)
  46. }
  47. //this.conn.Write([]byte(msg + "\n"))
  48. this.conn.Write(bytes)
  49. }
  50. }
  51. // Online:用户上线方法
  52. func (this *User) Online() {
  53. //用户上线,将用户添加到OnlineMap中
  54. this.server.mapLock.Lock()
  55. this.server.OnlineMap[this.Name] = this
  56. this.server.mapLock.Unlock()
  57. //广播用户上线信息
  58. this.server.Broadcast(this, "已上线")
  59. }
  60. // 用户下线业务
  61. func (this *User) Offline() {
  62. this.server.mapLock.Lock()
  63. //根据key删除对应值
  64. delete(this.server.OnlineMap, this.Name)
  65. this.server.mapLock.Unlock()
  66. this.server.Broadcast(this, "下线")
  67. }
  68. // 用户处理消息的业务
  69. func (this *User) DoMessage(msg string) {
  70. this.server.Broadcast(this, msg)
  71. }

server.go:

  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync"
  7. )
  8. // 定义服务端
  9. type Server struct {
  10. ip string
  11. port int
  12. //定义一个map,用于存储在线用户
  13. OnlineMap map[string]*User
  14. //定义map锁,保证存储map时候的数据正确,避免并发读取
  15. mapLock sync.RWMutex
  16. //消息广播的channel[当该chan中有数据时,直接广播给所有在线用户]
  17. Message chan string
  18. }
  19. // 创建一个Server
  20. func NewServer(ip string, port int) *Server {
  21. return &Server{
  22. ip: ip,
  23. port: port,
  24. OnlineMap: make(map[string]*User),
  25. Message: make(chan string),
  26. }
  27. }
  28. // 定义Server方法
  29. func (this *Server) Start() {
  30. //根据ip+port监听socket套接字 tcp表明类型【socket listen】
  31. listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.ip, this.port))
  32. if err != nil {
  33. fmt.Println("net.Listen err=", err)
  34. return
  35. }
  36. //关闭套接字,避免资源浪费
  37. defer listen.Close()
  38. //启动监听Message的goroutine
  39. go this.ListenMessage()
  40. for {
  41. conn, err := listen.Accept()
  42. if err != nil {
  43. fmt.Println("listen accept err=", err)
  44. continue
  45. }
  46. //处理连接请求【具体业务:处理客户端请求】
  47. //开启协程处理,避免占用主线程【server一直要监听ip+port】
  48. go this.Handler(conn)
  49. }
  50. }
  51. // 处理连接请求
  52. func (this *Server) Handler(conn net.Conn) {
  53. //当前连接的业务[用户上线之后,发送广播通知]
  54. //fmt.Printf("连接建立成功...")
  55. // 创建user,并将user添加到OnlineMap中,此处操作map(可能涉及并发,因此加锁保证安全性)
  56. user := NewUser(conn, this)
  57. user.Online()
  58. // 接收用户发送的消息,并广播
  59. go func() {
  60. buf := make([]byte, 4096)
  61. //接收用户信息
  62. n, err := conn.Read(buf)
  63. if n == 0 {
  64. //用户下线
  65. user.Offline()
  66. return
  67. }
  68. if err != nil && err != io.EOF {
  69. fmt.Println("Conn read err:", err)
  70. return
  71. }
  72. //提取用户消息(去除"\n")
  73. msg := string(buf[:n-1])
  74. //用户针对msg进行消息处理
  75. user.DoMessage(msg)
  76. }()
  77. //4. 阻塞当前handler
  78. select {
  79. }
  80. }
  81. // 广播消息
  82. func (this *Server) Broadcast(user *User, message string) {
  83. sendMsg := "[" + user.Addr + "] " + user.Name + ":" + message
  84. //将消息写入广播chan中
  85. this.Message <- sendMsg
  86. }
  87. // 监听Message广播消息channel的goroutine,一旦有消息就广播发送给所有在线用户
  88. func (this *Server) ListenMessage() {
  89. for {
  90. msg := <-this.Message
  91. //遍历OnlineMap,将消息广播给所有用户[设置并发操作map,加锁]
  92. this.mapLock.Lock()
  93. for _, cli := range this.OnlineMap {
  94. //将广播消息写入用户的channel,等待用户监听读取
  95. cli.C <- msg
  96. }
  97. this.mapLock.Unlock()
  98. }
  99. }

5 用户查询功能

实现,用户在终端输入who,查看当前在线用户(修改user.go)

  • 添加SendMsg():给客户端发送消息
  • 新增判断“who”命令逻辑
  1. package main
  2. import (
  3. "net"
  4. "unicode/utf8"
  5. )
  6. type User struct {
  7. Name string
  8. Addr string
  9. //管道用于接收服务端的消息
  10. C chan string
  11. //与服务器端的连接
  12. conn net.Conn
  13. //对应连接的Server
  14. server *Server
  15. }
  16. func NewUser(conn net.Conn, server *Server) *User {
  17. userAddr := conn.RemoteAddr().String()
  18. user := &User{
  19. Name: userAddr,
  20. Addr: userAddr,
  21. C: make(chan string),
  22. conn: conn,
  23. server: server,
  24. }
  25. //创建一个user就应该监听自己chan管道中的消息,如果有就取出
  26. go user.ListenMessage()
  27. return user
  28. }
  29. // 监听当前user 的channel,一旦有消息,就直接发送给对应的客户端
  30. func (this *User) ListenMessage() {
  31. for {
  32. msg := <-this.C
  33. //从user管道中读取消息,发送给user客户端
  34. 对中文进行处理
  35. //msgByte := []byte(msg)
  36. //bytes, err := simplifiedchinese.GB18030.NewDecoder().Bytes(msgByte)
  37. //if err != nil {
  38. // fmt.Println("simplifiedchinese decoder err=", err)
  39. //}
  40. runes := []rune(msg + "\n")
  41. bytes := make([]byte, len(runes)*4)
  42. for _, v := range runes {
  43. buf := make([]byte, 4)
  44. size := utf8.EncodeRune(buf, v)
  45. bytes = append(bytes, buf[:size]...)
  46. }
  47. //this.conn.Write([]byte(msg + "\n"))
  48. this.conn.Write(bytes)
  49. }
  50. }
  51. // Online:用户上线方法
  52. func (this *User) Online() {
  53. //用户上线,将用户添加到OnlineMap中
  54. this.server.mapLock.Lock()
  55. this.server.OnlineMap[this.Name] = this
  56. this.server.mapLock.Unlock()
  57. //广播用户上线信息
  58. this.server.Broadcast(this, "已上线")
  59. }
  60. // 用户下线业务
  61. func (this *User) Offline() {
  62. this.server.mapLock.Lock()
  63. //根据key删除对应值
  64. delete(this.server.OnlineMap, this.Name)
  65. this.server.mapLock.Unlock()
  66. this.server.Broadcast(this, "下线")
  67. }
  68. // 用户处理消息的业务
  69. func (this *User) DoMessage(msg string) {
  70. //添加who命令逻辑:查询当前在线用户
  71. if msg == "who" {
  72. this.server.mapLock.Lock()
  73. for _, user := range this.server.OnlineMap {
  74. onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "online...\n"
  75. this.SendMsg(onlineMsg)
  76. }
  77. }
  78. this.server.Broadcast(this, msg)
  79. }
  80. // 给当前user的客户端发送消息
  81. func (this *User) SendMsg(msg string) {
  82. this.conn.Write([]byte(msg))
  83. }

6 修改用户名

定义命令rename|zhangsan:将当前用户名修改为张三

  • 修改user.go:在DoMessage()方法中判断命令是否为rename

user.go:

  1. package main
  2. import (
  3. "net"
  4. "strings"
  5. "unicode/utf8"
  6. )
  7. type User struct {
  8. Name string
  9. Addr string
  10. //管道用于接收服务端的消息
  11. C chan string
  12. //与服务器端的连接
  13. conn net.Conn
  14. //对应连接的Server
  15. server *Server
  16. }
  17. func NewUser(conn net.Conn, server *Server) *User {
  18. userAddr := conn.RemoteAddr().String()
  19. user := &User{
  20. Name: userAddr,
  21. Addr: userAddr,
  22. C: make(chan string),
  23. conn: conn,
  24. server: server,
  25. }
  26. //创建一个user就应该监听自己chan管道中的消息,如果有就取出
  27. go user.ListenMessage()
  28. return user
  29. }
  30. // 监听当前user 的channel,一旦有消息,就直接发送给对应的客户端
  31. func (this *User) ListenMessage() {
  32. for {
  33. msg := <-this.C
  34. //从user管道中读取消息,发送给user客户端
  35. 对中文进行处理
  36. //msgByte := []byte(msg)
  37. //bytes, err := simplifiedchinese.GB18030.NewDecoder().Bytes(msgByte)
  38. //if err != nil {
  39. // fmt.Println("simplifiedchinese decoder err=", err)
  40. //}
  41. runes := []rune(msg + "\n")
  42. bytes := make([]byte, len(runes)*4)
  43. for _, v := range runes {
  44. buf := make([]byte, 4)
  45. size := utf8.EncodeRune(buf, v)
  46. bytes = append(bytes, buf[:size]...)
  47. }
  48. //this.conn.Write([]byte(msg + "\n"))
  49. this.conn.Write(bytes)
  50. }
  51. }
  52. // Online:用户上线方法
  53. func (this *User) Online() {
  54. //用户上线,将用户添加到OnlineMap中
  55. this.server.mapLock.Lock()
  56. this.server.OnlineMap[this.Name] = this
  57. this.server.mapLock.Unlock()
  58. //广播用户上线信息
  59. this.server.Broadcast(this, "is online")
  60. }
  61. // 用户下线业务
  62. func (this *User) Offline() {
  63. this.server.mapLock.Lock()
  64. //根据key删除对应值
  65. delete(this.server.OnlineMap, this.Name)
  66. this.server.mapLock.Unlock()
  67. this.server.Broadcast(this, "is offline")
  68. }
  69. // 用户处理消息的业务
  70. func (this *User) DoMessage(msg string) {
  71. //添加who命令逻辑:查询当前在线用户
  72. if msg == "who" {
  73. this.server.mapLock.Lock()
  74. for _, user := range this.server.OnlineMap {
  75. onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "online...\n"
  76. this.SendMsg(onlineMsg)
  77. }
  78. } else if len(msg) > 7 && msg[:7] == "rename|" {
  79. newName := strings.Split(msg, "|")[1]
  80. //判断要修改的name是否已经被占用
  81. _, ok := this.server.OnlineMap[newName]
  82. if ok {
  83. this.SendMsg("the name is already exists...")
  84. } else {
  85. this.server.mapLock.Lock()
  86. delete(this.server.OnlineMap, this.Name)
  87. this.server.OnlineMap[newName] = this
  88. this.server.mapLock.Unlock()
  89. this.Name = newName //更新页面当前用户
  90. this.SendMsg("update name success:" + this.Name + "\n")
  91. }
  92. } else {
  93. this.server.Broadcast(this, msg)
  94. }
  95. }
  96. // 给当前user的客户端发送消息
  97. func (this *User) SendMsg(msg string) {
  98. this.conn.Write([]byte(msg))
  99. }

7 超时强踢功能

如果某个用户长时间不发消息,不活跃,达到一定时间则断开连接,达到强踢效果

  • 修改server.go:
    ①在用户的Hander() goroutine中,添加用户活跃channel,一旦有消息就向该channel发送数据
    ②在用户的Hander()goroutine中,添加定时器功能,超时则强踢

server.go:

  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. // 定义服务端
  10. type Server struct {
  11. ip string
  12. port int
  13. //定义一个map,用于存储在线用户
  14. OnlineMap map[string]*User
  15. //定义map锁,保证存储map时候的数据正确,避免并发读取
  16. mapLock sync.RWMutex
  17. //消息广播的channel[当该chan中有数据时,直接广播给所有在线用户]
  18. Message chan string
  19. }
  20. // 创建一个Server
  21. func NewServer(ip string, port int) *Server {
  22. return &Server{
  23. ip: ip,
  24. port: port,
  25. OnlineMap: make(map[string]*User),
  26. Message: make(chan string),
  27. }
  28. }
  29. // 定义Server方法
  30. func (this *Server) Start() {
  31. //根据ip+port监听socket套接字 tcp表明类型【socket listen】
  32. listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.ip, this.port))
  33. if err != nil {
  34. fmt.Println("net.Listen err=", err)
  35. return
  36. }
  37. //关闭套接字,避免资源浪费
  38. defer listen.Close()
  39. //启动监听Message的goroutine
  40. go this.ListenMessage()
  41. for {
  42. conn, err := listen.Accept()
  43. if err != nil {
  44. fmt.Println("listen accept err=", err)
  45. continue
  46. }
  47. //处理连接请求【具体业务:处理客户端请求】
  48. //开启协程处理,避免占用主线程【server一直要监听ip+port】
  49. go this.Handler(conn)
  50. }
  51. }
  52. // 处理连接请求
  53. func (this *Server) Handler(conn net.Conn) {
  54. //当前连接的业务[用户上线之后,发送广播通知]
  55. //fmt.Printf("连接建立成功...")
  56. // 创建user,并将user添加到OnlineMap中,此处操作map(可能涉及并发,因此加锁保证安全性)
  57. user := NewUser(conn, this)
  58. user.Online()
  59. //监听用户是否活跃的channel
  60. isLive := make(chan bool)
  61. // 接收用户发送的消息,并广播
  62. go func() {
  63. buf := make([]byte, 4096)
  64. //接收用户信息
  65. n, err := conn.Read(buf)
  66. if n == 0 {
  67. //用户下线
  68. user.Offline()
  69. return
  70. }
  71. if err != nil && err != io.EOF {
  72. fmt.Println("Conn read err:", err)
  73. return
  74. }
  75. //提取用户消息(去除"\n")
  76. msg := string(buf[:n-1])
  77. //用户针对msg进行消息处理
  78. user.DoMessage(msg)
  79. //用户的任意消息代表用户当前是一个活跃的
  80. isLive <- true
  81. }()
  82. //4. 当前handler阻塞【超时强制踢出】
  83. for {
  84. select {
  85. case <-isLive:
  86. //当前用户是活跃的,应该重置定时器
  87. //不做任何事,为了激活select,更新下面的定时器
  88. case <-time.After(time.Second * 10):
  89. //已经超时,将当前的User强制关闭
  90. user.SendMsg("you have been offline")
  91. //销毁用的资源,关闭channel
  92. close(user.C)
  93. //关闭连接
  94. conn.Close()
  95. //退出当前Handler[runtime.Goexit()]
  96. return
  97. }
  98. }
  99. }
  100. // 广播消息
  101. func (this *Server) Broadcast(user *User, message string) {
  102. sendMsg := "[" + user.Addr + "] " + user.Name + ":" + message
  103. //将消息写入广播chan中
  104. this.Message <- sendMsg
  105. }
  106. // 监听Message广播消息channel的goroutine,一旦有消息就广播发送给所有在线用户
  107. func (this *Server) ListenMessage() {
  108. for {
  109. msg := <-this.Message
  110. //遍历OnlineMap,将消息广播给所有用户[设置并发操作map,加锁]
  111. this.mapLock.Lock()
  112. for _, cli := range this.OnlineMap {
  113. //将广播消息写入用户的channel,等待用户监听读取
  114. cli.C <- msg
  115. }
  116. this.mapLock.Unlock()
  117. }
  118. }

8 私聊功能

消息格式:to|zhangsan|hello, how are you

  • 修改user.go的DoMessage()逻辑,新增私聊消息判断

user.go:

  1. package main
  2. import (
  3. "net"
  4. "strings"
  5. "unicode/utf8"
  6. )
  7. type User struct {
  8. Name string
  9. Addr string
  10. //管道用于接收服务端的消息
  11. C chan string
  12. //与服务器端的连接
  13. conn net.Conn
  14. //对应连接的Server
  15. server *Server
  16. }
  17. func NewUser(conn net.Conn, server *Server) *User {
  18. userAddr := conn.RemoteAddr().String()
  19. user := &User{
  20. Name: userAddr,
  21. Addr: userAddr,
  22. C: make(chan string),
  23. conn: conn,
  24. server: server,
  25. }
  26. //创建一个user就应该监听自己chan管道中的消息,如果有就取出
  27. go user.ListenMessage()
  28. return user
  29. }
  30. // 监听当前user 的channel,一旦有消息,就直接发送给对应的客户端
  31. func (this *User) ListenMessage() {
  32. for {
  33. msg := <-this.C
  34. //从user管道中读取消息,发送给user客户端
  35. 对中文进行处理
  36. //msgByte := []byte(msg)
  37. //bytes, err := simplifiedchinese.GB18030.NewDecoder().Bytes(msgByte)
  38. //if err != nil {
  39. // fmt.Println("simplifiedchinese decoder err=", err)
  40. //}
  41. runes := []rune(msg + "\n")
  42. bytes := make([]byte, len(runes)*4)
  43. for _, v := range runes {
  44. buf := make([]byte, 4)
  45. size := utf8.EncodeRune(buf, v)
  46. bytes = append(bytes, buf[:size]...)
  47. }
  48. //this.conn.Write([]byte(msg + "\n"))
  49. this.conn.Write(bytes)
  50. }
  51. }
  52. // Online:用户上线方法
  53. func (this *User) Online() {
  54. //用户上线,将用户添加到OnlineMap中
  55. this.server.mapLock.Lock()
  56. this.server.OnlineMap[this.Name] = this
  57. this.server.mapLock.Unlock()
  58. //广播用户上线信息
  59. this.server.Broadcast(this, "is online")
  60. }
  61. // 用户下线业务
  62. func (this *User) Offline() {
  63. this.server.mapLock.Lock()
  64. //根据key删除对应值
  65. delete(this.server.OnlineMap, this.Name)
  66. this.server.mapLock.Unlock()
  67. this.server.Broadcast(this, "is offline")
  68. }
  69. // 用户处理消息的业务
  70. func (this *User) DoMessage(msg string) {
  71. //添加who命令逻辑:查询当前在线用户
  72. if msg == "who" {
  73. this.server.mapLock.Lock()
  74. for _, user := range this.server.OnlineMap {
  75. onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "online...\n"
  76. this.SendMsg(onlineMsg)
  77. }
  78. } else if len(msg) > 7 && msg[:7] == "rename|" {
  79. newName := strings.Split(msg, "|")[1]
  80. //判断要修改的name是否已经被占用
  81. _, ok := this.server.OnlineMap[newName]
  82. if ok {
  83. this.SendMsg("the name is already exists...")
  84. } else {
  85. this.server.mapLock.Lock()
  86. delete(this.server.OnlineMap, this.Name)
  87. this.server.OnlineMap[newName] = this
  88. this.server.mapLock.Unlock()
  89. this.Name = newName //更新页面当前用户
  90. this.SendMsg("update name success:" + this.Name + "\n")
  91. }
  92. } else if len(msg) > 4 && msg[:3] == "to|" {
  93. //如果是私聊命令 消息格式: to|zhangsan|msg content
  94. //1. 获取对方用户名
  95. remoteName := strings.Split(msg, "|")[1]
  96. if remoteName == "" {
  97. this.SendMsg("the msg format is incorrect, please use the 'to|zhangsan|msg content' to send a msg\n")
  98. return
  99. }
  100. //2. 根据用户名,得到对方的user对象
  101. remoteUser, ok := this.server.OnlineMap[remoteName]
  102. if !ok {
  103. this.SendMsg("the user is not exist")
  104. return
  105. }
  106. //3. 获取消息内容,通过对方的User对象将消息内容发送过去
  107. content := strings.Split(msg, "|")[2]
  108. if content == "" {
  109. this.SendMsg("please do not send a empty msg\n")
  110. return
  111. }
  112. remoteUser.SendMsg(this.Name + "is speak to you:" + content)
  113. } else {
  114. this.server.Broadcast(this, msg)
  115. }
  116. }
  117. // 给当前user的客户端发送消息
  118. func (this *User) SendMsg(msg string) {
  119. this.conn.Write([]byte(msg))
  120. }

9 客户端实现(过程省略)

9.1 客户端类型定义与链接

9.2 解析命令行

9.3 菜单显示

9.4 更新用户名客户端实现

9.5 公聊模式

9.6 私聊模式

10 最终代码

①main.go

  1. package main
  2. func main() {
  3. server := NewServer("127.0.0.1", 8888)
  4. server.Start()
  5. }

②server.go

  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. type Server struct {
  10. Ip string
  11. Port int
  12. //在线用户的列表
  13. OnlineMap map[string]*User
  14. mapLock sync.RWMutex
  15. //消息广播的channel
  16. Message chan string
  17. }
  18. //创建一个server的接口
  19. func NewServer(ip string, port int) *Server {
  20. server := &Server{
  21. Ip: ip,
  22. Port: port,
  23. OnlineMap: make(map[string]*User),
  24. Message: make(chan string),
  25. }
  26. return server
  27. }
  28. //监听Message广播消息channel的goroutine,一旦有消息就发送给全部的在线User
  29. func (this *Server) ListenMessager() {
  30. for {
  31. msg := <-this.Message
  32. //将msg发送给全部的在线User
  33. this.mapLock.Lock()
  34. for _, cli := range this.OnlineMap {
  35. cli.C <- msg
  36. }
  37. this.mapLock.Unlock()
  38. }
  39. }
  40. //广播消息的方法
  41. func (this *Server) BroadCast(user *User, msg string) {
  42. sendMsg := "[" + user.Addr + "]" + user.Name + ":" + msg
  43. this.Message <- sendMsg
  44. }
  45. func (this *Server) Handler(conn net.Conn) {
  46. //...当前链接的业务
  47. //fmt.Println("链接建立成功")
  48. user := NewUser(conn, this)
  49. user.Online()
  50. //监听用户是否活跃的channel
  51. isLive := make(chan bool)
  52. //接受客户端发送的消息
  53. go func() {
  54. buf := make([]byte, 4096)
  55. for {
  56. n, err := conn.Read(buf)
  57. if n == 0 {
  58. user.Offline()
  59. return
  60. }
  61. if err != nil && err != io.EOF {
  62. fmt.Println("Conn Read err:", err)
  63. return
  64. }
  65. //提取用户的消息(去除'\n')
  66. msg := string(buf[:n-1])
  67. //用户针对msg进行消息处理
  68. user.DoMessage(msg)
  69. //用户的任意消息,代表当前用户是一个活跃的
  70. isLive <- true
  71. }
  72. }()
  73. //当前handler阻塞
  74. for {
  75. select {
  76. case <-isLive:
  77. //当前用户是活跃的,应该重置定时器
  78. //不做任何事情,为了激活select,更新下面的定时器
  79. case <-time.After(time.Second * 300):
  80. //已经超时
  81. //将当前的User强制的关闭
  82. user.SendMsg("你被踢了")
  83. //销毁用的资源
  84. close(user.C)
  85. //关闭连接
  86. conn.Close()
  87. //退出当前Handler
  88. return //runtime.Goexit()
  89. }
  90. }
  91. }
  92. //启动服务器的接口
  93. func (this *Server) Start() {
  94. //socket listen
  95. listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
  96. if err != nil {
  97. fmt.Println("net.Listen err:", err)
  98. return
  99. }
  100. //close listen socket
  101. defer listener.Close()
  102. //启动监听Message的goroutine
  103. go this.ListenMessager()
  104. for {
  105. //accept
  106. conn, err := listener.Accept()
  107. if err != nil {
  108. fmt.Println("listener accept err:", err)
  109. continue
  110. }
  111. //do handler
  112. go this.Handler(conn)
  113. }
  114. }

③user.go

  1. package main
  2. import (
  3. "net"
  4. "strings"
  5. )
  6. type User struct {
  7. Name string
  8. Addr string
  9. C chan string
  10. conn net.Conn
  11. server *Server
  12. }
  13. //创建一个用户的API
  14. func NewUser(conn net.Conn, server *Server) *User {
  15. userAddr := conn.RemoteAddr().String()
  16. user := &User{
  17. Name: userAddr,
  18. Addr: userAddr,
  19. C: make(chan string),
  20. conn: conn,
  21. server: server,
  22. }
  23. //启动监听当前user channel消息的goroutine
  24. go user.ListenMessage()
  25. return user
  26. }
  27. //用户的上线业务
  28. func (this *User) Online() {
  29. //用户上线,将用户加入到onlineMap中
  30. this.server.mapLock.Lock()
  31. this.server.OnlineMap[this.Name] = this
  32. this.server.mapLock.Unlock()
  33. //广播当前用户上线消息
  34. this.server.BroadCast(this, "已上线")
  35. }
  36. //用户的下线业务
  37. func (this *User) Offline() {
  38. //用户下线,将用户从onlineMap中删除
  39. this.server.mapLock.Lock()
  40. delete(this.server.OnlineMap, this.Name)
  41. this.server.mapLock.Unlock()
  42. //广播当前用户上线消息
  43. this.server.BroadCast(this, "下线")
  44. }
  45. //给当前User对应的客户端发送消息
  46. func (this *User) SendMsg(msg string) {
  47. this.conn.Write([]byte(msg))
  48. }
  49. //用户处理消息的业务
  50. func (this *User) DoMessage(msg string) {
  51. if msg == "who" {
  52. //查询当前在线用户都有哪些
  53. this.server.mapLock.Lock()
  54. for _, user := range this.server.OnlineMap {
  55. onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "在线...\n"
  56. this.SendMsg(onlineMsg)
  57. }
  58. this.server.mapLock.Unlock()
  59. } else if len(msg) > 7 && msg[:7] == "rename|" {
  60. //消息格式: rename|张三
  61. newName := strings.Split(msg, "|")[1]
  62. //判断name是否存在
  63. _, ok := this.server.OnlineMap[newName]
  64. if ok {
  65. this.SendMsg("当前用户名被使用\n")
  66. } else {
  67. this.server.mapLock.Lock()
  68. delete(this.server.OnlineMap, this.Name)
  69. this.server.OnlineMap[newName] = this
  70. this.server.mapLock.Unlock()
  71. this.Name = newName
  72. this.SendMsg("您已经更新用户名:" + this.Name + "\n")
  73. }
  74. } else if len(msg) > 4 && msg[:3] == "to|" {
  75. //消息格式: to|张三|消息内容
  76. //1 获取对方的用户名
  77. remoteName := strings.Split(msg, "|")[1]
  78. if remoteName == "" {
  79. this.SendMsg("消息格式不正确,请使用 \"to|张三|你好啊\"格式。\n")
  80. return
  81. }
  82. //2 根据用户名 得到对方User对象
  83. remoteUser, ok := this.server.OnlineMap[remoteName]
  84. if !ok {
  85. this.SendMsg("该用户名不不存在\n")
  86. return
  87. }
  88. //3 获取消息内容,通过对方的User对象将消息内容发送过去
  89. content := strings.Split(msg, "|")[2]
  90. if content == "" {
  91. this.SendMsg("无消息内容,请重发\n")
  92. return
  93. }
  94. remoteUser.SendMsg(this.Name + "对您说:" + content)
  95. } else {
  96. this.server.BroadCast(this, msg)
  97. }
  98. }
  99. //监听当前User channel的 方法,一旦有消息,就直接发送给对端客户端
  100. func (this *User) ListenMessage() {
  101. for {
  102. msg := <-this.C
  103. this.conn.Write([]byte(msg + "\n"))
  104. }
  105. }

④client.go

  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "net"
  7. "os"
  8. )
  9. type Client struct {
  10. ServerIp string
  11. ServerPort int
  12. Name string
  13. conn net.Conn
  14. flag int //当前client的模式
  15. }
  16. func NewClient(serverIp string, serverPort int) *Client {
  17. //创建客户端对象
  18. client := &Client{
  19. ServerIp: serverIp,
  20. ServerPort: serverPort,
  21. flag: 999,
  22. }
  23. //链接server
  24. conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
  25. if err != nil {
  26. fmt.Println("net.Dial error:", err)
  27. return nil
  28. }
  29. client.conn = conn
  30. //返回对象
  31. return client
  32. }
  33. //处理server回应的消息, 直接显示到标准输出即可
  34. func (client *Client) DealResponse() {
  35. //一旦client.conn有数据,就直接copy到stdout标准输出上, 永久阻塞监听
  36. io.Copy(os.Stdout, client.conn)
  37. }
  38. func (client *Client) menu() bool {
  39. var flag int
  40. fmt.Println("1.公聊模式")
  41. fmt.Println("2.私聊模式")
  42. fmt.Println("3.更新用户名")
  43. fmt.Println("0.退出")
  44. fmt.Scanln(&flag)
  45. if flag >= 0 && flag <= 3 {
  46. client.flag = flag
  47. return true
  48. } else {
  49. fmt.Println(">>>>请输入合法范围内的数字<<<<")
  50. return false
  51. }
  52. }
  53. //查询在线用户
  54. func (client *Client) SelectUsers() {
  55. sendMsg := "who\n"
  56. _, err := client.conn.Write([]byte(sendMsg))
  57. if err != nil {
  58. fmt.Println("conn Write err:", err)
  59. return
  60. }
  61. }
  62. //私聊模式
  63. func (client *Client) PrivateChat() {
  64. var remoteName string
  65. var chatMsg string
  66. client.SelectUsers()
  67. fmt.Println(">>>>请输入聊天对象[用户名], exit退出:")
  68. fmt.Scanln(&remoteName)
  69. for remoteName != "exit" {
  70. fmt.Println(">>>>请输入消息内容, exit退出:")
  71. fmt.Scanln(&chatMsg)
  72. for chatMsg != "exit" {
  73. //消息不为空则发送
  74. if len(chatMsg) != 0 {
  75. sendMsg := "to|" + remoteName + "|" + chatMsg + "\n\n"
  76. _, err := client.conn.Write([]byte(sendMsg))
  77. if err != nil {
  78. fmt.Println("conn Write err:", err)
  79. break
  80. }
  81. }
  82. chatMsg = ""
  83. fmt.Println(">>>>请输入消息内容, exit退出:")
  84. fmt.Scanln(&chatMsg)
  85. }
  86. client.SelectUsers()
  87. fmt.Println(">>>>请输入聊天对象[用户名], exit退出:")
  88. fmt.Scanln(&remoteName)
  89. }
  90. }
  91. func (client *Client) PublicChat() {
  92. //提示用户输入消息
  93. var chatMsg string
  94. fmt.Println(">>>>请输入聊天内容,exit退出.")
  95. fmt.Scanln(&chatMsg)
  96. for chatMsg != "exit" {
  97. //发给服务器
  98. //消息不为空则发送
  99. if len(chatMsg) != 0 {
  100. sendMsg := chatMsg + "\n"
  101. _, err := client.conn.Write([]byte(sendMsg))
  102. if err != nil {
  103. fmt.Println("conn Write err:", err)
  104. break
  105. }
  106. }
  107. chatMsg = ""
  108. fmt.Println(">>>>请输入聊天内容,exit退出.")
  109. fmt.Scanln(&chatMsg)
  110. }
  111. }
  112. func (client *Client) UpdateName() bool {
  113. fmt.Println(">>>>请输入用户名:")
  114. fmt.Scanln(&client.Name)
  115. sendMsg := "rename|" + client.Name + "\n"
  116. _, err := client.conn.Write([]byte(sendMsg))
  117. if err != nil {
  118. fmt.Println("conn.Write err:", err)
  119. return false
  120. }
  121. return true
  122. }
  123. func (client *Client) Run() {
  124. for client.flag != 0 {
  125. for client.menu() != true {
  126. }
  127. //根据不同的模式处理不同的业务
  128. switch client.flag {
  129. case 1:
  130. //公聊模式
  131. client.PublicChat()
  132. break
  133. case 2:
  134. //私聊模式
  135. client.PrivateChat()
  136. break
  137. case 3:
  138. //更新用户名
  139. client.UpdateName()
  140. break
  141. }
  142. }
  143. }
  144. var serverIp string
  145. var serverPort int
  146. //./client -ip 127.0.0.1 -port 8888
  147. func init() {
  148. flag.StringVar(&serverIp, "ip", "127.0.0.1", "设置服务器IP地址(默认是127.0.0.1)")
  149. flag.IntVar(&serverPort, "port", 8888, "设置服务器端口(默认是8888)")
  150. }
  151. func main() {
  152. //命令行解析
  153. flag.Parse()
  154. client := NewClient(serverIp, serverPort)
  155. if client == nil {
  156. fmt.Println(">>>>> 链接服务器失败...")
  157. return
  158. }
  159. //单独开启一个goroutine去处理server的回执消息
  160. go client.DealResponse()
  161. fmt.Println(">>>>>链接服务器成功...")
  162. //启动客户端的业务
  163. client.Run()
  164. }

11 go的全部生态

在这里插入图片描述

参考:

  • 资料地址:https://pan.baidu.com/s/1glckD7XGInHDFQQKCRE66g\#list/path=%2F

发表评论

表情:
评论列表 (有 0 条评论,147人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Go基础编程:Go语言介绍

    Go语言是什么 2009年11月10日,Go语言正式成为开源编程语言家庭的一员。 Go语言(或称Golang)是云计算时代的C语言。Go语言的诞生是为了让程序员有更高的

    相关 go目录介绍

    一:工作区 所谓工作区可以理解为工作空间,由你安装go时指定工作区目录,比如我的:E:\\go\_project 此目录就是安装go时设置的GOPATH环境变量,GOPA