gstreamer/subprojects/gst-plugins-good/tests/examples/rpicamsrc/dynamicprops.py

69 lines
1.8 KiB
Python
Raw Normal View History

#!/usr/bin/python3
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
def bus_call(bus, msg, *args):
# print("BUSCALL", msg, msg.type, *args)
if msg.type == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
return
elif msg.type == Gst.MessageType.ERROR:
print("GST ERROR", msg.parse_error())
loop.quit()
return
return True
saturation = -100
def set_saturation(pipeline):
global saturation
if saturation <= 100:
print("Setting saturation to {0}".format(saturation))
videosrc.set_property("saturation", saturation)
videosrc.set_property("annotation-text", "Saturation %d" % (saturation))
else:
pipeline.send_event (Gst.Event.new_eos())
return False
saturation += 10
return True
if __name__ == "__main__":
GObject.threads_init()
# initialization
loop = GObject.MainLoop()
Gst.init(None)
pipeline = Gst.parse_launch ("rpicamsrc name=src ! video/x-h264,width=320,height=240 ! h264parse ! mp4mux ! filesink name=s")
if pipeline == None:
print ("Failed to create pipeline")
sys.exit(0)
# watch for messages on the pipeline's bus (note that this will only
# work like this when a GLib main loop is running)
bus = pipeline.get_bus()
bus.add_watch(0, bus_call, loop)
videosrc = pipeline.get_by_name ("src")
videosrc.set_property("saturation", saturation)
videosrc.set_property("annotation-mode", 1)
sink = pipeline.get_by_name ("s")
sink.set_property ("location", "test.mp4")
# this will call set_saturation every 1s
GObject.timeout_add(1000, set_saturation, pipeline)
# run
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except Exception as e:
print(e)
# cleanup
pipeline.set_state(Gst.State.NULL)