"Fossies" - the Fresh Open Source Software archive 
Member "pitivi-0.15.2/pitivi/stream.py" of archive pitivi-0.15.2.tar.gz:
# PiTiVi , Non-linear video editor
#
# pitivi/stream.py
#
# Copyright (c) 2008, Edward Hervey <bilboed@bilboed.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
"""
Multimedia stream, used for definition of media streams
"""
from pitivi.log.loggable import Loggable
import pitivi.log.log as log
import gst
STREAM_MATCH_SAME_CAPS = 60
STREAM_MATCH_SAME_PAD_NAME = 30
STREAM_MATCH_COMPATIBLE_CAPS = 20
STREAM_MATCH_SAME_TYPE = 10
STREAM_MATCH_NONE = 0
class MultimediaStream(Loggable):
"""
Defines a media stream
@cvar raw: The stream is a raw media format.
@type raw: C{bool}
@cvar fixed: The stream is entirely defined.
@type fixed: C{bool}
@cvar caps: The caps corresponding to the stream
@type caps: C{gst.Caps}
@cvar pad_name: The name of the pad from which this stream originated.
@type pad_name: C{str}
"""
def __init__(self, caps, pad_name=None):
Loggable.__init__(self)
self.caps = caps
self.pad_name = pad_name
self.fixed = caps.is_fixed()
self.raw = None
self._analyzeCaps()
self.log("new with caps %s" % self.caps.to_string())
def _analyzeCaps(self):
"""
Override to extract properties from caps.
"""
# NOTE: current implementations only parse the first structure. It could
# be a bit limited but on the other hand, Streams are just a thin layer
# on top of caps. For more complex things caps should be used.
def isCompatible(self, other):
"""
Checks whether the stream is compatible with the other streams.
That means they have compatible caps
@param other: another stream
@type other: L{MultimediaStream}
@return: C{True} if the stream is compatible.
@rtype: C{bool}
"""
classes = [type(self), type(other)]
compatible_classes = issubclass(*classes) or issubclass(*classes[::-1])
return compatible_classes and not self.caps.intersect(other.caps).is_empty()
def isCompatibleWithName(self, other):
"""
Checks whether the stream is compatible with the other streams caps
and pad name.
@param other: another stream
@type other: L{MultimediaStream}
@return: C{True} if the stream is compatible.
@rtype: C{bool}
"""
#if self.pad_name and other.pad_name:
self.log("self.pad_name:%r, other.pad_name:%r",
self.pad_name, other.pad_name)
return self.pad_name == other.pad_name and self.isCompatible(other)
#return self.isCompatible(other)
def __str__(self):
return "<%s(%s) '%s'>" % (self.__class__.__name__, self.pad_name, self.caps)
class VideoStream(MultimediaStream):
"""
Video Stream
@cvar width: Width of the video in pixels.
@type width: C{int}
@cvar height: Height of the video in pixels.
@type height: C{int}
@cvar framerate: Framerate of the video.
@type framerate: C{gst.Fraction}
@cvar format: The subtype of video type
@cvar par: The pixel-aspect-ratio of the video stream.
@type par: C{gst.Fraction}
@cvar dar: The display-aspect-ratio of the video stream.
@type dar: C{gst.Fraction}
@cvar is_image: If the stream is an image.
@type is_image: C{bool}
@cvar thumbnail: The thumbnail associated with this stream
"""
def __init__(self, caps, pad_name=None, is_image=False):
self.width = None
self.height = None
self.framerate = gst.Fraction(1, 1)
self.format = None
self.is_image = is_image
self.thumbnail = None
self.par = gst.Fraction(1, 1)
self.dar = gst.Fraction(4, 3)
MultimediaStream.__init__(self, caps, pad_name)
def _analyzeCaps(self):
if len(self.caps) == 0:
# FIXME: rendering still images triggers this as we aren't using
# decodebin2 and caps are still not negotiated when this happens. We
# should fix this, but for the moment just returning makes rendering
# work
self.error("can't analyze %s", self.caps)
return
struct = self.caps[0]
self.videotype = struct.get_name()
self.raw = self.videotype.startswith("video/x-raw-")
for property_name in ('width', 'height', 'framerate', 'format',
'bpp', 'depth'):
try:
setattr(self, property_name, struct[property_name])
except KeyError:
# property not in caps
pass
try:
self.par = struct['pixel-aspect-ratio']
except KeyError:
pass
# compute display aspect ratio
try:
if self.width and self.height and self.par:
self.dar = gst.Fraction(self.width * self.par.num,
self.height * self.par.denom)
elif self.width and self.height:
self.dar = gst.Fraction(self.width, self.height)
except:
self.dar = gst.Fraction(4, 3)
def has_alpha(self):
if self.videotype == "video/x-raw-rgb":
return ((hasattr(self, 'bpp') and (self.bpp == 32)) and
(hasattr(self, 'depth') and (self.depth == 32)))
elif self.videotype == "video/x-raw-yuv":
return (hasattr(self, 'format') and
self.format == gst.Fourcc('AYUV'))
return False
class AudioStream(MultimediaStream):
"""
Audio stream
@cvar audiotype: Type of the audio stream (Ex: audio/x-raw-int)
@type audiotype: C{str}
@cvar channels: The number of channels handled by this Stream
@type channels: C{int}
@cvar rate: The sample rate
@type rate: C{int}
@cvar width: The number of bits taken by an individual sample.
@type width: C{int}
@cvar depth: The number of useful bits used by an individual sample.
@type depth: C{int}
"""
def __init__(self, caps, pad_name=None):
# initialize properties here for clarity
self.audiotype = None
self.channels = None
self.rate = None
self.width = None
self.depth = None
MultimediaStream.__init__(self, caps, pad_name)
def _analyzeCaps(self):
struct = self.caps[0]
self.audiotype = struct.get_name()
self.raw = self.audiotype.startswith('audio/x-raw-')
for property_name in ('channels', 'rate', 'width', 'height', 'depth'):
try:
setattr(self, property_name, struct[property_name])
except KeyError:
# property not in the caps
pass
if self.width and not self.depth:
self.depth = self.width
class TextStream(MultimediaStream):
"""
Text media stream
"""
def _analyzeCaps(self):
self.texttype = self.caps[0].get_name()
def find_decoder(pad):
decoder = None
while pad is not None:
if pad.props.direction == gst.PAD_SINK:
pad = pad.get_peer()
continue
if isinstance(pad, gst.GhostPad):
pad = pad.get_target()
continue
element = pad.get_parent_element()
if element is None or isinstance(element, gst.Bin):
return None
factory = element.get_factory()
if factory is not None and ('Decoder' in factory.get_klass() or \
'Codec/Demuxer/Audio' == factory.get_klass()):
decoder = element
break
pad = element.get_pad('sink')
return decoder
def find_upstream_demuxer_and_pad(pad):
while pad:
if pad.props.direction == gst.PAD_SRC \
and isinstance(pad, gst.GhostPad):
pad = pad.get_target()
continue
if pad.props.direction == gst.PAD_SINK:
pad = pad.get_peer()
continue
element = pad.get_parent()
if isinstance(element, gst.Pad):
# pad is a proxy pad
element = element.get_parent()
if element is None:
pad = None
continue
element_factory = element.get_factory()
if element_factory is None:
# python elements don't have a factory
return None, None
element_klass = element_factory.get_klass()
if 'Demuxer' in element_klass:
return element, pad
sink_pads = list(element.sink_pads())
if len(sink_pads) > 1:
if element_factory.get_name() == 'multiqueue':
pad = element.get_pad(pad.get_name().replace('src', 'sink'))
else:
raise Exception('boom!')
elif len(sink_pads) == 0:
pad = None
else:
pad = sink_pads[0]
return None, None
def get_type_from_decoder(decoder):
log.debug("stream", "%r" % decoder)
klass = decoder.get_factory().get_klass()
parts = klass.split('/', 2)
if len(parts) != 3:
return None
return parts[2].lower()
def get_pad_type(pad):
decoder = find_decoder(pad)
if decoder:
return get_type_from_decoder(decoder)
caps = pad.props.caps
if caps is None:
caps = pad.get_caps()
return caps[0].get_name().split('/', 1)[0]
def get_pad_id(pad):
lst = []
while pad:
demuxer, pad = find_upstream_demuxer_and_pad(pad)
if (demuxer, pad) != (None, None):
lst.append([demuxer.get_factory().get_name(), pad.get_name()])
# FIXME: we always follow back the first sink
try:
pad = list(demuxer.sink_pads())[0]
except IndexError:
pad = None
return lst
def get_stream_for_caps(caps, pad=None):
"""
Returns the appropriate MediaStream corresponding to the
given caps.
"""
log.debug("stream", "caps:%s, pad:%r" % (caps.to_string(), pad))
# FIXME : we should have an 'unknown' data stream class
ret = None
if pad is not None:
pad_name = pad.get_name()
stream_type = get_pad_type(pad)
else:
pad_name = None
stream_type = caps[0].get_name().split('/', 1)[0]
log.debug("stream", "stream_type:%s" % stream_type)
if stream_type in ('video', 'image'):
ret = VideoStream(caps, pad_name, stream_type == 'image')
elif stream_type == 'audio':
ret = AudioStream(caps, pad_name)
elif stream_type in ('text', 'subpicture'):
ret = TextStream(caps, pad_name)
return ret
def get_stream_for_pad(pad, store_pad=False):
log.debug("stream", "pad:%r")
caps = pad.props.caps
if caps is None:
caps = pad.get_caps()
pad_id = get_pad_id(pad)
stream = get_stream_for_caps(caps, pad)
stream.pad_id = pad_id
if store_pad:
stream.pad = pad
return stream
def pad_compatible_stream(pad, stream):
"""
Checks whether the given pad is compatible with the given stream.
@param pad: The pad
@type pad: C{gst.Pad}
@param stream: The stream to match against.
@type stream: L{MultimediaStream}
@return: Whether the pad is compatible with the given stream
@rtype: C{bool}
"""
log.debug("stream", "pad:%r, stream:%r" % (pad, stream))
if stream == None:
# yes, None is the magical stream that takes everything
return True
# compatible caps
if stream.caps:
return not stream.caps.intersect(pad.get_caps()).is_empty()
raise Exception("Can't figure out compatibility since the stream doesn't have any caps")
def get_pads_for_stream(element, stream):
"""
Fetches the pads of the given element which are compatible with the given
stream.
@param element: The element to search on.
@type element: C{gst.Element}
@param stream: The stream to match against.
@type stream: L{MultimediaStream}
@return: The compatible pads
@rtype: List of C{gst.Pad}
"""
log.debug("stream", "element:%r, stream:%r" % (element, stream))
while True:
try:
ls = [x for x in element.pads() if pad_compatible_stream(x, stream)]
break
except TypeError:
continue
# FIXME : I'm not 100% certain that checking against the stream pad_name
# is a good idea ....
# only filter the list if there's more than one choice
if stream and len(ls) > 1 and stream.pad_name:
return [x for x in ls if x.get_name() == stream.pad_name]
return ls
def get_src_pads_for_stream(element, stream):
"""
Fetches the source pads of the given element which are compatible with the
given stream.
@param element: The element to search on.
@type element: C{gst.Element}
@param stream: The stream to match against.
@type stream: L{MultimediaStream}
@return: The compatible source pads
@rtype: List of C{gst.Pad}
"""
return [x for x in get_pads_for_stream(element, stream) if x.get_direction() == gst.PAD_SRC]
def get_sink_pads_for_stream(element, stream):
"""
Fetches the sink pads of the given element which are compatible with the
given stream.
@param element: The element to search on.
@type element: C{gst.Element}
@param stream: The stream to match against.
@type stream: L{MultimediaStream}
@return: The compatible sink pads
@rtype: List of C{gst.Pad}
"""
return [x for x in get_pads_for_stream(element, stream) if x.get_direction() == gst.PAD_SINK]
def stream_compare(stream_a, stream_b):
"""
Compare two streams.
"""
current_rank = STREAM_MATCH_NONE
if stream_a.pad_name is not None and (stream_a.pad_name ==
stream_b.pad_name):
current_rank += STREAM_MATCH_SAME_PAD_NAME
if stream_a.caps is not None:
if stream_a.caps == stream_b.caps:
current_rank += STREAM_MATCH_SAME_CAPS
elif stream_a.caps.intersect(stream_b.caps):
current_rank += STREAM_MATCH_COMPATIBLE_CAPS
else:
name_a = stream_a.caps[0].get_name()
name_b = stream_b.caps[0].get_name()
if name_a.split("/", 1)[0] == name_b.split("/", 1)[0]:
current_rank += STREAM_MATCH_SAME_TYPE
return current_rank
def match_stream(stream, stream_list):
"""
Get the stream contained in stream_list that best matches the given stream.
"""
best_stream = None
best_rank = STREAM_MATCH_NONE
for current_stream in stream_list:
current_rank = stream_compare(stream, current_stream)
if current_rank > best_rank:
best_rank = current_rank
best_stream = current_stream
return best_stream, best_rank
class StreamGroupWalker(object):
"""
Utility class used to match two groups of streams.
This class implements a greedy algorithm to compare two sets of streams. See
match_stream_groups for an example of usage.
"""
def __init__(self, group_a, group_b,
stream_a=None, stream_b=None, parent=None):
self.group_a = list(group_a)
self.group_b = list(group_b)
self.stream_a = stream_a
self.stream_b = stream_b
if stream_a is not None and stream_b is not None:
match = stream_compare(stream_a, stream_b)
self.match = ((stream_a, stream_b), match)
else:
self.match = None
self.parent = parent
def advance(self):
walkers = []
for stream_a in self.group_a:
for stream_b in self.group_b:
group_a = list(self.group_a)
group_a.remove(stream_a)
group_b = list(self.group_b)
group_b.remove(stream_b)
walker = StreamGroupWalker(group_a, group_b,
stream_a, stream_b, self)
walkers.append(walker)
return walkers
def getMatches(self):
matches = {}
walker = self
while walker is not None:
if walker.match is not None:
if matches.get(walker.match[0],
STREAM_MATCH_NONE) < walker.match[1]:
matches[walker.match[0]] = walker.match[1]
walker = walker.parent
return matches
def match_stream_groups(group_a, group_b):
"""
Match two groups of streams.
The function takes two sequences group_a and group_b of streams and returns
a dictionary of (stream_a, stream_b) -> rank, where stream_a belongs to
group_a, stream_b belongs to group_b and rank is stream_compare(stream_a,
stream_b).
The algorithm tries all the possible combinations of group_a and group_b and
returns the "best" match between group_a and group_b, ie the dictionary
having the sum of the ranks maximized.
"""
walker = StreamGroupWalker(group_a, group_b)
walkers = [walker]
best_rank = 0
best_map = {}
while walkers:
walker = walkers.pop(0)
child_walkers = walker.advance()
if child_walkers:
walkers.extend(child_walkers)
continue
current_map = walker.getMatches()
current_rank = sum(current_map.values())
if current_rank > best_rank:
best_rank = current_rank
best_map = current_map
return best_map
def match_stream_groups_map(group_a, group_b):
stream_map = match_stream_groups(group_a, group_b)
return dict(stream_map.keys())