fix connection locks issues

This commit is contained in:
Patrick Nagurny
2019-05-20 10:40:40 -04:00
parent 1c10277ca2
commit e64f60fd09

View File

@@ -73,7 +73,7 @@ func Handler(w rest.ResponseWriter, r *rest.Request) {
if err != nil { if err != nil {
log.Println(err.Error()) log.Println(err.Error())
writeMessage(c, websocket.CloseMessage, websocket.FormatCloseMessage(4001, err.Error())) writeMessageRaw(c, websocket.CloseMessage, websocket.FormatCloseMessage(4001, err.Error()))
break break
} }
@@ -83,7 +83,7 @@ func Handler(w rest.ResponseWriter, r *rest.Request) {
err = authenticate(message, c) err = authenticate(message, c)
if err != nil { if err != nil {
log.Println("Authentication error " + err.Error()) log.Println("Authentication error " + err.Error())
writeMessage(c, websocket.CloseMessage, websocket.FormatCloseMessage(4000, err.Error())) writeMessageRaw(c, websocket.CloseMessage, websocket.FormatCloseMessage(4000, err.Error()))
break break
} }
continue continue
@@ -146,7 +146,7 @@ func processMessage(message Message, conn *websocket.Conn) error {
return err return err
} }
err = writeMessage(conn, websocket.TextMessage, responseData) err = writeMessageRaw(conn, websocket.TextMessage, responseData)
if err != nil { if err != nil {
unsubscribeAll(conn) unsubscribeAll(conn)
@@ -173,6 +173,9 @@ func subscribe(conn *websocket.Conn, key string, clientMap map[string][]*websock
} }
func unsubscribe(conn *websocket.Conn, key string, clientMap map[string][]*websocket.Conn) { func unsubscribe(conn *websocket.Conn, key string, clientMap map[string][]*websocket.Conn) {
locks[conn].Lock()
defer locks[conn].Unlock()
newConns := clientMap[key][:0] newConns := clientMap[key][:0]
for _, c := range clientMap[key] { for _, c := range clientMap[key] {
@@ -183,7 +186,8 @@ func unsubscribe(conn *websocket.Conn, key string, clientMap map[string][]*webso
} }
func unsubscribeAll(conn *websocket.Conn) { func unsubscribeAll(conn *websocket.Conn) {
// TODO fix "concurrent map iteration and map write" error locks[conn].Lock()
for key, conns := range txSubscriptions { for key, conns := range txSubscriptions {
newConns := conns[:0] newConns := conns[:0]
for _, c := range conns { for _, c := range conns {
@@ -227,16 +231,7 @@ func PushTransaction(transaction *types.Transaction, userIds []string, action st
for _, userId := range userIds { for _, userId := range userIds {
key := getKey(userId, transaction.OrgId) key := getKey(userId, transaction.OrgId)
for _, conn := range txSubscriptions[key] { for _, conn := range txSubscriptions[key] {
sequenceNumbers[conn]++ err := writeMessage(conn, &message)
message.SequenceNumber = sequenceNumbers[conn]
messageData, err := json.Marshal(message)
if err != nil {
log.Println("PushTransaction json error:", err)
return
}
err = writeMessage(conn, websocket.TextMessage, messageData)
if err != nil { if err != nil {
log.Println("Cannot PushTransaction to client:", err) log.Println("Cannot PushTransaction to client:", err)
@@ -252,15 +247,7 @@ func PushAccount(account *types.Account, userIds []string, action string) {
for _, userId := range userIds { for _, userId := range userIds {
key := getKey(userId, account.OrgId) key := getKey(userId, account.OrgId)
for _, conn := range accountSubscriptions[key] { for _, conn := range accountSubscriptions[key] {
sequenceNumbers[conn]++ err := writeMessage(conn, &message)
message.SequenceNumber = sequenceNumbers[conn]
messageData, err := json.Marshal(message)
if err != nil {
log.Println("PushAccount error:", err)
return
}
err = writeMessage(conn, websocket.TextMessage, messageData)
if err != nil { if err != nil {
log.Println("Cannot PushAccount to client:", err) log.Println("Cannot PushAccount to client:", err)
@@ -276,16 +263,7 @@ func PushPrice(price *types.Price, userIds []string, action string) {
for _, userId := range userIds { for _, userId := range userIds {
key := getKey(userId, price.OrgId) key := getKey(userId, price.OrgId)
for _, conn := range priceSubscriptions[key] { for _, conn := range priceSubscriptions[key] {
sequenceNumbers[conn]++ err := writeMessage(conn, &message)
message.SequenceNumber = sequenceNumbers[conn]
messageData, err := json.Marshal(message)
if err != nil {
log.Println("PushPrice error:", err)
return
}
err = writeMessage(conn, websocket.TextMessage, messageData)
if err != nil { if err != nil {
log.Println("Cannot PushPrice to client:", err) log.Println("Cannot PushPrice to client:", err)
@@ -337,7 +315,24 @@ func checkVersion(clientVersion string) error {
return nil return nil
} }
func writeMessage(conn *websocket.Conn, messageType int, data []byte) error { func writeMessage(conn *websocket.Conn, message *Message) error {
locks[conn].Lock()
sequenceNumbers[conn]++
message.SequenceNumber = sequenceNumbers[conn]
locks[conn].Unlock()
messageData, err := json.Marshal(message)
if err != nil {
log.Println("json error:", err)
return err
}
return writeMessageRaw(conn, websocket.TextMessage, messageData)
}
func writeMessageRaw(conn *websocket.Conn, messageType int, data []byte) error {
locks[conn].Lock() locks[conn].Lock()
defer locks[conn].Unlock() defer locks[conn].Unlock()
return conn.WriteMessage(messageType, data) return conn.WriteMessage(messageType, data)