2025年7月6日日曜日

janusのstream動画をapiとRTCPeerConnectionで受信


要点

これらの順に処理するとjanusとのDTLS handshakeが成功します。
どれがかが欠けるとhandshakeが成功しないのですが、成功しない理由を教えてくれないので、この一連の処理が必要と分かるまでに時間を要しました。
  1. janusのapiで受け取ったjsepをsetRemoteDescriptionで登録
  2. createAnswerを実施
  3. createAnswerの結果をsetLocalDescriptionで登録
  4. janusにcreateAnswerの結果をmessageとして送信
  await peerConnection.setRemoteDescription(data.jsep)
const answer = await peerConnection.createAnswer()
console.log('createAnswer', answer)
await peerConnection.setLocalDescription(answer);
callbacks.onCreateAnswer(answer)
      onCreateAnswer: (jsep) => {
const dataToSend = {
janus: "message",
transaction: data.transaction,
session_id: data.session_id,
jsep,
handle_id: data.handle_id,
body: {
request: "start",
},
}
socket.send(JSON.stringify(dataToSend))
}

背景

janusとはWebRTCの動画や音声を中継してくれるサーバープログラムです。
nodejs + webpackで開発する場合はnodeのjanus-gatewayライブラリが使えるのですが、これはjanusのデモページ用のjanus.jsファイルをライブラリ化したものなので、設定が手間(adapterとしてwebrtc-adapterを読み込み可能にする必要がある)な上に環境によっては使えなかったりします。
(vite環境: adapterとしてwebrtc-adapterを呼ぶ設定方法が不明。react-native環境: window要素が無い)

上記のjanus.jsに頼らなくてもjanusはapi呼び出しに対応しており、それを使えば必要な情報を取得できます。
Streaming plugin documentation

しかしながら、stream配信に必要な情報取得後のWebRTC関係の処理方法の説明が見当たらず、通信確立方法の把握に時間を要しました。
apiとjsのRTCPeerConnectionを組み合わせてjanusのstreamingを受信する方法を備忘録として記事に残します。

使ったもの

janus 1.3.1
以前の記事で動かしたdocker環境を利用しました。
使い方は以前の記事を見てください。
janusのstreaming配信をdockerを利用して動かす 

react + vite 環境
janus-gatewayライブラリを使えない(webrtc-adapterをadapterとして呼び出し可能にする設定方法が分からない)vite環境を使いました。

ffmpeg 4.4.2
webカメラのstream配信に利用します。
この記事ではubuntu22.04で実行しました。

janusの設定: streamとwebsocket有効化

streaming plugin(5004ポートでVP8のstream受信)とwebsocket(8188ポートで通信)を有効化しました。
どちらもjcfg.sampleのsampleを外してそのまま使えば良いです。
janus.plugin.streaming.jcfg.sample.in
janus.transport.websockets.jcfg.sample

下記のようなコマンドでwebカメラの画像をVP8方式のrtpでjanusに送ります。
ffmpeg -f v4l2 \
-pix_fmt uyvy422 \
-video_size 640x480 \
-framerate 15 \
-i /dev/video0 \
-an -c:v libvpx -deadline realtime -f rtp \
rtp://localhost:5004

tsxのコード全体

200行ほどありますが、断片的な紹介だと混乱することがあると思うので、全てを共有してから要所を解説します。
下記のreactのコードをvite環境で表示すると、janusのstreamを表示できます。
import { useEffect, useRef } from 'react'
import './App.css'

const urlWebsocket = 'ws://localhost:8188';

function generateRandomString(length) {
const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let result = '';
const charactersLength = characters.length;
for (let i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}
return result;
}

const requestWithWebSocketApi = (socket, onReceiveJsep) => {
const transactionId = generateRandomString(15)
let sessionId = 0
let handleId = 0

// https://janus.conf.meetecho.com/docs/streaming
socket.addEventListener("open", (event) => {
const data = JSON.stringify({ janus: "create", transaction: transactionId })
console.log(data)
socket.send(data);
});

// Listen for messages
socket.addEventListener("message", (event) => {
console.log("Message from server ")
console.log(event.data);
const dataReceived = JSON.parse(event.data)
// console.log('is preparing', dataReceived !== undefined &&
// dataReceived["plugindata"] !== undefined &&
// dataReceived["plugindata"]["data"] !== undefined &&
// dataReceived["plugindata"]["data"]["result"] !== undefined &&
// dataReceived["plugindata"]["data"]["result"]["status"] !== undefined)
if (dataReceived['janus'] == "success") {
if (dataReceived["session_id"] === undefined) {
console.log('attach streaming plugin')
sessionId = dataReceived["data"]["id"]
const dataToSend = JSON.stringify({
janus: "attach",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
})
console.log(dataToSend)
socket.send(dataToSend)
} else if (dataReceived["session_id"] !== undefined && dataReceived["plugindata"] === undefined) {
handleId = dataReceived["data"]["id"]
console.log('got handle_id', handleId)
socket.send(JSON.stringify({
janus: "message",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
body: {
request: "list",
},
}))
} else if (dataReceived["plugindata"]["data"]["streaming"] == "list") {
socket.send(JSON.stringify({
janus: "message",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
body: {
request: "info",
id: 1,
},
}))
} else if (dataReceived["plugindata"]["data"]["streaming"] == "info") {
console.log('request watch')
socket.send(JSON.stringify({
janus: "message",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
body: {
request: "watch",
id: 1,
},
}))
}
}
if (dataReceived["jsep"] !== undefined) {
if (
dataReceived["plugindata"]["data"]["streaming"] == "event" &&
dataReceived["plugindata"]["data"]["result"]["status"] == "preparing"
) {
onReceiveJsep({
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
jsep: dataReceived['jsep'],
})
}
}
});
}

const sendTrickleCandidate = (data, socket, candidate) => {
console.log('sendTrickleCandidate')
let request = {
"janus": "trickle",
"candidate": candidate,
transaction: data.transaction,
session_id: data.session_id,
handle_id: data.handle_id,
};
console.log(request);
socket.send(JSON.stringify(request));
}

const preparePeerConnection = async (data, socket, peerConnection, callbacks) => {
peerConnection.onconnectionstatechange = function () {
console.log('onconnectionstatechange')
};
peerConnection.oniceconnectionstatechange = function () {
console.log("oniceconnectionstatechange")
};
peerConnection.onicecandidate = function (event) {
console.log('onicecandidate', event)
if (!event.candidate || (event.candidate.candidate && event.candidate.candidate.indexOf('endOfCandidates') > 0)) {
console.log('End of candidates.');
sendTrickleCandidate(data, socket, { completed: true });
} else {
// JSON.stringify doesn't work on some WebRTC objects anymore
// See https://code.google.com/p/chromium/issues/detail?id=467366
let candidate = {
candidate: event.candidate.candidate,
sdpMid: event.candidate.sdpMid,
sdpMLineIndex: event.candidate.sdpMLineIndex
};
sendTrickleCandidate(data, socket, candidate);
}
}
peerConnection.ontrack = (event) => {
console.log('ontrack', event);
callbacks.onTrack(event)
}

await peerConnection.setRemoteDescription(data.jsep)
const answer = await peerConnection.createAnswer()
console.log('createAnswer', answer)
await peerConnection.setLocalDescription(answer);
callbacks.onCreateAnswer(answer)
}

function App() {
const refVideo = useRef(null);
// https://janus.conf.meetecho.com/docs/rest.html
const socket = new WebSocket(urlWebsocket, ["janus-protocol"])
const peerConnection = new RTCPeerConnection({
iceServers: [{ urls: "stun:stun.l.google.com:19302" }],
sdpSemantics: "unified-plan",
})

useEffect(() => {
requestWithWebSocketApi(socket, (data) => preparePeerConnection(data, socket, peerConnection, {
onTrack: (event) => {
console.log('onTrack', event)
if (event.track.kind == 'video') {
let stream = new MediaStream([event.track]);
console.log('stream', stream)
const videoTag = refVideo.current
videoTag.volume = 0
videoTag.srcObject = stream
videoTag.play().then(() => {
console.log('start playing')
}).catch(() => {
console.log('failed to play')
})
}
},
onCreateAnswer: (jsep) => {
const dataToSend = {
janus: "message",
transaction: data.transaction,
session_id: data.session_id,
jsep,
handle_id: data.handle_id,
body: {
request: "start",
},
}
socket.send(JSON.stringify(dataToSend))
}
}))
}, [])
return (
<>
<h1>Stream viewer</h1>
<div>
<video id="streamVideo" ref={refVideo} width="100%" height="100%" playsInline />
</div>
</>
)
}

export default App

上記のコードのページをブラウザで表示すると、janusのログにDTLSの通信経路確立とWebRTC mediaが有効になった旨の下記のようなログが表示されます。
[2672856040436376] The DTLS handshake has been completed
[janus.plugin.streaming-0x7f6e500099c0] WebRTC media is now available

ffmpegでカメラの画像をjanusに送れば、ブラウザで動画を確認できます。


janusのapiでstreamingのwebrtc接続に必要な情報を取得

janusの全般的なapiの使い方streaming apiの使い方janus.jsの実装を参考にしつつ、streamingに必要な情報をapiで取得します。

websocketを開きます。
urlに加えてsubprotocolを「janus-protocol」に指定します。
const urlWebsocket = 'ws://localhost:8188';
const socket = new WebSocket(urlWebsocket, ["janus-protocol"])

websocketでstreaming受信に必要な情報(jsep)を取得します。
websocketでの情報受信はevent形式なので、受け取った情報から状況を判断して次の要求を送っています。
const requestWithWebSocketApi = (socket, onReceiveJsep) => {
const transactionId = generateRandomString(15)
let sessionId = 0
let handleId = 0

// https://janus.conf.meetecho.com/docs/streaming
socket.addEventListener("open", (event) => {
const data = JSON.stringify({ janus: "create", transaction: transactionId })
console.log(data)
socket.send(data);
});

// Listen for messages
socket.addEventListener("message", (event) => {
console.log("Message from server ")
console.log(event.data);
const dataReceived = JSON.parse(event.data)
// console.log('is preparing', dataReceived !== undefined &&
// dataReceived["plugindata"] !== undefined &&
// dataReceived["plugindata"]["data"] !== undefined &&
// dataReceived["plugindata"]["data"]["result"] !== undefined &&
// dataReceived["plugindata"]["data"]["result"]["status"] !== undefined)
if (dataReceived['janus'] == "success") {
if (dataReceived["session_id"] === undefined) {
console.log('attach streaming plugin')
sessionId = dataReceived["data"]["id"]
const dataToSend = JSON.stringify({
janus: "attach",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
})
console.log(dataToSend)
socket.send(dataToSend)
} else if (dataReceived["session_id"] !== undefined && dataReceived["plugindata"] === undefined) {
handleId = dataReceived["data"]["id"]
console.log('got handle_id', handleId)
socket.send(JSON.stringify({
janus: "message",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
body: {
request: "list",
},
}))
} else if (dataReceived["plugindata"]["data"]["streaming"] == "list") {
socket.send(JSON.stringify({
janus: "message",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
body: {
request: "info",
id: 1,
},
}))
} else if (dataReceived["plugindata"]["data"]["streaming"] == "info") {
console.log('request watch')
socket.send(JSON.stringify({
janus: "message",
plugin: "janus.plugin.streaming",
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
body: {
request: "watch",
id: 1,
},
}))
}
}
if (dataReceived["jsep"] !== undefined) {
if (
dataReceived["plugindata"]["data"]["streaming"] == "event" &&
dataReceived["plugindata"]["data"]["result"]["status"] == "preparing"
) {
onReceiveJsep({
transaction: transactionId,
session_id: sessionId,
handle_id: handleId,
jsep: dataReceived['jsep'],
})
}
}
});
}

受信したstreamのjsepでRTCPeerConnectionを実行

ここがこの記事で最も伝えたいことです。
janus.jsの実装jsのRTCPeerConnection関係の解説記事を参考にしつつ、apiで受け取ったjespを利用してRTCPeerConnectionを実行します。

関連箇所を共有後に要所を解説します。
const sendTrickleCandidate = (data, socket, candidate) => {
console.log('sendTrickleCandidate')
let request = {
"janus": "trickle",
"candidate": candidate,
transaction: data.transaction,
session_id: data.session_id,
handle_id: data.handle_id,
};
console.log(request);
socket.send(JSON.stringify(request));
}

const preparePeerConnection = async (data, socket, peerConnection, callbacks) => {
peerConnection.onconnectionstatechange = function () {
console.log('onconnectionstatechange')
};
peerConnection.oniceconnectionstatechange = function () {
console.log("oniceconnectionstatechange")
};
peerConnection.onicecandidate = function (event) {
console.log('onicecandidate', event)
if (!event.candidate || (event.candidate.candidate && event.candidate.candidate.indexOf('endOfCandidates') > 0)) {
console.log('End of candidates.');
sendTrickleCandidate(data, socket, { completed: true });
} else {
// JSON.stringify doesn't work on some WebRTC objects anymore
// See https://code.google.com/p/chromium/issues/detail?id=467366
let candidate = {
candidate: event.candidate.candidate,
sdpMid: event.candidate.sdpMid,
sdpMLineIndex: event.candidate.sdpMLineIndex
};
sendTrickleCandidate(data, socket, candidate);
}
}
peerConnection.ontrack = (event) => {
console.log('ontrack', event);
callbacks.onTrack(event)
}

await peerConnection.setRemoteDescription(data.jsep)
const answer = await peerConnection.createAnswer()
console.log('createAnswer', answer)
await peerConnection.setLocalDescription(answer);
callbacks.onCreateAnswer(answer)
}
  const peerConnection = new RTCPeerConnection({
iceServers: [{ urls: "stun:stun.l.google.com:19302" }],
sdpSemantics: "unified-plan",
})

useEffect(() => {
requestWithWebSocketApi(socket, (data) => preparePeerConnection(data, socket, peerConnection, {
onTrack: (event) => {
console.log('onTrack', event)
if (event.track.kind == 'video') {
let stream = new MediaStream([event.track]);
console.log('stream', stream)
const videoTag = refVideo.current
videoTag.volume = 0
videoTag.srcObject = stream
videoTag.play().then(() => {
console.log('start playing')
}).catch(() => {
console.log('failed to play')
})
}
},
onCreateAnswer: (jsep) => {
const dataToSend = {
janus: "message",
transaction: data.transaction,
session_id: data.session_id,
jsep,
handle_id: data.handle_id,
body: {
request: "start",
},
}
socket.send(JSON.stringify(dataToSend))
}
}))
}, [])

janusが利用するstunサーバーを指定してRTCPeerConnectionのインスタンスを作ります。
  const peerConnection = new RTCPeerConnection({
iceServers: [{ urls: "stun:stun.l.google.com:19302" }],
sdpSemantics: "unified-plan",
})

peer connectionの情報を受け取るためのcallbackを設定します。
  peerConnection.onconnectionstatechange = function () {
// something to do
};
peerConnection.oniceconnectionstatechange = function () {
// something to do
};
peerConnection.onicecandidate = function (event) {
// something to do
}
peerConnection.ontrack = (event) => {
// something to do
}

これらの順に処理するとDTLS handshakeが成功します。
どれがかが欠けるとhandshakeが成功しないのですが、成功しない理由を教えてくれないので、この一連の処理が必要と分かるまでに時間を要しました。
  1. janusのapiで受け取ったjsepをsetRemoteDescriptionで登録
  2. createAnswerを実施
  3. createAnswerの結果をsetLocalDescriptionで登録
  4. janusにcreateAnswerの結果をmessageとして送信
  await peerConnection.setRemoteDescription(data.jsep)
const answer = await peerConnection.createAnswer()
console.log('createAnswer', answer)
await peerConnection.setLocalDescription(answer);
callbacks.onCreateAnswer(answer)
      onCreateAnswer: (jsep) => {
const dataToSend = {
janus: "message",
transaction: data.transaction,
session_id: data.session_id,
jsep,
handle_id: data.handle_id,
body: {
request: "start",
},
}
socket.send(JSON.stringify(dataToSend))
}

ontrackで受け取ったvideoを再生

RTCPeerConnectionのontrackとして受け取ったvideo要素をvideoタグに割り当てて再生するとstream動画を表示できます。
  peerConnection.ontrack = (event) => {
console.log('ontrack', event);
callbacks.onTrack(event)
}
      onTrack: (event) => {
console.log('onTrack', event)
if (event.track.kind == 'video') {
let stream = new MediaStream([event.track]);
console.log('stream', stream)
const videoTag = refVideo.current
videoTag.volume = 0
videoTag.srcObject = stream
videoTag.play().then(() => {
console.log('start playing')
}).catch(() => {
console.log('failed to play')
})
}
},

課題 janusのstreamingのサンプルページのように動画停止後の復帰処理が動かない

DTLSのhandshakeを成功させ動画の表示はできたものの、安定化のための記述が足りていないようで、一度動画が停止するとstreamingのサンプルページのように復帰してくれません。
安定化に必要な箇所を把握したら記事に情報を追記予定ですが、今の所は一度止まると復帰できない処理になっています。

janus.js上の安定化に必要な該当箇所をご存知のかたがいらっしゃれば、コメントなどで教えていただけると嬉しいです。

おわり

janusのapiを利用して取得したstreamingの情報をRTCPeerConnectionで処理し、その結果をjanusに送り返すことでDTLSのhandshakeが可能と分かりました。
janus.jsやjanus-gatewayライブラリに頼らず通信確立する方法が分かって良かったです。

参考

RESTful, WebSockets, RabbitMQ, MQTT, Nanomsg and UnixSockets API
Streaming plugin documentation
janus.js

0 件のコメント :