summaryrefslogtreecommitdiffstats
path: root/hangupsthread.py
blob: 4cc4065f9b8ab7147fe7bec119f81ad78828468d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import threading
import asyncio
import hangups
import logging
from hangups.auth import RefreshTokenCache
from hangups.user import UserID

class CredentialsPrompt(object):
    def __init__(self, email, password):
        self.email = email
        self.password = password

    def get_email(self):
        return self.email

    def get_password(self):
        return self.password

class HangupsThread(threading.Thread):
    def __init__(self, xmpp, remote, gmail='', password=''):

        self.xmpp = xmpp
        self.remote = remote
        self.presence_timer = None # filled in on first incoming presence
        self.presence_timer_lock = threading.Lock()

        super().__init__(name='HangupsThread-{}'.format(remote))

        self.cookie = hangups.auth.get_auth(CredentialsPrompt(gmail, password), RefreshTokenCache('{}-refresh_token.cache'.format(remote)))

    def run(self):
        policy = asyncio.get_event_loop_policy()
        self.loop = policy.new_event_loop()
        policy.set_event_loop(self.loop)

        self.client = hangups.Client(self.cookie)
        self.client.on_connect.add_observer(self._on_connect)
        self.client.on_disconnect.add_observer(self._on_disconnect)
        self.client.on_reconnect.add_observer(self._on_reconnect)

        self.client.on_connect.fire()
        self.loop.run_until_complete(self.client.connect())
        logging.info('Hangups thread stopped for {}.'.format(self.remote))
        if self.presence_timer is not None:
            self.presence_timer.cancel()

    @asyncio.coroutine
    def _stop(self):
        yield from self.client.disconnect()
        self.loop.stop()
        self.loop = None

    def stop(self):
        if self.loop is not None:
            self.loop.call_soon_threadsafe(asyncio.async, self._stop())

    def _presence_timer_run(self):
        # if we haven't heard from google for a while, re-query all
        # the presence states just to remind it that we exist
        # without this, presence seems to get forgotten eventually
        # by the server
        logging.info('Presence Timer fired, requerying presence for {}'.format(self.remote))
        self.loop.call_soon_threadsafe(asyncio.async, self._on_connect_common())

    def presence_timer_reset(self):
        with self.presence_timer_lock:
            if self.presence_timer is not None:
                self.presence_timer.cancel()
            self.presence_timer = threading.Timer(3600, self._presence_timer_run)
            self.presence_timer.start()

    def incoming_presence(self, user, pres):
        self.presence_timer_reset()
        mood_message  = ''
        for segment in pres.mood_message.mood_content.segment:
            mood_message += segment.text
        self.xmpp.incoming_presence(self.remote, user, pres.reachable,
                                    pres.available, mood_message)

    @asyncio.coroutine
    def _on_connect_common(self):
        for user in self.user_list.get_all():
            if user.is_self:
                continue
            self.xmpp.incoming_user(self.remote, user)
            pres = yield from self.query_presence(user)
            self.incoming_presence(user, pres)

    @asyncio.coroutine
    def _on_connect(self):
        logging.info('Connected to google hangouts service for %s', self.remote)
        self.user_list, self.conv_list = (
            yield from hangups.build_user_conversation_list(self.client)
        )
        self.client.on_state_update.add_observer(self._on_state_update)
        self.conv_list.on_event.add_observer(self._on_event)
        self.conv_list.on_typing.add_observer(self._on_typing)
        yield from self._on_connect_common()

    @asyncio.coroutine
    def _on_disconnect(self):
        logging.info('Disconnect from google hangouts for %s', self.remote)
        self.xmpp.incoming_disconnect(self.remote)

    @asyncio.coroutine
    def _on_reconnect(self):
        logging.info('Reconnect to google hangouts for %s', self.remote)
        self.user_list, self.conv_list = (
            yield from hangups.build_user_conversation_list(self.client)
        )
        yield from self._on_connect_common()

    @asyncio.coroutine
    def _on_state_update(self, state_update):
        """Receive a StateUpdate"""
        notification_type = state_update.WhichOneof('state_update')
        if notification_type == 'presence_notification':
            yield from self._handle_presence_notification(state_update.presence_notification)
        elif notification_type == 'focus_notification':
            yield from self._handle_focus_notification(state_update.focus_notification)

    @asyncio.coroutine
    def _handle_presence_notification(self, presence_notification):
        for presence in presence_notification.presence:
            if not presence.HasField('presence'):
                continue
            user = self.user_list.get_user(UserID(chat_id=presence.user_id.chat_id,
                                                  gaia_id=presence.user_id.gaia_id))
            if not user:
                continue
            # the notification usually only contains the timestamp, so
            # ask for the values we want
            pres = yield from self.query_presence(user)
            self.incoming_presence(user, pres)

    @asyncio.coroutine
    def _handle_focus_notification(self, focus_notification):
        # focus is telling us something about the client, but google
        # will helpfully translate it to presence
        sid = focus_notification.sender_id
        user = self.user_list.get_user(UserID(chat_id=sid.chat_id,
                                              gaia_id=sid.gaia_id))
        pres = yield from self.query_presence(user)
        self.incoming_presence(user, pres)

    @asyncio.coroutine
    def query_presence(self, user):
        req = hangups.hangouts_pb2.QueryPresenceRequest(
            request_header = self.client.get_request_header(),
            participant_id=[
                hangups.hangouts_pb2.ParticipantId(gaia_id=user.id_.gaia_id),
            ],
            field_mask=[
                hangups.hangouts_pb2.FIELD_MASK_REACHABLE,
                hangups.hangouts_pb2.FIELD_MASK_AVAILABLE,
                hangups.hangouts_pb2.FIELD_MASK_MOOD,
                hangups.hangouts_pb2.FIELD_MASK_LAST_SEEN,
            ],
        )
        res = yield from self.client.query_presence(req)
        if not hasattr(res, 'presence_result'):
            return None
        for presence in res.presence_result:
            return presence.presence

    def get_conversation_for_user(self, gaia_id):
        for conv in self.conv_list.get_all():
            if conv._conversation.type != hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE:
                continue
            for user in conv.users:
                if user.id_.gaia_id == gaia_id:
                    return conv

    @asyncio.coroutine
    def _send_message(self, msg):
        mto =  msg.get_to().local
            
        conv = self.get_conversation_for_user(mto)
        if conv:
            segments = hangups.ChatMessageSegment.from_str(msg['body'])
            yield from conv.send_message(segments)
        else:
            logging.error("FAILED TO FIND CONV FOR %s", mto)

    def send_message(self, msg):
        if self.loop is not None:
            self.loop.call_soon_threadsafe(asyncio.async, self._send_message(msg))

    def _on_event(self, conv_event):
        conv = self.conv_list.get(conv_event.conversation_id)
        user = conv.get_user(conv_event.user_id)
        if isinstance(conv_event, hangups.ChatMessageEvent):
            # Event is a chat message: foward it to XMPP.
            if conv._conversation.type == hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE:
                if not user.is_self:
                    self.xmpp.incoming_message(self.remote, user, conv_event.text)

    def _on_typing(self, typ):
        user = self.user_list.get_user(UserID(chat_id=typ.user_id.chat_id,
                                              gaia_id=typ.user_id.gaia_id))
        if user is None:
            return
        typing_states = {
            hangups.hangouts_pb2.TYPING_TYPE_UNKNOWN: 'gone',
            hangups.hangouts_pb2.TYPING_TYPE_STARTED: 'composing',
            hangups.hangouts_pb2.TYPING_TYPE_PAUSED:  'paused',
            hangups.hangouts_pb2.TYPING_TYPE_STOPPED: 'inactive',
        }
        self.xmpp.incoming_typing(self.remote, user, typing_states[typ.status])
 
    def send_typing_notification(self, msg):
        self.client.set_active()
        mto = msg.get_to().local
        conv = self.get_conversation_for_user(mto)
        if self.loop is not None:
            self.loop.call_soon_threadsafe(asyncio.async, self._send_typing_notification(msg, conv))

    @asyncio.coroutine
    def _send_typing_notification(self, msg, conv):
        typ = hangups.hangouts_pb2.TYPING_TYPE_PAUSED
        if msg['chat_state'] == 'composing':
            typ = hangups.hangouts_pb2.TYPING_TYPE_STARTED
        yield from conv.set_typing(typ)