#!/usr/bin/python3 import datetime import getpass import http.client import json import os.path import ssl import subprocess import sys import time import urllib.parse import ws4py.client import yaml dev_name = 'push-client' cfg_file = 'push.cfg' show_stdout = True show_kdialog = True class HTTPError(Exception): pass class LoginError(Exception): pass class MessageAckError(Exception): pass class MessageDeleteError(Exception): pass class MessageError(Exception): pass class RegisterError(Exception): pass class WebSocketError(Exception): pass class WebSocketRestart(Exception): pass class WebSocketClient(ws4py.client.WebSocketBaseClient): def handshake_ok(self): self.send('login:%s:%s' % (self.cfg.deviceid, self.cfg.secret)) def received_message(self, m): msg = m.data.decode('utf-8') if msg == '!': # We got a message msg_handler = MessageHandler(self.cfg) msg_handler.handle_messages() elif msg == 'R': # Restart debug("Received restart command.") raise WebSocketRestart() elif msg == 'E': # Error - Requires manual reconnect debug("Received error command.") raise WebSocketError() elif msg == '#': # Keep-alive packet pass else: # Ignore unknown commands debug("Received unknown command \"%s\"." % msg) pass def closed(self, code, reason=None): if code in (1001, 1011): # This might be wrong raise WebSocketRestart() class Message: def __init__(self, msg): self.acked = msg.get('acked', 0) self.aid = msg.get('aid', None) self.app = msg.get('app', 'Unknown') self.date = msg.get('date', None) self.html = msg.get('html', 0) self.icon = msg.get('icon', None) self.id = msg.get('id', 0) self.message = msg.get('message', '') self.priority = msg.get('priority', 0) self.receipt = msg.get('receipt', None) self.sound = msg.get('sound', None) self.title = msg.get('title', 'Unnamed') self.umid = msg.get('umid', 0) self.url = msg.get('url', None) self.url_title = msg.get('url_title', None) class MessageHandler: headers = {'Content-Type': 'application/x-www-form-urlencoded'} def __init__(self, cfg): self.conn = http.client.HTTPSConnection('api.pushover.net', 443) self.cfg = cfg def __del__(self): self.conn.close() def fetch_messages(self): self.conn.request('GET', '/1/messages.json?secret=%s&device_id=%s' % (self.cfg.secret, self.cfg.deviceid)) resp = self.conn.getresponse() if resp.status != http.client.OK: raise HTTPError(resp.status) data = json.loads(resp.read().decode('utf-8')) if data.get('status', 0) != 1: raise MessageError(data) return [Message(msg) for msg in data['messages']] def delete_messages(self, last_msg): params = urllib.parse.urlencode({'secret': self.cfg.secret, 'message': last_msg.id}) self.conn.request('POST', '/1/devices/%s/update_highest_message.json' % self.cfg.deviceid, params, self.headers) resp = self.conn.getresponse() if resp.status != http.client.OK: raise HTTPError(resp.status) data = json.loads(resp.read().decode('utf-8')) if data.get('status', 0) != 1: raise MessageDeleteError(data) def handle_messages(self): last_msg = None for message in self.fetch_messages(): if last_msg is None or message.id > last_msg.id: last_msg = message if message.priority >= 2 and message.acked != 1: self.ack_message(message) self.show_message(message) if last_msg is not None: self.delete_messages(last_msg) def show_message(self, message): if show_stdout: self.show_message_stdout(message) if show_kdialog: self.show_message_dialog(message) def show_message_stdout(self, message): ts = datetime.datetime.fromtimestamp(message.date).strftime('%Y-%m-%d %H:%M:%S') msg = '\033[1m[%s - %s] %s\033[0m\n%s' % (ts, message.app, message.title, message.message) if message.url is not None: if message.url_title is not None: msg += '\n[%s] ' % (message.url_title, message.url) else: msg += '\n' % message.url print(msg) def show_message_dialog(self, message): msg = message.message title = '[%s] %s' % (message.app, message.title) if message.url is not None: if message.url_title is not None: msg += '\n\n[%s] ' % (message.url_title, message.url) else: msg += '\n\n' % message.url if subprocess.call('kdialog --title "%s" --passivepopup "%s" 10' % (title, msg), shell=True) != 0: print('Failed to show dialog.', file=sys.stderr) def ack_message(self, msg): params = urllib.parse.urlencode({'secret': self.cfg.secret}) self.conn.request('POST', '/1/receipts/%s/acknowledge.json' % msg.receipt, params, self.headers) resp = self.conn.getresponse() if resp.status != http.client.OK: raise HTTPError(resp.status) data = json.loads(resp.read().decode('utf-8')) if data.get('status', 0) != 1: raise MessageAckError(data) class Cfg: def __init__(self, file=None): self.file = file self.secret = None self.deviceid = None self.devname = dev_name def load(self, file=None): if file is not None: self.file = file try: with open(self.file, 'r') as fh: cfg = yaml.load(fh) except FileNotFoundError: with open(self.file, 'x') as fh: pass else: if cfg is not None: self.secret = cfg.get('secret') self.deviceid = cfg.get('deviceid') self.devname = cfg.get('devname', dev_name) def save(self): with open(self.file, 'w') as fh: yaml.dump({'secret': self.secret, 'deviceid': self.deviceid, 'devname': self.devname}, fh) class PushClient: headers = {'Content-Type': 'application/x-www-form-urlencoded'} def __init__(self, cfg): self.cfg = cfg self.conn = None def get_user_pass(self): print('Visit https://pushover.net/signup to create an account.') username = input('E-Mail: ') password = getpass.getpass() devname = input('Device name [%s]: ' % self.cfg.devname) return (username, password, devname) def setup_connection(self): self.conn = http.client.HTTPSConnection('api.pushover.net', 443) def close_connection(self): self.conn.close() self.conn = None def setup(self): self.setup_connection() if self.cfg.secret is None: self.cfg.deviceid = None (username, password, devname) = self.get_user_pass() if len(devname.strip()) > 0: self.cfg.devname = devname self.login(username, password) if self.cfg.deviceid is None: self.register_device() self.cfg.save() if self.conn is not None: self.close_connection() def login(self, username, password): params = urllib.parse.urlencode({'email': username, 'password': password}) self.conn.request('POST', '/1/users/login.json', params, self.headers) resp = self.conn.getresponse() if resp.status != http.client.OK: raise HTTPError(resp.status) data = json.loads(resp.read().decode('utf-8')) if data.get('status', 0) != 1: raise LoginError(data) self.cfg.secret = data['secret'] def register_device(self): params = urllib.parse.urlencode({'secret': self.cfg.secret, 'name': self.cfg.devname, 'os': 'O'}) self.conn.request('POST', '/1/devices.json', params, self.headers) resp = self.conn.getresponse() if resp.status != http.client.OK: raise HTTPError(resp.status) data = json.loads(resp.read().decode('utf-8')) if data.get('status', 0) != 1: raise RegisterError(data) self.cfg.deviceid = data['id'] def run(self): if self.cfg.secret is None or self.cfg.deviceid is None: self.setup() msg_handler = MessageHandler(self.cfg) msg_handler.handle_messages() while True: ws = WebSocketClient('wss://client.pushover.net/push', ssl_options={'cert_reqs': ssl.CERT_NONE}) ws.cfg = self.cfg ws.connect() try: ws.run() except WebSocketRestart: pass except KeyboardInterrupt: # Handle this gracefully break finally: ws.close() for i in reversed(range(1, 6)): print(' %s' % i, flush=True, end='') time.sleep(1) print('\n', end='') def debug(msg): if enable_debug: ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('%s - DEBUG: %s' % (ts, msg), file=sys.stderr) def main(): cfg = Cfg() cfg.load(cfg_file) client = PushClient(cfg) client.run() if __name__ == '__main__': try: main() except KeyboardInterrupt: pass