path: root/src/miners/twitter/tracker-miner-twitter.vala
diff options
Diffstat (limited to 'src/miners/twitter/tracker-miner-twitter.vala')
1 files changed, 441 insertions, 0 deletions
diff --git a/src/miners/twitter/tracker-miner-twitter.vala b/src/miners/twitter/tracker-miner-twitter.vala
new file mode 100644
index 000000000..b919af83b
--- /dev/null
+++ b/src/miners/twitter/tracker-miner-twitter.vala
@@ -0,0 +1,441 @@
+ * Copyright (C) 2010, Adrien Bustany <>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+namespace Tracker {
+public class MinerTwitter : Tracker.MinerWeb {
+ public Twitter.Provider provider { get; construct; }
+ private string friendly_name; /* Human readable name of the provider */
+ private string channel_urn = null;
+ private Twitter.Client service;
+ private static const string DATASOURCE_URN = "urn:103e7d6e-2334-4cd2-b0a5-f1b0c8bb10ef";
+ private static const string SERVICE_DESCRIPTION = "Tracker miner for %s";
+ private static const uint PULL_INTERVAL = 60; /* seconds */
+ private uint pull_timeout_handle;
+ private QueryQueue query_queue;
+ /* used to store state information like last pulled tweet*/
+ private static const string STATE_FILE_NAME = "tracker-miner-%s.state";
+ private static const string STATE_FILE_GROUP = "General";
+ private KeyFile state_file;
+ private uint last_status_timestamp;
+ private static MainLoop main_loop;
+ construct {
+ if (provider == Twitter.Provider.DEFAULT_PROVIDER) {
+ set ("name", "Twitter");
+ friendly_name = "Twitter";
+ } else if (provider == Twitter.Provider.IDENTI_CA) {
+ set ("name", "Identica");
+ friendly_name = "";
+ } else {
+ assert_not_reached ();
+ }
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+ service = new Twitter.Client.full (provider, null, null, null);
+ service.status_received.connect (status_received_cb);
+ service.timeline_complete.connect (timeline_complete_cb);
+ state_file = new KeyFile ();
+ load_state_file ();
+ pull_timeout_handle = 0;
+ this.notify["association-status"].connect (association_status_changed);
+ query_queue = new QueryQueue (this);
+ init_feed_channel ();
+ }
+ public void shutdown () {
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+ save_state_file ();
+ }
+ private async void init_feed_channel () {
+ string sparql;
+ unowned PtrArray results;
+ sparql = "select ?c where { ?c a mfo:FeedChannel ; mfo:type ?type ."
+ + " ?type mfo:name ?typeName . "
+ + " FILTER (?typeName = \"%s\") }".printf (friendly_name);
+ try {
+ results = yield execute_sparql (sparql);
+ if (results.len == 0) {
+ /* No optimal, but we're waiting for blank support in TrackerMiner */
+ sparql = "insert { _:channel a mfo:FeedChannel ; rdfs:label \"%s home timeline\";".printf (friendly_name)
+ + " mfo:type [ a mfo:FeedType ; mfo:name \"%s\" ] }".printf (friendly_name);
+ yield execute_update (sparql);
+ sparql = "select ?c where { ?c a mfo:FeedChannel ; mfo:type ?type ."
+ + " ?type mfo:name ?typeName . "
+ + " FILTER (?typeName = \"%s\") }".printf (friendly_name);
+ results = yield execute_sparql (sparql);
+ }
+ assert (results.len == 1);
+ channel_urn = ((string[][])results.pdata)[0][0];
+ } catch (Error tracker_error) {
+ critical ("Couldn't initialize feed channel: %s", tracker_error.message);
+ }
+ }
+ private void status_received_cb (ulong handle, Twitter.Status? status, Error error) {
+ SparqlBuilder builder;
+ string name;
+ TimeVal tv = TimeVal ();
+ get ("name", out name);
+ if (error != null) {
+ if (!(error is Twitter.Error.NOT_MODIFIED)) {
+ warning ("An error occurred while pulling statuses: %s", error.message);
+ }
+ return;
+ }
+ builder = new SparqlBuilder.update ();
+ builder.insert_open (status.url);
+ builder.subject ("_:author");
+ builder.predicate ("a");
+ builder.object ("nco:PersonContact");
+ builder.predicate ("nco:fullname");
+ builder.object_string (;
+ builder.predicate ("nco:photo");
+ builder.object_blank_open ();
+ builder.predicate ("a");
+ builder.object ("nfo:RemoteDataObject");
+ builder.predicate ("nie:url");
+ builder.object_string (status.user.profile_image_url);
+ builder.object_blank_close (); /* nco:photo */
+ builder.subject ("_:message");
+ builder.predicate ("a");
+ builder.object ("mfo:FeedMessage");
+ builder.predicate ("a");
+ builder.object ("nfo:RemoteDataObject");
+ builder.predicate ("nie:url");
+ builder.object_string (status.url);
+ builder.predicate ("nmo:communicationChannel");
+ builder.object_iri (channel_urn);
+ builder.predicate ("nmo:messageId");
+ builder.object_string (rdf_message_id (;
+ builder.predicate ("nmo:from");
+ builder.object ("_:author");
+ builder.predicate ("nie:plainTextContent");
+ builder.object_string (status.text);
+ if (status.reply_to_status != 0) {
+ builder.predicate ("nmo:inReplyTo");
+ builder.object_blank_open ();
+ builder.predicate ("a");
+ builder.object ("mfo:FeedMessage");
+ builder.predicate ("nmo:communicationChannel");
+ builder.object_iri (channel_urn);
+ builder.predicate ("nmo:messageId");
+ builder.object_string (rdf_message_id (status.reply_to_status));
+ builder.object_blank_close ();
+ }
+ if (Twitter.date_to_time_val (status.created_at, out tv)) {
+ builder.predicate ("nmo:receivedDate");
+ builder.object_string (tv.to_iso8601 ());
+ /* We receive the status in chronological order */
+ last_status_timestamp = (int)tv.tv_sec;
+ }
+ tv.get_current_time ();
+ builder.predicate ("mfo:downloadedTime");
+ builder.object_string (tv.to_iso8601 ());
+ builder.insert_close ();
+ query_queue.append (builder.get_result ());
+ }
+ private void timeline_complete_cb () {
+ message ("Timeline downloaded");
+ query_queue.flush ();
+ state_file.set_integer (STATE_FILE_GROUP, "since", (int)last_status_timestamp);
+ }
+ private string rdf_message_id (uint status_id)
+ {
+ string name;
+ get ("name", out name);
+ return "feed:%s:%u".printf (name, status_id);
+ }
+ private void association_status_changed (Object source, ParamSpec pspec) {
+ MinerWebAssociationStatus status;
+ get ("association-status", out status);
+ switch (status) {
+ case MinerWebAssociationStatus.ASSOCIATED:
+ if (pull_timeout_handle != 0)
+ return;
+ message ("Miner is now associated. Initiating periodic pull.");
+ pull_timeout_handle = Timeout.add_seconds (PULL_INTERVAL, pull_timeout_cb);
+ Idle.add ( () => { pull_timeout_cb (); return false; });
+ break;
+ case MinerWebAssociationStatus.UNASSOCIATED:
+ if (pull_timeout_handle == 0)
+ return;
+ Source.remove (pull_timeout_handle);
+ break;
+ }
+ }
+ private bool pull_timeout_cb () {
+ int since;
+ if (channel_urn == null) {
+ message ("Feed channel not initialized yet, skipping this cycle");
+ return true;
+ }
+ message ("Pulling new data");
+ try {
+ since = state_file.get_integer (STATE_FILE_GROUP, "since");
+ } catch (Error error) {
+ critical ("Cannot load config variable: %s", error.message);
+ return true;
+ }
+ service.get_friends_timeline ("", since);
+ return true;
+ }
+ private void load_state_file () {
+ string name;
+ get ("name", out name);
+ try {
+ state_file.load_from_file (Path.build_filename (Environment.get_user_cache_dir (),
+ "tracker",
+ STATE_FILE_NAME.printf (name)),
+ KeyFileFlags.NONE);
+ } catch (Error error) {
+ message ("Couldn't load the state file");
+ }
+ try {
+ state_file.get_integer (STATE_FILE_GROUP, "since");
+ } catch (Error error) {
+ state_file.set_integer (STATE_FILE_GROUP, "since", 0);
+ }
+ }
+ private void save_state_file () {
+ string name;
+ string file_path;
+ get ("name", out name);
+ file_path = Path.build_filename (Environment.get_user_cache_dir (),
+ "tracker",
+ STATE_FILE_NAME.printf (name));
+ try {
+ FileUtils.set_contents (file_path, state_file.to_data ());
+ } catch (Error error) {
+ warning ("Couldn't save state file: %s", error.message);
+ }
+ }
+ // If we don't protect the function with a mutex, it could be called by the
+ // inner loop, starting at inner-inner one, and so on...
+ private Mutex authenticate_mutex = new Mutex ();
+ public override void authenticate () throws MinerWebError {
+ PasswordProvider password_provider;
+ string name;
+ string username;
+ string password;
+ bool verified = false;
+ Error twitter_error = null;
+ if(!authenticate_mutex.trylock ()) {
+ warning ("authenticate called while it was still running");
+ return;
+ }
+ password_provider = PasswordProvider.get ();
+ get ("name", out name);
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+ try {
+ password = password_provider.get_password (name, out username);
+ } catch (Error e) {
+ authenticate_mutex.unlock ();
+ if (e is PasswordProviderError.SERVICE) {
+ throw new MinerWebError.KEYRING (e.message);
+ }
+ if (e is PasswordProviderError.NOTFOUND) {
+ throw new MinerWebError.NO_CREDENTIALS ("Miner is not associated");
+ }
+ critical ("Internal error: %s", e.message);
+ return;
+ }
+ message ("Verifying username and password");
+ service.set_user (username, password);
+ service.verify_user ();
+ var wait_loop = new MainLoop (null, false);
+ service.user_verified.connect ( (h, v, e) => {
+ verified = v;
+ twitter_error = e;
+ wait_loop.quit (); });
+ ();
+ authenticate_mutex.unlock ();
+ if (twitter_error != null) {
+ throw new MinerWebError.SERVICE (twitter_error.message);
+ }
+ if (!verified) {
+ throw new MinerWebError.WRONG_CREDENTIALS ("Wrong username and/or password");
+ } else {
+ message ("Authentication sucessful");
+ set ("association-status", MinerWebAssociationStatus.ASSOCIATED);
+ }
+ return;
+ }
+ public override void dissociate () throws MinerWebError {
+ var password_provider = PasswordProvider.get ();
+ string name;
+ get ("name", out name);
+ try {
+ password_provider.forget_password (name);
+ } catch (Error e) {
+ if (e is PasswordProviderError.SERVICE) {
+ throw new MinerWebError.KEYRING (e.message);
+ }
+ critical ("Internal error: %s", e.message);
+ return;
+ }
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+ }
+ public override void associate (HashTable<string, string> association_data) throws Tracker.MinerWebError {
+ assert (association_data.lookup ("username") != null && association_data.lookup ("password") != null);
+ var password_provider = PasswordProvider.get ();
+ string name;
+ get ("name", out name);
+ try {
+ password_provider.store_password (name,
+ SERVICE_DESCRIPTION.printf (name),
+ association_data.lookup ("username"),
+ association_data.lookup ("password"));
+ } catch (Error e) {
+ if (e is PasswordProviderError.SERVICE) {
+ throw new MinerWebError.KEYRING (e.message);
+ }
+ critical ("Internal error: %s", e.message);
+ return;
+ }
+ }
+ public GLib.HashTable get_association_data () throws Tracker.MinerWebError {
+ return new HashTable<string, string>(str_hash, str_equal);
+ }
+ private static bool in_loop = false;
+ private static void signal_handler (int signo) {
+ if (in_loop) {
+ Posix.exit (Posix.EXIT_FAILURE);
+ }
+ switch (signo) {
+ case Posix.SIGINT:
+ case Posix.SIGTERM:
+ in_loop = true;
+ main_loop.quit ();
+ break;
+ }
+ }
+ private static void init_signals () {
+#if G_OS_WIN32
+ Posix.sigaction_t act = Posix.sigaction_t ();
+ Posix.sigset_t empty_mask = Posix.sigset_t ();
+ Posix.sigemptyset (empty_mask);
+ act.sa_handler = signal_handler;
+ act.sa_mask = empty_mask;
+ act.sa_flags = 0;
+ Posix.sigaction (Posix.SIGTERM, act, null);
+ Posix.sigaction (Posix.SIGINT, act, null);
+ }
+ public static void main (string[] args) {
+ Environment.set_application_name ("Twitter/ tracker miner");
+ MinerTwitter twitter_miner;
+ twitter_miner = (typeof (MinerTwitter),
+ "provider", Twitter.Provider.DEFAULT_PROVIDER) as MinerTwitter;
+ MinerTwitter identica_miner;
+ identica_miner = (typeof (MinerTwitter),
+ "provider", Twitter.Provider.IDENTI_CA) as MinerTwitter;
+ init_signals ();
+ main_loop = new MainLoop (null, false);
+ ();
+ twitter_miner.shutdown ();
+ identica_miner.shutdown ();
+ }
+} // End namespace Tracker