본문 바로가기
카테고리 없음

MediaSoup 을 사용해서 SFU방식으로 VideoChat 구현하기(2/2) - 구현 과정

by Lizzie Oh 2023. 1. 23.

다수의 사람이 화상으로 대화를 하기 위해서는  미디어 스트림은 audio trackvideo track 이 둘 다 존재해야 하고, 각 사람이 producer 이면서 동시에 consumer 여야 한다. 하지만 결국 가장 기본 흐름은 지난 포스팅에서 다룬 아래 형식에서 시작하게 된다! 

 

 

이를 기반으로, 어떻게 video chat 코드를 짤 수 있고 어떤 흐름으로 다중 화상 통화 기능을 구현할 수 있는지 정리해보려고 한다. 코드에서 서 중요한 부분만 발췌하였기 때문에 실제로 돌아가는 코드를 보고 참고하시려는 분들은 gitHub 레포지토리를 참고하시길 바란다! (frontend Repository, socket Repository)

 

아래는 간단한 흐름을 내 식대로 다시 정리한 표이다! 

 

 

 

 

그렇다면 순서대로 흐름과 간략한 코드를 확인해보자! 


📌 서버: Worker를 생성

worker가 있어야 Router를 만들 수 있고, Router가 있어야 Transport를 만들 수 있다. 가장 먼저 worker를 생성한다. 이때 옵션으로 입력하는 rtcMinPort와 rtcMaxPort 는 통신이 이루어지는 포트의 범위를 결정한다. 따라서 이 범위가 통신의 주체가 되는 사람의 수를 커버할 수 있어야 하고 (이전 글 참고) ec2에 올릴 때는 이 범위의 포트에 대해 보안그룹이 열려있어야 한다. (이거 문제 찾느라 진짜 일주일 넘게 헤맸다.. 다시한번 문제 해결하는데 큰 도움 준 쫑구 너무 고마워 ㅠ ) 

const createWorker = async () => {
worker = await mediasoup.createWorker({
    rtcMinPort: 2000,
    rtcMaxPort: 2100,
})
return worker
}

worker = createWorker()

📌 클라이언트: 유저의 미디어 장비에 접근해서 오디오, 비디오 stream을 받고 서버에 Router rtpCapabilities 요청 

    : 서버에 Router rtpCapabilities를 요청한다는 것은 클라이언트 입장에서는 어떤 채팅 '방'에 들어가겠다는 의미!

getLocalStream();
 
const getLocalStream = () => {
   navigator.mediaDevices.getUserMedia({
       audio: true,
       video: { width: { min: 640, max: 1920 },  height: { min: 400, max: 1080 }, },
       })
    .then(streamSuccess)
    .catch((error) => {
       console.log(error.message);
      }); };
const streamSuccess = (stream) => {  
    const localVideo = document.getElementById('localVideo')
    localVideo.srcObject = stream
 
    audioParams = { track: stream.getAudioTracks()[0], ...audioParams };
    videoParams = { track: stream.getVideoTracks()[0], ...videoParams };
 
     socket.emit("joinRoom", roomName , (data) => {
     rtpCapabilities = data.rtpCapabilities;
     createDevice();
     });};
 

 


📌 서버: Router 생성 및 클라이언트에 rtpCapabilities 전달

  worker 객체의 createRouter 생성자를 사용하여 router를 생성하고, 생성한 router 객체의 rtpCapabilities 메서드를 사용하여 rptCapabilities를 얻는다. 이를 callback 함수에 담아 실행하여 클라이언트에 전달한다. 

socket.on('joinRoom', async (roomName, callback) => {
    socket.join(roomName);

    const router1 = await createRoom(roomName, socket.id
    peers[socket.id] = {  socket roomName transports: [], producers: [],  consumers: []}
 
     const rtpCapabilities = router1.rtpCapabilities
     socket.name = userName
     callback({ rtpCapabilities }) 
    })
 
const createRoom = async (roomName, socketId) => {
    let router1
    let peers = []
    if (rooms[roomName]) {
        router1 = rooms[roomName].router
        peers = rooms[roomName].peers || []
    } else {
        router1 = await worker.createRouter({ mediaCodecs, })
    }
    rooms[roomName] = {
        router: router1,
        peers: [...peers, socketId],
    }
    return router1
    }
    

📌 클라이언트: 서버에서 rtpCapabilities 받은 후 Device 생성 

mediasoupClient의 Device() 생성자로 새 device 객체를 생성하고, 이 객체의 load 메서드를 실행한다. 이때 load 메서드의 인자로 서버에서 받은 rtpCapabilities 를 전달한다. 

     socket.emit("joinRoom", roomName , (data) => {
     rtpCapabilities = data.rtpCapabilities;
     createDevice();
     });};
     const createDevice = async () => {
     try {
         device = new mediasoupClient.Device();
         await device.load({ routerRtpCapabilities: rtpCapabilities });
         createSendTransport();
     } catch (error) {
         console.log(error); } };

📌 클라이언트: 서버에 서버 측 Transport 생성 요청 

소켓 서버로 server-side Transport 요청을 보낸다 (콜백 함수 내용이 길고 다른 이벤트와 이어져 있어서 아래 코드에서는 생략)

     createSendTransport();   
     const createSendTransport = () => {
        socket.emit( "createWebRtcTransport" { consumer: false },  ({ params }) => { ...생략 ... } );
    };

📌 서버: 서버측 Transport(RP)를 생성하고, 생성한 transport의 정보들을 클라이언트로 전송

router의 createWebRtcTransport 메서드를 실행하여 transport를 생성한다. 이때 ip주소와 udp, tcp 설정을 옵션으로 전달한다. 이때 로컬에서 테스트 할 때라면 아래 코드와 같이 ip는 로컬 ip를, announcedIp는 null로 설정하지만 ec2에 올려서 테스트하는 경우라면 ip에는 private ip주소를, announcedIp는 public ip 주소를 넣는다. 

socket.on('createWebRtcTransport', async ({ consumer }, callback) => {
       const roomName = peers[socket.id].roomName
       const router = rooms[roomName].router // 해당 소켓이 속한 방의 router를 찾는다
       createWebRtcTransport(router).then( transport => {
               callback( { params: { id: transport.id,
                                                   iceParameters: transport.iceParameters,
                                                   iceCandidates: transport.iceCandidates,
                                                   dtlsParameters: transport.dtlsParameters})
       addTransport(transport, roomName, consumer) // add transport to Peer's properties },
       error => { console.log(error) })
      })
const createWebRtcTransport = async (router) => {
       return new Promise(async (resolve, reject) => {
             try {
                    const webRtcTransport_options = {
                           listenIps: [ { ip: 127.0.0.1,  announcedIp: null } ],
                           enableUdp: true,
                           enableTcp: true,
                           preferUdp: true,
                           }
                   let transport = await router.createWebRtcTransport(webRtcTransport_options)
                   transport.on('dtlsstatechange', dtlsState => {
                          if (dtlsState === 'closed') { transport.close() }
                   })
                  transport.on('close', () =>console.log('transport closed') })
                  resolve(transport)
              } catch (error) {  reject(error }
             })
      }
 

📌 클라이언트: 서버에서 받은 parameter 들을 인자로 클라이언트 측 Transport (LP) SEND Transport 생성

device.createSendTransport(options) 를 호출하여 media를 전송하기 위한 WebRTC transport (SEND transport)를 생성한다. 

    const createSendTransport = () => {
        socket.emit( "createWebRtcTransport" { consumer: false },  ({ params }) => {
        if (params.error) { 
            console.log(params.error); 
            return;
        }
        producerTransport = device.createSendTransport(params);
 
        producerTransport.on( "connect",   async ({ dtlsParameters }, callback, errback) => { ...생략...} );
        producerTransport.on( "produce" async (parameters, callback, errback) => {...생략...} );
 
       connectSendTransport(); // audio producer와 video producer가 생성된다 
} );
};
 

📌 클라이언트: Transport(LP)의 produce()를 호출해서 transport가 Router로 미디어(video/audio track)를 보내게 한다. 이때 produce 메서드 호출은 transport의 connect, produce 이벤트를 발생시킨다! 

produce는 audio track, video track 별로 호출하게 되므로 영상과 음성을 모두 보내려면 음성,영상 각각 produce()를 실행해야 한다. (device와는 음성과 영상을 구분하지 않지만 producer와 consumer의 경우 음성과 영상 각각에 대해 존재한다! transport 의 경우 produce를 위한 transport는 음성과 영상을 함께 보내더라도 1개만 있으면 되지만, consumer transport는 음성과 영상 각각에 대해 존재한다.) 

 

const connectSendTransport = async () => {

     audioProducer = await producerTransport.produce(audioParams);
     videoProducer = await producerTransport.produce(videoParams);
     audioProducer.on("trackended", () => { console.log("audio track ended");});
     audioProducer.on("transportclose", () => { console.log("audio transport ended"); });
     videoProducer.on("trackended", () => { console.log("video track ended"); });
     videoProducer.on("transportclose", () => { console.log("video transport ended"); });
};

📌 클라이언트: Transport(LP) 의 produce로 발생한 connect, produce이 서버의 connect, produce 메서드를 호출 
     (audio, video 각각에 대해 connect, produce가 발생한다!)

connect 이벤트는 transport가 ICE+DTLS connection 을 맺기 위해 서버측 transport와 정보를 주고받아야 할 때 emit 된다. 이때  ㅣlocal dtls parameter 정보인 dtlsParameters를 받게 되는데, 이를 서버로 보내서 server-side transport(RP)가 connect 메서드를 호출할 때 인자로 사용할 수 있게 한다. 

 

produce 이벤트는 transport가 새 producer에 대한 정보를 서버 측 trasport에 전송해야 할 때 emit 된다. 이때 서버 측 producer를 생성하는 데 사용하는 parameter 정보인 parameters값을 받게 되는데, 이를 서버로 보내서 server-side transport(RP)의 produce 메서드를 호출할 때 인자로 사용할 수 있게 한다.

        producerTransport.on( "connect",   async ({ dtlsParameters }, callback, errback) => {
                try {  // Signal local DTLS parameters to the server side transport
                        await socket.emit("transport-connect", { dtlsParameters });
                        callback();  //! transport에 parameters들이 전송되었다는 것을 알려주는 역할!  }
                catch (error) { errback(error);  }
        } );
 
   producerTransport.on( "produce" async (parameters, callback, errback) => {
                try {   // server에 Producer를 생성하라고 요청 (콜백으로 producer id가 올 것)
                        await socket.emit( "transport-produce" {
                                kind: parameters.kind,
                                rtpParameters: parameters.rtpParameters,
                                appData: parameters.appData,
                                mysocket: socket.id  },
                                ({ id, producersExist }) => { 
                                        callback({ id });  // transport에 parameters가 전달되었음을 알리고, server side producer's id를 전달 
                                        if (producersExist) getProducers();  } );
                  } catch (error) {  errback(error);  }
        } );

📌 서버: transport(RP) 의 connect({ dtlsParameters }) 실행 (audio, video 각각 실행) 

클라이언트에서 전달받은 local dtlsParameter를 인자로 넣어 connect를 호출함으로서 서버 측 transport(RP)에 클라이언트측 transport(LP)의 dtls parameter를 제공한다.

socket.on('transport-connect', ({ dtlsParameters }) => {
     getTransport(socket.id).connect({ dtlsParameters }) })

📌 서버: transport(RP)의 produce({ kind, rtpParameters }) 실행 → 서버측 producer 생성 (audio, video 각각 실행)
이후 콜백함수에 클라이언트에 생성한 producer id와 현재 producer가 존재하는지 정보를 담아 전송하게 되며, 동시에
기존에 존재하던 다른 consumer들에게 새로운 producer가 있다는 것을 알림 (informConsumer 함수) 

socket.on('transport-produce', async ({ kind, rtpParameters, appData, mysocket }, callback) => {         
     const
 producer = await getTransport(socket.id).produce({ kind, rtpParameters })
     const { roomName } = peers[socket.id]
     addProducer(producer, roomName)
     informConsumers(roomName, socket.id, producer.id)
     producer.on('transportclose', () => { producer.close() })
     callback({  id: producer.id,  producersExist: producers.length>1 ? true : false  }) 
})

📌 클라이언트: produce 이벤트의 콜백함수를 실행함으로서, transport(LP)에 서버측 producer id를 전달한다. 이 단계까지 오면 영상을 송출할 수 있게 되고, 기존에 이미 영상을 송출하고 있던 producer가 있다면 이를 consume 하기 위해 서버에 정보를 요청

   producerTransport.on( "produce" async (parameters, callback, errback) => {
                try { await socket.emit( "transport-produce" { ... 생략 ...} , ({ id, producersExist }) => { 
                              callback({ id });  // transport에 parameters가 전달되었음을 알리고, server side producer's id를 전달 
                              if (producersExist) getProducers();  } );
                } catch (error) {  errback(error);  } } );
   const getProducers = () => {
         socket.emit("getProducers", (producerList) => {
               producerList.forEach((id) => { signalNewConsumerTransport(id[0], id[1], id[2], id[3]); });
        });
    };

📌 서버: producer 정보를 요청한 소켓과 동일한 방(라우터)에 있는 다른 소켓의 producer의 정보들을 클라이언트에 전달하며 콜백 함수를 실행. (이때 producer 정보에는 음성, 영상 producer가 모두 담겨있다!)

      socket.on('getProducers', callback => {
              const { roomName } = peers[socket.id]
              const socketName = peers[socket.id].peerDetails.name
              let producerList = []
              producers.forEach(producerData => {
                       if (producerData.socketId !== socket.id && producerData.roomName === roomName) {
                             producerList = [...producerList, [
                                              producerData.producer.id,
                                              peers[producerData.socketId].peerDetails.name,
                                              producerData.socketId,
                                              peers[producerData.socketId].peerDetails.isAdmin]]
               })
              callback(producerList)
       })
 

📌 클라이언트: 서버에 서버측 consumer transport(RC) 생성 요청. 

const signalNewConsumerTransport = async ( remoteProducerId, socketName, newSocketId, isNewSocketHost ) => {
 
     if (consumingTransports.includes(remoteProducerId)) return;
     consumingTransports.push(remoteProducerId);
 
     await socket.emit"createWebRtcTransport" { consumer: true },  ({ params }) => { ... 생략...  };
 

📌 서버: 서버측 consumer transport(RC) 를 생성하고, 생성한 transport의 정보들을 클라이언트로 전송

socket.on('createWebRtcTransport', async ({ consumer }, callback) => {
       const roomName = peers[socket.id].roomName
       const router = rooms[roomName].router // 해당 소켓이 속한 방의 router를 찾는다             
       createWebRtcTransport(router).then( transport => {
               callback( { params: { id: transport.id,
                                                   iceParameters: transport.iceParameters,
                                                   iceCandidates: transport.iceCandidates,
                                                   dtlsParameters: transport.dtlsParameters })
       addTransport(transport, roomName, consumer) // add transport to Peer's properties },
       error => { console.log(error) })
      })
 
const createWebRtcTransport = async (router) => {
       return new Promise(async (resolve, reject) => {
             try {
                    const webRtcTransport_options = {
                           listenIps: [ { ip: 127.0.0.1,  announcedIp: null } ],
                           enableUdp: true,
                           enableTcp: true,
                           preferUdp: true,
                           }
                   let transport = await router.createWebRtcTransport(webRtcTransport_options)
                   transport.on('dtlsstatechange', dtlsState => {
                          if (dtlsState === 'closed') { transport.close() }
                   })
                  transport.on('close', () => console.log('transport closed') })
                  resolve(transport)
              } catch (error) {  reject(error }
             })
      }
 

📌 클라이언트: 서버에서 받은 RC 관련 정보를 인자로 넣어 클라이언트측 consumer transport(LC) 생성

device.createRecvTransport() 는 미디어를 받을 수 있는 새 WebRTC transport를 생성하는 메서드 

 

const signalNewConsumerTransport = async ( remoteProducerId, socketName, newSocketId, isNewSocketHost ) => {
 
     if (consumingTransports.includes(remoteProducerId)) return;
     consumingTransports.push(remoteProducerId);
 
     await socket.emit "createWebRtcTransport" { consumer: true },  ({ params }) => {
          if (params.error) { 
                    console.log(params.error);
                    return;
           }
          let consumerTransport;
          try {
               consumerTransport = device.createRecvTransport(params);
          } catch (error) {
                    console.log(error);
                    return;
          }
 
          // consumerTransport의 consume 메서드가 호출될 때 connect 이벤트가 emit 됨 
          consumerTransport.on( "connect" async ({ dtlsParameters }, callback, errback) => {
              try {
                    await socket.emit("transport-recv-connect", { dtlsParameters, serverConsumerTransportId: params.id });
                   callback();  // transport(LC)에 parameters 전송이 완료되었다는 것을 알리는 역할
               } catch (error) {  errback(error)  }
          }
       );
 
          connectRecvTransport(
                 consumerTransport remoteProducerId, params.id,  socketName, newSocketId isNewSocketHost );
       } );
};

📌 클라이언트:  디바이스의 rtpCapabilities 를 포함하여 consume에 필요한 정보들을 서버로 전송 

 
const connectRecvTransport = async consumerTransport, remoteProducerId, serverConsumerTransportId,
socketName, newSocketId, isNewSocketHost ) => {
 
     remoteProducerIdPair.remoteProducerId = newSocketId
 
     await socket.emit( "consume", { rtpCapabilities: device.rtpCapabilities, remoteProducerId,
          serverConsumerTransportId, }, async ({ params }) => {
               if (params.error) {
                    console.log("Cannot Consume");
                    return;
}
 
... 생략 ...
 

📌 서버 :  클라이언트에서 받은 정보로 router의 consume 가능 여부를 체크하고 (canConsume 메서드) 가능하다면 서버측 consumer transport(RC) 의 consume 함수를 실행. 단 공식 문서에서 권고하는 방식과 같이 paused:true 를 인자로 넣어 우선 pause 한 후 추후 resume 으로 시작할 수 있도록 한다.

When creating a consumer it's recommended to set paused to true, then transmit the consumer parameters to the consuming endpoint and, once the consuming endpoint has created its local side consumer, unpause the server side consumer using the resume() method.

Reasons for create the server side consumer in paused mode:
- If the remote endpoint is a WebRTC browser or application and it receives a RTP packet of the new consumer before the remote RTCPeerConnection is ready to process it (this is, before the remote consumer is created in the remote endpoint) it may happen that the RTCPeerConnection will wrongly associate the SSRC of the received packet to an already existing SDP m= section, so the imminent creation of the new consumer and its associated m= section will fail.
- Also, when creating a video consumer, this is an optimization to make it possible for the consuming endpoint to render the video as far as possible. If the server side consumer was created with paused: false, mediasoup will immediately request a key frame to the producer and that key frame may reach the consuming endpoint even before it's ready to consume it, generating “black” video until the device requests a keyframe by itself.

 

socket.on('consume', async ({ rtpCapabilities, remoteProducerId, serverConsumerTransportId }, callback) => {
try {
const { roomName } = peers[socket.id]
const userName = peers[socket.id].peerDetails.name
const router = rooms[roomName].router
 
let consumerTransport = transports.find(transportData => (
transportData.consumer && transportData.transport.id == serverConsumerTransportId
)).transport
 
if (router.canConsume({  producerId: remoteProducerId, rtpCapabilities })) {
      const consumer = await consumerTransport.consume({
            
producerId: remoteProducerId rtpCapabilities paused: true })
consumer.on('transportclose', () => { console.log('transport close from consumer') })
 
consumer.on('producerclose', () => {
      console.log('producer of consumer closed')
      socket.emit('producer-closed', { remoteProducerId })
      consumerTransport.close([])
      transports = transports.filter(transportData => transportData.transport.id !== consumerTransport.id)
      consumer.close()
      consumers = consumers.filter(consumerData => consumerData.consumer.id !== consumer.id)
})
addConsumer(consumer, roomName)
 
      const params = { 
id: consumer.id, producerId: remoteProducerId, kind: consumer.kind, rtpParameters:                   consumer.rtpParameters, serverConsumerId: consumer.id, userName:userName}
 
      callback({ params })
}
} catch (error) {
      console.log(error.message)
      callback({ params: {error: error} })
}
})

📌 클라이언트: transport(LC) 의 consume 메서드를 실행하여 consumer 생성. 이때 consume 메서드는 LC에 connect이벤트를 발생 시켜 서버에 transport-recv-connect 이벤트를 emit 하게 되고, 이때 dtlsParameter를 담아 서버로 보낸다. consumer를 생성한 이후에는 html audio, video 태그를 만들고 srcObject로 미디어 스트림 트랙을 담아 음성과 영상이 재생될 수 있도록 한다. 

const connectRecvTransport = async  consumerTransport, remoteProducerId, serverConsumerTransportId,
socketName, newSocketId, isNewSocketHost ) => {
 
     remoteProducerIdPair.remoteProducerId = newSocketId
 
     await socket.emit( "consume", { rtpCapabilities: device.rtpCapabilities, remoteProducerId,
          serverConsumerTransportId, }, async ({ params }) => {
               if (params.error) {
                    console.log("Cannot Consume");
                    return; }
               
 
     const consumer = await consumerTransport.consume({           
          id: params.id,
          
producerId: params.producerId,
          kind: params.kind,
          rtpParameters: params.rtpParameters });
          consumerTransports =  [  ...consumerTransports,
               { consumerTransport, serverConsumerTransportId: params.id,  producerId: remoteProducerId, consumer } ]
 
     const { track } = consumer
 
     if (params.kind === "audio") {
           const wrapper = document.createElement("div");
           wrapper.setAttribute("id", `td-${remoteProducerId}`);
           const audio = document.createElement("audio")
           audio.setAttribute("autoplay", "true")
           wrapper.appendChild(audio)
           audio.srcObject = new MediaStream([track])
           videoContainer.appendChild(wrapper)
     } else {
           const existingWrapper = document.getElementsByClassName(newSocketId)[0]
           const video = document.createElement("video")
           video.setAttribute("id", remoteProducerId)
          video.setAttribute("autoplay", "true")
           existingWrapper.appendChild(video)
          video.srcObject = new MediaStream([track])
           socket.emit('consumer-resume', { serverConsumerId: params.serverConsumerId })
})}
 

📌 서버: RC의 connect()메서드를 실행함으로서 produce가 보낸 스트림을 소비할 수 있게 된다! 

서버측 transport에 엔드포인트(=클라이언트 Device) 클라이언트측 transport parameter 들을 제공하는 역할을 한다

 

socket.on('transport-recv-connect', async ({ dtlsParameters, serverConsumerTransportId }) => {
const consumerTransport = transports.find(transportData => (
transportData.consumer && transportData.transport.id == serverConsumerTransportId
)).transport
 
try {
await consumerTransport.connect({ dtlsParameters })
} catch(e) {console.log("transport-recv-connect", e)}
})

 


 

코드 쓰기만큼이나 힘들었던 블로그 정리... ! full 코드는 다시한번.. github에서 봐 주시길 부탁드립니다!!! 

반응형

댓글