/*! @license
* Shaka Player
* Copyright 2016 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
goog.provide('shaka.transmuxer.TransmuxerProxy');
goog.require('shaka.device.DeviceFactory');
goog.require('shaka.log');
goog.require('shaka.util.BufferUtils');
goog.require('shaka.util.Error');
goog.require('shaka.util.Timer');
goog.require('shaka.util.Uint8ArrayUtils');
/**
* @summary A proxy transmuxer that delegates transmux() calls to a Web Worker.
*
* Synchronous methods (isSupported, convertCodecs, getOriginalMimeType) are
* handled on the main thread by the inner transmuxer. Only the heavy
* transmux() work is offloaded to the worker.
*
* The worker URL must be supplied by the integrating application via the
* `mediaSource.transmuxWorkerUrl` config option. The library does not attempt
* to discover it. If the URL is empty or the worker cannot be created, the
* proxy falls back to main-thread transmuxing.
*
* @implements {shaka.extern.Transmuxer}
* @export
*/
shaka.transmuxer.TransmuxerProxy = class {
/**
* @param {!shaka.extern.Transmuxer} innerTransmuxer
* The real transmuxer to use for sync methods and as fallback.
* @param {string=} workerUrl
* URL of the standalone transmuxer worker script. When empty, the proxy
* uses main-thread transmuxing.
*/
constructor(innerTransmuxer, workerUrl = '') {
/** @private {!shaka.extern.Transmuxer} */
this.innerTransmuxer_ = innerTransmuxer;
/** @private {string} */
this.workerUrl_ = workerUrl;
/** @private {boolean} */
this.workerFailed_ = false;
/** @private {number} */
this.nextReqId_ = 0;
/**
* Maps request IDs to pending promise resolvers and their timeout timers.
* @private {!Map<number, {resolve: function(*), reject: function(*),
* timer: shaka.util.Timer}>}
*/
this.pendingRequests_ = new Map();
/** @private {number} */
this.id_ = shaka.transmuxer.TransmuxerProxy.nextId_++;
/** @private {boolean} */
this.workerReady_ = false;
/** @private {boolean} */
this.attachedToWorker_ = false;
}
/**
* @override
* @export
*/
destroy() {
// Reject all pending requests.
for (const pending of this.pendingRequests_.values()) {
pending.timer.stop();
pending.reject(new shaka.util.Error(
shaka.util.Error.Severity.CRITICAL,
shaka.util.Error.Category.MEDIA,
shaka.util.Error.Code.TRANSMUXING_FAILED,
'Worker transmuxer destroyed'));
}
this.pendingRequests_.clear();
if (this.attachedToWorker_) {
const TransmuxerProxy = shaka.transmuxer.TransmuxerProxy;
if (TransmuxerProxy.sharedWorker_) {
TransmuxerProxy.sharedWorker_.postMessage(
{'cmd': 'destroy', 'id': this.id_});
}
TransmuxerProxy.activeInstances_.delete(this.id_);
this.attachedToWorker_ = false;
// Terminate the shared worker when no instances remain.
if (TransmuxerProxy.activeInstances_.size === 0 &&
TransmuxerProxy.sharedWorker_) {
TransmuxerProxy.sharedWorker_.terminate();
TransmuxerProxy.sharedWorker_ = null;
}
}
this.innerTransmuxer_.destroy();
}
/**
* @param {string} mimeType
* @param {string=} contentType
* @return {boolean}
* @override
* @export
*/
isSupported(mimeType, contentType) {
return this.innerTransmuxer_.isSupported(mimeType, contentType);
}
/**
* @param {string} contentType
* @param {string} mimeType
* @return {string}
* @override
* @export
*/
convertCodecs(contentType, mimeType) {
return this.innerTransmuxer_.convertCodecs(contentType, mimeType);
}
/**
* @return {string}
* @override
* @export
*/
getOriginalMimeType() {
return this.innerTransmuxer_.getOriginalMimeType();
}
/**
* @override
* @export
*/
async transmux(data, stream, reference, duration, contentType) {
// If worker creation previously failed, fall back to main thread.
if (this.workerFailed_) {
return this.innerTransmuxer_.transmux(
data, stream, reference, duration, contentType);
}
// Lazy-init: attach to the shared worker on first transmux call.
if (!this.attachedToWorker_) {
const TransmuxerProxy = shaka.transmuxer.TransmuxerProxy;
const worker = TransmuxerProxy.getOrCreateWorker_(this.workerUrl_);
if (!worker) {
this.workerFailed_ = true;
return this.innerTransmuxer_.transmux(
data, stream, reference, duration, contentType);
}
TransmuxerProxy.activeInstances_.set(this.id_, this);
this.attachedToWorker_ = true;
}
const worker = shaka.transmuxer.TransmuxerProxy.sharedWorker_;
if (!worker) {
this.workerFailed_ = true;
return this.innerTransmuxer_.transmux(
data, stream, reference, duration, contentType);
}
// Send init on first use so the worker creates the right transmuxer.
if (!this.workerReady_) {
const mimeType = this.innerTransmuxer_.getOriginalMimeType();
worker.postMessage({
'cmd': 'init',
'id': this.id_,
'mimeType': mimeType,
});
this.workerReady_ = true;
}
const reqId = this.nextReqId_++;
// Extract only the properties transmuxers actually read/write.
const streamProps = {
'id': stream.id,
'codecs': stream.codecs,
'channelsCount': stream.channelsCount,
'audioSamplingRate': stream.audioSamplingRate,
'height': stream.height,
'width': stream.width,
'language': stream.language,
};
const refProps = reference ? {
'discontinuitySequence': reference.discontinuitySequence,
'startTime': reference.startTime,
'endTime': reference.endTime,
'uris': reference.getUris(),
} : null;
// Copy the buffer before transferring so the original `data` stays valid.
// This is necessary because MediaSourceEngine may call transmux() twice
// with the same data (split muxed content: once for audio, once for video).
const buffer = shaka.util.BufferUtils.toArrayBuffer(
shaka.util.Uint8ArrayUtils.concat(data));
const {promise, resolve, reject} = Promise.withResolvers();
const timer = new shaka.util.Timer(() => {
if (this.pendingRequests_.has(reqId)) {
this.pendingRequests_.delete(reqId);
this.workerFailed_ = true;
reject(new shaka.util.Error(
shaka.util.Error.Severity.CRITICAL,
shaka.util.Error.Category.MEDIA,
shaka.util.Error.Code.TRANSMUXING_FAILED,
'Worker transmux timed out'));
}
});
timer.tickAfter(shaka.transmuxer.TransmuxerProxy.TIMEOUT_MS_ / 1000);
this.pendingRequests_.set(reqId, {
resolve,
reject,
timer,
});
try {
// Transfer the copied buffer to the worker for zero-copy delivery.
// The original data remains valid for any subsequent callers.
worker.postMessage({
'cmd': 'transmux',
'id': this.id_,
'reqId': reqId,
'data': buffer,
'streamProps': streamProps,
'refProps': refProps,
'duration': duration,
'contentType': contentType,
}, [buffer]);
} catch (e) {
timer.stop();
this.pendingRequests_.delete(reqId);
shaka.log.warning(
'Failed to post message to worker, falling back to main thread', e);
const transmuxerProxy = shaka.transmuxer.TransmuxerProxy;
transmuxerProxy.terminateWorker_('Worker postMessage failed');
return this.innerTransmuxer_.transmux(
data, stream, reference, duration, contentType);
}
const response = await promise;
// Apply stream mutations back to the real stream object.
const mutations = response['streamMutations'];
if (mutations && Object.keys(mutations).length > 0) {
for (const key of Object.keys(mutations)) {
stream[key] = mutations[key];
}
}
// Reconstruct the output.
const output = response['output'];
const BufferUtils = shaka.util.BufferUtils;
if (output['type'] === 'raw') {
return BufferUtils.toUint8(
/** @type {!ArrayBuffer} */(output['data']));
} else {
return {
data: BufferUtils.toUint8(
/** @type {!ArrayBuffer} */(output['data'])),
init: output['init'] ? BufferUtils.toUint8(
/** @type {!ArrayBuffer} */(output['init'])) : null,
};
}
}
/**
* Handles messages from the shared worker for this instance.
* @param {!Object} msg
* @private
*/
onWorkerMessage_(msg) {
const cmd = msg['cmd'];
if (cmd === 'transmuxed' || cmd === 'error') {
const reqId = msg['reqId'];
const pending = this.pendingRequests_.get(reqId);
if (!pending) {
return;
}
pending.timer.stop();
this.pendingRequests_.delete(reqId);
if (cmd === 'error') {
const errorObj = msg['error'];
pending.reject(new shaka.util.Error(
errorObj['severity'],
errorObj['category'],
errorObj['code'],
...errorObj['data']));
} else {
pending.resolve(msg);
}
}
}
};
/** @private {number} */
shaka.transmuxer.TransmuxerProxy.nextId_ = 0;
/**
* Timeout in milliseconds for a worker transmux response. If the worker does
* not respond within this time, the request is rejected and future calls fall
* back to the main thread.
* @private @const {number}
*/
shaka.transmuxer.TransmuxerProxy.TIMEOUT_MS_ = 30000;
/**
* Shared Worker instance used by all TransmuxerProxy instances.
* @private {?Worker}
*/
shaka.transmuxer.TransmuxerProxy.sharedWorker_ = null;
/**
* Map of active instances keyed by ID, for routing worker messages.
* @private {!Map<number, !shaka.transmuxer.TransmuxerProxy>}
*/
shaka.transmuxer.TransmuxerProxy.activeInstances_ = new Map();
/**
* Gets or creates the shared worker. Returns null if the worker cannot be
* created (unsupported device, missing script URL, or creation error).
* @param {string} workerUrlOverride
* @return {?Worker}
* @private
*/
shaka.transmuxer.TransmuxerProxy.getOrCreateWorker_ = (workerUrlOverride) => {
const TransmuxerProxy = shaka.transmuxer.TransmuxerProxy;
if (TransmuxerProxy.sharedWorker_) {
return TransmuxerProxy.sharedWorker_;
}
const device = shaka.device.DeviceFactory.getDevice();
if (!device.supportsWorkerTransmux()) {
shaka.log.info(
'Device does not support worker transmuxing; ' +
'falling back to main-thread transmuxing');
return null;
}
if (!workerUrlOverride) {
shaka.log.warning(
'Transmuxer worker URL is not configured ' +
'(mediaSource.transmuxWorkerUrl); ' +
'falling back to main-thread transmuxing');
return null;
}
try {
const worker = new Worker(workerUrlOverride);
worker.addEventListener('message', (event) => {
const msg = /** @type {!MessageEvent} */(event).data;
const cmd = msg['cmd'];
if (cmd === 'transmuxed' || cmd === 'error') {
// Route directly to the instance that owns this request.
const instance = TransmuxerProxy.activeInstances_.get(msg['id']);
if (instance) {
instance.onWorkerMessage_(msg);
}
}
});
worker.addEventListener('error', (event) => {
shaka.log.warning('Transmuxer worker error:', event);
TransmuxerProxy.terminateWorker_('Worker error');
});
TransmuxerProxy.sharedWorker_ = worker;
return worker;
} catch (e) {
shaka.log.warning(
'Failed to create transmuxer worker, falling back to main thread', e);
return null;
}
};
/**
* Marks all active instances as failed, rejects their pending requests, and
* shuts down the shared worker.
* @param {string} message Error message for rejected promises.
* @private
*/
shaka.transmuxer.TransmuxerProxy.terminateWorker_ = (message) => {
const TransmuxerProxy = shaka.transmuxer.TransmuxerProxy;
for (const instance of TransmuxerProxy.activeInstances_.values()) {
instance.workerFailed_ = true;
for (const pending of instance.pendingRequests_.values()) {
pending.timer.stop();
pending.reject(new shaka.util.Error(
shaka.util.Error.Severity.CRITICAL,
shaka.util.Error.Category.MEDIA,
shaka.util.Error.Code.TRANSMUXING_FAILED,
message));
}
instance.pendingRequests_.clear();
}
TransmuxerProxy.sharedWorker_ = null;
TransmuxerProxy.activeInstances_.clear();
};