python: update examples

Update various examples for brokenness, bindings and pep8 format changes

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5187>
This commit is contained in:
Jan Schmidt 2023-08-15 14:57:09 +10:00 committed by GStreamer Marge Bot
parent f48442d62a
commit 21bc5ebd7b
3 changed files with 51 additions and 30 deletions

View file

@ -11,14 +11,15 @@ import random
import gi import gi
gi.require_version('Gst', '1.0') gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0') gi.require_version('GLib', '2.0')
gi.require_version('GObject', '2.0') from gi.repository import GLib, Gst
from gi.repository import GLib, GObject, Gst
class ProbeData: class ProbeData:
def __init__(self, pipe, src): def __init__(self, pipe, src):
self.pipe = pipe self.pipe = pipe
self.src = src self.src = src
def bus_call(bus, message, loop): def bus_call(bus, message, loop):
t = message.type t = message.type
if t == Gst.MessageType.EOS: if t == Gst.MessageType.EOS:
@ -30,9 +31,11 @@ def bus_call(bus, message, loop):
loop.quit() loop.quit()
return True return True
def dispose_src_cb(src): def dispose_src_cb(src):
src.set_state(Gst.State.NULL) src.set_state(Gst.State.NULL)
def probe_cb(pad, info, pdata): def probe_cb(pad, info, pdata):
peer = pad.get_peer() peer = pad.get_peer()
pad.unlink(peer) pad.unlink(peer)
@ -42,8 +45,9 @@ def probe_cb(pad, info, pdata):
pdata.src = Gst.ElementFactory.make('videotestsrc') pdata.src = Gst.ElementFactory.make('videotestsrc')
pdata.src.props.pattern = random.randint(0, 24) pdata.src.props.pattern = random.randint(0, 24)
pdata.src.props.is_live = True
pdata.pipe.add(pdata.src) pdata.pipe.add(pdata.src)
srcpad = pdata.src.get_static_pad ("src") srcpad = pdata.src.get_static_pad("src")
srcpad.link(peer) srcpad.link(peer)
pdata.src.sync_state_with_parent() pdata.src.sync_state_with_parent()
@ -51,13 +55,14 @@ def probe_cb(pad, info, pdata):
return Gst.PadProbeReturn.REMOVE return Gst.PadProbeReturn.REMOVE
def timeout_cb(pdata): def timeout_cb(pdata):
srcpad = pdata.src.get_static_pad('src') srcpad = pdata.src.get_static_pad('src')
srcpad.add_probe(Gst.PadProbeType.IDLE, probe_cb, pdata) srcpad.add_probe(Gst.PadProbeType.IDLE, probe_cb, pdata)
return GLib.SOURCE_REMOVE return GLib.SOURCE_REMOVE
def main(args): def main(args):
GObject.threads_init()
Gst.init(None) Gst.init(None)
pipe = Gst.Pipeline.new('dynamic') pipe = Gst.Pipeline.new('dynamic')
@ -68,23 +73,24 @@ def main(args):
pdata = ProbeData(pipe, src) pdata = ProbeData(pipe, src)
loop = GObject.MainLoop() loop = GLib.MainLoop()
GLib.timeout_add_seconds(1, timeout_cb, pdata) GLib.timeout_add_seconds(1, timeout_cb, pdata)
bus = pipe.get_bus() bus = pipe.get_bus()
bus.add_signal_watch() bus.add_signal_watch()
bus.connect ("message", bus_call, loop) bus.connect("message", bus_call, loop)
# start play back and listen to events # start play back and listen to events
pipe.set_state(Gst.State.PLAYING) pipe.set_state(Gst.State.PLAYING)
try: try:
loop.run() loop.run()
except: except e:
pass pass
# cleanup # cleanup
pipe.set_state(Gst.State.NULL) pipe.set_state(Gst.State.NULL)
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main(sys.argv)) sys.exit(main(sys.argv))

View file

@ -4,7 +4,9 @@ import sys
import gi import gi
gi.require_version('Gst', '1.0') gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst gi.require_version('GLib', '2.0')
from gi.repository import GLib, Gst
def bus_call(bus, message, loop): def bus_call(bus, message, loop):
t = message.type t = message.type
@ -17,12 +19,12 @@ def bus_call(bus, message, loop):
loop.quit() loop.quit()
return True return True
def main(args): def main(args):
if len(args) != 2: if len(args) != 2:
sys.stderr.write("usage: %s <media file or uri>\n" % args[0]) sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
sys.exit(1) sys.exit(1)
GObject.threads_init()
Gst.init(None) Gst.init(None)
playbin = Gst.ElementFactory.make("playbin", None) playbin = Gst.ElementFactory.make("playbin", None)
@ -32,27 +34,28 @@ def main(args):
# take the commandline argument and ensure that it is a uri # take the commandline argument and ensure that it is a uri
if Gst.uri_is_valid(args[1]): if Gst.uri_is_valid(args[1]):
uri = args[1] uri = args[1]
else: else:
uri = Gst.filename_to_uri(args[1]) uri = Gst.filename_to_uri(args[1])
playbin.set_property('uri', uri) playbin.set_property('uri', uri)
# create and event loop and feed gstreamer bus mesages to it # create and event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop() loop = GLib.MainLoop()
bus = playbin.get_bus() bus = playbin.get_bus()
bus.add_signal_watch() bus.add_signal_watch()
bus.connect ("message", bus_call, loop) bus.connect("message", bus_call, loop)
# start play back and listed to events # start play back and listed to events
playbin.set_state(Gst.State.PLAYING) playbin.set_state(Gst.State.PLAYING)
try: try:
loop.run() loop.run()
except: except e:
pass pass
# cleanup # cleanup
playbin.set_state(Gst.State.NULL) playbin.set_state(Gst.State.NULL)
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main(sys.argv)) sys.exit(main(sys.argv))

View file

@ -12,6 +12,7 @@ gi.require_version('GLib', '2.0')
gi.require_version('GObject', '2.0') gi.require_version('GObject', '2.0')
from gi.repository import GLib, GObject, Gst, GstPbutils from gi.repository import GLib, GObject, Gst, GstPbutils
def bus_call(bus, message, loop): def bus_call(bus, message, loop):
t = message.type t = message.type
Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL, "test") Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL, "test")
@ -24,6 +25,7 @@ def bus_call(bus, message, loop):
loop.quit() loop.quit()
return True return True
def stop(loop, pipeline): def stop(loop, pipeline):
_, position = pipeline.query_position(Gst.Format.TIME) _, position = pipeline.query_position(Gst.Format.TIME)
print("Position: %s\r" % Gst.TIME_ARGS(position)) print("Position: %s\r" % Gst.TIME_ARGS(position))
@ -35,11 +37,18 @@ def stop(loop, pipeline):
return True return True
def _link_to_filesink(element, pad, filesink):
if not element.link(filesink):
print(f"Failed to link transcodebin to output.")
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
Gst.init(sys.argv) Gst.init(sys.argv)
if len(sys.argv) != 2: if len(sys.argv) != 2:
print("Missing <output-location> parametter") print("Missing <output-location.ogg> parameter")
sys.exit(1) sys.exit(1)
monitor = Gst.DeviceMonitor.new() monitor = Gst.DeviceMonitor.new()
@ -58,7 +67,7 @@ if __name__ == "__main__":
if len(default) == 1: if len(default) == 1:
device = default[0] device = default[0]
else: else:
print("Avalaible microphones:") print("Available microphones:")
for i, d in enumerate(devices): for i, d in enumerate(devices):
print("%d - %s" % (i, d.get_display_name())) print("%d - %s" % (i, d.get_display_name()))
res = int(input("Select device: ")) res = int(input("Select device: "))
@ -66,14 +75,17 @@ if __name__ == "__main__":
pipeline = Gst.ElementFactory.make("pipeline", None) pipeline = Gst.ElementFactory.make("pipeline", None)
source = device.create_element() source = device.create_element()
conv = Gst.ElementFactory.make("audioconvert", None)
transcodebin = Gst.ElementFactory.make("transcodebin", None) transcodebin = Gst.ElementFactory.make("transcodebin", None)
Gst.util_set_object_arg(transcodebin, "profile", "video/ogg:audio/x-opus") Gst.util_set_object_arg(transcodebin, "profile", "video/ogg:audio/x-opus")
filesink = Gst.ElementFactory.make("filesink", None) filesink = Gst.ElementFactory.make("filesink", None)
filesink.props.location = sys.argv[1] filesink.props.location = sys.argv[1]
pipeline.add(source, transcodebin, filesink) pipeline.add(source, conv, transcodebin, filesink)
source.link(transcodebin) source.link(conv)
transcodebin.link(filesink) conv.link(transcodebin)
transcodebin.connect('pad-added', _link_to_filesink, filesink)
pipeline.set_state(Gst.State.PLAYING) pipeline.set_state(Gst.State.PLAYING)
@ -82,7 +94,7 @@ if __name__ == "__main__":
loop = GLib.MainLoop() loop = GLib.MainLoop()
GLib.timeout_add_seconds(1, stop, loop, pipeline) GLib.timeout_add_seconds(1, stop, loop, pipeline)
bus.connect ("message", bus_call, loop) bus.connect("message", bus_call, loop)
loop.run() loop.run()
pipeline.set_state(Gst.State.NULL) pipeline.set_state(Gst.State.NULL)