Files
wiscast/display.go

258 lines
5.7 KiB
Go

package main
// Packages
import "errors"
import "log"
import "net/http"
import "sync"
import "time"
import "github.com/gorilla/websocket"
// Display instance
type Display struct {
// WebSocket used to communicate with the display
sock *websocket.Conn;
sockSendMtx sync.Mutex;
// User mutex
userMtx sync.Mutex;
// User currently connected to the display
user *User;
// One-time-pass currently shown on the display
otpMtx sync.Mutex;
otp string;
// Channel to pass the answer from the display coroutine to the user couroutine
answerCh chan interface{};
}
// Helper function to flush channels
func chFlush(ch *chan interface{}) {
for {
empty := false;
select {
// If data is available, read it and try again
case <-(*ch):
continue;
// If no data is available, stop reading
default:
empty = true;
}
if empty { break; }
}
}
// Helper function to read from a channel with a timeout
func chReadTimeout(ch *chan interface{}, timeoutMS int) (interface{}, error) {
select {
// If data is available, return it with no error
case data := <-(*ch):
return data, nil;
// If no data has been received and the timeout is reached, return an error
case <-time.After(time.Millisecond * time.Duration(timeoutMS)):
return "", errors.New("timeout")
}
}
// List of all connected displays
var displaysLck sync.Mutex
var displays = map[string]*Display{}
// Get the display back to its idle state
func (this *Display) reset() {
// Acquire the sending mutex
this.sockSendMtx.Lock()
// Send a reset command
this.sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{
mtype: "reset",
}))
// Release the sending mutex
this.sockSendMtx.Unlock()
}
// Switch the display to streaming mode
func (this *Display) stream() {
// Acquire the sending mutex
this.sockSendMtx.Lock()
// Send a show-pin command
this.sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{
mtype: "stream",
}))
// Release the sending mutex
this.sockSendMtx.Unlock()
}
// Send a WebRTC offer to the display and get an answer
func (this *Display) sendWebRTCOffer(offer interface{}, timeoutMS int) (interface{}, error) {
// Flush the answer channel
chFlush(&this.answerCh)
// Acquire the sending mutex
this.sockSendMtx.Lock()
// Send the offer
this.sock.WriteMessage(websocket.TextMessage, encodeMessage(Message{
mtype: "webrtc-offer",
arguments: map[string]interface{}{
"offer": offer,
},
}))
// Release the sending mutex
this.sockSendMtx.Unlock()
// TODO: Close the connection if the display failed to respond?
// Receive the answer
return chReadTimeout(&this.answerCh, CONF_TIMEOUT_MS)
}
// Send an ICE candiate to the display
func (this *Display) sendIceCandidate(candidate interface{}) {
// Acquire the sending mutex
this.sockSendMtx.Lock()
// Send the candidate
sendMessage(this.sock, Message{
mtype: "ice-candidate",
arguments: map[string]interface{}{
"candidate": candidate,
},
})
// Release the sending mutex
this.sockSendMtx.Unlock()
}
// Connection handler for displays
func displayHandler(sock *websocket.Conn, dispID string, otp string) {
// Create the display object
disp := Display{ sock: sock, otp: otp }
disp.answerCh = make(chan interface{});
// Acquire the sending mutex
disp.sockSendMtx.Lock()
// Acquire the display list
displaysLck.Lock()
// Check that a display with that ID doesn't already exist
if displays[dispID] != nil {
// Release the display list
displaysLck.Unlock()
// Send back an error
sendErrorMessage(sock, http.StatusConflict)
// Release the sending mutex
disp.sockSendMtx.Unlock()
// Give up
return;
}
// Add the display to the list
displays[dispID] = &disp
// Release the display list
displaysLck.Unlock()
// Send back the config for the display to use
sendMessage(sock, Message{
mtype: "config",
arguments: map[string]interface{}{
"config": map[string]interface{}{
"otpLifespan": CONF_OTP_LIFESPAN_MS,
"timeout": CONF_TIMEOUT_MS,
"iceServers": CONF_ICE_SERVERS,
},
},
})
// Release the sending mutex
disp.sockSendMtx.Unlock();
// Log the new display
log.Println("New display: ID='" + dispID + "', OTP='" + otp + "'");
// Message loop
for {
// Receive a message
msg, err := recvMessage(sock, 0);
// Give up on the connection if there was an error
if (err != nil) { break; }
// Handle the message depending on its type
switch msg.mtype {
case "otp":
// Check that the message contains an OTP
otp, valid := msg.arguments["otp"].(string);
if (!valid) { break; }
// Acquire the display's OTP
disp.otpMtx.Lock();
// Update the OTP
disp.otp = otp;
log.Println("New OTP for ID='" + dispID + "': OTP='" + otp + "'");
// Release the display's OTP
disp.otpMtx.Unlock();
case "webrtc-answer":
// Check that the message contains an answer
answer := msg.arguments["answer"];
if (answer == nil) { break; }
// Send the answer through the display's answer channel
disp.answerCh <- answer;
case "ice-candidate":
// Check that the message contains an ice candidate
candidate := msg.arguments["candidate"];
if (candidate == nil) { break; }
// Acquire the user's display pointer
disp.userMtx.Lock();
// Check that a user is connected to a display
if (disp.user == nil) {
// Release the user's display pointer
disp.userMtx.Unlock();
// Send back an error
sendErrorMessage(sock, http.StatusForbidden);
continue;
}
// Send the ice candidtate to the display
disp.user.iceCandidate(candidate);
// Release the user's display pointer
disp.userMtx.Unlock();
default:
// Give up
break
}
}
// Acquire the display list
displaysLck.Lock();
// Remove the display from the list
delete(displays, dispID);
// Release the display list
displaysLck.Unlock();
}