summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJames Bottomley <James.Bottomley@HansenPartnership.com>2017-08-15 11:07:26 -0700
committerJames Bottomley <James.Bottomley@HansenPartnership.com>2017-08-17 18:18:14 -0700
commitf3cbf2467e477dfbf49fc427985497426ba3a5bd (patch)
tree9b03cde6c542e4f52e6a3305e311c9f9873820a5
downloadhangouts-xmpp-gateway-f3cbf2467e477dfbf49fc427985497426ba3a5bd.tar.gz
Initial Commit
This commit has minimal functionality. It implements a basic Hangups client as an xmpp gateway with discover and command possibilities including one on one chat and presence and typing notifications. The architecture is as an XMPP gateway with initial registration sending the google id and password to the server to acquire the authentication token which is then used for subsequent logins. When the server starts up, one hangups thread per authentication token is started. The xmpp code uses SleekXMPP and is naturally threaded. Signed-off-by: James Bottomley <James.Bottomley@HansenPartnership.com>
-rw-r--r--__main__.py17
-rw-r--r--hangupsthread.py202
-rw-r--r--rosterdb.py49
-rw-r--r--xmppthread.py237
4 files changed, 505 insertions, 0 deletions
diff --git a/__main__.py b/__main__.py
new file mode 100644
index 0000000..e19a3ce
--- /dev/null
+++ b/__main__.py
@@ -0,0 +1,17 @@
+import sys
+sys.path.insert(0, './lib/hangups')
+sys.path.insert(0, './lib/sleekxmpp')
+import os
+import logging
+import xmppthread
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO,
+ format='%(levelname)-8s %(message)s')
+
+ s = xmppthread.XMPPThread('google.hangouts', '153.66.160.254')
+
+ if s.connect():
+ s.process(block=True)
+ sys.exit(0)
+
diff --git a/hangupsthread.py b/hangupsthread.py
new file mode 100644
index 0000000..6708a36
--- /dev/null
+++ b/hangupsthread.py
@@ -0,0 +1,202 @@
+import threading
+import asyncio
+import hangups
+import logging
+from hangups.auth import RefreshTokenCache
+from hangups.user import UserID
+
+class CredentialsPrompt(object):
+ def __init__(self, email, password):
+ self.email = email
+ self.password = password
+
+ def get_email(self):
+ return self.email
+
+ def get_password(self):
+ return self.password
+
+class HangupsThread(threading.Thread):
+ def __init__(self, xmpp, remote, gmail='', password=''):
+
+ self.xmpp = xmpp
+ self.remote = remote
+
+ super().__init__(name='HangupsThread-{}'.format(remote))
+
+ self.cookie = hangups.auth.get_auth(CredentialsPrompt(gmail, password), RefreshTokenCache('{}-refresh_token.cache'.format(remote)))
+
+ def run(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop = policy.new_event_loop()
+ policy.set_event_loop(self.loop)
+
+ self.client = hangups.Client(self.cookie)
+ self.client.on_connect.add_observer(self._on_connect)
+ self.client.on_disconnect.add_observer(self._on_disconnect)
+ self.client.on_reconnect.add_observer(self._on_reconnect)
+
+ self.client.on_connect.fire()
+ self.loop.run_until_complete(self.client.connect())
+ logging.info('Hangups thread stopped for {}.'.format(self.remote))
+
+ def _stop(self):
+ yield from self.client.disconnect()
+ self.loop.stop()
+ self.loop = None
+
+ def stop(self):
+ if self.loop is not None:
+ self.loop.call_soon_threadsafe(asyncio.async, self._stop())
+
+ def incoming_presence(self, user, pres):
+ mood_message = ''
+ for segment in pres.mood_message.mood_content.segment:
+ mood_message += segment.text
+ self.xmpp.incoming_presence(self.remote, user, pres.reachable,
+ pres.available, mood_message)
+
+ @asyncio.coroutine
+ def _on_connect_common(self):
+ for user in self.user_list.get_all():
+ if user.is_self:
+ continue
+ self.xmpp.incoming_user(self.remote, user)
+ pres = yield from self.query_presence(user)
+ self.incoming_presence(user, pres)
+
+ @asyncio.coroutine
+ def _on_connect(self):
+ logging.info('Connected to google hangouts service for %s', self.remote)
+ self.user_list, self.conv_list = (
+ yield from hangups.build_user_conversation_list(self.client)
+ )
+ self.client.on_state_update.add_observer(self._on_state_update)
+ self.conv_list.on_event.add_observer(self._on_event)
+ self.conv_list.on_typing.add_observer(self._on_typing)
+ yield from self._on_connect_common()
+
+ @asyncio.coroutine
+ def _on_disconnect(self):
+ logging.info('Disconnect from google hangouts for %s', self.remote)
+ self.xmpp.incoming_disconnect(self.remote)
+
+ @asyncio.coroutine
+ def _on_reconnect(self):
+ logging.info('Reconnect to google hangouts for %s', self.remote)
+ self.user_list, self.conv_list = (
+ yield from hangups.build_user_conversation_list(self.client)
+ )
+ yield from self._on_connect_common()
+
+ @asyncio.coroutine
+ def _on_state_update(self, state_update):
+ """Receive a StateUpdate"""
+ notification_type = state_update.WhichOneof('state_update')
+ if notification_type == 'presence_notification':
+ yield from self._handle_presence_notification(state_update.presence_notification)
+ elif notification_type == 'focus_notification':
+ yield from self._handle_focus_notification(state_update.focus_notification)
+
+ @asyncio.coroutine
+ def _handle_presence_notification(self, presence_notification):
+ for presence in presence_notification.presence:
+ if not presence.HasField('presence'):
+ continue
+ user = self.user_list.get_user(UserID(chat_id=presence.user_id.chat_id,
+ gaia_id=presence.user_id.gaia_id))
+ if not user:
+ continue
+ # the notification usually only contains the timestamp, so
+ # ask for the values we want
+ pres = yield from self.query_presence(user)
+ self.incoming_presence(user, pres)
+
+ @asyncio.coroutine
+ def _handle_focus_notification(self, focus_notification):
+ # focus is telling us something about the client, but google
+ # will helpfully translate it to presence
+ sid = focus_notification.sender_id
+ user = self.user_list.get_user(UserID(chat_id=sid.chat_id,
+ gaia_id=sid.gaia_id))
+ pres = yield from self.query_presence(user)
+ self.incoming_presence(user, pres)
+
+ @asyncio.coroutine
+ def query_presence(self, user):
+ req = hangups.hangouts_pb2.QueryPresenceRequest(
+ request_header = self.client.get_request_header(),
+ participant_id=[
+ hangups.hangouts_pb2.ParticipantId(gaia_id=user.id_.gaia_id),
+ ],
+ field_mask=[
+ hangups.hangouts_pb2.FIELD_MASK_REACHABLE,
+ hangups.hangouts_pb2.FIELD_MASK_AVAILABLE,
+ hangups.hangouts_pb2.FIELD_MASK_MOOD,
+ hangups.hangouts_pb2.FIELD_MASK_LAST_SEEN,
+ ],
+ )
+ res = yield from self.client.query_presence(req)
+ if not hasattr(res, 'presence_result'):
+ return None
+ for presence in res.presence_result:
+ return presence.presence
+
+ def get_conversation_for_user(self, gaia_id):
+ for conv in self.conv_list.get_all():
+ if conv._conversation.type != hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE:
+ continue
+ for user in conv.users:
+ if user.id_.gaia_id == gaia_id:
+ return conv
+
+ @asyncio.coroutine
+ def _send_message(self, msg):
+ mto = msg.get_to().local
+
+ conv = self.get_conversation_for_user(mto)
+ if conv:
+ segments = hangups.ChatMessageSegment.from_str(msg['body'])
+ yield from conv.send_message(segments)
+ else:
+ logging.error("FAILED TO FIND CONV FOR %s", mto)
+
+ def send_message(self, msg):
+ if self.loop is not None:
+ self.loop.call_soon_threadsafe(asyncio.async, self._send_message(msg))
+
+ def _on_event(self, conv_event):
+ conv = self.conv_list.get(conv_event.conversation_id)
+ user = conv.get_user(conv_event.user_id)
+ if isinstance(conv_event, hangups.ChatMessageEvent):
+ # Event is a chat message: foward it to XMPP.
+ if conv._conversation.type == hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE:
+ if not user.is_self:
+ self.xmpp.incoming_message(self.remote, user, conv_event.text)
+
+ def _on_typing(self, typ):
+ user = self.user_list.get_user(UserID(chat_id=typ.user_id.chat_id,
+ gaia_id=typ.user_id.gaia_id))
+ if user is None:
+ return
+ typing_states = {
+ hangups.hangouts_pb2.TYPING_TYPE_UNKNOWN: 'gone',
+ hangups.hangouts_pb2.TYPING_TYPE_STARTED: 'composing',
+ hangups.hangouts_pb2.TYPING_TYPE_PAUSED: 'paused',
+ hangups.hangouts_pb2.TYPING_TYPE_STOPPED: 'inactive',
+ }
+ self.xmpp.incoming_typing(self.remote, user, typing_states[typ.status])
+
+ def send_typing_notification(self, msg):
+ self.client.set_active()
+ mto = msg.get_to().local
+ conv = self.get_conversation_for_user(mto)
+ if self.loop is not None:
+ self.loop.call_soon_threadsafe(asyncio.async, self._send_typing_notification(msg, conv))
+
+ @asyncio.coroutine
+ def _send_typing_notification(self, msg, conv):
+ typ = hangups.hangouts_pb2.TYPING_TYPE_PAUSED
+ if msg['chat_state'] == 'composing':
+ typ = hangups.hangouts_pb2.TYPING_TYPE_STARTED
+ yield from conv.set_typing(typ)
diff --git a/rosterdb.py b/rosterdb.py
new file mode 100644
index 0000000..7275320
--- /dev/null
+++ b/rosterdb.py
@@ -0,0 +1,49 @@
+import sqlite3
+import pickle
+
+class RosterDB(object):
+ """
+ Implements a roster persistence object (expected by sleekxmpp roster)
+ with a sqlite3 backend
+ """
+ def __init__(self, name, table):
+ self.db = sqlite3.connect(name, check_same_thread=False)
+ self.table = table
+
+ try:
+ self.db.execute('create table {} (o varchar, k varchar, v blob)'.format(self.table))
+ except sqlite3.OperationalError as e:
+ pass
+
+ def entries(self, key, dblist={}):
+ if key is None:
+ r = self.db.execute('select distinct(o) from {}'.format(self.table)).fetchall()
+ r = map(lambda x : x[0], r)
+ print("DB ENTRIES AT TOP LEVEL", r)
+ return r
+
+ r = self.db.execute('select k from {} where o = ?'.format(self.table), (key,)).fetchall()
+ r = map(lambda x: x[0], r)
+ print("DB ENTRIES FOR KEY ", key, r)
+ return r
+
+ def load(self, owner, key, state):
+ print("TYPE OF OWNER", type(owner)," TYPE OF KEY ", type(key))
+ r = self.db.execute('select v from {} where o = ? and k = ?'.format(self.table), (owner, key)).fetchone()
+ print("DB LOAD ", owner, key, " = ", r);
+ if r is None:
+ return r
+ return pickle.loads(r[0])
+
+ def save(self, owner, key, item, state):
+ print("DB SAVE ", owner, key, " = ", item)
+ try:
+ self.db.execute('delete from {} where o = ? and k = ?'.format(self.table), (owner, key))
+ except sqlite3.OperationalError as e:
+ pass
+
+ p = pickle.dumps(item)
+ self.db.execute('insert into {} values (?, ?, ?)'.format(self.table),
+ (owner, key, p))
+ self.db.commit()
+
diff --git a/xmppthread.py b/xmppthread.py
new file mode 100644
index 0000000..7a608e7
--- /dev/null
+++ b/xmppthread.py
@@ -0,0 +1,237 @@
+import logging
+import sleekxmpp
+import glob
+import urllib
+
+# local imports
+import hangupsthread
+import rosterdb
+
+from sleekxmpp import ComponentXMPP
+from sleekxmpp.xmlstream.matcher import MatchXPath
+from sleekxmpp.xmlstream.handler import Callback
+from hangups.auth import GoogleAuthError
+
+class XMPPThread(ComponentXMPP):
+ def __init__(self, name, host, secret='secret', port=5347):
+ ComponentXMPP.__init__(self, name, secret, host, port,
+ ##
+ # Note Here: this seems to be required for
+ # jabberd2. If the message event isn't
+ # firing change this to False
+ ##
+ use_jc_ns=True)
+ self.name = name
+
+ #self.roster.set_backend(db=rosterdb.RosterDB('database.sqlite', 'roster'))
+
+ self.registerPlugin('xep_0030') # service discovery
+ self.registerPlugin('xep_0004') # Data Forms
+ self.registerPlugin('xep_0054') # vcard
+ self.registerPlugin('xep_0060') # PubSub
+ self.registerPlugin('xep_0077') # registration
+ self.registerPlugin('xep_0085') # chat state notification
+ self.registerPlugin('xep_0172') # user nick
+ self.registerPlugin('xep_0199') # XMPP Ping
+ #self.registerPlugin('xep_0280') # carbons
+
+ self['xep_0030'].add_identity(category='gateway', itype='hangouts', name='Google Hangouts Transport')
+
+ # god knows why the xep-0077 plugin doesn't do component registration
+ self['xep_0030'].add_feature('jabber:iq:register')
+ self.register_handler(Callback('Handle Registration',
+ MatchXPath('{%s}iq/{jabber:iq:register}query' % self.default_ns),
+ self._disco_register_handler))
+
+ # all handlers should be threaded
+ self.add_event_handler('message', self._message, threaded=True)
+ self.add_event_handler('chatstate', self._chatstate, threaded=True)
+ self.add_event_handler('presence_available', self._presence_available,
+ threaded=True)
+ self.add_event_handler('presence_unavailable',
+ self._presence_unavailable,
+ threaded=True)
+ self.add_event_handler('presence_probe', self._presence_probe,
+ threaded=True)
+ # session handler is unthreaded so we set up all the useful stuff first
+ self.add_event_handler("session_start", self._session_start, threaded=False)
+ self.add_event_handler("session_end", self._session_end, threaded=False)
+
+ self.hangups_threads = {}
+
+ def _session_start(self, event):
+ # Anyone can subscribe to the transport bot identity
+ self.roster[self.name].auto_authorize = True
+ self.roster[self.name].auto_subscribe = True
+
+ # However you see it offline until you log in successfully
+ # to the google service
+ self.roster[self.name].send_presence(ptype='unavailable')
+
+ for token in glob.glob('*-refresh_token.cache'):
+ name = token.replace('-refresh_token.cache', '')
+ logging.info('starting Hangups thread for %s', name)
+ try:
+ thread = hangupsthread.HangupsThread(self, name)
+ self.hangups_threads[name] = thread
+ thread.start()
+ self.add_subscription(self.name, name)
+ self.send_presence(pto=name, ptype='available')
+ except GoogleAuthError as e:
+ logging.error("FAILED TO RUN THREAD FOR %s", name, e)
+
+ def _session_end(self, event):
+ for name in self.hangups_threads:
+ logging.info('Stopping Hangups thread for %s', name)
+ self.hangups_threads[name].stop()
+
+
+ def _disco_register_handler(self, iq):
+ if iq['type'] == 'set':
+ ifrom = iq['from'].bare
+ reg = iq['register']
+ iq.reply()
+ try:
+ thread = hangupsthread.HangupsThread(self, ifrom,
+ gmail=reg['username'],
+ password=reg['password'])
+ self.hangups_threads[ifrom] = thread
+ thread.start()
+
+ to=iq['to']
+ # successfully registered
+ iq.send()
+ # now force the client to add the bot as a buddy
+ self.send_presence(pto=to, ptype='subscribe')
+
+ except GoogleAuthError as e:
+ iq.error()
+ iq['error']['condition'] = 'not-allowed'
+ iq['error']['text'] = 'Login to Google Failed: {}'.format(e)
+ iq.send()
+
+
+ elif iq['type'] == 'get':
+ iq.reply(clear=True)
+ iq.enable('register')
+ register = iq['register']
+ register['instructions'] = 'Please Enter your gmail login'
+ register.add_field('password')
+ register.add_field('username')
+ iq.send()
+
+ def _message(self, msg):
+ mto = msg.get_to().bare
+ mfrom = msg.get_from().bare
+ if mto == self.name:
+ self.bot_message(msg)
+ elif mto in self.roster:
+ self.hangups_threads[mfrom].send_message(msg)
+ else:
+ msg.reply('got it').send()
+
+ def bot_message(self, msg):
+ """handle messages directly to the service
+ A chatbot is far easier than a service implementation because
+ xmpp clients are very uneven in their service implementations
+ once you have the service subscribed, any client can send
+ messages to it
+
+ current commands are:
+
+ online - set the bot online
+ offline - set the bot offline"""
+
+ cmd = msg['body'].split(' ')
+
+ # strip resource so actions go to all clients
+ mfrom = msg.get_from().bare
+
+ cmd[0] = cmd[0].lower()
+
+ if cmd[0] == 'offline':
+ self.send_presence(ptype='unavailable', pto=mfrom)
+ elif cmd[0] == 'online':
+ self.send_presence(pto=mfrom)
+ elif cmd[0] == 'query':
+ if not mfrom in self.hangups_threads:
+ msg.reply("no jid found").send()
+ return
+ body=[]
+ for user in self.hangups_threads[mfrom].user_list.get_all():
+ if user.is_self:
+ continue
+ body.append(user.full_name + '(' + user.id_.chat_id +')')
+ jid=sleekxmpp.JID(local=user.id_.gaia_id, domain=self.name)
+ self.roster.add(jid)
+ self.roster[jid][mfrom]['whitelisted'] = True
+ self.roster[jid][jid]['name'] = user.full_name
+ self.roster[jid].send_presence(ptype='subscribe',pto=mfrom,pnick=user.full_name)
+ body = "\n".join(body)
+ msg.reply(body).send()
+ else:
+ msg.reply('Command ' + msg['body'] + ' not understood').send()
+
+ def add_subscription(self, jid, to):
+ self.roster[jid][to]['to'] = True
+ self.roster[jid][to]['from'] = True
+ self.roster[jid][to]['subscription'] = 'both'
+
+ def incoming_typing(self, to, user, state):
+ jid = sleekxmpp.JID(local=user.id_.gaia_id, domain=self.name)
+ msg = self.Message(sfrom=jid, sto=to, stype='chat')
+ msg['chat_state'] = state
+ msg.send()
+
+ def incoming_user(self, to, user):
+ jid=sleekxmpp.JID(local=user.id_.gaia_id, domain=self.name)
+ self.roster[jid][to]['whitelisted'] = True
+ self.add_subscription(jid, to)
+ vcard = self['xep_0054'].make_vcard()
+ vcard['FN'] = user.full_name
+ vcard['NICKNAME'] = user.full_name
+ if user.photo_url:
+ vcard['PHOTO']['TYPE'] = 'image/jpeg'
+ vcard['PHOTO']['BINVAL'] = urllib.request.urlopen('http:' + user.photo_url).read()
+
+ self['xep_0054'].publish_vcard(jid=jid, vcard=vcard)
+
+ def incoming_presence(self, to, user, reachable, available, mood_message):
+ jid=sleekxmpp.JID(local=user.id_.gaia_id, domain=self.name)
+
+ if reachable:
+ if not available:
+ ptype = 'away'
+ else:
+ ptype = 'available'
+ else:
+ ptype = 'unavailable'
+
+ self.roster[jid].send_presence(ptype=ptype, pto = to,
+ pnick = user.full_name,
+ pstatus = mood_message)
+
+ def incoming_message(self, to, user, text):
+ mfrom = sleekxmpp.JID(local=user.id_.gaia_id, domain=self.name)
+ msg = self.send_message(to, text, mfrom=mfrom)
+
+ def incoming_disconnect(self, to):
+ for jid in self.roster:
+ if jid == self.name:
+ continue
+ self.roster[jid][to].send_presence(ptype = 'unavailable')
+
+ def _chatstate(self, msg):
+ mfrom = msg.get_from().bare
+ mto = msg.get_to().bare
+ if mto != self.name:
+ self.hangups_threads[mfrom].send_typing_notification(msg)
+
+ def _presence_available(self, msg):
+ pass
+
+ def _presence_probe(self, msg):
+ self._presence_available(msg)
+
+ def _presence_unavailable(self, msg):
+ pass