golang mysql批量插入实例
业务逻辑:tcp接收消息后,先将消息保存到数据库再进行后续操作。问题:压测发现,tcp连接非常多,消息更多,每秒10000条消息时,程序会内存溢出。解决:自然就想到了,问题出在不能及时把消息处理掉的问题。所以先想能不能加快mysql的处理流程,可以定时批量插入代替及时插入。思路:1.接收tcp消息,通多channel发送到临时的切片。2.定时任务,定时将临时切片的数据批量插入到mysql。知识点:
·
业务逻辑:
tcp接收消息后,先将消息保存到数据库再进行后续操作。
问题:
压测发现,tcp连接非常多,消息更多,每秒10000条消息时,程序会内存溢出。
解决:
自然就想到了,问题出在不能及时把消息处理掉的问题。所以先想能不能加快mysql的处理流程,可以定时批量插入代替及时插入。
思路:
1.接收tcp消息,通多channel发送到临时的切片。
2.定时任务,定时将临时切片的数据批量插入到mysql。
知识点:
1.channel来接收消息
2.互斥锁,保证切片操作和mysql插入互斥
3.定时器,定时触发插入任务
package service
import (
"bytes"
"fmt"
"sync"
"time"
)
/*
问题:mysql连接过多可能导致内存溢出
思路:
1.使用定时批量更新代替即收即插
2.使用channel,将接受的信息先留在内存中,然后定时,插入数据库
3.定时器,定时将内存中的数据整理后插入
*/
var tmpMessage []message
var messageMysqlChan chan message
var mesLock sync.Mutex //使用互斥锁,保证切片添加和数据库插入互斥
func init() {
messageMysqlChan = make(chan message, 100000)
go batchMessageReceive()
go batchStartTimer()
}
/*
接收消息的逻辑
只负责接收消息
*/
func batchMessageReceive() {
for {
select {
case oneMessage := <-messageMysqlChan:
mesLock.Lock()
tmpMessage = append(tmpMessage, oneMessage)
mesLock.Unlock()
}
}
}
func batch(batchMessage []message) {
if len(batchMessage) == 0 {
fmt.Print("空消息")
return
}
var buffer bytes.Buffer
sql := "insert into `t_msg` (`id`,`msg`,`insert_time`) values"
if _, err := buffer.WriteString(sql); err != nil {
fmt.Print(err.Error())
}
for index, value := range batchMessage {
if index == len(batchMessage)-1 {
buffer.WriteString(fmt.Sprintf("('%d','%s','%s');", value.Id, value.Mn, value.Msg, value.InsertTime))
} else {
buffer.WriteString(fmt.Sprintf("('%d','%s','%s'),", value.Id, value.Mn, value.Msg, value.InsertTime))
}
}
err := MysqlDB.Exec(buffer.String()).Error
if err != nil {
fmt.Print("插入消息失败:", err.Error())
}
return
}
/*
插入的逻辑
*/
func batchStartTimer() {
tick := time.NewTicker(5 * time.Second)
for {
select {
case <-tick.C:
mesLock.Lock()
batch(tmpMessage)
tmpMessage = tmpMessage[0:0] //置空,重复利用切片
mesLock.Unlock()
}
}
}
参考博客:

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)