summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/plugins/gst_plugins_cache.json189
-rw-r--r--ext/gs/.clang-format39
-rw-r--r--ext/gs/README.md71
-rw-r--r--ext/gs/gstgs.cpp56
-rw-r--r--ext/gs/gstgscommon.cpp134
-rw-r--r--ext/gs/gstgscommon.h39
-rw-r--r--ext/gs/gstgssink.cpp793
-rw-r--r--ext/gs/gstgssink.h47
-rw-r--r--ext/gs/gstgssrc.cpp578
-rw-r--r--ext/gs/gstgssrc.h34
-rw-r--r--ext/gs/meson.build28
-rw-r--r--ext/meson.build1
-rw-r--r--meson_options.txt1
13 files changed, 2010 insertions, 0 deletions
diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json
index 6eae76468..02da7405d 100644
--- a/docs/plugins/gst_plugins_cache.json
+++ b/docs/plugins/gst_plugins_cache.json
@@ -25898,6 +25898,195 @@
"tracers": {},
"url": "Unknown package origin"
},
+ "gs": {
+ "description": "Read and write from and to a Google Cloud Storage",
+ "elements": {
+ "gssink": {
+ "author": "Julien Isorce <jisorce@oblong.com>",
+ "description": "Write buffers to a sequentially named set of files on Google Cloud Storage",
+ "hierarchy": [
+ "GstGsSink",
+ "GstBaseSink",
+ "GstElement",
+ "GstObject",
+ "GInitiallyUnowned",
+ "GObject"
+ ],
+ "klass": "Sink/File",
+ "long-name": "Google Cloud Storage Sink",
+ "pad-templates": {
+ "sink": {
+ "caps": "ANY",
+ "direction": "sink",
+ "presence": "always"
+ }
+ },
+ "properties": {
+ "bucket-name": {
+ "blurb": "Google Cloud Storage Bucket Name",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "NULL",
+ "mutable": "null",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ },
+ "index": {
+ "blurb": "Index to use with location property to create file names. The index is incremented by one for each buffer written.",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "0",
+ "max": "2147483647",
+ "min": "0",
+ "mutable": "null",
+ "readable": true,
+ "type": "gint",
+ "writable": true
+ },
+ "next-file": {
+ "blurb": "When to start a new file",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "buffer (0)",
+ "mutable": "null",
+ "readable": true,
+ "type": "GstGsSinkNext",
+ "writable": true
+ },
+ "object-name": {
+ "blurb": "Full path name of the remote file",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "%%s_%%05d",
+ "mutable": "null",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ },
+ "post-messages": {
+ "blurb": "Post a message for each file with information of the buffer",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "false",
+ "mutable": "null",
+ "readable": true,
+ "type": "gboolean",
+ "writable": true
+ },
+ "service-account-email": {
+ "blurb": "Service Account Email to use for credentials",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "NULL",
+ "mutable": "ready",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ },
+ "start-date": {
+ "blurb": "Start date in iso8601 format",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "NULL",
+ "mutable": "ready",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ }
+ },
+ "rank": "none"
+ },
+ "gssrc": {
+ "author": "Julien Isorce <jisorce@oblong.com>",
+ "description": "Read from arbitrary point from a file in a Google Cloud Storage",
+ "hierarchy": [
+ "GstGsSrc",
+ "GstBaseSrc",
+ "GstElement",
+ "GstObject",
+ "GInitiallyUnowned",
+ "GObject"
+ ],
+ "interfaces": [
+ "GstURIHandler"
+ ],
+ "klass": "Source/File",
+ "long-name": "Google Cloud Storage Source",
+ "pad-templates": {
+ "src": {
+ "caps": "ANY",
+ "direction": "src",
+ "presence": "always"
+ }
+ },
+ "properties": {
+ "location": {
+ "blurb": "Location of the file to read",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "NULL",
+ "mutable": "ready",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ },
+ "service-account-email": {
+ "blurb": "Service Account Email to use for credentials",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "NULL",
+ "mutable": "ready",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ }
+ },
+ "rank": "none"
+ }
+ },
+ "filename": "gstgs",
+ "license": "LGPL",
+ "other-types": {
+ "GstGsSinkNext": {
+ "kind": "enum",
+ "values": [
+ {
+ "desc": "New file for each buffer",
+ "name": "buffer",
+ "value": "0"
+ },
+ {
+ "desc": "New file for each buffer",
+ "name": "Only one file, no next file",
+ "value": "1"
+ }
+ ]
+ }
+ },
+ "package": "GStreamer Bad Plug-ins",
+ "source": "gst-plugins-bad",
+ "tracers": {},
+ "url": "Unknown package origin"
+ },
"gsm": {
"description": "GSM encoder/decoder",
"elements": {
diff --git a/ext/gs/.clang-format b/ext/gs/.clang-format
new file mode 100644
index 000000000..a27438b0c
--- /dev/null
+++ b/ext/gs/.clang-format
@@ -0,0 +1,39 @@
+# Defines the Chromium style for automatic reformatting.
+# http://clang.llvm.org/docs/ClangFormatStyleOptions.html
+BasedOnStyle: Chromium
+# This defaults to 'Auto'. Explicitly set it for a while, so that
+# 'vector<vector<int> >' in existing files gets formatted to
+# 'vector<vector<int>>'. ('Auto' means that clang-format will only use
+# 'int>>' if the file already contains at least one such instance.)
+Standard: Cpp11
+
+# Make sure code like:
+# IPC_BEGIN_MESSAGE_MAP()
+# IPC_MESSAGE_HANDLER(WidgetHostViewHost_Update, OnUpdate)
+# IPC_END_MESSAGE_MAP()
+# gets correctly indented.
+MacroBlockBegin: "^\
+BEGIN_MSG_MAP|\
+BEGIN_MSG_MAP_EX|\
+BEGIN_SAFE_MSG_MAP_EX|\
+CR_BEGIN_MSG_MAP_EX|\
+IPC_BEGIN_MESSAGE_MAP|\
+IPC_BEGIN_MESSAGE_MAP_WITH_PARAM|\
+IPC_PROTOBUF_MESSAGE_TRAITS_BEGIN|\
+IPC_STRUCT_BEGIN|\
+IPC_STRUCT_BEGIN_WITH_PARENT|\
+IPC_STRUCT_TRAITS_BEGIN|\
+POLPARAMS_BEGIN|\
+PPAPI_BEGIN_MESSAGE_MAP$"
+MacroBlockEnd: "^\
+CR_END_MSG_MAP|\
+END_MSG_MAP|\
+IPC_END_MESSAGE_MAP|\
+IPC_PROTOBUF_MESSAGE_TRAITS_END|\
+IPC_STRUCT_END|\
+IPC_STRUCT_TRAITS_END|\
+POLPARAMS_END|\
+PPAPI_END_MESSAGE_MAP$"
+
+# TODO: Remove this once clang-format r357700 is rolled in.
+JavaImportGroups: ['android', 'androidx', 'com', 'dalvik', 'junit', 'org', 'com.google.android.apps.chrome', 'org.chromium', 'java', 'javax']
diff --git a/ext/gs/README.md b/ext/gs/README.md
new file mode 100644
index 000000000..48fb88c7e
--- /dev/null
+++ b/ext/gs/README.md
@@ -0,0 +1,71 @@
+# Install the Google Cloud Storage dependencies.
+
+```
+sudo apt-get install \
+ cmake \
+ libcurl3-gnutls-dev \
+ libgrpc++-dev \
+ libprotobuf-dev \
+ protobuf-compiler-grpc
+```
+
+# Build the Google Cloud Storage library
+
+```
+git clone https://github.com/google/crc32c.git
+cd crc32c && git checkout -b 1.1.1
+mkdir build && cd build
+cmake .. \
+ -GNinja \
+ -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
+ -DCMAKE_INSTALL_LIBDIR:PATH=lib \
+ -DBUILD_SHARED_LIBS=YES \
+ -DCRC32C_USE_GLOG=NO \
+ -DCRC32C_BUILD_TESTS=NO \
+ -DCRC32C_BUILD_BENCHMARKS=NO
+ninja && ninja install
+
+git clone https://github.com/abseil/abseil-cpp.git
+git checkout master
+mkdir build && cd build
+cmake .. \
+ -GNinja \
+ -DBUILD_TESTING=NO \
+ -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
+ -DCMAKE_INSTALL_LIBDIR:PATH=lib \
+ -DBUILD_SHARED_LIBS=YES
+ninja && ninja install
+
+git clone https://github.com/googleapis/google-cloud-cpp.git
+git checkout -b v1.25.0
+mkdir build && cd build
+cmake .. \
+ -GNinja \
+ -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
+ -DCMAKE_INSTALL_LIBDIR:PATH=lib \
+ -DBUILD_SHARED_LIBS=YES \
+ -DBUILD_TESTING=NO \
+ -DGOOGLE_CLOUD_CPP_ENABLE=storage
+ninja && ninja install
+```
+
+# Running the gs elements locally
+
+When running from the command line or in a container running locally, simply
+set the credentials by exporting GOOGLE_APPLICATION_CREDENTIALS. If you are
+not familiar with this environment variable, check the documentation
+https://cloud.google.com/docs/authentication/getting-started
+Note that you can restrict a service account to the role Storage Admin or
+Storage Object Creator instead of the Project Owner role from the above
+documentation.
+
+# Running the gs elements in Google Cloud Run
+
+Add the Storage Object Viewer role to the service account assigned to the
+Cloud Run service where gssrc runs. For gssink add the role Storage Object
+Creator. Then just set the service-account-email property on the element.
+
+# Running the gs elements in Google Cloud Kubernetes
+
+You need to set GOOGLE_APPLICATION_CREDENTIALS in the container and ship the
+json file to which the environment variable points to.
diff --git a/ext/gs/gstgs.cpp b/ext/gs/gstgs.cpp
new file mode 100644
index 000000000..f5c9ddfaf
--- /dev/null
+++ b/ext/gs/gstgs.cpp
@@ -0,0 +1,56 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * plugin-gs:
+ *
+ * The gs plugin contains elements to interact with with Google Cloud Storage.
+ * In particular with the gs:// protocol or by specifying the storage bucket.
+ *
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstgssink.h"
+#include "gstgssrc.h"
+
+GST_DEBUG_CATEGORY (gst_gs_src_debug);
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ if (!gst_element_register (plugin, "gssrc", GST_RANK_NONE, GST_TYPE_GS_SRC))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK))
+ return FALSE;
+
+ return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ gs,
+ "Read and write from and to a Google Cloud Storage",
+ plugin_init,
+ PACKAGE_VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/ext/gs/gstgscommon.cpp b/ext/gs/gstgscommon.cpp
new file mode 100644
index 000000000..b0f576016
--- /dev/null
+++ b/ext/gs/gstgscommon.cpp
@@ -0,0 +1,134 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgscommon.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "gstgscommon.h"
+
+#include "google/cloud/storage/oauth2/compute_engine_credentials.h"
+
+namespace gcs = google::cloud::storage;
+
+namespace {
+
+#if !GLIB_CHECK_VERSION(2, 62, 0)
+ static inline gchar *
+ g_date_time_format_iso8601 (GDateTime * datetime)
+ {
+ GString *
+ outstr = NULL;
+ gchar *
+ main_date = NULL;
+ gint64 offset;
+
+ // Main date and time.
+ main_date = g_date_time_format (datetime, "%Y-%m-%dT%H:%M:%S");
+ outstr = g_string_new (main_date);
+ g_free (main_date);
+
+ // Timezone. Format it as `%:::z` unless the offset is zero, in which case
+ // we can simply use `Z`.
+ offset = g_date_time_get_utc_offset (datetime);
+
+ if (offset == 0) {
+ g_string_append_c (outstr, 'Z');
+ } else {
+ gchar *
+ time_zone = g_date_time_format (datetime, "%:::z");
+ g_string_append (outstr, time_zone);
+ g_free (time_zone);
+ }
+
+ return g_string_free (outstr, FALSE);
+ }
+#endif
+
+} // namespace
+
+std::unique_ptr <
+ google::cloud::storage::Client >
+gst_gs_create_client (const gchar * service_account_email, GError ** error)
+{
+ if (service_account_email) {
+ // Meant to be used from a container running in the Cloud.
+
+ google::cloud::StatusOr < std::shared_ptr <
+ gcs::oauth2::Credentials >> creds (std::make_shared <
+ gcs::oauth2::ComputeEngineCredentials <>> (service_account_email));
+ if (!creds) {
+ g_set_error (error, GST_RESOURCE_ERROR,
+ GST_RESOURCE_ERROR_NOT_AUTHORIZED,
+ "Could not retrieve credentials for the given service account %s (%s)",
+ service_account_email, creds.status ().message ().c_str ());
+ return nullptr;
+ }
+
+ gcs::ClientOptions client_options (std::move (creds.value ()));
+ return std::make_unique < gcs::Client > (client_options,
+ gcs::StrictIdempotencyPolicy ());
+ }
+ // Default account. This is meant to retrieve the credentials automatically
+ // using diffrent methods.
+ google::cloud::StatusOr < gcs::ClientOptions > client_options =
+ gcs::ClientOptions::CreateDefaultClientOptions ();
+
+ if (!client_options) {
+ g_set_error (error, GST_RESOURCE_ERROR,
+ GST_RESOURCE_ERROR_NOT_AUTHORIZED,
+ "Could not create default client options (%s)",
+ client_options.status ().message ().c_str ());
+ return nullptr;
+ }
+ return std::make_unique < gcs::Client > (client_options.value (),
+ gcs::StrictIdempotencyPolicy ());
+}
+
+gboolean
+gst_gs_get_buffer_date (GstBuffer * buffer, GDateTime * start_date,
+ gchar ** buffer_date_str_ptr)
+{
+ gchar *
+ buffer_date_str = NULL;
+ GstClockTime buffer_timestamp = GST_CLOCK_TIME_NONE;
+ GTimeSpan buffer_timespan = 0;
+
+ if (!buffer || !start_date)
+ return FALSE;
+
+ buffer_timestamp = GST_BUFFER_PTS (buffer);
+
+ // GTimeSpan is in micro seconds.
+ buffer_timespan = GST_TIME_AS_USECONDS (buffer_timestamp);
+
+ GDateTime *
+ buffer_date = g_date_time_add (start_date, buffer_timespan);
+ if (!buffer_date)
+ return FALSE;
+
+ buffer_date_str = g_date_time_format_iso8601 (buffer_date);
+ g_date_time_unref (buffer_date);
+
+ if (!buffer_date_str)
+ return FALSE;
+
+ if (buffer_date_str_ptr)
+ *buffer_date_str_ptr = buffer_date_str;
+
+ return TRUE;
+}
diff --git a/ext/gs/gstgscommon.h b/ext/gs/gstgscommon.h
new file mode 100644
index 000000000..8dd59f288
--- /dev/null
+++ b/ext/gs/gstgscommon.h
@@ -0,0 +1,39 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgscommon.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_GS_COMMON_H__
+#define __GST_GS_COMMON_H__
+
+#include <memory>
+
+#include <gst/gst.h>
+
+#include <google/cloud/storage/client.h>
+
+std::unique_ptr<google::cloud::storage::Client> gst_gs_create_client(
+ const gchar* service_account_email,
+ GError** error);
+
+gboolean gst_gs_get_buffer_date(GstBuffer* buffer,
+ GDateTime* start_date,
+ gchar** buffer_date_str_ptr);
+
+#endif // __GST_GS_COMMON_H__
diff --git a/ext/gs/gstgssink.cpp b/ext/gs/gstgssink.cpp
new file mode 100644
index 000000000..3fdb7b0f5
--- /dev/null
+++ b/ext/gs/gstgssink.cpp
@@ -0,0 +1,793 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssink.cpp:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:element-gssink
+ * @title: gssink
+ * @see_also: #GstGsSrc
+ *
+ * Write incoming data to a series of sequentially-named remote files on a
+ * Google Cloud Storage bucket.
+ *
+ * The object-name property should contain a string with a \%d placeholder
+ * that will be substituted with the index for each filename.
+ *
+ * If the #GstGsSink:post-messages property is %TRUE, it sends an application
+ * message named `GstGsSink` after writing each buffer.
+ *
+ * The message's structure contains these fields:
+ *
+ * * #gchararray `filename`: the filename where the buffer was written.
+ * * #gchararray `date`: the date of the current buffer, NULL if no start date
+ * is provided.
+ * * #gint `index`: index of the buffer.
+ * * #GstClockTime `timestamp`: the timestamp of the buffer.
+ * * #GstClockTime `stream-time`: the stream time of the buffer.
+ * * #GstClockTime `running-time`: the running_time of the buffer.
+ * * #GstClockTime `duration`: the duration of the buffer.
+ * * #guint64 `offset`: the offset of the buffer that triggered the message.
+ * * #guint64 `offset-end`: the offset-end of the buffer that triggered the
+ * message.
+ *
+ * ## Example launch line
+ * ```
+ * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
+ * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
+ * next-file=buffer post-messages=true
+ * ```
+ * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
+ * names are frame00000.png, frame00001.png, ..., frame00014.png
+ * ```
+ * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
+ * pngenc ! gssink start-date="2020-04-16T08:55:03Z"
+ * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
+ * next-file=buffer post-messages=true
+ * ```
+ * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
+ * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
+ * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
+ * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
+ * ```
+ * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
+ * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
+ * ```
+ * ### Upload any stream as a single file into Google Cloud Storage. Similar as
+ * filesink in this case. The file is then accessible from:
+ * gs://mybucket/mypath/myvideos/video.mp4
+ *
+ * See also: #GstGsSrc
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstgscommon.h"
+#include "gstgssink.h"
+
+#include <algorithm>
+
+static GstStaticPadTemplate sinktemplate =
+ GST_STATIC_PAD_TEMPLATE("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
+#define GST_CAT_DEFAULT gst_gs_sink_debug
+
+#define DEFAULT_INDEX 0
+#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
+#define DEFAULT_OBJECT_NAME "%s_%05d"
+#define DEFAULT_POST_MESSAGES FALSE
+
+namespace gcs = google::cloud::storage;
+
+enum {
+ PROP_0,
+ PROP_BUCKET_NAME,
+ PROP_OBJECT_NAME,
+ PROP_INDEX,
+ PROP_POST_MESSAGES,
+ PROP_NEXT_FILE,
+ PROP_SERVICE_ACCOUNT_EMAIL,
+ PROP_START_DATE,
+};
+
+class GSWriteStream;
+
+struct _GstGsSink {
+ GstBaseSink parent;
+
+ std::unique_ptr<google::cloud::storage::Client> gcs_client;
+ std::unique_ptr<GSWriteStream> gcs_stream;
+ gchar* service_account_email;
+ gchar* bucket_name;
+ gchar* object_name;
+ gchar* start_date_str;
+ GDateTime* start_date;
+ gint index;
+ gboolean post_messages;
+ GstGsSinkNext next_file;
+ const gchar* content_type;
+ size_t nb_percent_format;
+ gboolean percent_s_is_first;
+};
+
+static void gst_gs_sink_finalize(GObject* object);
+
+static void gst_gs_sink_set_property(GObject* object,
+ guint prop_id,
+ const GValue* value,
+ GParamSpec* pspec);
+static void gst_gs_sink_get_property(GObject* object,
+ guint prop_id,
+ GValue* value,
+ GParamSpec* pspec);
+
+static gboolean gst_gs_sink_start(GstBaseSink* bsink);
+static gboolean gst_gs_sink_stop(GstBaseSink* sink);
+static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
+static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
+ GstBufferList* buffer_list);
+static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
+static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
+
+#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
+static GType gst_gs_sink_next_get_type(void) {
+ static GType gs_sink_next_type = 0;
+ static const GEnumValue next_types[] = {
+ {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
+ {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
+ {0, NULL, NULL}};
+
+ if (!gs_sink_next_type) {
+ gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
+ }
+
+ return gs_sink_next_type;
+}
+
+#define gst_gs_sink_parent_class parent_class
+G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
+
+class GSWriteStream {
+ public:
+ GSWriteStream(google::cloud::storage::Client& client,
+ const char* bucket_name,
+ const char* object_name,
+ const char* content_type)
+ : gcs_stream_(client.WriteObject(
+ bucket_name,
+ object_name,
+ gcs::WithObjectMetadata(
+ gcs::ObjectMetadata().set_content_type(content_type)))) {}
+ ~GSWriteStream() { gcs_stream_.Close(); }
+
+ gcs::ObjectWriteStream& stream() { return gcs_stream_; }
+
+ private:
+ gcs::ObjectWriteStream gcs_stream_;
+};
+
+static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
+ GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
+ GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
+ GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
+
+ gobject_class->set_property = gst_gs_sink_set_property;
+ gobject_class->get_property = gst_gs_sink_get_property;
+
+ /**
+ * GstGsSink:bucket-name:
+ *
+ * Name of the Google Cloud Storage bucket.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_BUCKET_NAME,
+ g_param_spec_string(
+ "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
+ NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+ /**
+ * GstGsSink:object-name:
+ *
+ * Full path name of the remote file.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_OBJECT_NAME,
+ g_param_spec_string(
+ "object-name", "Object Name", "Full path name of the remote file",
+ DEFAULT_OBJECT_NAME,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+ /**
+ * GstGsSink:index:
+ *
+ * Index to use with location property to create file names.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_INDEX,
+ g_param_spec_int(
+ "index", "Index",
+ "Index to use with location property to create file names. The "
+ "index is incremented by one for each buffer written.",
+ 0, G_MAXINT, DEFAULT_INDEX,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+ /**
+ * GstGsSink:post-messages:
+ *
+ * Post a message on the GstBus for each file.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_POST_MESSAGES,
+ g_param_spec_boolean(
+ "post-messages", "Post Messages",
+ "Post a message for each file with information of the buffer",
+ DEFAULT_POST_MESSAGES,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+ /**
+ * GstGsSink:next-file:
+ *
+ * A #GstGsSinkNext that specifies when to start a new file.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_NEXT_FILE,
+ g_param_spec_enum(
+ "next-file", "Next File", "When to start a new file",
+ GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+ /**
+ * GstGsSink:service-account-email:
+ *
+ * Service Account Email to use for credentials.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
+ g_param_spec_string(
+ "service-account-email", "Service Account Email",
+ "Service Account Email to use for credentials", NULL,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_MUTABLE_READY)));
+
+ /**
+ * GstGsSink:start-date:
+ *
+ * Start date in iso8601 format.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_START_DATE,
+ g_param_spec_string(
+ "start-date", "Start Date", "Start date in iso8601 format", NULL,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_MUTABLE_READY)));
+
+ gobject_class->finalize = gst_gs_sink_finalize;
+
+ gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
+ gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
+ gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
+ gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
+ gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
+ gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
+
+ GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
+
+ gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
+ gst_element_class_set_static_metadata(
+ gstelement_class, "Google Cloud Storage Sink", "Sink/File",
+ "Write buffers to a sequentially named set of files on Google Cloud "
+ "Storage",
+ "Julien Isorce <jisorce@oblong.com>");
+}
+
+static void gst_gs_sink_init(GstGsSink* sink) {
+ sink->gcs_client = nullptr;
+ sink->gcs_stream = nullptr;
+ sink->index = DEFAULT_INDEX;
+ sink->post_messages = DEFAULT_POST_MESSAGES;
+ sink->service_account_email = NULL;
+ sink->bucket_name = NULL;
+ sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
+ sink->start_date_str = NULL;
+ sink->start_date = NULL;
+ sink->next_file = DEFAULT_NEXT_FILE;
+ sink->content_type = NULL;
+ sink->nb_percent_format = 0;
+ sink->percent_s_is_first = FALSE;
+
+ gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
+}
+
+static void gst_gs_sink_finalize(GObject* object) {
+ GstGsSink* sink = GST_GS_SINK(object);
+
+ sink->gcs_client = nullptr;
+ sink->gcs_stream = nullptr;
+ g_free(sink->service_account_email);
+ sink->service_account_email = NULL;
+ g_free(sink->bucket_name);
+ sink->bucket_name = NULL;
+ g_free(sink->object_name);
+ sink->object_name = NULL;
+ g_free(sink->start_date_str);
+ sink->start_date_str = NULL;
+ if (sink->start_date) {
+ g_date_time_unref(sink->start_date);
+ sink->start_date = NULL;
+ }
+ sink->content_type = NULL;
+
+ G_OBJECT_CLASS(parent_class)->finalize(object);
+}
+
+static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
+ const gchar* object_name) {
+ g_free(sink->object_name);
+ sink->object_name = NULL;
+ sink->nb_percent_format = 0;
+ sink->percent_s_is_first = FALSE;
+
+ if (!object_name) {
+ GST_ERROR_OBJECT(sink, "Object name is null");
+ return FALSE;
+ }
+
+ const std::string name(object_name);
+ sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
+ if (sink->nb_percent_format > 2) {
+ GST_ERROR_OBJECT(sink, "Object name has too many formats");
+ return FALSE;
+ }
+
+ const size_t delimiter_percent_s = name.find("%s");
+ if (delimiter_percent_s == std::string::npos) {
+ if (sink->nb_percent_format == 2) {
+ GST_ERROR_OBJECT(sink, "Object name must have just one number format");
+ return FALSE;
+ }
+ sink->object_name = g_strdup(object_name);
+ return TRUE;
+ }
+
+ const size_t delimiter_percent = name.find_first_of('%');
+ if (delimiter_percent_s == delimiter_percent) {
+ sink->percent_s_is_first = TRUE;
+
+ if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
+ GST_ERROR_OBJECT(sink, "Object name expect max one string format");
+ return FALSE;
+ }
+ }
+
+ sink->object_name = g_strdup(object_name);
+
+ return TRUE;
+}
+
+static void gst_gs_sink_set_property(GObject* object,
+ guint prop_id,
+ const GValue* value,
+ GParamSpec* pspec) {
+ GstGsSink* sink = GST_GS_SINK(object);
+
+ switch (prop_id) {
+ case PROP_BUCKET_NAME:
+ g_free(sink->bucket_name);
+ sink->bucket_name = g_strdup(g_value_get_string(value));
+ break;
+ case PROP_OBJECT_NAME:
+ gst_gs_sink_set_object_name(sink, g_value_get_string(value));
+ break;
+ case PROP_INDEX:
+ sink->index = g_value_get_int(value);
+ break;
+ case PROP_POST_MESSAGES:
+ sink->post_messages = g_value_get_boolean(value);
+ break;
+ case PROP_NEXT_FILE:
+ sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
+ break;
+ case PROP_SERVICE_ACCOUNT_EMAIL:
+ g_free(sink->service_account_email);
+ sink->service_account_email = g_strdup(g_value_get_string(value));
+ break;
+ case PROP_START_DATE:
+ g_free(sink->start_date_str);
+ if (sink->start_date)
+ g_date_time_unref(sink->start_date);
+ sink->start_date_str = g_strdup(g_value_get_string(value));
+ sink->start_date =
+ g_date_time_new_from_iso8601(sink->start_date_str, NULL);
+ if (!sink->start_date) {
+ GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
+ sink->start_date_str);
+ g_free(sink->start_date_str);
+ sink->start_date_str = NULL;
+ }
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void gst_gs_sink_get_property(GObject* object,
+ guint prop_id,
+ GValue* value,
+ GParamSpec* pspec) {
+ GstGsSink* sink = GST_GS_SINK(object);
+
+ switch (prop_id) {
+ case PROP_BUCKET_NAME:
+ g_value_set_string(value, sink->bucket_name);
+ break;
+ case PROP_OBJECT_NAME:
+ g_value_set_string(value, sink->object_name);
+ break;
+ case PROP_INDEX:
+ g_value_set_int(value, sink->index);
+ break;
+ case PROP_POST_MESSAGES:
+ g_value_set_boolean(value, sink->post_messages);
+ break;
+ case PROP_NEXT_FILE:
+ g_value_set_enum(value, sink->next_file);
+ break;
+ case PROP_SERVICE_ACCOUNT_EMAIL:
+ g_value_set_string(value, sink->service_account_email);
+ break;
+ case PROP_START_DATE:
+ g_value_set_string(value, sink->start_date_str);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
+ GstGsSink* sink = GST_GS_SINK(bsink);
+ GError* err = NULL;
+
+ if (!sink->bucket_name) {
+ GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
+ GST_ERROR_SYSTEM);
+ return FALSE;
+ }
+
+ if (!sink->object_name) {
+ GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
+ GST_ERROR_SYSTEM);
+ return FALSE;
+ }
+
+ sink->content_type = "";
+
+ sink->gcs_client = gst_gs_create_client(sink->service_account_email, &err);
+ if (err) {
+ GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
+ ("Could not create client (%s)", err->message),
+ GST_ERROR_SYSTEM);
+ g_clear_error(&err);
+ return FALSE;
+ }
+
+ GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
+ sink->bucket_name, sink->object_name);
+
+ return TRUE;
+}
+
+static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
+ GstGsSink* sink = GST_GS_SINK(bsink);
+
+ sink->gcs_client = nullptr;
+ sink->gcs_stream = nullptr;
+ sink->content_type = NULL;
+
+ return TRUE;
+}
+
+static void gst_gs_sink_post_message_full(GstGsSink* sink,
+ GstClockTime timestamp,
+ GstClockTime duration,
+ GstClockTime offset,
+ GstClockTime offset_end,
+ GstClockTime running_time,
+ GstClockTime stream_time,
+ const char* filename,
+ const gchar* date) {
+ GstStructure* s;
+
+ if (!sink->post_messages)
+ return;
+
+ s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
+ "date", G_TYPE_STRING, date, "index", G_TYPE_INT,
+ sink->index, "timestamp", G_TYPE_UINT64, timestamp,
+ "stream-time", G_TYPE_UINT64, stream_time,
+ "running-time", G_TYPE_UINT64, running_time, "duration",
+ G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
+ offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
+
+ gst_element_post_message(GST_ELEMENT_CAST(sink),
+ gst_message_new_element(GST_OBJECT_CAST(sink), s));
+}
+
+static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
+ GstClockTime timestamp,
+ GstClockTime duration,
+ const char* filename) {
+ GstClockTime running_time, stream_time;
+ guint64 offset, offset_end;
+ GstSegment* segment;
+ GstFormat format;
+
+ if (!sink->post_messages)
+ return;
+
+ segment = &GST_BASE_SINK(sink)->segment;
+ format = segment->format;
+
+ offset = -1;
+ offset_end = -1;
+
+ running_time = gst_segment_to_running_time(segment, format, timestamp);
+ stream_time = gst_segment_to_stream_time(segment, format, timestamp);
+
+ gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
+ running_time, stream_time, filename, NULL);
+}
+
+static void gst_gs_sink_post_message(GstGsSink* sink,
+ GstBuffer* buffer,
+ const char* filename,
+ const char* date) {
+ GstClockTime duration, timestamp;
+ GstClockTime running_time, stream_time;
+ guint64 offset, offset_end;
+ GstSegment* segment;
+ GstFormat format;
+
+ if (!sink->post_messages)
+ return;
+
+ segment = &GST_BASE_SINK(sink)->segment;
+ format = segment->format;
+
+ timestamp = GST_BUFFER_PTS(buffer);
+ duration = GST_BUFFER_DURATION(buffer);
+ offset = GST_BUFFER_OFFSET(buffer);
+ offset_end = GST_BUFFER_OFFSET_END(buffer);
+
+ running_time = gst_segment_to_running_time(segment, format, timestamp);
+ stream_time = gst_segment_to_stream_time(segment, format, timestamp);
+
+ gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
+ running_time, stream_time, filename, date);
+}
+
+static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
+ GstBuffer* buffer) {
+ GstMapInfo map = {0};
+ gchar* object_name = NULL;
+ gchar* buffer_date = NULL;
+
+ if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
+ return GST_FLOW_ERROR;
+
+ switch (sink->next_file) {
+ case GST_GS_SINK_NEXT_BUFFER: {
+ // Get buffer date if needed.
+ if (sink->start_date) {
+ if (sink->nb_percent_format != 2) {
+ GST_ERROR_OBJECT(sink, "Object name expects date and index");
+ gst_buffer_unmap(buffer, &map);
+ return GST_FLOW_ERROR;
+ }
+
+ if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
+ GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
+ gst_buffer_unmap(buffer, &map);
+ return GST_FLOW_ERROR;
+ }
+
+ if (sink->percent_s_is_first) {
+ object_name =
+ g_strdup_printf(sink->object_name, buffer_date, sink->index);
+ } else {
+ object_name =
+ g_strdup_printf(sink->object_name, sink->index, buffer_date);
+ }
+ } else {
+ if (sink->nb_percent_format != 1) {
+ GST_ERROR_OBJECT(sink, "Object name expects only an index");
+ gst_buffer_unmap(buffer, &map);
+ return GST_FLOW_ERROR;
+ }
+
+ object_name = g_strdup_printf(sink->object_name, sink->index);
+ }
+
+ GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
+
+ gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
+ sink->bucket_name, object_name,
+ gcs::WithObjectMetadata(
+ gcs::ObjectMetadata().set_content_type(sink->content_type)));
+ gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
+ if (gcs_stream.fail()) {
+ GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
+ }
+ gcs_stream.Close();
+
+ google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
+ sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
+ if (!object_metadata) {
+ GST_ERROR_OBJECT(
+ sink, "Could not get object metadata for object %s (%s)",
+ object_name, object_metadata.status().message().c_str());
+ gst_buffer_unmap(buffer, &map);
+ g_free(object_name);
+ g_free(buffer_date);
+ return GST_FLOW_ERROR;
+ }
+
+ GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
+ object_name, object_metadata->size());
+
+ gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
+ g_free(object_name);
+ g_free(buffer_date);
+ ++sink->index;
+ break;
+ }
+ case GST_GS_SINK_NEXT_NONE: {
+ if (!sink->gcs_stream) {
+ GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
+ sink->gcs_stream = std::make_unique<GSWriteStream>(
+ *sink->gcs_client.get(), sink->bucket_name, sink->object_name,
+ sink->content_type);
+
+ if (!sink->gcs_stream->stream().IsOpen()) {
+ GST_ELEMENT_ERROR(
+ sink, RESOURCE, OPEN_READ,
+ ("Could not create write stream (%s)",
+ sink->gcs_stream->stream().last_status().message().c_str()),
+ GST_ERROR_SYSTEM);
+ gst_buffer_unmap(buffer, &map);
+ return GST_FLOW_OK;
+ }
+ }
+
+ GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
+
+ gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
+ stream.write(reinterpret_cast<const char*>(map.data), map.size);
+ if (stream.fail()) {
+ GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
+ }
+ break;
+ }
+ default:
+ g_assert_not_reached();
+ }
+
+ gst_buffer_unmap(buffer, &map);
+ return GST_FLOW_OK;
+}
+
+static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
+ GstGsSink* sink = GST_GS_SINK(bsink);
+ GstFlowReturn flow = GST_FLOW_OK;
+
+ flow = gst_gs_sink_write_buffer(sink, buffer);
+ return flow;
+}
+
+static gboolean buffer_list_copy_data(GstBuffer** buf,
+ guint idx,
+ gpointer data) {
+ GstBuffer* dest = GST_BUFFER_CAST(data);
+ guint num, i;
+
+ if (idx == 0)
+ gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
+
+ num = gst_buffer_n_memory(*buf);
+ for (i = 0; i < num; ++i) {
+ GstMemory* mem;
+
+ mem = gst_buffer_get_memory(*buf, i);
+ gst_buffer_append_memory(dest, mem);
+ }
+
+ return TRUE;
+}
+
+/* Our assumption for now is that the buffers in a buffer list should always
+ * end up in the same file. If someone wants different behaviour, they'll just
+ * have to add a property for that. */
+static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
+ GstBufferList* list) {
+ GstBuffer* buf;
+ guint size;
+
+ size = gst_buffer_list_calculate_size(list);
+ GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
+
+ /* copy all buffers in the list into one single buffer, so we can use
+ * the normal render function (FIXME: optimise to avoid the memcpy) */
+ buf = gst_buffer_new();
+ gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
+ g_assert(gst_buffer_get_size(buf) == size);
+
+ gst_gs_sink_render(sink, buf);
+ gst_buffer_unref(buf);
+
+ return GST_FLOW_OK;
+}
+
+static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
+ GstGsSink* sink = GST_GS_SINK(bsink);
+ GstStructure* s = gst_caps_get_structure(caps, 0);
+
+ sink->content_type = gst_structure_get_name(s);
+
+ GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
+
+ return TRUE;
+}
+
+static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
+ GstGsSink* sink = GST_GS_SINK(bsink);
+
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_EOS:
+ if (sink->gcs_stream) {
+ sink->gcs_stream = nullptr;
+ gst_gs_sink_post_message_from_time(
+ sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
+ }
+ break;
+ default:
+ break;
+ }
+
+ return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
+}
diff --git a/ext/gs/gstgssink.h b/ext/gs/gstgssink.h
new file mode 100644
index 000000000..e93ac4b1f
--- /dev/null
+++ b/ext/gs/gstgssink.h
@@ -0,0 +1,47 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssink.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_GS_SINK_H__
+#define __GST_GS_SINK_H__
+
+#include <gst/base/base.h>
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_GS_SINK (gst_gs_sink_get_type())
+G_DECLARE_FINAL_TYPE(GstGsSink, gst_gs_sink, GST, GS_SINK, GstBaseSink)
+
+/**
+ * GstGsSinkNext:
+ * @GST_GS_SINK_NEXT_BUFFER: New file for each buffer.
+ * @GST_GS_SINK_NEXT_NONE: Only one file like filesink.
+ *
+ * File splitting modes.
+ * Since: 1.20
+ */
+typedef enum {
+ GST_GS_SINK_NEXT_BUFFER,
+ GST_GS_SINK_NEXT_NONE,
+} GstGsSinkNext;
+
+G_END_DECLS
+#endif // __GST_GS_SINK_H__
diff --git a/ext/gs/gstgssrc.cpp b/ext/gs/gstgssrc.cpp
new file mode 100644
index 000000000..c37640bd6
--- /dev/null
+++ b/ext/gs/gstgssrc.cpp
@@ -0,0 +1,578 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-gssrc
+ * @title: gssrc
+ * @see_also: #GstGsSrc
+ *
+ * Read data from a file in a Google Cloud Storage.
+ *
+ * ## Example launch line
+ * ```
+ * gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin !
+ * glimagesink
+ * ```
+ * ### Play a video from a Google Cloud Storage.
+ * ```
+ * gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin ! navseek
+ * seek-offset=10 ! glimagesink
+ * ```
+ * ### Play a video from a Google Cloud Storage and seek using the keyboard
+ * from the terminal.
+ *
+ * See also: #GstGsSink
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstgscommon.h"
+#include "gstgssrc.h"
+
+static GstStaticPadTemplate srctemplate =
+ GST_STATIC_PAD_TEMPLATE("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC(gst_gs_src_debug);
+#define GST_CAT_DEFAULT gst_gs_src_debug
+
+enum { LAST_SIGNAL };
+
+#define DEFAULT_BLOCKSIZE 4 * 1024
+
+enum { PROP_0, PROP_LOCATION, PROP_SERVICE_ACCOUNT_EMAIL };
+
+class GSReadStream;
+
+struct _GstGsSrc {
+ GstBaseSrc parent;
+
+ std::unique_ptr<google::cloud::storage::Client> gcs_client;
+ std::unique_ptr<GSReadStream> gcs_stream;
+ gchar* uri;
+ gchar* service_account_email;
+ std::string bucket_name;
+ std::string object_name;
+ guint64 read_position;
+ guint64 object_size;
+};
+
+static void gst_gs_src_finalize(GObject* object);
+
+static void gst_gs_src_set_property(GObject* object,
+ guint prop_id,
+ const GValue* value,
+ GParamSpec* pspec);
+static void gst_gs_src_get_property(GObject* object,
+ guint prop_id,
+ GValue* value,
+ GParamSpec* pspec);
+
+static gboolean gst_gs_src_start(GstBaseSrc* basesrc);
+static gboolean gst_gs_src_stop(GstBaseSrc* basesrc);
+
+static gboolean gst_gs_src_is_seekable(GstBaseSrc* src);
+static gboolean gst_gs_src_get_size(GstBaseSrc* src, guint64* size);
+static GstFlowReturn gst_gs_src_fill(GstBaseSrc* src,
+ guint64 offset,
+ guint length,
+ GstBuffer* buf);
+static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query);
+
+static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data);
+
+#define _do_init \
+ G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, gst_gs_src_uri_handler_init); \
+ GST_DEBUG_CATEGORY_INIT(gst_gs_src_debug, "gssrc", 0, "gssrc element");
+#define gst_gs_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE(GstGsSrc, gst_gs_src, GST_TYPE_BASE_SRC, _do_init);
+
+namespace gcs = google::cloud::storage;
+
+class GSReadStream {
+ public:
+ GSReadStream(GstGsSrc* src,
+ const std::int64_t start = 0,
+ const std::int64_t end = -1)
+ : gcs_stream_(src->gcs_client->ReadObject(src->bucket_name,
+ src->object_name,
+ gcs::ReadFromOffset(start))) {}
+ ~GSReadStream() { gcs_stream_.Close(); }
+
+ gcs::ObjectReadStream& stream() { return gcs_stream_; }
+
+ private:
+ gcs::ObjectReadStream gcs_stream_;
+};
+
+static void gst_gs_src_class_init(GstGsSrcClass* klass) {
+ GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
+ GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
+ GstBaseSrcClass* gstbasesrc_class = GST_BASE_SRC_CLASS(klass);
+
+ gobject_class->set_property = gst_gs_src_set_property;
+ gobject_class->get_property = gst_gs_src_get_property;
+
+ /**
+ * GstGsSink:location:
+ *
+ * Name of the Google Cloud Storage bucket.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_LOCATION,
+ g_param_spec_string(
+ "location", "File Location", "Location of the file to read", NULL,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_MUTABLE_READY)));
+
+ /**
+ * GstGsSrc:service-account-email:
+ *
+ * Service Account Email to use for credentials.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property(
+ gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
+ g_param_spec_string(
+ "service-account-email", "Service Account Email",
+ "Service Account Email to use for credentials", NULL,
+ (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_MUTABLE_READY)));
+
+ gobject_class->finalize = gst_gs_src_finalize;
+
+ gst_element_class_set_static_metadata(
+ gstelement_class, "Google Cloud Storage Source", "Source/File",
+ "Read from arbitrary point from a file in a Google Cloud Storage",
+ "Julien Isorce <jisorce@oblong.com>");
+ gst_element_class_add_static_pad_template(gstelement_class, &srctemplate);
+
+ gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_gs_src_start);
+ gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_gs_src_stop);
+ gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR(gst_gs_src_is_seekable);
+ gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR(gst_gs_src_get_size);
+ gstbasesrc_class->fill = GST_DEBUG_FUNCPTR(gst_gs_src_fill);
+ gstbasesrc_class->query = GST_DEBUG_FUNCPTR(gst_gs_src_query);
+}
+
+static void gst_gs_src_init(GstGsSrc* src) {
+ src->gcs_stream = nullptr;
+ src->uri = NULL;
+ src->service_account_email = NULL;
+ src->read_position = 0;
+ src->object_size = 0;
+
+ gst_base_src_set_blocksize(GST_BASE_SRC(src), DEFAULT_BLOCKSIZE);
+ gst_base_src_set_dynamic_size(GST_BASE_SRC(src), FALSE);
+ gst_base_src_set_live(GST_BASE_SRC(src), FALSE);
+}
+
+static void gst_gs_src_finalize(GObject* object) {
+ GstGsSrc* src = GST_GS_SRC(object);
+
+ g_free(src->uri);
+ src->uri = NULL;
+ g_free(src->service_account_email);
+ src->service_account_email = NULL;
+ src->read_position = 0;
+ src->object_size = 0;
+
+ G_OBJECT_CLASS(parent_class)->finalize(object);
+}
+
+static gboolean gst_gs_src_set_location(GstGsSrc* src,
+ const gchar* location,
+ GError** err) {
+ GstState state = GST_STATE_NULL;
+ std::string filepath = location;
+ size_t delimiter = std::string::npos;
+
+ // The element must be stopped in order to do this.
+ GST_OBJECT_LOCK(src);
+ state = GST_STATE(src);
+ if (state != GST_STATE_READY && state != GST_STATE_NULL)
+ goto wrong_state;
+ GST_OBJECT_UNLOCK(src);
+
+ g_free(src->uri);
+ src->uri = NULL;
+
+ if (location) {
+ if (g_str_has_prefix(location, "gs://")) {
+ src->uri = g_strdup(location);
+ filepath = filepath.substr(5);
+ } else {
+ src->uri = g_strdup_printf("gs://%s", location);
+ filepath = location;
+ }
+
+ delimiter = filepath.find_first_of('/');
+ if (delimiter == std::string::npos)
+ goto wrong_location;
+
+ std::string bucket_name = filepath.substr(0, delimiter);
+ src->bucket_name = bucket_name;
+ src->object_name = filepath.substr(delimiter + 1);
+
+ GST_INFO_OBJECT(src, "uri is %s", src->uri);
+ GST_INFO_OBJECT(src, "bucket name is %s", src->bucket_name.c_str());
+ GST_INFO_OBJECT(src, "object name is %s", src->object_name.c_str());
+ }
+ g_object_notify(G_OBJECT(src), "location");
+
+ return TRUE;
+
+ // ERROR.
+wrong_state : {
+ g_warning(
+ "Changing the `location' property on gssrc when a file is open"
+ "is not supported.");
+ if (err)
+ g_set_error(
+ err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
+ "Changing the `location' property on gssrc when a file is open is "
+ "not supported.");
+ GST_OBJECT_UNLOCK(src);
+ return FALSE;
+}
+wrong_location : {
+ if (err)
+ g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+ "Failed to find a bucket name");
+ GST_OBJECT_UNLOCK(src);
+ return FALSE;
+}
+}
+
+static gboolean gst_gs_src_set_service_account_email(
+ GstGsSrc* src,
+ const gchar* service_account_email) {
+ if (GST_STATE(src) == GST_STATE_PLAYING ||
+ GST_STATE(src) == GST_STATE_PAUSED) {
+ GST_WARNING_OBJECT(src,
+ "Setting a new service account email not supported in "
+ "PLAYING or PAUSED state");
+ return FALSE;
+ }
+
+ GST_OBJECT_LOCK(src);
+ g_free(src->service_account_email);
+ src->service_account_email = NULL;
+
+ if (service_account_email)
+ src->service_account_email = g_strdup(service_account_email);
+
+ GST_OBJECT_UNLOCK(src);
+
+ return TRUE;
+}
+
+static void gst_gs_src_set_property(GObject* object,
+ guint prop_id,
+ const GValue* value,
+ GParamSpec* pspec) {
+ GstGsSrc* src = GST_GS_SRC(object);
+
+ g_return_if_fail(GST_IS_GS_SRC(object));
+
+ switch (prop_id) {
+ case PROP_LOCATION:
+ gst_gs_src_set_location(src, g_value_get_string(value), NULL);
+ break;
+ case PROP_SERVICE_ACCOUNT_EMAIL:
+ gst_gs_src_set_service_account_email(src, g_value_get_string(value));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void gst_gs_src_get_property(GObject* object,
+ guint prop_id,
+ GValue* value,
+ GParamSpec* pspec) {
+ GstGsSrc* src = GST_GS_SRC(object);
+
+ g_return_if_fail(GST_IS_GS_SRC(object));
+
+ switch (prop_id) {
+ case PROP_LOCATION:
+ GST_OBJECT_LOCK(src);
+ g_value_set_string(value, src->uri);
+ GST_OBJECT_UNLOCK(src);
+ break;
+ case PROP_SERVICE_ACCOUNT_EMAIL:
+ GST_OBJECT_LOCK(src);
+ g_value_set_string(value, src->service_account_email);
+ GST_OBJECT_UNLOCK(src);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static gint gst_gs_read_stream(GstGsSrc* src,
+ guint8* data,
+ const guint64 offset,
+ const guint length) {
+ gint gcount = 0;
+ gchar* sdata = reinterpret_cast<gchar*>(data);
+
+ gcs::ObjectReadStream& stream = src->gcs_stream->stream();
+
+ while (!stream.eof()) {
+ stream.read(sdata, length);
+ if (stream.status().ok())
+ break;
+
+ GST_ERROR_OBJECT(src, "Restart after (%s)",
+ stream.status().message().c_str());
+ src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
+ }
+
+ gcount = stream.gcount();
+
+ GST_INFO_OBJECT(src, "Client read %d bytes", gcount);
+
+ return gcount;
+}
+
+static GstFlowReturn gst_gs_src_fill(GstBaseSrc* basesrc,
+ guint64 offset,
+ guint length,
+ GstBuffer* buf) {
+ GstGsSrc* src = GST_GS_SRC(basesrc);
+ guint to_read = 0;
+ guint bytes_read = 0;
+ gint ret = 0;
+ GstMapInfo info = {};
+ guint8* data = NULL;
+
+ if (G_UNLIKELY(offset != (guint64)-1 && src->read_position != offset)) {
+ src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
+ src->read_position = offset;
+ }
+
+ if (!gst_buffer_map(buf, &info, GST_MAP_WRITE))
+ goto buffer_write_fail;
+
+ data = info.data;
+
+ bytes_read = 0;
+ to_read = length;
+ while (to_read > 0) {
+ GST_INFO_OBJECT(src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x",
+ to_read, offset + bytes_read);
+
+ ret = gst_gs_read_stream(src, data + bytes_read, offset, to_read);
+ if (G_UNLIKELY(ret < 0))
+ goto could_not_read;
+
+ if (G_UNLIKELY(ret == 0)) {
+ // Push any remaining data.
+ if (bytes_read > 0)
+ break;
+ goto eos;
+ }
+
+ to_read -= ret;
+ bytes_read += ret;
+
+ src->read_position += ret;
+ }
+
+ GST_INFO_OBJECT(
+ src, "Read %" G_GUINT32_FORMAT " bytes of %" G_GUINT32_FORMAT " length",
+ bytes_read, length);
+
+ gst_buffer_unmap(buf, &info);
+ if (bytes_read != length)
+ gst_buffer_resize(buf, 0, bytes_read);
+
+ GST_BUFFER_OFFSET(buf) = offset;
+ GST_BUFFER_OFFSET_END(buf) = offset + bytes_read;
+
+ return GST_FLOW_OK;
+
+ // ERROR.
+could_not_read : {
+ GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ gst_buffer_unmap(buf, &info);
+ gst_buffer_resize(buf, 0, 0);
+ return GST_FLOW_ERROR;
+}
+eos : {
+ GST_INFO_OBJECT(src, "EOS");
+ gst_buffer_unmap(buf, &info);
+ gst_buffer_resize(buf, 0, 0);
+ return GST_FLOW_EOS;
+}
+buffer_write_fail : {
+ GST_ELEMENT_ERROR(src, RESOURCE, WRITE, (NULL), ("Can't write to buffer"));
+ return GST_FLOW_ERROR;
+}
+}
+
+static gboolean gst_gs_src_is_seekable(GstBaseSrc* basesrc) {
+ return TRUE;
+}
+
+static gboolean gst_gs_src_get_size(GstBaseSrc* basesrc, guint64* size) {
+ GstGsSrc* src = GST_GS_SRC(basesrc);
+
+ *size = src->object_size;
+
+ return TRUE;
+}
+
+static gboolean gst_gs_src_start(GstBaseSrc* basesrc) {
+ GstGsSrc* src = GST_GS_SRC(basesrc);
+ GError* err = NULL;
+ guint blocksize = 0;
+
+ src->read_position = 0;
+ src->object_size = 0;
+
+ if (src->uri == NULL || src->uri[0] == '\0') {
+ GST_ELEMENT_ERROR(src, RESOURCE, NOT_FOUND,
+ ("No uri specified for reading."), (NULL));
+ return FALSE;
+ }
+
+ GST_INFO_OBJECT(src, "Opening file %s", src->uri);
+
+ src->gcs_client = gst_gs_create_client(src->service_account_email, &err);
+ if (err) {
+ GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
+ ("Could not create client (%s)", err->message),
+ GST_ERROR_SYSTEM);
+ g_clear_error(&err);
+ return FALSE;
+ }
+
+ GST_INFO_OBJECT(src, "Parsed bucket name (%s) and object name (%s)",
+ src->bucket_name.c_str(), src->object_name.c_str());
+
+ google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
+ src->gcs_client->GetObjectMetadata(src->bucket_name, src->object_name);
+ if (!object_metadata) {
+ GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
+ ("Could not get object metadata (%s)",
+ object_metadata.status().message().c_str()),
+ GST_ERROR_SYSTEM);
+ return FALSE;
+ }
+
+ src->object_size = object_metadata->size();
+
+ GST_INFO_OBJECT(src, "Object size %" G_GUINT64_FORMAT "\n", src->object_size);
+
+ src->gcs_stream = std::make_unique<GSReadStream>(src);
+
+ blocksize = gcs::ClientOptions(nullptr).download_buffer_size();
+
+ GST_INFO_OBJECT(src, "Set blocksize to %" G_GUINT32_FORMAT, blocksize);
+
+ gst_base_src_set_blocksize(GST_BASE_SRC(src), blocksize);
+
+ return TRUE;
+}
+
+static gboolean gst_gs_src_stop(GstBaseSrc* basesrc) {
+ GstGsSrc* src = GST_GS_SRC(basesrc);
+
+ src->gcs_stream = nullptr;
+ src->read_position = 0;
+ src->object_size = 0;
+
+ return TRUE;
+}
+
+static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query) {
+ gboolean ret;
+
+ switch (GST_QUERY_TYPE(query)) {
+ case GST_QUERY_SCHEDULING: {
+ // A pushsrc can by default never operate in pull mode override
+ // if you want something different.
+ gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEQUENTIAL, 1, -1, 0);
+ gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
+
+ ret = TRUE;
+ break;
+ }
+ default:
+ ret = GST_BASE_SRC_CLASS(parent_class)->query(src, query);
+ break;
+ }
+ return ret;
+}
+
+static GstURIType gst_gs_src_uri_get_type(GType type) {
+ return GST_URI_SRC;
+}
+
+static const gchar* const* gst_gs_src_uri_get_protocols(GType type) {
+ static const gchar* protocols[] = {"gs", NULL};
+
+ return protocols;
+}
+
+static gchar* gst_gs_src_uri_get_uri(GstURIHandler* handler) {
+ GstGsSrc* src = GST_GS_SRC(handler);
+
+ return g_strdup(src->uri);
+}
+
+static gboolean gst_gs_src_uri_set_uri(GstURIHandler* handler,
+ const gchar* uri,
+ GError** err) {
+ GstGsSrc* src = GST_GS_SRC(handler);
+
+ if (strcmp(uri, "gs://") == 0) {
+ // Special case for "gs://" as this is used by some applications
+ // to test with gst_element_make_from_uri if there's an element
+ // that supports the URI protocol.
+ gst_gs_src_set_location(src, NULL, NULL);
+ return TRUE;
+ }
+
+ return gst_gs_src_set_location(src, uri, err);
+}
+
+static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data) {
+ GstURIHandlerInterface* iface = (GstURIHandlerInterface*)g_iface;
+
+ iface->get_type = gst_gs_src_uri_get_type;
+ iface->get_protocols = gst_gs_src_uri_get_protocols;
+ iface->get_uri = gst_gs_src_uri_get_uri;
+ iface->set_uri = gst_gs_src_uri_set_uri;
+}
diff --git a/ext/gs/gstgssrc.h b/ext/gs/gstgssrc.h
new file mode 100644
index 000000000..717742f12
--- /dev/null
+++ b/ext/gs/gstgssrc.h
@@ -0,0 +1,34 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_GS_SRC_H__
+#define __GST_GS_SRC_H__
+
+#include <gst/base/gstbasesrc.h>
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_GS_SRC (gst_gs_src_get_type())
+G_DECLARE_FINAL_TYPE(GstGsSrc, gst_gs_src, GST, GS_SRC, GstBaseSrc)
+
+G_END_DECLS
+#endif // __GST_GS_SRC_H__
diff --git a/ext/gs/meson.build b/ext/gs/meson.build
new file mode 100644
index 000000000..50e7ff89a
--- /dev/null
+++ b/ext/gs/meson.build
@@ -0,0 +1,28 @@
+gs_sources = [
+ 'gstgscommon.cpp',
+ 'gstgssink.cpp',
+ 'gstgssrc.cpp',
+ 'gstgs.cpp',
+]
+
+gs_dep = dependency('storage_client', version : '>= 1.25.0', required : get_option('gs'))
+
+if gs_dep.found()
+ gstgs = library('gstgs',
+ gs_sources,
+ c_args : gst_plugins_bad_args,
+ cpp_args : gst_plugins_bad_args,
+ include_directories : [configinc, libsinc],
+ dependencies : [gstbase_dep, gs_dep],
+ install : true,
+ install_dir : plugins_install_dir,
+ )
+ pkgconfig.generate(gstgs, install_dir : plugins_pkgconfig_install_dir)
+ plugins += [gstgs]
+endif
+
+clang_format_p = find_program('clang-format', required: false)
+if clang_format_p.found()
+ run_command(clang_format_p, '--style=file', '-i', 'gstgssink.cpp', 'gstgssrc.cpp')
+endif
+
diff --git a/ext/meson.build b/ext/meson.build
index 63844ca82..98a005329 100644
--- a/ext/meson.build
+++ b/ext/meson.build
@@ -18,6 +18,7 @@ subdir('fdkaac')
subdir('flite')
subdir('fluidsynth')
subdir('gme')
+subdir('gs')
subdir('gsm')
subdir('hls')
subdir('iqa')
diff --git a/meson_options.txt b/meson_options.txt
index 032685120..4b8eaa361 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -107,6 +107,7 @@ option('flite', type : 'feature', value : 'auto', description : 'Flite speech sy
option('fluidsynth', type : 'feature', value : 'auto', description : 'Fluidsynth MIDI decoder plugin')
option('gl', type : 'feature', value : 'auto', description : 'GStreamer OpenGL integration support (used by various plugins)')
option('gme', type : 'feature', value : 'auto', description : 'libgme gaming console music file decoder plugin')
+option('gs', type : 'feature', value : 'auto', description : 'Google Cloud Storage source and sink plugin')
option('gsm', type : 'feature', value : 'auto', description : 'GSM encoder/decoder plugin')
option('ipcpipeline', type : 'feature', value : 'auto', description : 'Inter-process communication plugin')
option('iqa', type : 'feature', value : 'auto', description : 'Image quality assessment plugin')