diff --git a/display.go b/display.go index 7152be0..a6d19bf 100644 --- a/display.go +++ b/display.go @@ -1,49 +1,122 @@ package main // Packages -import "github.com/gorilla/websocket" +import "errors" +import "log" +import "net/http" import "sync" +import "time" +import "github.com/gorilla/websocket" // Display instance type Display struct { // WebSocket used to communicate with the display sock *websocket.Conn + sockSendMtx sync.Mutex // User currently connected to the display user *User // One-time-pass currently shown on the display + otpMtx sync.Mutex otp string + + // Channel to pass the answer from the display coroutine to the user couroutine + answerCh chan string +} + +// Helper function to flush channels +func chFlush(ch *chan string) { + for { + empty := false + select { + // If data is available, read it and try again + case <-*ch: + continue + + // If no data is available, stop reading + default: + empty = true + } + if empty { break } + } +} + +// Helper function to read from a channel with a timeout +func chReadTimeout(ch *chan string, timeoutMS int) (string, error) { + select { + // If data is available, return it with no error + case data := <-*ch: + return data, nil + + // If no data has been received and the timeout is reached, return an error + case <-time.After(timeoutMS * time.Millisecond): + return "", errors.New("timeout") + } } // List of all connected displays var displaysLck sync.Mutex -var displays map[string]*Display +var displays = map[string]*Display{} // Get the display back to its idle state func (this *Display) reset() { + // Acquire the sending mutex + this.sockSendMtx.Lock() + // Send a reset command this.sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{ mtype: "reset", })) + + // Release the sending mutex + this.sockSendMtx.Unlock() } // Switch the display to streaming mode func (this *Display) stream() { + // Acquire the sending mutex + this.sockSendMtx.Lock() + // Send a show-pin command this.sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{ mtype: "stream", })) + + // Release the sending mutex + this.sockSendMtx.Unlock() } // Send a WebRTC offer to the display and get an answer -func (this *Display) webRTCOffer(offer string, timeoutMS int) string { - // TODO - return "" +func (this *Display) webRTCOffer(offer string, timeoutMS int) (string, error) { + // Flush the answer channel + chFlush(&this.answerCh) + + // Acquire the sending mutex + this.sockSendMtx.Lock() + + // Send the offer + this.sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{ + mtype: "webrtc-offer", + arguments: map[string]interface{}{ + "offer": offer, + }, + })) + + // Release the sending mutex + this.sockSendMtx.Unlock() + + // TODO: Close the connection if the display failed to respond? + + // Receive the answer + return chReadTimeout(&this.answerCh, CONF_TIMEOUT_MS) } // Send an ICE candiate to the display func (this *Display) iceCandidate(candidate string) { + // Acquire the sending mutex + this.sockSendMtx.Lock() + // Send the candidate sendMessage(this.sock, Message{ mtype: "ice-candidate", @@ -51,9 +124,95 @@ func (this *Display) iceCandidate(candidate string) { "candidate": candidate, }, }) + + // Release the sending mutex + this.sockSendMtx.Unlock() } // Connection handler for displays -func displayHandler(sock *websocket.Conn, dispID string) { +func displayHandler(sock *websocket.Conn, dispID string, otp string) { + // Create the display object + disp := Display{ sock: sock, otp: otp } + + // Acquire the sending mutex + disp.sockSendMtx.Lock() + + // Acquire the display list + displaysLck.Lock() + // Check that a display with that ID doesn't already exist + if displays[dispID] != nil { + // Release the display list + displaysLck.Unlock() + + // Send back an error + sendErrorMessage(sock, http.StatusConflict) + + // Release the sending mutex + disp.sockSendMtx.Unlock() + } + + // Add the display to the list + displays[dispID] = &disp + + // Release the display list + displaysLck.Unlock() + + // Send back the config for the display to use + sendMessage(sock, Message{ + mtype: "config", + arguments: map[string]interface{}{ + "timeout": CONF_TIMEOUT_MS, + "iceServers": CONF_ICE_SERVERS, + }, + }) + + // Release the sending mutex + disp.sockSendMtx.Unlock() + + // Log the new display + log.Println("New display: ID='" + dispID + "', OTP='" + otp + "'") + + // Message loop + for { + // Receive a message + msg, err := recvMessage(sock, 0) + + // Give up on the connection if there was an error + if (err != nil) { break } + + // Handle the message depending on its type + switch msg.mtype { + case "otp": + // Check that the message contains an OTP + otp, valid := msg.arguments["otp"].(string) + if (!valid) { break } + + // Acquire the display's OTP + disp.otpMtx.Lock() + + // Update the OTP + disp.otp = otp + + // Release the display's OTP + disp.otpMtx.Unlock() + + case "answer": + // Check that the message contains an answer + answer, valid := msg.arguments["answer"].(string) + if (!valid) { break } + + // Send the answer through the display's answer channel + disp.answerCh <- answer + + case "ice-candidate": + // TODO + + default: + // Give up + break + } + } + + // TODO: Gracefull disconnect the connected user if there is one } \ No newline at end of file diff --git a/main.go b/main.go index 50b3ffd..27a411a 100644 --- a/main.go +++ b/main.go @@ -14,9 +14,9 @@ func main() { http.Handle("/", static) // Create a handler for the signaling backend - // http.HandleFunc("/sig", wsHandler) + http.HandleFunc("/sig", wsHandler) // Run the server err := http.ListenAndServe(":3000", nil) - if( err != nil) { log.Fatal(err) } + if (err != nil) { log.Fatal(err) } } \ No newline at end of file diff --git a/message.go b/message.go index 8dcea16..38d3c33 100644 --- a/message.go +++ b/message.go @@ -1,48 +1,110 @@ package main +import "encoding/json" +import "errors" +import "time" import "github.com/gorilla/websocket" -//import "encoding/json" // Backend message object type Message struct { // Type of message - mtype string + mtype string; // Arguments of the message - arguments map[string]interface{} + arguments map[string]interface{}; } // Get the display back to its idle state func encodeMessage(msg Message) []byte { - // TODO - return nil + // Create the message map and set the message type + msgJson := map[string]interface{}{}; + msgJson["type"] = msg.mtype; + + // Add all arguments + for k, v := range msg.arguments { + // Skip the type key + if (k == "type") { continue }; + + // Add the key/value to the argument map + msgJson[k] = v; + } + + // Serialize the message + data, _ := json.Marshal(msgJson); + + // Return the data + return data; } // Get the display back to its idle state -func decodeMessage(data []byte) Message { - // TODO - return Message{} +func decodeMessage(data []byte) (Message, error) { + // Attempt to parse the message + var msgJson map[string]interface{}; + err := json.Unmarshal(data, &msgJson); + if (err != nil) { return Message{}, err; } + + // If no message type is given, return an error + if msgJson["type"] == nil { return Message{}, errors.New("Invalid message"); } + + // If the message type is not a string, return an error + mtype, valid := msgJson["type"].(string); + if !valid { return Message{}, errors.New("Invalid message"); } + + // Create the message object + msg := Message{ mtype: mtype, arguments: map[string]interface{}{} }; + + // Extract the arguments + for k, v := range msgJson { + // Skip the type key + if (k == "type") { continue; } + + // Add the key/value to the argument map + msg.arguments[k] = v; + } + + // Return the decoded message with no error + return msg, nil; } // Encode a message and send it over a WebSocket func sendMessage(sock *websocket.Conn, msg Message) { // Encode and send the message - sock.WriteMessage(websocket.TextMessage, encodeMessage(msg)) + sock.WriteMessage(websocket.TextMessage, encodeMessage(msg)); } // Receive a message from a WebSocket and decode it -func recvMessage(sock *websocket.Conn, timeoutMS int) Message { - // TODO - return Message{} +func recvMessage(sock *websocket.Conn, timeoutMS int) (Message, error) { + for { + // Configure the timeout + if timeoutMS > 0 { + // Milliseconds given + sock.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(timeoutMS))) + } else { + // No timeout given + sock.SetReadDeadline(time.Time{}); + } + + // Receive a WebSocket message + mt, msgData, err := sock.ReadMessage(); + + // If there was an error, give up and return it + if (err != nil) { return Message{}, err; } + + // If the message is not a text message, continue waiting + if (mt != websocket.TextMessage) { continue; } + + // Return the decoded message + return decodeMessage(msgData); + } } // Encode an error message and send it over a WebSocket -func sendErrorMessage(sock *websocket.Conn, err string) { +func sendErrorMessage(sock *websocket.Conn, code int) { // Send the error message sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{ mtype: "error", arguments: map[string]interface{}{ - "error": err, + "code": code, }, - })) + })); } \ No newline at end of file diff --git a/user.go b/user.go index f446edf..2186097 100644 --- a/user.go +++ b/user.go @@ -1,29 +1,27 @@ package main // Packages -//import "log" -import "github.com/gorilla/websocket" -//import "encoding/json" +import "log" import "sync" +import "net/http" +import "github.com/gorilla/websocket" // General client instance type User struct { // WebSocket used to communicate with the user - sock *websocket.Conn + sock *websocket.Conn; // Display mutex - displayMtx sync.Mutex + displayMtx sync.Mutex; // Display that the user is connecting to - display *Display + display *Display; } -// TODO: Check type - // Connection handler for users func userHandler(sock *websocket.Conn) { // Initialize the user instance - user := User{ sock: sock, display: nil } + user := User{ sock: sock, display: nil }; // Send back the config for the user to use sendMessage(sock, Message{ @@ -32,110 +30,116 @@ func userHandler(sock *websocket.Conn) { "timeout": CONF_TIMEOUT_MS, "iceServers": CONF_ICE_SERVERS, }, - }) + }); // Message loop for { // Receive a message - msg := recvMessage(sock, 0) + msg, err := recvMessage(sock, 0); - // TODO: exit on error + // Give up on the connection if there was an error + if (err != nil) { break; } // Handle the message depending on its type switch msg.mtype { case "connect": // Check that a display ID was provided - if msg.arguments["dispID"] == nil { - sendErrorMessage(sock, "Missing display ID") - continue; - } + dispID, valid := msg.arguments["dispID"].(string) + if (!valid) { break; } // Check that an OTP was provided - if msg.arguments["otp"] == nil { - sendErrorMessage(sock, "Missing OTP") - continue; - } + otp, valid := msg.arguments["otp"].(string) + if (!valid) { break; } // Acquire the display ID list - displaysLck.Lock() + displaysLck.Lock(); // Check that the display ID exists - dispID := msg.arguments["dispID"].(string) - if displays[dispID] == nil { + if (displays[dispID] == nil) { // Release the display list - displaysLck.Unlock() + displaysLck.Unlock(); // Send back an error - sendErrorMessage(sock, "Unknown display") + sendErrorMessage(sock, http.StatusNotFound); continue; } + // Acquire the displays OTP + displays[dispID].otpMtx.Lock(); + // Check the OTP - otp := msg.arguments["otp"].(string) - if otp == "" || otp != displays[dispID].otp { + if (otp == "" || otp != displays[dispID].otp) { + // Release the display's OTP + displays[dispID].otpMtx.Unlock(); + // Release the display list - displaysLck.Unlock() + displaysLck.Unlock(); // Send back an error - sendErrorMessage(sock, "Invalid OTP") + sendErrorMessage(sock, http.StatusUnauthorized); continue; } - // TODO: Check types + // Release the display's OTP + displays[dispID].otpMtx.Unlock(); // Acquire the user's display pointer - user.displayMtx.Lock() + user.displayMtx.Lock(); // Register the user and display to each other - user.display = displays[dispID] - user.display.user = &user + user.display = displays[dispID]; + user.display.user = &user; // Put the display into streaming mode - user.display.stream() + user.display.stream(); // TODO: Check for error // Release the user's display pointer - user.displayMtx.Lock() + user.displayMtx.Lock(); // Release the display list - displaysLck.Unlock() + displaysLck.Unlock(); + + // Log the connection + log.Println("User successfully connected to display: ID='", dispID, "'"); // Notify the user of the successful connection sendMessage(sock, Message{ mtype: "success", - }) + }); case "webrtc-offer": // Check that the message contains an offer - if msg.arguments["offer"] == nil { - // Send back an error - sendErrorMessage(sock, "No offer given") - continue; - } - - // TODO: Check type + offer, valid := msg.arguments["offer"].(string) + if (!valid) { break; } // Acquire the user's display pointer - user.displayMtx.Lock() + user.displayMtx.Lock(); // Check that the user is connected to a display - if user.display == nil { + if (user.display == nil) { // Release the user's display pointer - user.displayMtx.Unlock() + user.displayMtx.Unlock(); // Send back an error - sendErrorMessage(sock, "Not connected") + sendErrorMessage(sock, http.StatusForbidden); continue; } // Send the offer to the display and get the response - answer := user.display.webRTCOffer(msg.arguments["offer"].(string), CONF_TIMEOUT_MS) + answer, err := user.display.webRTCOffer(offer, CONF_TIMEOUT_MS); + if (err != nil) { + // Release the user's display pointer + user.displayMtx.Unlock(); - // TODO: Check for error + // Send back an error + sendErrorMessage(sock, http.StatusBadGateway); + continue; + } // Release the user's display pointer - user.displayMtx.Unlock() + user.displayMtx.Unlock(); // Send back the response sendMessage(sock, Message{ @@ -143,42 +147,39 @@ func userHandler(sock *websocket.Conn) { arguments: map[string]interface{}{ "answer": answer, }, - }) + }); case "ice-candidate": // Check that the message contains an ice candidate - if msg.arguments["candidate"] == nil { - // Send back an error - sendErrorMessage(sock, "No offer given") - continue; - } - - // TODO: Check type + candidate, valid := msg.arguments["candidate"].(string) + if (!valid) { break; } // Acquire the user's display pointer - user.displayMtx.Lock() + user.displayMtx.Lock(); // Check that the user is connected to a display - if user.display == nil { + if (user.display == nil) { // Release the user's display pointer - user.displayMtx.Unlock() + user.displayMtx.Unlock(); // Send back an error - sendErrorMessage(sock, "Not connected") + sendErrorMessage(sock, http.StatusForbidden); continue; } // Send the ice candidtate to the display - user.display.iceCandidate(msg.arguments["candidate"].(string)) + user.display.iceCandidate(candidate); + + // TODO: Check error // Release the user's display pointer - user.displayMtx.Unlock() + user.displayMtx.Unlock(); default: - // Send back an error - sendErrorMessage(sock, "Invalid message type") + // Give up + break; } } - // If the user was connected to a display, disconnect it + // TODO: Gracefull disconnect the connected display if there is one } \ No newline at end of file diff --git a/wshandler.go b/wshandler.go index d29c7e4..1c14d12 100644 --- a/wshandler.go +++ b/wshandler.go @@ -21,7 +21,10 @@ func wsHandler(respWriter http.ResponseWriter, req *http.Request) { defer sock.Close() // Receive the init message - msg := recvMessage(sock, 5000) + msg, err := recvMessage(sock, 5000) + + // If there was an error or timeout, give up on the connection + if err != nil { return } // If it's not an init message, give up if msg.mtype != "init" { return } @@ -34,9 +37,14 @@ func wsHandler(respWriter http.ResponseWriter, req *http.Request) { case "display": // Check that the display has provided its ID - if msg.arguments["dispID"] == nil { return } + dispID, valid := msg.arguments["dispID"].(string) + if !valid { return } + + // Check that the display has provided its OTP + otp, valid := msg.arguments["otp"].(string) + if !valid { return } // Handle as a display - displayHandler(sock, msg.arguments["dispID"].(string)) + displayHandler(sock, dispID, otp) } } \ No newline at end of file diff --git a/www/display/index.html b/www/display/index.html index f5533e9..1876b6d 100644 --- a/www/display/index.html +++ b/www/display/index.html @@ -5,7 +5,7 @@ -