diff --git a/Cargo.toml b/Cargo.toml index a7d1565c..0f120a61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,11 @@ members = [ "gst-plugin-http", "gst-plugin-flv", "gst-plugin-audiofx", + "gst-plugin-togglerecord", ] [profile.release] lto = true -debug-assertions = false opt-level = 3 debug = true panic = 'unwind' diff --git a/LICENSE-LGPLv2 b/LICENSE-LGPLv2 new file mode 100644 index 00000000..4362b491 --- /dev/null +++ b/LICENSE-LGPLv2 @@ -0,0 +1,502 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 2.1, February 1999 + + Copyright (C) 1991, 1999 Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + +[This is the first released version of the Lesser GPL. It also counts + as the successor of the GNU Library Public License, version 2, hence + the version number 2.1.] + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +Licenses are intended to guarantee your freedom to share and change +free software--to make sure the software is free for all its users. + + This license, the Lesser General Public License, applies to some +specially designated software packages--typically libraries--of the +Free Software Foundation and other authors who decide to use it. You +can use it too, but we suggest you first think carefully about whether +this license or the ordinary General Public License is the better +strategy to use in any particular case, based on the explanations below. + + When we speak of free software, we are referring to freedom of use, +not price. Our General Public Licenses are designed to make sure that +you have the freedom to distribute copies of free software (and charge +for this service if you wish); that you receive source code or can get +it if you want it; that you can change the software and use pieces of +it in new free programs; and that you are informed that you can do +these things. + + To protect your rights, we need to make restrictions that forbid +distributors to deny you these rights or to ask you to surrender these +rights. These restrictions translate to certain responsibilities for +you if you distribute copies of the library or if you modify it. + + For example, if you distribute copies of the library, whether gratis +or for a fee, you must give the recipients all the rights that we gave +you. You must make sure that they, too, receive or can get the source +code. If you link other code with the library, you must provide +complete object files to the recipients, so that they can relink them +with the library after making changes to the library and recompiling +it. And you must show them these terms so they know their rights. + + We protect your rights with a two-step method: (1) we copyright the +library, and (2) we offer you this license, which gives you legal +permission to copy, distribute and/or modify the library. + + To protect each distributor, we want to make it very clear that +there is no warranty for the free library. Also, if the library is +modified by someone else and passed on, the recipients should know +that what they have is not the original version, so that the original +author's reputation will not be affected by problems that might be +introduced by others. + + Finally, software patents pose a constant threat to the existence of +any free program. We wish to make sure that a company cannot +effectively restrict the users of a free program by obtaining a +restrictive license from a patent holder. Therefore, we insist that +any patent license obtained for a version of the library must be +consistent with the full freedom of use specified in this license. + + Most GNU software, including some libraries, is covered by the +ordinary GNU General Public License. This license, the GNU Lesser +General Public License, applies to certain designated libraries, and +is quite different from the ordinary General Public License. We use +this license for certain libraries in order to permit linking those +libraries into non-free programs. + + When a program is linked with a library, whether statically or using +a shared library, the combination of the two is legally speaking a +combined work, a derivative of the original library. The ordinary +General Public License therefore permits such linking only if the +entire combination fits its criteria of freedom. The Lesser General +Public License permits more lax criteria for linking other code with +the library. + + We call this license the "Lesser" General Public License because it +does Less to protect the user's freedom than the ordinary General +Public License. It also provides other free software developers Less +of an advantage over competing non-free programs. These disadvantages +are the reason we use the ordinary General Public License for many +libraries. However, the Lesser license provides advantages in certain +special circumstances. + + For example, on rare occasions, there may be a special need to +encourage the widest possible use of a certain library, so that it becomes +a de-facto standard. To achieve this, non-free programs must be +allowed to use the library. A more frequent case is that a free +library does the same job as widely used non-free libraries. In this +case, there is little to gain by limiting the free library to free +software only, so we use the Lesser General Public License. + + In other cases, permission to use a particular library in non-free +programs enables a greater number of people to use a large body of +free software. For example, permission to use the GNU C Library in +non-free programs enables many more people to use the whole GNU +operating system, as well as its variant, the GNU/Linux operating +system. + + Although the Lesser General Public License is Less protective of the +users' freedom, it does ensure that the user of a program that is +linked with the Library has the freedom and the wherewithal to run +that program using a modified version of the Library. + + The precise terms and conditions for copying, distribution and +modification follow. Pay close attention to the difference between a +"work based on the library" and a "work that uses the library". The +former contains code derived from the library, whereas the latter must +be combined with the library in order to run. + + GNU LESSER GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License Agreement applies to any software library or other +program which contains a notice placed by the copyright holder or +other authorized party saying it may be distributed under the terms of +this Lesser General Public License (also called "this License"). +Each licensee is addressed as "you". + + A "library" means a collection of software functions and/or data +prepared so as to be conveniently linked with application programs +(which use some of those functions and data) to form executables. + + The "Library", below, refers to any such software library or work +which has been distributed under these terms. A "work based on the +Library" means either the Library or any derivative work under +copyright law: that is to say, a work containing the Library or a +portion of it, either verbatim or with modifications and/or translated +straightforwardly into another language. (Hereinafter, translation is +included without limitation in the term "modification".) + + "Source code" for a work means the preferred form of the work for +making modifications to it. For a library, complete source code means +all the source code for all modules it contains, plus any associated +interface definition files, plus the scripts used to control compilation +and installation of the library. + + Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running a program using the Library is not restricted, and output from +such a program is covered only if its contents constitute a work based +on the Library (independent of the use of the Library in a tool for +writing it). Whether that is true depends on what the Library does +and what the program that uses the Library does. + + 1. You may copy and distribute verbatim copies of the Library's +complete source code as you receive it, in any medium, provided that +you conspicuously and appropriately publish on each copy an +appropriate copyright notice and disclaimer of warranty; keep intact +all the notices that refer to this License and to the absence of any +warranty; and distribute a copy of this License along with the +Library. + + You may charge a fee for the physical act of transferring a copy, +and you may at your option offer warranty protection in exchange for a +fee. + + 2. You may modify your copy or copies of the Library or any portion +of it, thus forming a work based on the Library, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) The modified work must itself be a software library. + + b) You must cause the files modified to carry prominent notices + stating that you changed the files and the date of any change. + + c) You must cause the whole of the work to be licensed at no + charge to all third parties under the terms of this License. + + d) If a facility in the modified Library refers to a function or a + table of data to be supplied by an application program that uses + the facility, other than as an argument passed when the facility + is invoked, then you must make a good faith effort to ensure that, + in the event an application does not supply such function or + table, the facility still operates, and performs whatever part of + its purpose remains meaningful. + + (For example, a function in a library to compute square roots has + a purpose that is entirely well-defined independent of the + application. Therefore, Subsection 2d requires that any + application-supplied function or table used by this function must + be optional: if the application does not supply it, the square + root function must still compute square roots.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Library, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Library, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote +it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Library. + +In addition, mere aggregation of another work not based on the Library +with the Library (or with a work based on the Library) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may opt to apply the terms of the ordinary GNU General Public +License instead of this License to a given copy of the Library. To do +this, you must alter all the notices that refer to this License, so +that they refer to the ordinary GNU General Public License, version 2, +instead of to this License. (If a newer version than version 2 of the +ordinary GNU General Public License has appeared, then you can specify +that version instead if you wish.) Do not make any other change in +these notices. + + Once this change is made in a given copy, it is irreversible for +that copy, so the ordinary GNU General Public License applies to all +subsequent copies and derivative works made from that copy. + + This option is useful when you wish to copy part of the code of +the Library into a program that is not a library. + + 4. You may copy and distribute the Library (or a portion or +derivative of it, under Section 2) in object code or executable form +under the terms of Sections 1 and 2 above provided that you accompany +it with the complete corresponding machine-readable source code, which +must be distributed under the terms of Sections 1 and 2 above on a +medium customarily used for software interchange. + + If distribution of object code is made by offering access to copy +from a designated place, then offering equivalent access to copy the +source code from the same place satisfies the requirement to +distribute the source code, even though third parties are not +compelled to copy the source along with the object code. + + 5. A program that contains no derivative of any portion of the +Library, but is designed to work with the Library by being compiled or +linked with it, is called a "work that uses the Library". Such a +work, in isolation, is not a derivative work of the Library, and +therefore falls outside the scope of this License. + + However, linking a "work that uses the Library" with the Library +creates an executable that is a derivative of the Library (because it +contains portions of the Library), rather than a "work that uses the +library". The executable is therefore covered by this License. +Section 6 states terms for distribution of such executables. + + When a "work that uses the Library" uses material from a header file +that is part of the Library, the object code for the work may be a +derivative work of the Library even though the source code is not. +Whether this is true is especially significant if the work can be +linked without the Library, or if the work is itself a library. The +threshold for this to be true is not precisely defined by law. + + If such an object file uses only numerical parameters, data +structure layouts and accessors, and small macros and small inline +functions (ten lines or less in length), then the use of the object +file is unrestricted, regardless of whether it is legally a derivative +work. (Executables containing this object code plus portions of the +Library will still fall under Section 6.) + + Otherwise, if the work is a derivative of the Library, you may +distribute the object code for the work under the terms of Section 6. +Any executables containing that work also fall under Section 6, +whether or not they are linked directly with the Library itself. + + 6. As an exception to the Sections above, you may also combine or +link a "work that uses the Library" with the Library to produce a +work containing portions of the Library, and distribute that work +under terms of your choice, provided that the terms permit +modification of the work for the customer's own use and reverse +engineering for debugging such modifications. + + You must give prominent notice with each copy of the work that the +Library is used in it and that the Library and its use are covered by +this License. You must supply a copy of this License. If the work +during execution displays copyright notices, you must include the +copyright notice for the Library among them, as well as a reference +directing the user to the copy of this License. Also, you must do one +of these things: + + a) Accompany the work with the complete corresponding + machine-readable source code for the Library including whatever + changes were used in the work (which must be distributed under + Sections 1 and 2 above); and, if the work is an executable linked + with the Library, with the complete machine-readable "work that + uses the Library", as object code and/or source code, so that the + user can modify the Library and then relink to produce a modified + executable containing the modified Library. (It is understood + that the user who changes the contents of definitions files in the + Library will not necessarily be able to recompile the application + to use the modified definitions.) + + b) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (1) uses at run time a + copy of the library already present on the user's computer system, + rather than copying library functions into the executable, and (2) + will operate properly with a modified version of the library, if + the user installs one, as long as the modified version is + interface-compatible with the version that the work was made with. + + c) Accompany the work with a written offer, valid for at + least three years, to give the same user the materials + specified in Subsection 6a, above, for a charge no more + than the cost of performing this distribution. + + d) If distribution of the work is made by offering access to copy + from a designated place, offer equivalent access to copy the above + specified materials from the same place. + + e) Verify that the user has already received a copy of these + materials or that you have already sent this user a copy. + + For an executable, the required form of the "work that uses the +Library" must include any data and utility programs needed for +reproducing the executable from it. However, as a special exception, +the materials to be distributed need not include anything that is +normally distributed (in either source or binary form) with the major +components (compiler, kernel, and so on) of the operating system on +which the executable runs, unless that component itself accompanies +the executable. + + It may happen that this requirement contradicts the license +restrictions of other proprietary libraries that do not normally +accompany the operating system. Such a contradiction means you cannot +use both them and the Library together in an executable that you +distribute. + + 7. You may place library facilities that are a work based on the +Library side-by-side in a single library together with other library +facilities not covered by this License, and distribute such a combined +library, provided that the separate distribution of the work based on +the Library and of the other library facilities is otherwise +permitted, and provided that you do these two things: + + a) Accompany the combined library with a copy of the same work + based on the Library, uncombined with any other library + facilities. This must be distributed under the terms of the + Sections above. + + b) Give prominent notice with the combined library of the fact + that part of it is a work based on the Library, and explaining + where to find the accompanying uncombined form of the same work. + + 8. You may not copy, modify, sublicense, link with, or distribute +the Library except as expressly provided under this License. Any +attempt otherwise to copy, modify, sublicense, link with, or +distribute the Library is void, and will automatically terminate your +rights under this License. However, parties who have received copies, +or rights, from you under this License will not have their licenses +terminated so long as such parties remain in full compliance. + + 9. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Library or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Library (or any work based on the +Library), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Library or works based on it. + + 10. Each time you redistribute the Library (or any work based on the +Library), the recipient automatically receives a license from the +original licensor to copy, distribute, link with or modify the Library +subject to these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties with +this License. + + 11. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Library at all. For example, if a patent +license would not permit royalty-free redistribution of the Library by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Library. + +If any portion of this section is held invalid or unenforceable under any +particular circumstance, the balance of the section is intended to apply, +and the section as a whole is intended to apply in other circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 12. If the distribution and/or use of the Library is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Library under this License may add +an explicit geographical distribution limitation excluding those countries, +so that distribution is permitted only in or among countries not thus +excluded. In such case, this License incorporates the limitation as if +written in the body of this License. + + 13. The Free Software Foundation may publish revised and/or new +versions of the Lesser General Public License from time to time. +Such new versions will be similar in spirit to the present version, +but may differ in detail to address new problems or concerns. + +Each version is given a distinguishing version number. If the Library +specifies a version number of this License which applies to it and +"any later version", you have the option of following the terms and +conditions either of that version or of any later version published by +the Free Software Foundation. If the Library does not specify a +license version number, you may choose any version ever published by +the Free Software Foundation. + + 14. If you wish to incorporate parts of the Library into other free +programs whose distribution conditions are incompatible with these, +write to the author to ask for permission. For software which is +copyrighted by the Free Software Foundation, write to the Free +Software Foundation; we sometimes make exceptions for this. Our +decision will be guided by the two goals of preserving the free status +of all derivatives of our free software and of promoting the sharing +and reuse of software generally. + + NO WARRANTY + + 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO +WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW. +EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR +OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY +KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME +THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN +WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY +AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU +FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR +CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE +LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING +RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A +FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF +SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Libraries + + If you develop a new library, and you want it to be of the greatest +possible use to the public, we recommend making it free software that +everyone can redistribute and change. You can do so by permitting +redistribution under these terms (or, alternatively, under the terms of the +ordinary General Public License). + + To apply these terms, attach the following notices to the library. It is +safest to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least the +"copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +Also add information on how to contact you by electronic and paper mail. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the library, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the + library `Frob' (a library for tweaking knobs) written by James Random Hacker. + + , 1 April 1990 + Ty Coon, President of Vice + +That's all there is to it! diff --git a/gst-plugin-togglerecord/Cargo.toml b/gst-plugin-togglerecord/Cargo.toml new file mode 100644 index 00000000..c1416891 --- /dev/null +++ b/gst-plugin-togglerecord/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "gst-plugin-togglerecord" +version = "0.1.0" +authors = ["Sebastian Dröge "] +license = "LGPL-2.1+" + +[dependencies] +glib = { git = "https://github.com/gtk-rs/glib" } +gstreamer = { git = "https://github.com/sdroege/gstreamer-rs", features = ["v1_10"] } +gst-plugin = { path = "../gst-plugin" } +gtk = { git = "https://github.com/gtk-rs/gtk", features = ["v3_6"], optional = true } +gio = { git = "https://github.com/gtk-rs/gio", optional = true } +send-cell = { version = "0.1", optional = true } +either = "1.0" + +[lib] +name = "gsttogglerecord" +crate-type = ["cdylib"] +path = "src/lib.rs" + +[[example]] +name = "gtk-recording" +path = "examples/gtk_recording.rs" +required-features = ["gtk", "gio", "send-cell"] + diff --git a/gst-plugin-togglerecord/LICENSE b/gst-plugin-togglerecord/LICENSE new file mode 100644 index 00000000..4362b491 --- /dev/null +++ b/gst-plugin-togglerecord/LICENSE @@ -0,0 +1,502 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 2.1, February 1999 + + Copyright (C) 1991, 1999 Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + +[This is the first released version of the Lesser GPL. It also counts + as the successor of the GNU Library Public License, version 2, hence + the version number 2.1.] + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +Licenses are intended to guarantee your freedom to share and change +free software--to make sure the software is free for all its users. + + This license, the Lesser General Public License, applies to some +specially designated software packages--typically libraries--of the +Free Software Foundation and other authors who decide to use it. You +can use it too, but we suggest you first think carefully about whether +this license or the ordinary General Public License is the better +strategy to use in any particular case, based on the explanations below. + + When we speak of free software, we are referring to freedom of use, +not price. Our General Public Licenses are designed to make sure that +you have the freedom to distribute copies of free software (and charge +for this service if you wish); that you receive source code or can get +it if you want it; that you can change the software and use pieces of +it in new free programs; and that you are informed that you can do +these things. + + To protect your rights, we need to make restrictions that forbid +distributors to deny you these rights or to ask you to surrender these +rights. These restrictions translate to certain responsibilities for +you if you distribute copies of the library or if you modify it. + + For example, if you distribute copies of the library, whether gratis +or for a fee, you must give the recipients all the rights that we gave +you. You must make sure that they, too, receive or can get the source +code. If you link other code with the library, you must provide +complete object files to the recipients, so that they can relink them +with the library after making changes to the library and recompiling +it. And you must show them these terms so they know their rights. + + We protect your rights with a two-step method: (1) we copyright the +library, and (2) we offer you this license, which gives you legal +permission to copy, distribute and/or modify the library. + + To protect each distributor, we want to make it very clear that +there is no warranty for the free library. Also, if the library is +modified by someone else and passed on, the recipients should know +that what they have is not the original version, so that the original +author's reputation will not be affected by problems that might be +introduced by others. + + Finally, software patents pose a constant threat to the existence of +any free program. We wish to make sure that a company cannot +effectively restrict the users of a free program by obtaining a +restrictive license from a patent holder. Therefore, we insist that +any patent license obtained for a version of the library must be +consistent with the full freedom of use specified in this license. + + Most GNU software, including some libraries, is covered by the +ordinary GNU General Public License. This license, the GNU Lesser +General Public License, applies to certain designated libraries, and +is quite different from the ordinary General Public License. We use +this license for certain libraries in order to permit linking those +libraries into non-free programs. + + When a program is linked with a library, whether statically or using +a shared library, the combination of the two is legally speaking a +combined work, a derivative of the original library. The ordinary +General Public License therefore permits such linking only if the +entire combination fits its criteria of freedom. The Lesser General +Public License permits more lax criteria for linking other code with +the library. + + We call this license the "Lesser" General Public License because it +does Less to protect the user's freedom than the ordinary General +Public License. It also provides other free software developers Less +of an advantage over competing non-free programs. These disadvantages +are the reason we use the ordinary General Public License for many +libraries. However, the Lesser license provides advantages in certain +special circumstances. + + For example, on rare occasions, there may be a special need to +encourage the widest possible use of a certain library, so that it becomes +a de-facto standard. To achieve this, non-free programs must be +allowed to use the library. A more frequent case is that a free +library does the same job as widely used non-free libraries. In this +case, there is little to gain by limiting the free library to free +software only, so we use the Lesser General Public License. + + In other cases, permission to use a particular library in non-free +programs enables a greater number of people to use a large body of +free software. For example, permission to use the GNU C Library in +non-free programs enables many more people to use the whole GNU +operating system, as well as its variant, the GNU/Linux operating +system. + + Although the Lesser General Public License is Less protective of the +users' freedom, it does ensure that the user of a program that is +linked with the Library has the freedom and the wherewithal to run +that program using a modified version of the Library. + + The precise terms and conditions for copying, distribution and +modification follow. Pay close attention to the difference between a +"work based on the library" and a "work that uses the library". The +former contains code derived from the library, whereas the latter must +be combined with the library in order to run. + + GNU LESSER GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License Agreement applies to any software library or other +program which contains a notice placed by the copyright holder or +other authorized party saying it may be distributed under the terms of +this Lesser General Public License (also called "this License"). +Each licensee is addressed as "you". + + A "library" means a collection of software functions and/or data +prepared so as to be conveniently linked with application programs +(which use some of those functions and data) to form executables. + + The "Library", below, refers to any such software library or work +which has been distributed under these terms. A "work based on the +Library" means either the Library or any derivative work under +copyright law: that is to say, a work containing the Library or a +portion of it, either verbatim or with modifications and/or translated +straightforwardly into another language. (Hereinafter, translation is +included without limitation in the term "modification".) + + "Source code" for a work means the preferred form of the work for +making modifications to it. For a library, complete source code means +all the source code for all modules it contains, plus any associated +interface definition files, plus the scripts used to control compilation +and installation of the library. + + Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running a program using the Library is not restricted, and output from +such a program is covered only if its contents constitute a work based +on the Library (independent of the use of the Library in a tool for +writing it). Whether that is true depends on what the Library does +and what the program that uses the Library does. + + 1. You may copy and distribute verbatim copies of the Library's +complete source code as you receive it, in any medium, provided that +you conspicuously and appropriately publish on each copy an +appropriate copyright notice and disclaimer of warranty; keep intact +all the notices that refer to this License and to the absence of any +warranty; and distribute a copy of this License along with the +Library. + + You may charge a fee for the physical act of transferring a copy, +and you may at your option offer warranty protection in exchange for a +fee. + + 2. You may modify your copy or copies of the Library or any portion +of it, thus forming a work based on the Library, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) The modified work must itself be a software library. + + b) You must cause the files modified to carry prominent notices + stating that you changed the files and the date of any change. + + c) You must cause the whole of the work to be licensed at no + charge to all third parties under the terms of this License. + + d) If a facility in the modified Library refers to a function or a + table of data to be supplied by an application program that uses + the facility, other than as an argument passed when the facility + is invoked, then you must make a good faith effort to ensure that, + in the event an application does not supply such function or + table, the facility still operates, and performs whatever part of + its purpose remains meaningful. + + (For example, a function in a library to compute square roots has + a purpose that is entirely well-defined independent of the + application. Therefore, Subsection 2d requires that any + application-supplied function or table used by this function must + be optional: if the application does not supply it, the square + root function must still compute square roots.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Library, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Library, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote +it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Library. + +In addition, mere aggregation of another work not based on the Library +with the Library (or with a work based on the Library) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may opt to apply the terms of the ordinary GNU General Public +License instead of this License to a given copy of the Library. To do +this, you must alter all the notices that refer to this License, so +that they refer to the ordinary GNU General Public License, version 2, +instead of to this License. (If a newer version than version 2 of the +ordinary GNU General Public License has appeared, then you can specify +that version instead if you wish.) Do not make any other change in +these notices. + + Once this change is made in a given copy, it is irreversible for +that copy, so the ordinary GNU General Public License applies to all +subsequent copies and derivative works made from that copy. + + This option is useful when you wish to copy part of the code of +the Library into a program that is not a library. + + 4. You may copy and distribute the Library (or a portion or +derivative of it, under Section 2) in object code or executable form +under the terms of Sections 1 and 2 above provided that you accompany +it with the complete corresponding machine-readable source code, which +must be distributed under the terms of Sections 1 and 2 above on a +medium customarily used for software interchange. + + If distribution of object code is made by offering access to copy +from a designated place, then offering equivalent access to copy the +source code from the same place satisfies the requirement to +distribute the source code, even though third parties are not +compelled to copy the source along with the object code. + + 5. A program that contains no derivative of any portion of the +Library, but is designed to work with the Library by being compiled or +linked with it, is called a "work that uses the Library". Such a +work, in isolation, is not a derivative work of the Library, and +therefore falls outside the scope of this License. + + However, linking a "work that uses the Library" with the Library +creates an executable that is a derivative of the Library (because it +contains portions of the Library), rather than a "work that uses the +library". The executable is therefore covered by this License. +Section 6 states terms for distribution of such executables. + + When a "work that uses the Library" uses material from a header file +that is part of the Library, the object code for the work may be a +derivative work of the Library even though the source code is not. +Whether this is true is especially significant if the work can be +linked without the Library, or if the work is itself a library. The +threshold for this to be true is not precisely defined by law. + + If such an object file uses only numerical parameters, data +structure layouts and accessors, and small macros and small inline +functions (ten lines or less in length), then the use of the object +file is unrestricted, regardless of whether it is legally a derivative +work. (Executables containing this object code plus portions of the +Library will still fall under Section 6.) + + Otherwise, if the work is a derivative of the Library, you may +distribute the object code for the work under the terms of Section 6. +Any executables containing that work also fall under Section 6, +whether or not they are linked directly with the Library itself. + + 6. As an exception to the Sections above, you may also combine or +link a "work that uses the Library" with the Library to produce a +work containing portions of the Library, and distribute that work +under terms of your choice, provided that the terms permit +modification of the work for the customer's own use and reverse +engineering for debugging such modifications. + + You must give prominent notice with each copy of the work that the +Library is used in it and that the Library and its use are covered by +this License. You must supply a copy of this License. If the work +during execution displays copyright notices, you must include the +copyright notice for the Library among them, as well as a reference +directing the user to the copy of this License. Also, you must do one +of these things: + + a) Accompany the work with the complete corresponding + machine-readable source code for the Library including whatever + changes were used in the work (which must be distributed under + Sections 1 and 2 above); and, if the work is an executable linked + with the Library, with the complete machine-readable "work that + uses the Library", as object code and/or source code, so that the + user can modify the Library and then relink to produce a modified + executable containing the modified Library. (It is understood + that the user who changes the contents of definitions files in the + Library will not necessarily be able to recompile the application + to use the modified definitions.) + + b) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (1) uses at run time a + copy of the library already present on the user's computer system, + rather than copying library functions into the executable, and (2) + will operate properly with a modified version of the library, if + the user installs one, as long as the modified version is + interface-compatible with the version that the work was made with. + + c) Accompany the work with a written offer, valid for at + least three years, to give the same user the materials + specified in Subsection 6a, above, for a charge no more + than the cost of performing this distribution. + + d) If distribution of the work is made by offering access to copy + from a designated place, offer equivalent access to copy the above + specified materials from the same place. + + e) Verify that the user has already received a copy of these + materials or that you have already sent this user a copy. + + For an executable, the required form of the "work that uses the +Library" must include any data and utility programs needed for +reproducing the executable from it. However, as a special exception, +the materials to be distributed need not include anything that is +normally distributed (in either source or binary form) with the major +components (compiler, kernel, and so on) of the operating system on +which the executable runs, unless that component itself accompanies +the executable. + + It may happen that this requirement contradicts the license +restrictions of other proprietary libraries that do not normally +accompany the operating system. Such a contradiction means you cannot +use both them and the Library together in an executable that you +distribute. + + 7. You may place library facilities that are a work based on the +Library side-by-side in a single library together with other library +facilities not covered by this License, and distribute such a combined +library, provided that the separate distribution of the work based on +the Library and of the other library facilities is otherwise +permitted, and provided that you do these two things: + + a) Accompany the combined library with a copy of the same work + based on the Library, uncombined with any other library + facilities. This must be distributed under the terms of the + Sections above. + + b) Give prominent notice with the combined library of the fact + that part of it is a work based on the Library, and explaining + where to find the accompanying uncombined form of the same work. + + 8. You may not copy, modify, sublicense, link with, or distribute +the Library except as expressly provided under this License. Any +attempt otherwise to copy, modify, sublicense, link with, or +distribute the Library is void, and will automatically terminate your +rights under this License. However, parties who have received copies, +or rights, from you under this License will not have their licenses +terminated so long as such parties remain in full compliance. + + 9. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Library or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Library (or any work based on the +Library), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Library or works based on it. + + 10. Each time you redistribute the Library (or any work based on the +Library), the recipient automatically receives a license from the +original licensor to copy, distribute, link with or modify the Library +subject to these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties with +this License. + + 11. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Library at all. For example, if a patent +license would not permit royalty-free redistribution of the Library by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Library. + +If any portion of this section is held invalid or unenforceable under any +particular circumstance, the balance of the section is intended to apply, +and the section as a whole is intended to apply in other circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 12. If the distribution and/or use of the Library is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Library under this License may add +an explicit geographical distribution limitation excluding those countries, +so that distribution is permitted only in or among countries not thus +excluded. In such case, this License incorporates the limitation as if +written in the body of this License. + + 13. The Free Software Foundation may publish revised and/or new +versions of the Lesser General Public License from time to time. +Such new versions will be similar in spirit to the present version, +but may differ in detail to address new problems or concerns. + +Each version is given a distinguishing version number. If the Library +specifies a version number of this License which applies to it and +"any later version", you have the option of following the terms and +conditions either of that version or of any later version published by +the Free Software Foundation. If the Library does not specify a +license version number, you may choose any version ever published by +the Free Software Foundation. + + 14. If you wish to incorporate parts of the Library into other free +programs whose distribution conditions are incompatible with these, +write to the author to ask for permission. For software which is +copyrighted by the Free Software Foundation, write to the Free +Software Foundation; we sometimes make exceptions for this. Our +decision will be guided by the two goals of preserving the free status +of all derivatives of our free software and of promoting the sharing +and reuse of software generally. + + NO WARRANTY + + 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO +WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW. +EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR +OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY +KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME +THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN +WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY +AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU +FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR +CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE +LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING +RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A +FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF +SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Libraries + + If you develop a new library, and you want it to be of the greatest +possible use to the public, we recommend making it free software that +everyone can redistribute and change. You can do so by permitting +redistribution under these terms (or, alternatively, under the terms of the +ordinary General Public License). + + To apply these terms, attach the following notices to the library. It is +safest to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least the +"copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +Also add information on how to contact you by electronic and paper mail. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the library, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the + library `Frob' (a library for tweaking knobs) written by James Random Hacker. + + , 1 April 1990 + Ty Coon, President of Vice + +That's all there is to it! diff --git a/gst-plugin-togglerecord/examples/gtk_recording.rs b/gst-plugin-togglerecord/examples/gtk_recording.rs new file mode 100644 index 00000000..5368f889 --- /dev/null +++ b/gst-plugin-togglerecord/examples/gtk_recording.rs @@ -0,0 +1,349 @@ +// Copyright (C) 2017 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +extern crate glib; +use glib::prelude::*; +extern crate gio; +use gio::prelude::*; + +extern crate gstreamer as gst; +use gst::prelude::*; + +extern crate gtk; +use gtk::prelude::*; + +extern crate send_cell; +use send_cell::SendCell; + +use std::env; + +fn create_pipeline() -> ( + gst::Pipeline, + gst::Pad, + gst::Pad, + gst::Element, + gst::Element, + gtk::Widget, +) { + let pipeline = gst::Pipeline::new(None); + + let video_src = gst::ElementFactory::make("videotestsrc", None).unwrap(); + video_src.set_property("is-live", &true).unwrap(); + video_src.set_property_from_str("pattern", "ball"); + + let timeoverlay = gst::ElementFactory::make("timeoverlay", None).unwrap(); + timeoverlay + .set_property("font-desc", &"Monospace 20") + .unwrap(); + + let video_tee = gst::ElementFactory::make("tee", None).unwrap(); + let video_queue1 = gst::ElementFactory::make("queue", None).unwrap(); + let video_queue2 = gst::ElementFactory::make("queue", None).unwrap(); + + let video_convert1 = gst::ElementFactory::make("videoconvert", None).unwrap(); + let video_convert2 = gst::ElementFactory::make("videoconvert", None).unwrap(); + + let (video_sink, video_widget) = + if let Some(gtkglsink) = gst::ElementFactory::make("gtkglsink", None) { + let glsinkbin = gst::ElementFactory::make("glsinkbin", None).unwrap(); + glsinkbin.set_property("sink", >kglsink).unwrap(); + + let widget = gtkglsink.get_property("widget").unwrap(); + (glsinkbin, widget.get::().unwrap()) + } else { + let sink = gst::ElementFactory::make("gtksink", None).unwrap(); + let widget = sink.get_property("widget").unwrap(); + (sink, widget.get::().unwrap()) + }; + + let video_enc = gst::ElementFactory::make("x264enc", None).unwrap(); + video_enc.set_property("rc-lookahead", &10i32).unwrap(); + video_enc.set_property("key-int-max", &30u32).unwrap(); + let video_parse = gst::ElementFactory::make("h264parse", None).unwrap(); + + let audio_src = gst::ElementFactory::make("audiotestsrc", None).unwrap(); + audio_src.set_property("is-live", &true).unwrap(); + audio_src.set_property_from_str("wave", "ticks"); + + let audio_tee = gst::ElementFactory::make("tee", None).unwrap(); + let audio_queue1 = gst::ElementFactory::make("queue", None).unwrap(); + let audio_queue2 = gst::ElementFactory::make("queue", None).unwrap(); + + let audio_convert1 = gst::ElementFactory::make("audioconvert", None).unwrap(); + let audio_convert2 = gst::ElementFactory::make("audioconvert", None).unwrap(); + + let audio_sink = gst::ElementFactory::make("autoaudiosink", None).unwrap(); + + let audio_enc = gst::ElementFactory::make("lamemp3enc", None).unwrap(); + let audio_parse = gst::ElementFactory::make("mpegaudioparse", None).unwrap(); + + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + + let mux_queue1 = gst::ElementFactory::make("queue", None).unwrap(); + let mux_queue2 = gst::ElementFactory::make("queue", None).unwrap(); + + let mux = gst::ElementFactory::make("mp4mux", None).unwrap(); + + let file_sink = gst::ElementFactory::make("filesink", None).unwrap(); + file_sink + .set_property("location", &"recording.mp4") + .unwrap(); + file_sink.set_property("async", &false).unwrap(); + file_sink.set_property("sync", &false).unwrap(); + + pipeline + .add_many(&[ + &video_src, + &timeoverlay, + &video_tee, + &video_queue1, + &video_queue2, + &video_convert1, + &video_convert2, + &video_sink, + &video_enc, + &video_parse, + &audio_src, + &audio_tee, + &audio_queue1, + &audio_queue2, + &audio_convert1, + &audio_convert2, + &audio_sink, + &audio_enc, + &audio_parse, + &togglerecord, + &mux_queue1, + &mux_queue2, + &mux, + &file_sink, + ]) + .unwrap(); + + gst::Element::link_many(&[ + &video_src, + &timeoverlay, + &video_tee, + &video_queue1, + &video_convert1, + &video_sink, + ]).unwrap(); + + gst::Element::link_many(&[ + &video_tee, + &video_queue2, + &video_convert2, + &video_enc, + &video_parse, + ]).unwrap(); + + video_parse.link_pads("src", &togglerecord, "sink").unwrap(); + togglerecord.link_pads("src", &mux_queue1, "sink").unwrap(); + mux_queue1.link_pads("src", &mux, "video_%u").unwrap(); + + gst::Element::link_many(&[ + &audio_src, + &audio_tee, + &audio_queue1, + &audio_convert1, + &audio_sink, + ]).unwrap(); + + gst::Element::link_many(&[ + &audio_tee, + &audio_queue2, + &audio_convert2, + &audio_enc, + &audio_parse, + ]).unwrap(); + + audio_parse + .link_pads("src", &togglerecord, "sink_0") + .unwrap(); + togglerecord + .link_pads("src_0", &mux_queue2, "sink") + .unwrap(); + mux_queue2.link_pads("src", &mux, "audio_%u").unwrap(); + + gst::Element::link_many(&[&mux, &file_sink]).unwrap(); + + ( + pipeline, + video_queue2.get_static_pad("sink").unwrap(), + audio_queue2.get_static_pad("sink").unwrap(), + togglerecord, + video_sink, + video_widget, + ) +} + +fn create_ui(app: >k::Application) { + let (pipeline, video_pad, audio_pad, togglerecord, video_sink, video_widget) = + create_pipeline(); + + let window = gtk::Window::new(gtk::WindowType::Toplevel); + window.set_default_size(320, 240); + let vbox = gtk::Box::new(gtk::Orientation::Vertical, 0); + vbox.pack_start(&video_widget, true, true, 0); + + let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0); + let position_label = gtk::Label::new("Position: 00:00:00"); + hbox.pack_start(&position_label, true, true, 5); + let recorded_duration_label = gtk::Label::new("Recorded: 00:00:00"); + hbox.pack_start(&recorded_duration_label, true, true, 5); + vbox.pack_start(&hbox, true, true, 5); + + let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0); + let record_button = gtk::Button::new_with_label("Record"); + hbox.pack_start(&record_button, true, true, 5); + let finish_button = gtk::Button::new_with_label("Finish"); + hbox.pack_start(&finish_button, true, true, 5); + vbox.pack_start(&hbox, true, true, 5); + + window.add(&vbox); + window.show_all(); + + app.add_window(&window); + + let video_sink_clone = video_sink.clone(); + let togglerecord_clone = togglerecord.clone(); + gtk::timeout_add(500, move || { + let video_sink = &video_sink_clone; + let togglerecord = &togglerecord_clone; + + let position = if let Some(gst::FormatValue::Time(position)) = + video_sink.query_position(gst::Format::Time) + { + position + } else { + 0.into() + }; + position_label.set_text(&format!("Position: {:.0}", position)); + + let recording_duration = if let Some(gst::FormatValue::Time(recording_duration)) = + togglerecord + .get_static_pad("src") + .unwrap() + .query_position(gst::Format::Time) + { + recording_duration + } else { + 0.into() + }; + recorded_duration_label.set_text(&format!("Recorded: {:.0}", recording_duration)); + + glib::Continue(true) + }); + + let togglerecord_clone = togglerecord.clone(); + record_button.connect_clicked(move |button| { + let togglerecord = &togglerecord_clone; + + let recording = !togglerecord + .get_property("record") + .unwrap() + .get::() + .unwrap(); + togglerecord.set_property("record", &recording).unwrap(); + + button.set_label(if recording { "Stop" } else { "Record" }); + }); + + let record_button_clone = record_button.clone(); + finish_button.connect_clicked(move |button| { + let record_button = &record_button_clone; + record_button.set_sensitive(false); + button.set_sensitive(false); + + video_pad.send_event(gst::Event::new_eos().build()); + audio_pad.send_event(gst::Event::new_eos().build()); + }); + + let app_clone = app.clone(); + window.connect_delete_event(move |_, _| { + let app = &app_clone; + app.quit(); + Inhibit(false) + }); + + let bus = pipeline.get_bus().unwrap(); + let app_clone = SendCell::new(app.clone()); + bus.add_watch(move |_, msg| { + use gst::MessageView; + + let app = app_clone.borrow(); + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + msg.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + app.quit(); + } + _ => (), + }; + + glib::Continue(true) + }); + + let pipeline_clone = pipeline.clone(); + app.connect_shutdown(move |_| { + let pipeline = &pipeline_clone; + let ret = pipeline.set_state(gst::State::Null); + assert_ne!(ret, gst::StateChangeReturn::Failure); + }); + + let ret = pipeline.set_state(gst::State::Playing); + assert_ne!(ret, gst::StateChangeReturn::Failure); +} + +fn main() { + gst::init().unwrap(); + gtk::init().unwrap(); + + #[cfg(debug_assertions)] + { + use std::path::Path; + + let mut path = Path::new("target/debug"); + if !path.exists() { + path = Path::new("../target/debug"); + } + + gst::Registry::get().scan_path(path); + } + #[cfg(not(debug_assertions))] + { + use std::path::Path; + + let mut path = Path::new("target/release"); + if !path.exists() { + path = Path::new("../target/release"); + } + + gst::Registry::get().scan_path(path); + } + + let app = gtk::Application::new(None, gio::ApplicationFlags::FLAGS_NONE).unwrap(); + + app.connect_activate(create_ui); + let args = env::args().collect::>(); + app.run(&args); +} diff --git a/gst-plugin-togglerecord/src/lib.rs b/gst-plugin-togglerecord/src/lib.rs new file mode 100644 index 00000000..959f8418 --- /dev/null +++ b/gst-plugin-togglerecord/src/lib.rs @@ -0,0 +1,43 @@ +// Copyright (C) 2017 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +#![crate_type = "cdylib"] + +extern crate glib; +#[macro_use] +extern crate gst_plugin; +#[macro_use] +extern crate gstreamer as gst; + +mod togglerecord; + +fn plugin_init(plugin: &gst::Plugin) -> bool { + togglerecord::register(plugin); + true +} + +plugin_define!( + b"togglerecord\0", + b"Toggle Record Plugin\0", + plugin_init, + b"0.1.0\0", + b"LGPL\0", + b"togglerecord\0", + b"togglerecord\0", + b"https://github.com/sdroege/gst-plugin-rs\0", + b"2017-12-04\0" +); diff --git a/gst-plugin-togglerecord/src/togglerecord.rs b/gst-plugin-togglerecord/src/togglerecord.rs new file mode 100644 index 00000000..61d9477f --- /dev/null +++ b/gst-plugin-togglerecord/src/togglerecord.rs @@ -0,0 +1,1381 @@ +// Copyright (C) 2017 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib; +use glib::prelude::*; +use gst; +use gst::prelude::*; + +use gst_plugin::properties::*; +use gst_plugin::object::*; +use gst_plugin::element::*; + +use std::sync::{Arc, Condvar, Mutex}; +use std::collections::HashMap; +use std::iter; +use std::cmp; +use std::f64; + +const DEFAULT_RECORD: bool = false; + +#[derive(Debug, Clone, Copy)] +struct Settings { + record: bool, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + record: DEFAULT_RECORD, + } + } +} + +static PROPERTIES: [Property; 1] = [ + Property::Boolean( + "record", + "Record", + "Enable/disable recording", + DEFAULT_RECORD, + PropertyMutability::ReadWrite, + ), +]; + +#[derive(Clone)] +struct Stream { + sinkpad: gst::Pad, + srcpad: gst::Pad, + state: Arc>, +} + +impl PartialEq for Stream { + fn eq(&self, other: &Self) -> bool { + self.sinkpad == other.sinkpad && self.srcpad == other.srcpad + } +} + +impl Eq for Stream {} + +impl Stream { + fn new(sinkpad: gst::Pad, srcpad: gst::Pad) -> Self { + Self { + sinkpad: sinkpad, + srcpad: srcpad, + state: Arc::new(Mutex::new(StreamState::default())), + } + } +} + +struct StreamState { + in_segment: gst::Segment, + out_segment: gst::Segment, + segment_seqnum: u32, + current_running_time: gst::ClockTime, + eos: bool, + flushing: bool, + segment_pending: bool, + pending_events: Vec, +} + +impl Default for StreamState { + fn default() -> Self { + Self { + in_segment: gst::Segment::new(), + out_segment: gst::Segment::new(), + segment_seqnum: gst::util_seqnum_next(), + current_running_time: gst::CLOCK_TIME_NONE, + eos: false, + flushing: false, + segment_pending: false, + pending_events: Vec::new(), + } + } +} + +// Recording behaviour: +// +// Secondary streams are *always* behind main stream +// Main stream EOS stops recording (-> Stopping), makes secondary streams go EOS +// +// Recording: Passing through all data +// Stopping: Main stream remembering current last_recording_stop, waiting for all +// other streams to reach this position +// Stopped: Dropping all data +// Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting +// for all other streams to reach this position +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum RecordingState { + Recording, + Stopping, + Stopped, + Starting, +} + +#[derive(Debug)] +struct State { + recording_state: RecordingState, + last_recording_start: gst::ClockTime, + last_recording_stop: gst::ClockTime, + // Accumulated duration of previous recording segments, + // updated whenever going to Stopped + recording_duration: gst::ClockTime, + // Updated whenever going to Recording + running_time_offset: gst::ClockTime, +} + +impl Default for State { + fn default() -> Self { + Self { + recording_state: RecordingState::Stopped, + last_recording_start: gst::CLOCK_TIME_NONE, + last_recording_stop: gst::CLOCK_TIME_NONE, + recording_duration: 0.into(), + running_time_offset: gst::CLOCK_TIME_NONE, + } + } +} + +#[derive(Debug, PartialEq, Eq)] +enum HandleResult { + Pass, + Drop, + Eos, + Flushing, +} + +struct ToggleRecord { + cat: gst::DebugCategory, + settings: Mutex, + state: Mutex, + main_stream: Stream, + // Always must have main_stream.state locked! + // If multiple stream states have to be locked, the + // main_stream always comes first + main_stream_cond: Condvar, + other_streams: Mutex<(Vec, u32)>, + pads: Mutex>, +} + +impl ToggleRecord { + fn new(_element: &Element, sinkpad: gst::Pad, srcpad: gst::Pad) -> Self { + let main_stream = Stream::new(sinkpad, srcpad); + + let mut pads = HashMap::new(); + pads.insert(main_stream.sinkpad.clone(), main_stream.clone()); + pads.insert(main_stream.srcpad.clone(), main_stream.clone()); + + Self { + cat: gst::DebugCategory::new( + "togglerecord", + gst::DebugColorFlags::empty(), + "Toggle Record Element", + ), + settings: Mutex::new(Settings::default()), + state: Mutex::new(State::default()), + main_stream: main_stream, + main_stream_cond: Condvar::new(), + other_streams: Mutex::new((Vec::new(), 0)), + pads: Mutex::new(pads), + } + } + + fn class_init(klass: &mut ElementClass) { + klass.set_metadata( + "Toggle Record", + "Generic", + "Valve that ensures multiple streams start/end at the same time", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::new_any(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(sink_pad_template); + + let src_pad_template = gst::PadTemplate::new( + "src_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ); + klass.add_pad_template(src_pad_template); + + let sink_pad_template = gst::PadTemplate::new( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + ); + klass.add_pad_template(sink_pad_template); + + klass.install_properties(&PROPERTIES); + } + + fn init(element: &Element) -> Box> { + let templ = element.get_pad_template("sink").unwrap(); + let sinkpad = gst::Pad::new_from_template(&templ, "sink"); + let templ = element.get_pad_template("src").unwrap(); + let srcpad = gst::Pad::new_from_template(&templ, "src"); + + ToggleRecord::set_pad_functions(&sinkpad, &srcpad); + element.add_pad(&sinkpad).unwrap(); + element.add_pad(&srcpad).unwrap(); + + let imp = Self::new(element, sinkpad, srcpad); + Box::new(imp) + } + + fn catch_panic_pad_function T, G: FnOnce() -> T>( + parent: &Option, + fallback: G, + f: F, + ) -> T { + let element = parent + .as_ref() + .cloned() + .unwrap() + .downcast::() + .unwrap(); + let togglerecord = element.get_impl().downcast_ref::().unwrap(); + element.catch_panic(fallback, |element| f(togglerecord, &element)) + } + + fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) { + sinkpad.set_chain_function(|pad, parent, buffer| { + ToggleRecord::catch_panic_pad_function( + parent, + || gst::FlowReturn::Error, + |togglerecord, element| togglerecord.sink_chain(pad, element, buffer), + ) + }); + sinkpad.set_event_function(|pad, parent, event| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.sink_event(pad, element, event), + ) + }); + sinkpad.set_query_function(|pad, parent, query| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.sink_query(pad, element, query), + ) + }); + sinkpad.set_iterate_internal_links_function(|pad, parent| { + ToggleRecord::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |togglerecord, element| togglerecord.iterate_internal_links(pad, element), + ) + }); + + srcpad.set_event_function(|pad, parent, event| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.src_event(pad, element, event), + ) + }); + srcpad.set_query_function(|pad, parent, query| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.src_query(pad, element, query), + ) + }); + srcpad.set_iterate_internal_links_function(|pad, parent| { + ToggleRecord::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |togglerecord, element| togglerecord.iterate_internal_links(pad, element), + ) + }); + } + + fn handle_main_stream( + &self, + pad: &gst::Pad, + stream: &Stream, + is_keyframe: bool, + mut dts_or_pts: gst::ClockTime, + duration: gst::ClockTime, + ) -> HandleResult { + let mut state = stream.state.lock().unwrap(); + + let mut dts_or_pts_end = if duration.is_some() { + dts_or_pts + duration + } else { + dts_or_pts + }; + + dts_or_pts = cmp::max(state.in_segment.get_start().into(), dts_or_pts); + dts_or_pts_end = cmp::max(state.in_segment.get_start().into(), dts_or_pts_end); + if state.in_segment.get_stop() != gst::BUFFER_OFFSET_NONE { + dts_or_pts = cmp::min(state.in_segment.get_stop().into(), dts_or_pts); + dts_or_pts_end = cmp::min(state.in_segment.get_stop().into(), dts_or_pts_end); + } + + let mut current_running_time = state + .in_segment + .to_running_time(gst::Format::Time, dts_or_pts.into()) + .into(); + current_running_time = cmp::max(current_running_time, state.current_running_time); + state.current_running_time = current_running_time; + + // Wake up everybody, we advanced a bit + // Important: They will only be able to advance once we're done with this + // function or waiting for them to catch up below, otherwise they might + // get the wrong state + self.main_stream_cond.notify_all(); + + let current_running_time_end = state + .in_segment + .to_running_time(gst::Format::Time, dts_or_pts_end.into()) + .into(); + + gst_log!( + self.cat, + obj: pad, + "Main stream current running time {}-{} (position: {}-{})", + current_running_time, + current_running_time_end, + dts_or_pts, + dts_or_pts_end + ); + + let settings = *self.settings.lock().unwrap(); + + // First check if we have to update our recording state + let mut rec_state = self.state.lock().unwrap(); + match rec_state.recording_state { + RecordingState::Recording => if !settings.record { + gst_debug!(self.cat, obj: pad, "Stopping recording"); + rec_state.recording_state = RecordingState::Stopping; + }, + RecordingState::Stopped => if settings.record { + gst_debug!(self.cat, obj: pad, "Starting recording"); + rec_state.recording_state = RecordingState::Starting; + }, + _ => (), + } + + match rec_state.recording_state { + RecordingState::Recording => { + // Remember where we stopped last, in case of EOS + rec_state.last_recording_stop = current_running_time_end; + gst_log!(self.cat, obj: pad, "Passing buffer (recording)"); + HandleResult::Pass + } + RecordingState::Stopping => { + if !is_keyframe { + // Remember where we stopped last, in case of EOS + rec_state.last_recording_stop = current_running_time_end; + gst_log!(self.cat, obj: pad, "Passing non-keyframe buffer (stopping)"); + return HandleResult::Pass; + } + + // Remember the time when we stopped: now! + rec_state.last_recording_stop = current_running_time; + gst_debug!(self.cat, obj: pad, "Stopping at {}", current_running_time); + + // Then unlock and wait for all other streams to reach + // it or go EOS instead. + drop(rec_state); + + while !self.other_streams.lock().unwrap().0.iter().all(|s| { + let s = s.state.lock().unwrap(); + s.eos + || (s.current_running_time.is_some() + && s.current_running_time >= current_running_time) + }) { + gst_log!(self.cat, obj: pad, "Waiting for other streams to stop"); + state = self.main_stream_cond.wait(state).unwrap(); + } + + if state.flushing { + gst_debug!(self.cat, obj: pad, "Flushing"); + return HandleResult::Flushing; + } + + let mut rec_state = self.state.lock().unwrap(); + rec_state.recording_state = RecordingState::Stopped; + rec_state.recording_duration += + rec_state.last_recording_stop - rec_state.last_recording_start; + rec_state.last_recording_start = gst::CLOCK_TIME_NONE; + rec_state.last_recording_stop = gst::CLOCK_TIME_NONE; + + gst_debug!( + self.cat, + obj: pad, + "Stopped at {}, recording duration {}", + current_running_time, + rec_state.recording_duration + ); + + // Then become Stopped and drop this buffer. We always stop right before + // a keyframe + gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)"); + HandleResult::Drop + } + RecordingState::Stopped => { + gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)"); + HandleResult::Drop + } + RecordingState::Starting => { + // If this is no keyframe, we can directly go out again here and drop the frame + if !is_keyframe { + gst_log!( + self.cat, + obj: pad, + "Dropping non-keyframe buffer (starting)" + ); + return HandleResult::Drop; + } + + // Remember the time when we started: now! + rec_state.last_recording_start = current_running_time; + rec_state.running_time_offset = current_running_time - rec_state.recording_duration; + gst_debug!(self.cat, obj: pad, "Starting at {}", current_running_time); + + state.segment_pending = true; + for other_stream in &self.other_streams.lock().unwrap().0 { + other_stream.state.lock().unwrap().segment_pending = true; + } + + // Then unlock and wait for all other streams to reach + // it or go EOS instead + drop(rec_state); + + while !self.other_streams.lock().unwrap().0.iter().all(|s| { + let s = s.state.lock().unwrap(); + s.eos + || (s.current_running_time.is_some() + && s.current_running_time >= current_running_time) + }) { + gst_log!(self.cat, obj: pad, "Waiting for other streams to start"); + state = self.main_stream_cond.wait(state).unwrap(); + } + + if state.flushing { + gst_debug!(self.cat, obj: pad, "Flushing"); + return HandleResult::Flushing; + } + + let mut rec_state = self.state.lock().unwrap(); + rec_state.recording_state = RecordingState::Recording; + gst_debug!( + self.cat, + obj: pad, + "Started at {}, recording duration {}", + current_running_time, + rec_state.recording_duration + ); + + gst_log!(self.cat, obj: pad, "Passing buffer (recording)"); + HandleResult::Pass + } + } + } + + fn handle_secondary_stream( + &self, + pad: &gst::Pad, + stream: &Stream, + mut pts: gst::ClockTime, + duration: gst::ClockTime, + ) -> HandleResult { + // Calculate end pts & current running time and make sure we stay in the segment + let mut state = stream.state.lock().unwrap(); + + let mut pts_end = if duration.is_some() { + pts + duration + } else { + pts + }; + + pts = cmp::max(state.in_segment.get_start().into(), pts); + if state.in_segment.get_stop() != gst::BUFFER_OFFSET_NONE + && pts >= state.in_segment.get_stop().into() + { + state.current_running_time = state + .in_segment + .to_running_time(gst::Format::Time, state.in_segment.get_stop()) + .into(); + state.eos = true; + gst_debug!( + self.cat, + obj: pad, + "After segment end {} >= {}, EOS", + pts, + gst::ClockTime::from(state.in_segment.get_stop()) + ); + + return HandleResult::Eos; + } + pts_end = cmp::max(state.in_segment.get_start().into(), pts_end); + if state.in_segment.get_stop() != gst::BUFFER_OFFSET_NONE { + pts_end = cmp::min(state.in_segment.get_stop().into(), pts_end); + } + + let mut current_running_time = state + .in_segment + .to_running_time(gst::Format::Time, pts.into()) + .into(); + current_running_time = cmp::max(current_running_time, state.current_running_time); + state.current_running_time = current_running_time; + + let current_running_time_end: gst::ClockTime = state + .in_segment + .to_running_time(gst::Format::Time, pts_end.into()) + .into(); + gst_log!( + self.cat, + obj: pad, + "Secondary stream current running time {}-{} (position: {}-{}", + current_running_time, + current_running_time_end, + pts, + pts_end + ); + + drop(state); + + let mut main_state = self.main_stream.state.lock().unwrap(); + + // Wake up, in case the main stream is waiting for us to progress up to here. We progressed + // above but all notifying must happen while the main_stream state is locked as per above. + self.main_stream_cond.notify_all(); + + while (main_state.current_running_time == gst::CLOCK_TIME_NONE + || main_state.current_running_time < current_running_time) + && !main_state.eos && !stream.state.lock().unwrap().flushing + { + gst_log!( + self.cat, + obj: pad, + "Waiting for reaching {} / EOS / flushing, main stream at {}", + current_running_time, + main_state.current_running_time + ); + + main_state = self.main_stream_cond.wait(main_state).unwrap(); + } + if stream.state.lock().unwrap().flushing { + gst_debug!(self.cat, obj: pad, "Flushing"); + return HandleResult::Flushing; + } + + let rec_state = self.state.lock().unwrap(); + + // If the main stream is EOS, we are also EOS unless we are + // before the final last recording stop running time + if main_state.eos { + // If we have no start or stop position (we never recorded), or are after the current + // stop position that we're EOS now + // If we're before the start position (we were starting before EOS), + // drop the buffer + if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() + || current_running_time_end > rec_state.last_recording_stop + { + gst_debug!( + self.cat, + obj: pad, + "Main stream EOS and we're EOS ({} > {})", + current_running_time_end, + rec_state.last_recording_stop + ); + return HandleResult::Eos; + } else if current_running_time < rec_state.last_recording_start { + gst_debug!( + self.cat, + obj: pad, + "Main stream EOS and we're not EOS yet (before recording start, {} <= {})", + current_running_time, + rec_state.last_recording_start + ); + return HandleResult::Drop; + } else { + gst_debug!( + self.cat, + obj: pad, + "Main stream EOS and we're not EOS yet (before recording end, {} <= {} < {})", + rec_state.last_recording_start, + current_running_time, + rec_state.last_recording_stop + ); + return HandleResult::Pass; + } + } + + match rec_state.recording_state { + RecordingState::Recording => { + // We're properly started, must have a start position and + // be actually after that start position + assert!(rec_state.last_recording_start.is_some()); + assert!(current_running_time >= rec_state.last_recording_start); + gst_log!(self.cat, obj: pad, "Passing buffer (recording)"); + HandleResult::Pass + } + RecordingState::Stopping => { + // If we have no start position yet, the main stream is waiting for a key-frame + if rec_state.last_recording_stop.is_none() { + gst_log!( + self.cat, + obj: pad, + "Passing buffer (stopping: waiting for keyframe)", + ); + HandleResult::Pass + } else if current_running_time_end <= rec_state.last_recording_stop { + gst_log!( + self.cat, + obj: pad, + "Passing buffer (stopping: {} <= {})", + current_running_time_end, + rec_state.last_recording_stop + ); + HandleResult::Pass + } else { + gst_log!( + self.cat, + obj: pad, + "Dropping buffer (stopping: {} > {})", + current_running_time_end, + rec_state.last_recording_stop + ); + HandleResult::Drop + } + } + RecordingState::Stopped => { + // We're properly stopped + gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)"); + HandleResult::Drop + } + RecordingState::Starting => { + // If we have no start position yet, the main stream is waiting for a key-frame + if rec_state.last_recording_start.is_none() { + gst_log!( + self.cat, + obj: pad, + "Dropping buffer (starting: waiting for keyframe)", + ); + HandleResult::Drop + } else if current_running_time >= rec_state.last_recording_start { + gst_log!( + self.cat, + obj: pad, + "Passing buffer (starting: {} >= {})", + current_running_time, + rec_state.last_recording_start + ); + HandleResult::Pass + } else { + gst_log!( + self.cat, + obj: pad, + "Dropping buffer (starting: {} < {})", + current_running_time, + rec_state.last_recording_start + ); + HandleResult::Drop + } + } + } + } + + fn sink_chain( + &self, + pad: &gst::Pad, + element: &Element, + buffer: gst::Buffer, + ) -> gst::FlowReturn { + let stream = match self.pads.lock().unwrap().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return gst::FlowReturn::Error; + } + Some(stream) => stream.clone(), + }; + + gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); + + { + let state = stream.state.lock().unwrap(); + if state.in_segment.get_format() != gst::Format::Time { + gst_element_error!( + element, + gst::StreamError::Format, + [ + "Only Time segments supported, got {:?}", + state.in_segment.get_format() + ] + ); + return gst::FlowReturn::Error; + } + + if state.eos { + return gst::FlowReturn::Eos; + } + } + + let handle_result = if stream != self.main_stream { + let pts = buffer.get_pts(); + let dts = buffer.get_dts(); + if dts.is_some() && pts.is_some() && dts != pts { + gst_element_error!( + element, + gst::StreamError::Format, + ["DTS != PTS not supported for secondary streams"] + ); + return gst::FlowReturn::Error; + } + if !pts.is_some() { + gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]); + return gst::FlowReturn::Error; + } + if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst_element_error!( + element, + gst::StreamError::Format, + ["Delta-units not supported for secondary streams"] + ); + return gst::FlowReturn::Error; + } + + self.handle_secondary_stream(pad, &stream, pts, buffer.get_duration()) + } else { + let dts_or_pts = buffer.get_dts_or_pts(); + if !dts_or_pts.is_some() { + gst_element_error!( + element, + gst::StreamError::Format, + ["Buffer without DTS or PTS"] + ); + return gst::FlowReturn::Error; + } + + self.handle_main_stream( + pad, + &stream, + !buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT), + dts_or_pts, + buffer.get_duration(), + ) + }; + + match handle_result { + HandleResult::Drop => { + return gst::FlowReturn::Ok; + } + HandleResult::Flushing => { + return gst::FlowReturn::Flushing; + } + HandleResult::Eos => { + stream.srcpad.push_event( + gst::Event::new_eos() + .seqnum(stream.state.lock().unwrap().segment_seqnum) + .build(), + ); + return gst::FlowReturn::Eos; + } + HandleResult::Pass => { + // Pass through and actually push the buffer + } + } + + let out_running_time = { + let mut state = stream.state.lock().unwrap(); + let mut events = Vec::with_capacity(state.pending_events.len() + 1); + + if state.segment_pending { + let rec_state = self.state.lock().unwrap(); + + // Adjust so that last_recording_start has running time of + // recording_duration + + state.out_segment = state.in_segment.clone(); + let offset: u64 = rec_state.running_time_offset.into(); + let res = state + .out_segment + .offset_running_time(gst::Format::Time, -(offset as i64)); + assert!(res); + events.push( + gst::Event::new_segment(&state.out_segment) + .seqnum(state.segment_seqnum) + .build(), + ); + state.segment_pending = false; + gst_debug!( + self.cat, + obj: pad, + "Pending Segment {:?}", + &state.out_segment + ); + } + + if !state.pending_events.is_empty() { + gst_debug!(self.cat, obj: pad, "Pushing pending events"); + } + + events.append(&mut state.pending_events); + + let out_running_time = gst::ClockTime::from( + state + .out_segment + .to_running_time(gst::Format::Time, buffer.get_pts().into()), + ); + + // Unlock before pushing + drop(state); + + for e in events.drain(..) { + stream.srcpad.push_event(e); + } + + out_running_time + }; + + gst_log!( + self.cat, + obj: pad, + "Pushing buffer with running time {}: {:?}", + out_running_time, + buffer + ); + stream.srcpad.push(buffer) + } + + fn sink_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + use gst::EventView; + + let stream = match self.pads.lock().unwrap().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(self.cat, obj: pad, "Handling event {:?}", event); + + let mut forward = true; + let mut send_pending = false; + + match event.view() { + EventView::FlushStart(..) => { + let _main_state = if stream != self.main_stream { + Some(self.main_stream.state.lock().unwrap()) + } else { + None + }; + let mut state = stream.state.lock().unwrap(); + + state.flushing = true; + self.main_stream_cond.notify_all(); + } + EventView::FlushStop(..) => { + let mut state = stream.state.lock().unwrap(); + + state.eos = false; + state.flushing = false; + state.segment_pending = false; + state.current_running_time = gst::CLOCK_TIME_NONE; + } + EventView::Segment(e) => { + let mut state = stream.state.lock().unwrap(); + + let segment = e.get_segment(); + if segment.get_format() != gst::Format::Time { + gst_element_error!( + element, + gst::StreamError::Format, + [ + "Only Time segments supported, got {:?}", + segment.get_format() + ] + ); + return false; + } + if (segment.get_rate() - 1.0).abs() > f64::EPSILON { + gst_element_error!( + element, + gst::StreamError::Format, + [ + "Only rate==1.0 segments supported, got {:?}", + segment.get_rate() + ] + ); + return false; + } + + state.in_segment = e.get_segment(); + state.segment_seqnum = event.get_seqnum(); + state.segment_pending = true; + state.current_running_time = gst::CLOCK_TIME_NONE; + + gst_debug!(self.cat, obj: pad, "Got new Segment {:?}", state.in_segment); + + forward = false; + } + EventView::Gap(e) => { + gst_debug!(self.cat, obj: pad, "Handling Gap event {:?}", event); + let (pts, duration) = e.get(); + let (pts, duration) = (pts.into(), duration.into()); + let handle_result = if stream == self.main_stream { + self.handle_main_stream(pad, &stream, false, pts, duration) + } else { + self.handle_secondary_stream(pad, &stream, pts, duration) + }; + + forward = handle_result == HandleResult::Pass; + } + EventView::Eos(..) => { + let _main_state = if stream != self.main_stream { + Some(self.main_stream.state.lock().unwrap()) + } else { + None + }; + let mut state = stream.state.lock().unwrap(); + + state.eos = true; + self.main_stream_cond.notify_all(); + gst_debug!( + self.cat, + obj: pad, + "Stream is EOS now, sending any pending events" + ); + + send_pending = true; + } + _ => (), + }; + + // If a serialized event and coming after Segment and a new Segment is pending, + // queue up and send at a later time (buffer/gap) after we sent the Segment + let type_ = event.get_type(); + if forward && type_ != gst::EventType::Eos && type_.is_serialized() + && type_.partial_cmp(&gst::EventType::Segment) == Some(cmp::Ordering::Greater) + { + let mut state = stream.state.lock().unwrap(); + if state.segment_pending { + gst_log!(self.cat, obj: pad, "Storing event for later pushing"); + state.pending_events.push(event); + return true; + } + } + + if send_pending { + let mut state = stream.state.lock().unwrap(); + let mut events = Vec::with_capacity(state.pending_events.len() + 1); + + // Got not a single buffer on this stream before EOS, forward + // the input segment + if state.segment_pending { + events.push( + gst::Event::new_segment(&state.in_segment) + .seqnum(state.segment_seqnum) + .build(), + ); + } + events.append(&mut state.pending_events); + drop(state); + + for e in events.drain(..) { + stream.srcpad.push_event(e); + } + } + + if forward { + gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event); + stream.srcpad.push_event(event) + } else { + gst_log!(self.cat, obj: pad, "Dropping event {:?}", event); + true + } + } + + fn sink_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool { + let stream = match self.pads.lock().unwrap().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(self.cat, obj: pad, "Handling query {:?}", query); + + stream.srcpad.peer_query(query) + } + + fn src_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool { + use gst::EventView; + + let stream = match self.pads.lock().unwrap().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(self.cat, obj: pad, "Handling event {:?}", event); + + let mut forward = true; + match event.view() { + EventView::Seek(..) => { + forward = false; + } + _ => (), + } + + let rec_state = self.state.lock().unwrap(); + let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64; + let offset = event.get_running_time_offset(); + event + .make_mut() + .set_running_time_offset(offset + running_time_offset); + drop(rec_state); + + if forward { + gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event); + stream.sinkpad.push_event(event) + } else { + gst_log!(self.cat, obj: pad, "Dropping event {:?}", event); + false + } + } + + fn src_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + + let stream = match self.pads.lock().unwrap().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(self.cat, obj: pad, "Handling query {:?}", query); + match query.view_mut() { + QueryView::Scheduling(ref mut q) => { + let mut new_query = gst::Query::new_scheduling(); + let res = stream.sinkpad.peer_query(new_query.get_mut().unwrap()); + if !res { + return res; + } + + gst_log!(self.cat, obj: pad, "Downstream returned {:?}", new_query); + + match new_query.view() { + QueryView::Scheduling(ref n) => { + let (flags, min, max, align) = n.get_result(); + q.set(flags, min, max, align); + q.add_scheduling_modes(&n.get_scheduling_modes() + .iter() + .cloned() + .filter(|m| m != &gst::PadMode::Pull) + .collect::>()); + gst_log!(self.cat, obj: pad, "Returning {:?}", q.get_mut_query()); + return true; + } + _ => unreachable!(), + } + } + QueryView::Seeking(ref mut q) => { + // Seeking is not possible here + let format = q.get_format(); + q.set( + false, + gst::FormatValue::new(format, -1), + gst::FormatValue::new(format, -1), + ); + + gst_log!(self.cat, obj: pad, "Returning {:?}", q.get_mut_query()); + return true; + } + // Position and duration is always the current recording position + QueryView::Position(ref mut q) => if q.get_format() == gst::Format::Time { + let state = stream.state.lock().unwrap(); + let rec_state = self.state.lock().unwrap(); + let mut recording_duration = rec_state.recording_duration; + if rec_state.recording_state == RecordingState::Recording + || rec_state.recording_state == RecordingState::Stopping + { + recording_duration += + state.current_running_time - rec_state.last_recording_start; + } + q.set(recording_duration); + return true; + } else { + return false; + }, + QueryView::Duration(ref mut q) => if q.get_format() == gst::Format::Time { + let state = stream.state.lock().unwrap(); + let rec_state = self.state.lock().unwrap(); + let mut recording_duration = rec_state.recording_duration; + if rec_state.recording_state == RecordingState::Recording + || rec_state.recording_state == RecordingState::Stopping + { + recording_duration += + state.current_running_time - rec_state.last_recording_start; + } + q.set(recording_duration); + return true; + } else { + return false; + }, + _ => (), + }; + + gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query); + stream.sinkpad.peer_query(query) + } + + fn iterate_internal_links(&self, pad: &gst::Pad, element: &Element) -> gst::Iterator { + let stream = match self.pads.lock().unwrap().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return gst::Iterator::from_vec(vec![]); + } + Some(stream) => stream.clone(), + }; + + if pad == &stream.srcpad { + gst::Iterator::from_vec(vec![stream.sinkpad.clone()]) + } else { + gst::Iterator::from_vec(vec![stream.srcpad.clone()]) + } + } +} + +impl ObjectImpl for ToggleRecord { + fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) { + let prop = &PROPERTIES[id as usize]; + let element = obj.clone().downcast::().unwrap(); + + match *prop { + Property::Boolean("record", ..) => { + let mut settings = self.settings.lock().unwrap(); + let record = value.get().unwrap(); + gst_debug!( + self.cat, + obj: &element, + "Setting record from {:?} to {:?}", + settings.record, + record + ); + settings.record = record; + } + _ => unimplemented!(), + } + } + + fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { + let prop = &PROPERTIES[id as usize]; + + match *prop { + Property::Boolean("record", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.record.to_value()) + } + _ => unimplemented!(), + } + } +} + +impl ElementImpl for ToggleRecord { + fn change_state( + &self, + element: &Element, + transition: gst::StateChange, + ) -> gst::StateChangeReturn { + gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused => { + for s in self.other_streams + .lock() + .unwrap() + .0 + .iter() + .chain(iter::once(&self.main_stream)) + { + let mut state = s.state.lock().unwrap(); + *state = StreamState::default(); + } + + let mut rec_state = self.state.lock().unwrap(); + *rec_state = State::default(); + } + gst::StateChange::PausedToReady => { + for s in &self.other_streams.lock().unwrap().0 { + let mut state = s.state.lock().unwrap(); + state.flushing = true; + } + + let mut state = self.main_stream.state.lock().unwrap(); + state.flushing = true; + self.main_stream_cond.notify_all(); + } + _ => (), + } + + let ret = element.parent_change_state(transition); + if ret == gst::StateChangeReturn::Failure { + return ret; + } + + match transition { + gst::StateChange::PausedToReady => for s in self.other_streams + .lock() + .unwrap() + .0 + .iter() + .chain(iter::once(&self.main_stream)) + { + let mut state = s.state.lock().unwrap(); + + state.pending_events.clear(); + }, + _ => (), + } + + ret + } + + fn request_new_pad( + &self, + element: &Element, + _templ: &gst::PadTemplate, + _name: Option, + _caps: Option<&gst::CapsRef>, + ) -> Option { + let mut other_streams = self.other_streams.lock().unwrap(); + let (ref mut other_streams, ref mut pad_count) = *other_streams; + let mut pads = self.pads.lock().unwrap(); + + let id = *pad_count; + *pad_count += 1; + + let templ = element.get_pad_template("sink_%u").unwrap(); + let sinkpad = gst::Pad::new_from_template(&templ, format!("sink_{}", id).as_str()); + + let templ = element.get_pad_template("src_%u").unwrap(); + let srcpad = gst::Pad::new_from_template(&templ, format!("src_{}", id).as_str()); + + ToggleRecord::set_pad_functions(&sinkpad, &srcpad); + + sinkpad.set_active(true).unwrap(); + srcpad.set_active(true).unwrap(); + + element.add_pad(&sinkpad).unwrap(); + element.add_pad(&srcpad).unwrap(); + + let stream = Stream::new(sinkpad.clone(), srcpad); + + pads.insert(stream.sinkpad.clone(), stream.clone()); + pads.insert(stream.srcpad.clone(), stream.clone()); + + other_streams.push(stream); + + Some(sinkpad) + } + + fn release_pad(&self, element: &Element, pad: &gst::Pad) { + let mut other_streams = self.other_streams.lock().unwrap(); + let (ref mut other_streams, _) = *other_streams; + let mut pads = self.pads.lock().unwrap(); + + let stream = match pads.get(pad) { + None => return, + Some(stream) => stream.clone(), + }; + + stream.srcpad.set_active(false).unwrap(); + stream.sinkpad.set_active(false).unwrap(); + + element.remove_pad(&stream.sinkpad).unwrap(); + element.remove_pad(&stream.srcpad).unwrap(); + + pads.remove(&stream.sinkpad).unwrap(); + pads.remove(&stream.srcpad).unwrap(); + + // TODO: Replace with Vec::remove_item() once stable + let pos = other_streams.iter().position(|x| *x == stream); + pos.map(|pos| other_streams.swap_remove(pos)); + } +} + +struct ToggleRecordStatic; + +impl ImplTypeStatic for ToggleRecordStatic { + fn get_name(&self) -> &str { + "ToggleRecord" + } + + fn new(&self, element: &Element) -> Box> { + ToggleRecord::init(element) + } + + fn class_init(&self, klass: &mut ElementClass) { + ToggleRecord::class_init(klass); + } +} + +pub fn register(plugin: &gst::Plugin) { + let togglerecord_static = ToggleRecordStatic; + let type_ = register_type(togglerecord_static); + gst::Element::register(plugin, "togglerecord", 0, type_); +} diff --git a/gst-plugin-togglerecord/tests/tests.rs b/gst-plugin-togglerecord/tests/tests.rs new file mode 100644 index 00000000..3b70ac4e --- /dev/null +++ b/gst-plugin-togglerecord/tests/tests.rs @@ -0,0 +1,1170 @@ +// Copyright (C) 2017 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +extern crate glib; +use glib::prelude::*; + +extern crate gstreamer as gst; +use gst::prelude::*; + +extern crate either; +use either::*; + +use std::sync::{mpsc, Mutex}; +use std::thread; + +fn init() { + use std::sync::{Once, ONCE_INIT}; + static INIT: Once = ONCE_INIT; + + INIT.call_once(|| { + gst::init().unwrap(); + + #[cfg(debug_assertions)] + { + use std::path::Path; + + let mut path = Path::new("target/debug"); + if !path.exists() { + path = Path::new("../target/debug"); + } + + gst::Registry::get().scan_path(path); + } + #[cfg(not(debug_assertions))] + { + use std::path::Path; + + let mut path = Path::new("target/release"); + if !path.exists() { + path = Path::new("../target/release"); + } + + gst::Registry::get().scan_path(path); + } + }); +} + +enum SendData { + Buffers(usize), + BuffersDelta(usize), + Gaps(usize), + Eos, +} + +fn setup_sender_receiver( + pipeline: &gst::Pipeline, + togglerecord: &gst::Element, + pad: &str, + offset: gst::ClockTime, +) -> ( + mpsc::Sender, + mpsc::Receiver<()>, + mpsc::Receiver>, + thread::JoinHandle<()>, +) { + let fakesink = gst::ElementFactory::make("fakesink", None).unwrap(); + fakesink.set_property("async", &false).unwrap(); + pipeline.add(&fakesink).unwrap(); + + let (srcpad, sinkpad) = if pad == "src" { + ( + togglerecord.get_static_pad("src").unwrap(), + togglerecord.get_static_pad("sink").unwrap(), + ) + } else { + let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap(); + let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap(); + (srcpad, sinkpad) + }; + + let fakesink_sinkpad = fakesink.get_static_pad("sink").unwrap(); + srcpad.link(&fakesink_sinkpad).into_result().unwrap(); + + let (sender_output, receiver_output) = mpsc::channel::>(); + let sender_output = Mutex::new(sender_output); + srcpad.add_probe( + gst::PadProbeType::BUFFER | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_, ref probe_info| { + match probe_info.data { + Some(gst::PadProbeData::Buffer(ref buffer)) => { + sender_output + .lock() + .unwrap() + .send(Left(buffer.clone())) + .unwrap(); + } + Some(gst::PadProbeData::Event(ref event)) => { + sender_output + .lock() + .unwrap() + .send(Right(event.clone())) + .unwrap(); + } + _ => { + unreachable!(); + } + } + + gst::PadProbeReturn::Ok + }, + ); + + let (sender_input, receiver_input) = mpsc::channel::(); + let (sender_input_done, receiver_input_done) = mpsc::channel::<()>(); + let thread = thread::spawn(move || { + let mut i = 0; + let mut first = true; + while let Ok(send_data) = receiver_input.recv() { + if first { + assert!(sinkpad.send_event(gst::Event::new_stream_start("test").build())); + let mut segment = gst::Segment::new(); + segment.init(gst::Format::Time); + assert!(sinkpad.send_event(gst::Event::new_segment(&segment).build())); + + let mut tags = gst::TagList::new(); + tags.get_mut() + .unwrap() + .add::(&"some title", gst::TagMergeMode::Append); + assert!(sinkpad.send_event(gst::Event::new_tag(tags).build())); + + first = false; + } + + match send_data { + SendData::Eos => { + break; + } + SendData::Buffers(n) => for _ in 0..n { + let mut buffer = gst::Buffer::new(); + buffer + .get_mut() + .unwrap() + .set_pts(offset + i * 20 * gst::MSECOND); + buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND); + let _ = sinkpad.chain(buffer); + i += 1; + }, + SendData::BuffersDelta(n) => for _ in 0..n { + let mut buffer = gst::Buffer::new(); + buffer + .get_mut() + .unwrap() + .set_pts(offset + i * 20 * gst::MSECOND); + buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND); + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::DELTA_UNIT); + let _ = sinkpad.chain(buffer); + i += 1; + }, + SendData::Gaps(n) => for _ in 0..n { + let event = gst::Event::new_gap( + (offset + i * 20 * gst::MSECOND).into(), + (20 * gst::MSECOND).into(), + ).build(); + let _ = sinkpad.send_event(event); + i += 1; + }, + } + + let _ = sender_input_done.send(()); + } + + let _ = sinkpad.send_event(gst::Event::new_eos().build()); + let _ = sender_input_done.send(()); + }); + + (sender_input, receiver_input_done, receiver_output, thread) +} + +fn recv_buffers( + receiver_output: &mpsc::Receiver>, + segment: &mut gst::Segment, + wait_buffers: usize, +) -> Vec<(gst::ClockTime, gst::ClockTime)> { + let mut res = Vec::new(); + let mut n_buffers = 0; + while let Ok(val) = receiver_output.recv() { + match val { + Left(buffer) => { + res.push(( + gst::ClockTime::from( + segment.to_running_time(gst::Format::Time, buffer.get_pts().into()), + ), + buffer.get_pts(), + )); + n_buffers += 1; + if wait_buffers > 0 && n_buffers == wait_buffers { + return res; + } + } + Right(event) => { + use gst::EventView; + + match event.view() { + EventView::Gap(ref e) => { + let (ts, _) = e.get(); + + res.push(( + gst::ClockTime::from(segment.to_running_time(gst::Format::Time, ts)), + ts.into(), + )); + n_buffers += 1; + if wait_buffers > 0 && n_buffers == wait_buffers { + return res; + } + } + EventView::Eos(..) => { + return res; + } + EventView::Segment(ref e) => { + *segment = e.get_segment(); + } + _ => (), + } + } + } + } + + res +} + +#[test] +fn test_create() { + init(); + assert!(gst::ElementFactory::make("togglerecord", None).is_some()); +} + +#[test] +fn test_create_pads() { + init(); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + + let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap(); + let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap(); + + assert_eq!(sinkpad.get_name(), "sink_0"); + assert_eq!(srcpad.get_name(), "src_0"); + + togglerecord.release_request_pad(&sinkpad); + assert!(sinkpad.get_parent().is_none()); + assert!(srcpad.get_parent().is_none()); +} + +#[test] +fn test_one_stream_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, _, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::Segment::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_one_stream_gaps_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, _, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(5)).unwrap(); + sender_input.send(SendData::Gaps(5)).unwrap(); + drop(sender_input); + + let mut segment = gst::Segment::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_one_stream_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, receiver_input_done, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::Segment::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_one_stream_open_close() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, receiver_input_done, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &false).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::Segment::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_one_stream_open_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, receiver_input_done, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &false).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::Segment::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 20); + for (index, &(running_time, pts)) in buffers.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_open_shift() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 9); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_open_shift_main() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // PTS 5 maps to running time 0 now + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // First and last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, 15 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(pts, 20 * gst::MSECOND + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 9); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_open_close() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &false).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Start recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_open_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another buffer to sender 2, this will block until sender 1 advances + // but must not be dropped, although we're not recording (yet) + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 20); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 20); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_open_close_open_gaps() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(3)).unwrap(); + sender_input_1.send(SendData::Gaps(3)).unwrap(); + sender_input_1.send(SendData::Buffers(4)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 4 gaps and 5 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Gaps(4)).unwrap(); + sender_input_2.send(SendData::Buffers(5)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another gap to sender 2, this will block until sender 1 advances + // but must not be dropped, although we're not recording (yet) + sender_input_2.send(SendData::Gaps(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 20); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 20); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_two_stream_close_open_close_delta() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &false).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Start recording and push new buffers to sender 1. The first one is a delta frame, + // so will be dropped, and as such the next frame of sender 2 will also be dropped + // Sender 2 is empty now + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::BuffersDelta(1)).unwrap(); + sender_input_1.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another buffer to sender 2, this will block until sender 1 advances + // but must not be dropped, and we're still recording + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Stop recording again and send another set of buffers to both senders + // The first one is a delta frame, so we only actually stop recording + // after recording another frame + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::BuffersDelta(1)).unwrap(); + sender_input_1.send(SendData::Buffers(9)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_three_stream_open_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + sender_input_3.send(SendData::Buffers(10)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished + receiver_input_done_1.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, 1/2 are at the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Send the remaining 10 buffers to sender 3, all are at the same position now + sender_input_3.send(SendData::Buffers(10)).unwrap(); + + // Wait until all 20 buffers of all senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Send another buffer to sender 2, this will block until sender 1 advances + // but must not be dropped, although we're not recording (yet) + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + sender_input_3.send(SendData::Buffers(5)).unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + sender_input_3.send(SendData::Buffers(5)).unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + sender_input_3.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + let mut segment_1 = gst::Segment::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 20); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::Segment::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 20); + + let mut segment_3 = gst::Segment::new(); + let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0); + for (index, &(running_time, pts)) in buffers_3.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0 * gst::MSECOND + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + } + assert_eq!(buffers_3.len(), 20); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + thread_3.join().unwrap(); + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +}