gstreamer/libs/gst/helpers/gst_gdb.py

1278 lines
41 KiB
Python

# GStreamer
# Copyright (C) 2018 Pengutronix, Michael Olbrich <m.olbrich@pengutronix.de>
#
# gst_gdb.py: gdb extension for GStreamer
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
import gdb
import sys
import re
from glib_gobject_helper import g_type_to_name, g_type_name_from_instance, \
g_type_to_typenode, g_quark_to_string
if sys.version_info[0] >= 3:
long = int
def is_gst_type(val, klass):
def _is_gst_type(type):
if str(type) == klass:
return True
while type.code == gdb.TYPE_CODE_TYPEDEF:
type = type.target()
if type.code != gdb.TYPE_CODE_STRUCT:
return False
fields = type.fields()
if len(fields) < 1:
return False
first_field = fields[0]
return _is_gst_type(first_field.type)
type = val.type
if type.code != gdb.TYPE_CODE_PTR:
return False
type = type.target()
return _is_gst_type(type)
class GstMiniObjectPrettyPrinter:
"Prints a GstMiniObject instance pointer"
def __init__(self, val):
self.val = val
def to_string(self):
try:
inst = self.val.cast(gdb.lookup_type("GstMiniObject").pointer())
gtype = inst["type"]
name = g_type_to_name(gtype)
return "0x%x [%s]" % (long(self.val), name)
except RuntimeError:
return "0x%x" % long(self.val)
class GstObjectPrettyPrinter:
"Prints a GstObject instance"
def __init__(self, val):
self.val = val
def to_string(self):
try:
name = g_type_name_from_instance(self.val)
if not name:
name = str(self.val.type.target())
if long(self.val) != 0:
inst = self.val.cast(gdb.lookup_type("GstObject").pointer())
inst_name = inst["name"].string()
if inst_name:
name += "|" + inst_name
return ("0x%x [%s]") % (long(self.val), name)
except RuntimeError:
return "0x%x" % long(self.val)
GST_SECOND = 1000000000
GST_CLOCK_TIME_NONE = 2**64-1
GST_CLOCK_STIME_NONE = -2**63
def format_time(n, signed=False):
prefix = ""
invalid = False
if signed:
if n == GST_CLOCK_STIME_NONE:
invalid = True
prefix = "+" if n >= 0 else "-"
n = abs(n)
else:
if n == GST_CLOCK_TIME_NONE:
invalid = True
if invalid:
return "99:99:99.999999999"
return "%s%u:%02u:%02u.%09u" % (
prefix,
n / (GST_SECOND * 60 * 60),
(n / (GST_SECOND * 60)) % 60,
(n / GST_SECOND) % 60,
n % GST_SECOND)
def format_time_value(val):
return format_time(int(val), str(val.type) == "GstClockTimeDiff")
class GstClockTimePrinter:
"Prints a GstClockTime / GstClockTimeDiff"
def __init__(self, val):
self.val = val
def to_string(self):
return "%d [%s]" % (int(self.val), format_time_value(self.val))
def gst_pretty_printer_lookup(val):
if is_gst_type(val, "GstMiniObject"):
return GstMiniObjectPrettyPrinter(val)
if is_gst_type(val, "GstObject"):
return GstObjectPrettyPrinter(val)
if str(val.type) == "GstClockTime" or str(val.type) == "GstClockTimeDiff":
return GstClockTimePrinter(val)
return None
def save_memory_access(fallback):
def _save_memory_access(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except gdb.MemoryError:
return fallback
return wrapper
return _save_memory_access
def save_memory_access_print(message):
def _save_memory_access_print(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except gdb.MemoryError:
_gdb_write(args[1], message)
return wrapper
return _save_memory_access_print
def _g_type_from_instance(instance):
if long(instance) != 0:
try:
inst = instance.cast(gdb.lookup_type("GTypeInstance").pointer())
klass = inst["g_class"]
gtype = klass["g_type"]
return gtype
except RuntimeError:
pass
return None
def g_inherits_type(val, typename):
if is_gst_type(val, "GstObject"):
gtype = _g_type_from_instance(val)
if gtype is None:
return False
typenode = g_type_to_typenode(gtype)
elif is_gst_type(val, "GstMiniObject"):
mini = val.cast(gdb.lookup_type("GstMiniObject").pointer())
try:
typenode = mini["type"].cast(gdb.lookup_type("TypeNode").pointer())
except gdb.MemoryError:
return False
else:
return False
for i in range(typenode["n_supers"]):
if g_type_to_name(typenode["supers"][i]) == typename:
return True
return False
def gst_is_bin(val):
return g_inherits_type(val, "GstBin")
def _g_array_iter(array, element_type):
if array == 0:
return
try:
item = array["data"].cast(element_type.pointer())
for i in range(int(array["len"])):
yield item[i]
except gdb.MemoryError:
pass
def _g_value_get_value(val):
typenode = g_type_to_typenode(val["g_type"])
if not typenode:
return None
tname = g_quark_to_string(typenode["qname"])
fname = g_type_to_name(typenode["supers"][int(typenode["n_supers"])])
if fname in ("gchar", "guchar", "gboolean", "gint", "guint", "glong",
"gulong", "gint64", "guint64", "gfloat", "gdouble",
"gpointer", "GFlags"):
try:
t = gdb.lookup_type(tname).pointer()
except RuntimeError:
t = gdb.lookup_type(fname).pointer()
elif fname == "gchararray":
t = gdb.lookup_type("char").pointer().pointer()
elif fname == "GstBitmask":
t = gdb.lookup_type("guint64").pointer()
elif fname == "GstFlagSet":
t = gdb.lookup_type("guint").pointer().pointer()
return val["data"].cast(t)
elif fname == "GstFraction":
t = gdb.lookup_type("gint").pointer().pointer()
return val["data"].cast(t)
elif fname == "GstFractionRange":
t = gdb.lookup_type("GValue").pointer().pointer()
elif fname == "GstValueList":
t = gdb.lookup_type("GArray").pointer().pointer()
elif fname in ("GBoxed", "GObject"):
try:
t = gdb.lookup_type(tname).pointer().pointer()
except RuntimeError:
t = gdb.lookup_type(tname).pointer().pointer()
else:
return val["data"]
return val["data"].cast(t).dereference()
def gst_object_from_value(value):
if value.type.code != gdb.TYPE_CODE_PTR:
value = value.address
if not is_gst_type(value, "GstObject"):
raise Exception("'%s' is not a GstObject" % args[0])
return value.cast(gdb.lookup_type("GstObject").pointer())
def gst_object_pipeline(obj):
try:
while obj["parent"] != 0:
tmp = obj["parent"]
# sanity checks to handle memory corruption
if g_inherits_type(obj, "GstElement") and \
GdbGstElement(obj) not in GdbGstElement(tmp).children():
break
if g_inherits_type(obj, "GstPad"):
pad = GdbGstPad(obj)
if g_inherits_type(tmp, "GstElement"):
if pad not in GdbGstElement(tmp).pads():
break
elif g_inherits_type(tmp, "GstProxyPad"):
t = gdb.lookup_type("GstProxyPad").pointer()
if pad != GdbGstPad(tmp.cast(t)["priv"]["internal"]):
break
obj = tmp
except gdb.MemoryError:
pass
if not g_inherits_type(obj, "GstElement"):
raise Exception("Toplevel parent is not a GstElement")
return obj.cast(gdb.lookup_type("GstElement").pointer())
def element_state_to_name(state):
names = [
"VOID_PENDING",
"NULL",
"READY",
"PAUSED",
"PLAYING"]
return names[state] if state < len(names) else "UNKNOWN"
def task_state_to_name(state):
names = [
"STARTED",
"STOPPED",
"PAUSED"]
return names[state] if state < len(names) else "UNKNOWN"
def _gdb_write(indent, text):
gdb.write("%s%s\n" % (" " * indent, text))
class GdbCapsFeatures:
def __init__(self, val):
self.val = val
def size(self):
if long(self.val) == 0:
return 0
return int(self.val["array"]["len"])
def items(self):
if long(self.val) == 0:
return
for q in _g_array_iter(self.val["array"], gdb.lookup_type("GQuark")):
yield q
def __eq__(self, other):
if self.size() != other.size():
return False
a1 = list(self.items())
a2 = list(other.items())
for item in a1:
if item not in a2:
return False
return True
def __str__(self):
if long(self.val) == 0:
return ""
count = self.size()
if int(self.val["is_any"]) == 1 and count == 0:
return "(ANY)"
if count == 0:
return ""
s = ""
for f in self.items():
ss = g_quark_to_string(f)
if ss != "memory:SystemMemory" or count > 1:
s += ", " if s else ""
s += ss
return s
class GdbGstCaps:
def __init__(self, val):
self.val = val.cast(gdb.lookup_type("GstCapsImpl").pointer())
def size(self):
return int(self.val["array"]["len"])
def items(self):
gdb_type = gdb.lookup_type("GstCapsArrayElement")
for f in _g_array_iter(self.val["array"], gdb_type):
yield(GdbCapsFeatures(f["features"]),
GdbGstStructure(f["structure"]))
def __eq__(self, other):
if self.size() != other.size():
return False
a1 = list(self.items())
a2 = list(other.items())
for i in range(self.size()):
if a1[i] != a2[i]:
return False
return True
def dot(self):
if self.size() == 0:
return "ANY"
s = ""
for (features, structure) in self.items():
s += structure.name()
tmp = str(features)
if tmp:
s += "(" + tmp + ")"
s += "\\l"
if structure.size() > 0:
s += "\\l".join(structure.value_strings(" %18s: %s")) + "\\l"
return s
@save_memory_access_print("<inaccessible memory>")
def print(self, indent, prefix=""):
items = list(self.items())
if len(items) != 1:
_gdb_write(indent, prefix)
prefix = ""
for (features, structure) in items:
s = "%s %s" % (prefix, structure.name())
tmp = str(features)
if tmp:
s += "(" + tmp + ")"
_gdb_write(indent, s)
for val in structure.value_strings("%s: %s", False):
_gdb_write(indent+1, val)
return s
class GdbGValue:
def __init__(self, val):
self.val = val
def fundamental_typename(self):
typenode = g_type_to_typenode(self.val["g_type"])
if not typenode:
return None
return g_type_to_name(typenode["supers"][int(typenode["n_supers"])])
def value(self):
return _g_value_get_value(self.val)
def __str__(self):
try:
value = self.value()
tname = self.fundamental_typename()
gvalue_type = gdb.lookup_type("GValue")
if tname == "GstFraction":
v = "%d/%d" % (value[0], value[1])
elif tname == "GstBitmask":
v = "0x%016x" % long(value)
elif tname == "gboolean":
v = "false" if int(value) == 0 else "true"
elif tname == "GstFlagSet":
v = "%x:%x" % (value[0], value[1])
elif tname == "GstIntRange":
rmin = int(value[0]["v_uint64"]) >> 32
rmax = int(value[0]["v_uint64"]) & 0xffffffff
step = int(value[1]["v_int"])
if step == 1:
v = "[ %d, %d ]" % (rmin, rmax)
else:
v = "[ %d, %d, %d ]" % (rmin*step, rmax*step, step)
elif tname == "GstFractionRange":
v = "[ %s, %s ]" % (GdbGValue(value[0]), GdbGValue(value[1]))
elif tname in ("GstValueList", "GstValueArray"):
if gvalue_type.fields()[1].type == value.type:
gdb_type = gdb.lookup_type("GArray").pointer()
value = value[0]["v_pointer"].cast(gdb_type)
v = "<"
for l in _g_array_iter(value, gvalue_type):
v += " " if v == "<" else ", "
v += str(GdbGValue(l))
v += " >"
elif tname in ("GEnum"):
v = "%s(%s)" % (
g_type_to_name(g_type_to_typenode(self.val["g_type"])),
value["v_int"])
else:
try:
v = value.string()
except RuntimeError:
# it is not a string-like type
if gvalue_type.fields()[1].type == value.type:
# don't print the raw GValue union
v = "<unknown type: %s>" % tname
else:
v = str(value)
except gdb.MemoryError:
v = "<inaccessible memory at 0x%x>" % int(self.val)
return v
def __eq__(self, other):
return self.val == other.val
class GdbGstStructure:
def __init__(self, val):
self.val = val.cast(gdb.lookup_type("GstStructureImpl").pointer())
@save_memory_access("<inaccessible memory>")
def name(self):
return g_quark_to_string(self.val["s"]["name"])
@save_memory_access(0)
def size(self):
return int(self.val["fields_len"])
def values(self):
item = self.val["fields"].cast(gdb.lookup_type("GstStructureField").pointer())
for i in range(self.size()):
f = item[i]
key = g_quark_to_string(f["name"])
value = GdbGValue(f["value"])
yield(key, value)
def value(self, key):
for (k, value) in self.values():
if k == key:
return value
raise KeyError(key)
def __eq__(self, other):
if self.size() != other.size():
return False
a1 = list(self.values())
a2 = list(other.values())
for (key, value) in a1:
if (key, value) not in a2:
return False
return True
def value_strings(self, pattern, elide=True):
s = []
for (key, value) in self.values():
v = str(value)
if elide and len(v) > 25:
if v[0] in "[(<\"":
v = v[:20] + "... " + v[-1:]
else:
v = v[:22] + "..."
s.append(pattern % (key, v))
return s
@save_memory_access_print("<inaccessible memory>")
def print(self, indent, prefix=None):
if prefix is not None:
_gdb_write(indent, "%s: %s" % (prefix, self.name()))
else:
_gdb_write(indent, "%s:" % (self.name()))
for (key, value) in self.values():
_gdb_write(indent+1, "%s: %s" % (key, str(value)))
class GdbGstSegment:
def __init__(self, val):
self.val = val
self.fmt = str(self.val["format"]).split("_")[-1].lower()
def format_value(self, n):
if self.fmt == "time":
return format_time(n, False)
else:
return str(n)
def print_optional(self, indent, key, skip=None):
value = int(self.val[key])
if skip is None or value != skip:
_gdb_write(indent, "%s:%s %s" %
(key, (8-len(key))*" ", self.format_value(value)))
def print(self, indent, seqnum=None):
s = "segment:"
if seqnum:
s += "(seqnum: %s)" % seqnum
_gdb_write(indent, s)
rate = float(self.val["rate"])
applied_rate = float(self.val["applied_rate"])
if applied_rate != 1.0:
applied = "(applied rate: %g)" % applied_rate
else:
applied = ""
_gdb_write(indent+1, "rate: %g%s" % (rate, applied))
self.print_optional(indent+1, "base", 0)
self.print_optional(indent+1, "offset", 0)
self.print_optional(indent+1, "start")
self.print_optional(indent+1, "stop", GST_CLOCK_TIME_NONE)
self.print_optional(indent+1, "time")
self.print_optional(indent+1, "position")
self.print_optional(indent+1, "duration", GST_CLOCK_TIME_NONE)
class GdbGstEvent:
def __init__(self, val):
self.val = val.cast(gdb.lookup_type("GstEventImpl").pointer())
@save_memory_access("<inaccessible memory>")
def typestr(self):
t = self.val["event"]["type"]
(event_quarks, _) = gdb.lookup_symbol("event_quarks")
event_quarks = event_quarks.value()
i = 0
while event_quarks[i]["name"] != 0:
if t == event_quarks[i]["type"]:
return event_quarks[i]["name"].string()
i += 1
return None
def structure(self):
return GdbGstStructure(self.val["structure"])
@save_memory_access_print("<inaccessible memory>")
def print(self, indent):
typestr = self.typestr()
seqnum = self.val["event"]["seqnum"]
if typestr == "caps":
caps = GdbGstCaps(self.structure().value("caps").value())
caps.print(indent, "caps (seqnum: %s):" % seqnum)
elif typestr == "stream-start":
stream_id = self.structure().value("stream-id").value()
_gdb_write(indent, "stream-start: (seqnum %s)" % seqnum)
_gdb_write(indent + 1, "stream-id: %s" % stream_id.string())
elif typestr == "segment":
segment = self.structure().value("segment").value()
GdbGstSegment(segment).print(indent, seqnum)
elif typestr == "tag":
struct = self.structure()
# skip 'GstTagList-'
name = struct.name()[11:]
t = gdb.lookup_type("GstTagListImpl").pointer()
s = struct.value("taglist").value().cast(t)["structure"]
structure = GdbGstStructure(s)
_gdb_write(indent, "tag: %s (seqnum: %s)" % (name, seqnum))
for (key, value) in structure.values():
_gdb_write(indent+1, "%s: %s" % (key, str(value)))
else:
self.structure().print(indent, "%s (seqnum: %s)" % (typestr, seqnum))
class GdbGstBuffer:
def __init__(self, val):
self.val = val.cast(gdb.lookup_type("GstBuffer").pointer())
def print_optional(self, indent, key, skip=None, format_func=str):
value = int(self.val[key])
if skip is None or value != skip:
_gdb_write(indent, "%s:%s %s" %
(key, (8-len(key))*" ", format_func(value)))
@save_memory_access_print("<inaccessible memory>")
def print(self, indent):
_gdb_write(indent, "GstBuffer: (0x%x)" % self.val)
indent += 1
self.print_optional(indent, "pool", 0)
self.print_optional(indent, "pts", GST_CLOCK_TIME_NONE, format_time)
self.print_optional(indent, "dts", GST_CLOCK_TIME_NONE, format_time)
self.print_optional(indent, "duration", GST_CLOCK_TIME_NONE, format_time)
self.print_optional(indent, "offset", GST_CLOCK_TIME_NONE)
self.print_optional(indent, "offset_end", GST_CLOCK_TIME_NONE)
impl = self.val.cast(gdb.lookup_type("GstBufferImpl").pointer())
meta_item = impl['item']
if meta_item:
_gdb_write(indent, "Metas:")
indent += 1
while meta_item:
meta = meta_item['meta']
meta_type_name = g_type_to_name(meta['info']['type'])
_gdb_write(indent, "%s:" % meta_type_name)
indent += 1
meta_info = str(meta.cast(gdb.lookup_type(meta_type_name)))
for l in meta_info.split('\n'):
_gdb_write(indent, l)
indent -= 1
meta_item = meta_item['next']
else:
_gdb_write(indent, "(No meta)")
class GdbGstQuery:
def __init__(self, val):
self.val = val.cast(gdb.lookup_type("GstQueryImpl").pointer())
@save_memory_access("<inaccessible memory>")
def typestr(self):
t = self.val["query"]["type"]
(query_quarks, _) = gdb.lookup_symbol("query_quarks")
query_quarks = query_quarks.value()
i = 0
while query_quarks[i]["name"] != 0:
if t == query_quarks[i]["type"]:
return query_quarks[i]["name"].string()
i += 1
return None
def structure(self):
return GdbGstStructure(self.val["structure"])
@save_memory_access_print("<inaccessible memory>")
def print(self, indent):
typestr = self.typestr()
self.structure().print(indent, typestr)
class GdbGstObject:
def __init__(self, klass, val):
self.val = val.cast(klass)
@save_memory_access("<inaccessible memory>")
def name(self):
obj = self.val.cast(gdb.lookup_type("GstObject").pointer())
return obj["name"].string()
def full_name(self):
parent = self.parent_element()
return "%s%s" % (parent.name() + ":" if parent else "", self.name())
def dot_name(self):
ptr = self.val.cast(gdb.lookup_type("void").pointer())
return re.sub('[^a-zA-Z0-9<>]', '_', "%s_%s" % (self.name(), str(ptr)))
def parent(self):
obj = self.val.cast(gdb.lookup_type("GstObject").pointer())
return obj["parent"]
def parent_element(self):
p = self.parent()
if p != 0 and g_inherits_type(p, "GstElement"):
element = p.cast(gdb.lookup_type("GstElement").pointer())
return GdbGstElement(element)
return None
def parent_pad(self):
p = self.parent()
if p != 0 and g_inherits_type(p, "GstPad"):
pad = p.cast(gdb.lookup_type("GstPad").pointer())
return GdbGstPad(pad)
return None
class GdbGstPad(GdbGstObject):
def __init__(self, val):
gdb_type = gdb.lookup_type("GstPad").pointer()
super(GdbGstPad, self).__init__(gdb_type, val)
def __eq__(self, other):
return self.val == other.val
def is_linked(self):
return long(self.val["peer"]) != 0
def peer(self):
return GdbGstPad(self.val["peer"])
def direction(self):
return str(self.val["direction"])
def events(self):
if long(self.val["priv"]) == 0:
return
array = self.val["priv"]["events"]
for ev in _g_array_iter(array, gdb.lookup_type("PadEvent")):
yield GdbGstEvent(ev["event"])
def caps(self):
for ev in self.events():
if ev.typestr() != "caps":
continue
return GdbGstCaps(ev.structure().value("caps").value())
return None
def template_caps(self):
tmp = self.val["padtemplate"]
return GdbGstCaps(tmp["caps"]) if int(tmp) != 0 else None
def mode(self):
m = str(self.val["mode"]).split("_")[-1].lower()
if m in ("push", "pull"):
return m
return None
def pad_type(self):
s = str(self.val["direction"]).split("_")[-1].capitalize()
if g_inherits_type(self.val, "GstGhostPad"):
s += "Ghost"
return s + "Pad"
@save_memory_access_print("Pad(<inaccessible memory>)")
def print(self, indent):
m = ", " + self.mode() if self.mode() else ""
_gdb_write(indent, "%s(%s%s) {" % (self.pad_type(), self.name(), m))
first = True
for ev in self.events():
if first:
_gdb_write(indent+1, "events:")
first = False
ev.print(indent+2)
if self.is_linked():
real = self.peer().parent_pad()
_gdb_write(indent+1, "peer: %s" %
(real.full_name() if real else self.peer().full_name()))
if g_inherits_type(self.val, "GstGhostPad"):
t = gdb.lookup_type("GstProxyPad").pointer()
internal = GdbGstPad(self.val.cast(t)["priv"]["internal"])
if internal and internal.peer():
_gdb_write(indent+1, "inner peer: %s" %
internal.peer().full_name())
task = self.val["task"]
if long(task) != 0:
_gdb_write(indent+1, "task: %s" %
task_state_to_name(int(task["state"])))
offset = long(self.val["offset"])
if offset != 0:
_gdb_write(indent+1, "offset: %d [%s]" %
(offset, format_time(offset, True)))
_gdb_write(indent, "}")
def _dot(self, color, pname, indent):
spc = " " * indent
activation_mode = "-><"
style = "filled,solid"
template = self.val["padtemplate"]
if template != 0:
presence = template["presence"]
if str(presence) == "GST_PAD_SOMETIMES":
style = "filled,dotted"
if str(presence) == "GST_PAD_REQUEST":
style = "filled,dashed"
task_mode = ""
task = self.val["task"]
if long(task) != 0:
task_state = int(task["state"])
if task_state == 0: # started
task_mode = "[T]"
if task_state == 2: # paused
task_mode = "[t]"
f = int(self.val["object"]["flags"])
flags = "B" if f & 16 else "b" # GST_PAD_FLAG_BLOCKED
flags += "F" if f & 32 else "f" # GST_PAD_FLAG_FLUSHING
flags += "B" if f & 16 else "b" # GST_PAD_FLAG_BLOCKING
s = "%s %s_%s [color=black, fillcolor=\"%s\", " \
"label=\"%s%s\\n[%c][%s]%s\", height=\"0.2\", style=\"%s\"];\n" % \
(spc, pname, self.dot_name(), color, self.name(), "",
activation_mode[int(self.val["mode"])], flags, task_mode, style)
return s
def dot(self, indent):
spc = " " * indent
direction = self.direction()
element = self.parent_element()
ename = element.dot_name() if element else ""
s = ""
if g_inherits_type(self.val, "GstGhostPad"):
if direction == "GST_PAD_SRC":
color = "#ffdddd"
elif direction == "GST_PAD_SINK":
color = "#ddddff"
else:
color = "#ffffff"
t = gdb.lookup_type("GstProxyPad").pointer()
other = GdbGstPad(self.val.cast(t)["priv"]["internal"])
if other:
s += other._dot(color, "", indent)
pname = self.dot_name()
other_element = other.parent_element()
other_ename = other_element.dot_name() if other_element else ""
other_pname = other.dot_name()
if direction == "GST_PAD_SRC":
s += "%s%s_%s -> %s_%s [style=dashed, minlen=0]\n" % \
(spc, other_ename, other_pname, ename, pname)
else:
s += "%s%s_%s -> %s_%s [style=dashed, minlen=0]\n" % \
(spc, ename, pname, other_ename, other_pname)
else:
if direction == "GST_PAD_SRC":
color = "#ffaaaa"
elif direction == "GST_PAD_SINK":
color = "#aaaaff"
else:
color = "#cccccc"
s += self._dot(color, ename, indent)
return s
def link_dot(self, indent, element):
spc = " " * indent
peer = self.peer()
peer_element = peer.parent_element()
caps = self.caps()
if not caps:
caps = self.template_caps()
peer_caps = peer.caps()
if not peer_caps:
peer_caps = peer.template_caps()
pname = self.dot_name()
ename = element.dot_name() if element else ""
peer_pname = peer.dot_name()
peer_ename = peer_element.dot_name() if peer_element else ""
if caps and peer_caps and caps == peer_caps:
s = "%s%s_%s -> %s_%s [label=\"%s\"]\n" % \
(spc, ename, pname, peer_ename, peer_pname, caps.dot())
elif caps and peer_caps and caps != peer_caps:
s = "%s%s_%s -> %s_%s [labeldistance=\"10\", labelangle=\"0\", " \
% (spc, ename, pname, peer_ename, peer_pname)
s += "label=\"" + " "*50 + "\", "
if self.direction() == "GST_PAD_SRC":
media_src = caps.dot()
media_dst = peer_caps.dot()
else:
media_src = peer_caps.dot()
media_dst = caps.dot()
s += "taillabel=\"%s\", headlabel=\"%s\"]\n" % \
(media_src, media_dst)
else:
s = "%s%s_%s -> %s_%s\n" % \
(spc, ename, pname, peer_ename, peer_pname)
return s
class GdbGstElement(GdbGstObject):
def __init__(self, val):
gdb_type = gdb.lookup_type("GstElement").pointer()
super(GdbGstElement, self).__init__(gdb_type, val)
self.is_bin = gst_is_bin(self.val)
def __eq__(self, other):
return self.val == other.val
def children(self):
if not self.is_bin:
return
b = self.val.cast(gdb.lookup_type("GstBin").pointer())
link = b["children"]
while link != 0:
yield GdbGstElement(link["data"])
link = link["next"]
def has_pads(self, pad_group="pads"):
return self.val[pad_group] != 0
def pads(self, pad_group="pads"):
link = self.val[pad_group]
while link != 0:
yield GdbGstPad(link["data"])
link = link["next"]
def _state_dot(self):
icons = "~0-=>"
current = int(self.val["current_state"])
pending = int(self.val["pending_state"])
if pending == 0:
# GST_ELEMENT_FLAG_LOCKED_STATE == 16
locked = (int(self.val["object"]["flags"]) & 16) != 0
return "\\n[%c]%s" % (icons[current], "(locked)" if locked else "")
return "\\n[%c] -> [%c]" % (icons[current], icons[pending])
@save_memory_access_print("Element(<inaccessible memory>)")
def print(self, indent):
_gdb_write(indent, "%s(%s) {" %
(g_type_name_from_instance(self.val), self.name()))
for p in self.pads():
p.print(indent+2)
first = True
for child in self.children():
if first:
_gdb_write(indent+2, "children:")
first = False
_gdb_write(indent+3, child.name())
current_state = self.val["current_state"]
s = "state: %s" % element_state_to_name(current_state)
for var in ("pending", "target"):
state = self.val[var + "_state"]
if state > 0 and state != current_state:
s += ", %s: %s" % (var, element_state_to_name(state))
_gdb_write(indent+2, s)
_gdb_write(indent+2, "base_time: %s" %
format_time_value(self.val["base_time"]))
_gdb_write(indent+2, "start_time: %s" %
format_time_value(self.val["start_time"]))
_gdb_write(indent, "}")
@save_memory_access_print("<inaccessible memory>")
def print_tree(self, indent):
_gdb_write(indent, "%s(%s)" % (self.name(), self.val))
for child in self.children():
child.print_tree(indent+1)
def _dot(self, indent=0):
spc = " " * indent
s = "%ssubgraph cluster_%s {\n" % (spc, self.dot_name())
s += "%s fontname=\"Bitstream Vera Sans\";\n" % spc
s += "%s fontsize=\"8\";\n" % spc
s += "%s style=\"filled,rounded\";\n" % spc
s += "%s color=black;\n" % spc
s += "%s label=\"%s\\n%s%s%s\";\n" % \
(spc, g_type_name_from_instance(self.val), self.name(),
self._state_dot(), "")
sink_name = None
if self.has_pads("sinkpads"):
(ss, sink_name) = self._dot_pads(indent+1, "sinkpads",
self.dot_name() + "_sink")
s += ss
src_name = None
if self.has_pads("srcpads"):
(ss, src_name) = self._dot_pads(indent+1, "srcpads",
self.dot_name() + "_src")
s += ss
if sink_name and src_name:
name = self.dot_name()
s += "%s %s_%s -> %s_%s [style=\"invis\"];\n" % \
(spc, name, sink_name, name, src_name)
if gst_is_bin(self.val):
s += "%s fillcolor=\"#ffffff\";\n" % spc
s += self.dot(indent+1)
else:
if src_name and not sink_name:
s += "%s fillcolor=\"#ffaaaa\";\n" % spc
elif not src_name and sink_name:
s += "%s fillcolor=\"#aaaaff\";\n" % spc
elif src_name and sink_name:
s += "%s fillcolor=\"#aaffaa\";\n" % spc
else:
s += "%s fillcolor=\"#ffffff\";\n" % spc
s += "%s}\n\n" % spc
for p in self.pads():
if not p.is_linked():
continue
if p.direction() == "GST_PAD_SRC":
s += p.link_dot(indent, self)
else:
pp = p.peer()
if not g_inherits_type(pp.val, "GstGhostPad") and \
g_inherits_type(pp.val, "GstProxyPad"):
s += pp.link_dot(indent, None)
return s
def _dot_pads(self, indent, pad_group, cluster_name):
spc = " " * indent
s = "%ssubgraph cluster_%s {\n" % (spc, cluster_name)
s += "%s label=\"\";\n" % spc
s += "%s style=\"invis\";\n" % spc
name = None
for p in self.pads(pad_group):
s += p.dot(indent)
if not name:
name = p.dot_name()
s += "%s}\n\n" % spc
return(s, name)
def dot(self, indent):
s = ""
for child in self.children():
try:
s += child._dot(indent)
except gdb.MemoryError:
gdb.write("warning: inaccessible memory in element 0x%x\n" %
long(child.val))
return s
def pipeline_dot(self):
t = g_type_name_from_instance(self.val)
s = "digraph pipeline {\n"
s += " rankdir=LR;\n"
s += " fontname=\"sans\";\n"
s += " fontsize=\"10\";\n"
s += " labelloc=t;\n"
s += " nodesep=.1;\n"
s += " ranksep=.2;\n"
s += " label=\"<%s>\\n%s%s%s\";\n" % (t, self.name(), "", "")
s += " node [style=\"filled,rounded\", shape=box, fontsize=\"9\", " \
"fontname=\"sans\", margin=\"0.0,0.0\"];\n"
s += " edge [labelfontsize=\"6\", fontsize=\"9\", " \
"fontname=\"monospace\"];\n"
s += " \n"
s += " legend [\n"
s += " pos=\"0,0!\",\n"
s += " margin=\"0.05,0.05\",\n"
s += " style=\"filled\",\n"
s += " label=\"Legend\\lElement-States: [~] void-pending, " \
"[0] null, [-] ready, [=] paused, [>] playing\\l" \
"Pad-Activation: [-] none, [>] push, [<] pull\\l" \
"Pad-Flags: [b]locked, [f]lushing, [b]locking, [E]OS; " \
"upper-case is set\\lPad-Task: [T] has started task, " \
"[t] has paused task\\l\",\n"
s += " ];"
s += "\n"
s += self.dot(1)
s += "}\n"
return s
class GstDot(gdb.Command):
"""\
Create a pipeline dot file as close as possible to the output of
GST_DEBUG_BIN_TO_DOT_FILE. This command will find the top-level parent
for the given gstreamer object and create the dot for that element.
Usage: gst-dot <gst-object> <file-name>"""
def __init__(self):
super(GstDot, self).__init__("gst-dot", gdb.COMMAND_DATA)
def invoke(self, arg, from_tty):
self.dont_repeat()
args = gdb.string_to_argv(arg)
if len(args) != 2:
raise Exception("Usage: gst-dot <gst-object> <file>")
value = gdb.parse_and_eval(args[0])
if not value:
raise Exception("'%s' is not a valid object" % args[0])
value = gst_object_from_value(value)
value = gst_object_pipeline(value)
dot = GdbGstElement(value).pipeline_dot()
file = open(args[1], "w")
file.write(dot)
file.close()
def complete(self, text, word):
cmd = gdb.string_to_argv(text)
if len(cmd) == 0 or(len(cmd) == 1 and len(word) > 0):
return gdb.COMPLETE_SYMBOL
return gdb.COMPLETE_FILENAME
class GstPrint(gdb.Command):
"""\
Print high-level information for GStreamer objects
Usage gst-print <gstreamer-object>"""
def __init__(self):
super(GstPrint, self).__init__("gst-print", gdb.COMMAND_DATA,
gdb.COMPLETE_SYMBOL)
def invoke(self, arg, from_tty):
value = gdb.parse_and_eval(arg)
if not value:
raise Exception("'%s' is not a valid object" % arg)
if value.type.code != gdb.TYPE_CODE_PTR:
value = value.address
if g_inherits_type(value, "GstElement"):
obj = GdbGstElement(value)
elif g_inherits_type(value, "GstPad"):
obj = GdbGstPad(value)
elif g_inherits_type(value, "GstCaps"):
obj = GdbGstCaps(value)
elif g_inherits_type(value, "GstEvent"):
obj = GdbGstEvent(value)
elif g_inherits_type(value, "GstQuery"):
obj = GdbGstQuery(value)
elif g_inherits_type(value, "GstBuffer"):
obj = GdbGstBuffer(value)
elif is_gst_type(value, "GstStructure"):
obj = GdbGstStructure(value)
else:
raise Exception("'%s' has an unknown type (%s)" % (arg, value))
obj.print(0)
class GstPipelineTree(gdb.Command):
"""\
Usage: gst-pipeline-tree <gst-object>"""
def __init__(self):
super(GstPipelineTree, self).__init__("gst-pipeline-tree",
gdb.COMPLETE_SYMBOL)
def invoke(self, arg, from_tty):
self.dont_repeat()
args = gdb.string_to_argv(arg)
if len(args) != 1:
raise Exception("Usage: gst-pipeline-tree <gst-object>")
value = gdb.parse_and_eval(args[0])
if not value:
raise Exception("'%s' is not a valid object" % args[0])
value = gst_object_from_value(value)
value = gst_object_pipeline(value)
GdbGstElement(value).print_tree(0)
GstDot()
GstPrint()
GstPipelineTree()
class GstPipeline(gdb.Function):
"""\
Find the top-level pipeline for the given element"""
def __init__(self):
super(GstPipeline, self).__init__("gst_pipeline")
def invoke(self, arg):
value = gst_object_from_value(arg)
return gst_object_pipeline(value)
class GstBinGet(gdb.Function):
"""\
Find a child element with the given name"""
def __init__(self):
super(GstBinGet, self).__init__("gst_bin_get")
def find(self, obj, name, recurse):
for child in obj.children():
if child.name() == name:
return child.val
if recurse:
result = self.find(child, name, recurse)
if result is not None:
return result
def invoke(self, element, arg):
value = gst_object_from_value(element)
if not g_inherits_type(value, "GstElement"):
raise Exception("'%s' is not a GstElement" %
str(value.address))
try:
name = arg.string()
except gdb.error:
raise Exception("Usage: $gst_bin_get(<gst-object>, \"<name>\")")
obj = GdbGstElement(value)
child = self.find(obj, name, False)
if child is None:
child = self.find(obj, name, True)
if child is None:
raise Exception("No child named '%s' found." % name)
return child
class GstElementPad(gdb.Function):
"""\
Get the pad with the given name"""
def __init__(self):
super(GstElementPad, self).__init__("gst_element_pad")
def invoke(self, element, arg):
value = gst_object_from_value(element)
if not g_inherits_type(value, "GstElement"):
raise Exception("'%s' is not a GstElement" %
str(value.address))
try:
name = arg.string()
except gdb.error:
raise Exception("Usage: $gst_element_pad(<gst-object>, \"<pad-name>\")")
obj = GdbGstElement(value)
for pad in obj.pads():
if pad.name() == name:
return pad.val
raise Exception("No pad named '%s' found." % name)
GstPipeline()
GstBinGet()
GstElementPad()
def register(obj):
if obj is None:
obj = gdb
# Make sure this is always used before the glib lookup function.
# Otherwise the gobject pretty printer is used for GstObjects
obj.pretty_printers.insert(0, gst_pretty_printer_lookup)