mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-26 10:10:32 +00:00
7a05a716d2
This is forward compatible to Python 3.
324 lines
8.3 KiB
Python
324 lines
8.3 KiB
Python
# -*- coding: utf-8; mode: python; -*-
|
|
#
|
|
# GStreamer Development Utilities
|
|
#
|
|
# Copyright (C) 2007 René Stadler <mail@renestadler.de>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it
|
|
# under the terms of the GNU General Public License as published by the Free
|
|
# Software Foundation; either version 3 of the License, or (at your option)
|
|
# any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
|
# more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""GStreamer Development Utilities Common utils module."""
|
|
|
|
import os
|
|
import logging
|
|
import subprocess as _subprocess
|
|
|
|
class SingletonMeta (type):
|
|
|
|
def __init__ (cls, name, bases, dict_):
|
|
|
|
from weakref import WeakValueDictionary
|
|
|
|
super (SingletonMeta, cls).__init__ (name, bases, dict_)
|
|
|
|
cls._singleton_instances = WeakValueDictionary ()
|
|
|
|
def __call__ (cls, *a, **kw):
|
|
|
|
kw_key = tuple (sorted (kw.iteritems ()))
|
|
|
|
try:
|
|
obj = cls._singleton_instances[a + kw_key]
|
|
except KeyError:
|
|
obj = super (SingletonMeta, cls).__call__ (*a, **kw)
|
|
cls._singleton_instances[a + kw_key] = obj
|
|
return obj
|
|
|
|
def gettext_cache ():
|
|
|
|
"""Return a callable object that operates like gettext.gettext, but is much
|
|
faster when a string is looked up more than once. This is very useful in
|
|
loops, where calling gettext.gettext can quickly become a major performance
|
|
bottleneck."""
|
|
|
|
from gettext import gettext
|
|
|
|
d = {}
|
|
|
|
def gettext_cache_access (s):
|
|
|
|
if not s in d:
|
|
d[s] = gettext (s)
|
|
return d[s]
|
|
|
|
return gettext_cache_access
|
|
|
|
class ClassProperty (property):
|
|
|
|
"Like the property class, but also invokes the getter for class access."
|
|
|
|
def __init__ (self, fget = None, fset = None, fdel = None, doc = None):
|
|
|
|
property.__init__ (self, fget, fset, fdel, doc)
|
|
|
|
self.__fget = fget
|
|
|
|
def __get__ (self, obj, obj_class = None):
|
|
|
|
ret = property.__get__ (self, obj, obj_class)
|
|
if ret == self:
|
|
return self.__fget (None)
|
|
else:
|
|
return ret
|
|
|
|
class _XDGClass (object):
|
|
|
|
"""Partial implementation of the XDG Base Directory specification v0.6.
|
|
|
|
http://standards.freedesktop.org/basedir-spec/basedir-spec-0.6.html"""
|
|
|
|
def __init__ (self):
|
|
|
|
self._add_base_dir ("DATA_HOME", "~/.local/share")
|
|
self._add_base_dir ("CONFIG_HOME", "~/.config")
|
|
self._add_base_dir ("CACHE_HOME", "~/.cache")
|
|
|
|
def _add_base_dir (self, name, default):
|
|
|
|
dir = os.environ.get ("XDG_%s" % (name,))
|
|
if not dir:
|
|
dir = os.path.expanduser (os.path.join (*default.split ("/")))
|
|
|
|
setattr (self, name, dir)
|
|
|
|
XDG = _XDGClass ()
|
|
|
|
class SaveWriteFile (object):
|
|
|
|
def __init__ (self, filename, mode = "wt"):
|
|
|
|
from tempfile import mkstemp
|
|
|
|
self.logger = logging.getLogger ("tempfile")
|
|
|
|
dir = os.path.dirname (filename)
|
|
base_name = os.path.basename (filename)
|
|
temp_prefix = "%s-tmp" % (base_name,)
|
|
|
|
if dir:
|
|
# Destination dir differs from current directory, ensure that it
|
|
# exists:
|
|
try:
|
|
os.makedirs (dir)
|
|
except OSError:
|
|
pass
|
|
|
|
self.clean_stale (dir, temp_prefix)
|
|
|
|
fd, temp_name = mkstemp (dir = dir, prefix = temp_prefix)
|
|
|
|
self.target_name = filename
|
|
self.temp_name = temp_name
|
|
self.real_file = os.fdopen (fd, mode)
|
|
|
|
def __enter__ (self):
|
|
|
|
return self
|
|
|
|
def __exit__ (self, *exc_args):
|
|
|
|
if exc_args == (None, None, None,):
|
|
self.close ()
|
|
else:
|
|
self.discard ()
|
|
|
|
def __del__ (self):
|
|
|
|
try:
|
|
self.discard ()
|
|
except AttributeError:
|
|
# If __init__ failed, self has no real_file attribute.
|
|
pass
|
|
|
|
def __close_real (self):
|
|
|
|
if self.real_file:
|
|
self.real_file.close ()
|
|
self.real_file = None
|
|
|
|
def clean_stale (self, dir, temp_prefix):
|
|
|
|
from time import time
|
|
from glob import glob
|
|
|
|
now = time ()
|
|
pattern = os.path.join (dir, "%s*" % (temp_prefix,))
|
|
|
|
for temp_filename in glob (pattern):
|
|
mtime = os.stat (temp_filename).st_mtime
|
|
if now - mtime > 3600:
|
|
self.logger.info ("deleting stale temporary file %s",
|
|
temp_filename)
|
|
try:
|
|
os.unlink (temp_filename)
|
|
except EnvironmentError as exc:
|
|
self.logger.warning ("deleting stale temporary file "
|
|
"failed: %s", exc)
|
|
|
|
def tell (self, *a, **kw):
|
|
|
|
return self.real_file.tell (*a, **kw)
|
|
|
|
def write (self, *a, **kw):
|
|
|
|
return self.real_file.write (*a, **kw)
|
|
|
|
def close (self):
|
|
|
|
self.__close_real ()
|
|
|
|
if self.temp_name:
|
|
try:
|
|
os.rename (self.temp_name, self.target_name)
|
|
except OSError as exc:
|
|
import errno
|
|
if exc.errno == errno.EEXIST:
|
|
# We are probably on windows.
|
|
os.unlink (self.target_name)
|
|
os.rename (self.temp_name, self.target_name)
|
|
self.temp_name = None
|
|
|
|
def discard (self):
|
|
|
|
self.__close_real ()
|
|
|
|
if self.temp_name:
|
|
|
|
try:
|
|
os.unlink (self.temp_name)
|
|
except EnvironmentError as exc:
|
|
self.logger.warning ("deleting temporary file failed: %s", exc)
|
|
self.temp_name = None
|
|
|
|
class TeeWriteFile (object):
|
|
|
|
# TODO Py2.5: Add context manager methods.
|
|
|
|
def __init__ (self, *file_objects):
|
|
|
|
self.files = list (file_objects)
|
|
|
|
def close (self):
|
|
|
|
for file in self.files:
|
|
file.close ()
|
|
|
|
def flush (self):
|
|
|
|
for file in self.files:
|
|
file.flush ()
|
|
|
|
def write (self, string):
|
|
|
|
for file in self.files:
|
|
file.write (string)
|
|
|
|
def writelines (self, lines):
|
|
|
|
for file in self.files:
|
|
file.writelines (lines)
|
|
|
|
class FixedPopen (_subprocess.Popen):
|
|
|
|
def __init__ (self, args, **kw):
|
|
|
|
# Unconditionally specify all descriptors as redirected, to
|
|
# work around Python bug #1358527 (which is triggered for
|
|
# console-less applications on Windows).
|
|
|
|
close = []
|
|
|
|
for name in ("stdin", "stdout", "stderr",):
|
|
target = kw.get (name)
|
|
if not target:
|
|
kw[name] = _subprocess.PIPE
|
|
close.append (name)
|
|
|
|
_subprocess.Popen.__init__ (self, args, **kw)
|
|
|
|
for name in close:
|
|
fp = getattr (self, name)
|
|
fp.close ()
|
|
setattr (self, name, None)
|
|
|
|
class DevhelpError (EnvironmentError):
|
|
|
|
pass
|
|
|
|
class DevhelpUnavailableError (DevhelpError):
|
|
|
|
pass
|
|
|
|
class DevhelpClient (object):
|
|
|
|
def available (self):
|
|
|
|
try:
|
|
self.version ()
|
|
except DevhelpUnavailableError:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def version (self):
|
|
|
|
return self._invoke ("--version")
|
|
|
|
def search (self, entry):
|
|
|
|
self._invoke_no_interact ("-s", entry)
|
|
|
|
def _check_os_error (self, exc):
|
|
|
|
import errno
|
|
if exc.errno == errno.ENOENT:
|
|
raise DevhelpUnavailableError ()
|
|
|
|
def _invoke (self, *args):
|
|
|
|
from subprocess import PIPE
|
|
|
|
try:
|
|
proc = FixedPopen (("devhelp",) + args,
|
|
stdout = PIPE)
|
|
except OSError as exc:
|
|
self._check_os_error (exc)
|
|
raise
|
|
|
|
out, err = proc.communicate ()
|
|
|
|
if proc.returncode is not None and proc.returncode != 0:
|
|
raise DevhelpError ("devhelp exited with status %i"
|
|
% (proc.returncode,))
|
|
return out
|
|
|
|
def _invoke_no_interact (self, *args):
|
|
|
|
from subprocess import PIPE
|
|
|
|
try:
|
|
proc = FixedPopen (("devhelp",) + args)
|
|
except OSError as exc:
|
|
self._check_os_error (exc)
|
|
raise
|
|
|