Package nbxmpp :: Module smacks
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.smacks

  1  from protocol import Acks 
  2  from protocol import NS_STREAM_MGMT 
  3  import logging 
  4  log = logging.getLogger('nbxmpp.smacks') 
  5   
6 -class Smacks():
7 ''' 8 This is Smacks is the Stream Management class. It takes care of requesting 9 and sending acks. Also, it keeps track of the unhandled outgoing stanzas. 10 11 The dispatcher has to be able to access this class to increment the 12 number of handled stanzas 13 ''' 14
15 - def __init__(self, con):
16 self.con = con # Connection object 17 self.out_h = 0 # Outgoing stanzas handled 18 self.in_h = 0 # Incoming stanzas handled 19 self.uqueue = [] # Unhandled stanzas queue 20 self.session_id = None 21 self.resumption = False # If server supports resume 22 # Max number of stanzas in queue before making a request 23 self.max_queue = 5 24 self._owner = None 25 self.resuming = False 26 self.enabled = False # If SM is enabled 27 self.location = None 28 self.failed_resume = False # If last resuming attempt failed 29 self.supports_sm = False # If server supports sm
30
31 - def set_owner(self, owner):
32 self._owner = owner 33 # Register handlers 34 owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) 35 owner.Dispatcher.RegisterHandler('enabled', self._neg_response, 36 xmlns=NS_STREAM_MGMT) 37 owner.Dispatcher.RegisterHandler('r', self.send_ack, 38 xmlns=NS_STREAM_MGMT) 39 owner.Dispatcher.RegisterHandler('a', self.check_ack, 40 xmlns=NS_STREAM_MGMT) 41 owner.Dispatcher.RegisterHandler('resumed', self.check_ack, 42 xmlns=NS_STREAM_MGMT) 43 owner.Dispatcher.RegisterHandler('failed', self.error_handling, 44 xmlns=NS_STREAM_MGMT)
45
46 - def _neg_response(self, disp, stanza):
47 r = stanza.getAttr('resume') 48 if r == 'true' or r == 'True' or r == '1': 49 self.resumption = True 50 self.session_id = stanza.getAttr('id') 51 if r == 'false' or r == 'False' or r == '0': 52 self.negociate(False) 53 l = stanza.getAttr('location') 54 if l: 55 self.location = l 56 if self.failed_resume: 57 self.con._discover_server_at_connection(self.con.connection) 58 self.failed_resume = False
59
60 - def negociate(self, resume=True):
61 # Every time we attempt to negociate, we must erase all previous info 62 # about any previous session 63 self.uqueue = [] 64 self.in_h = 0 65 self.out_h = 0 66 self.session_id = None 67 self.enabled = True 68 69 stanza = Acks() 70 stanza.buildEnable(resume) 71 self._owner.Connection.send(stanza, now=True)
72
73 - def resume_request(self):
74 if not self.session_id: 75 self.resuming = False 76 log.error('Attempted to resume without a valid session id ') 77 return 78 resume = Acks() 79 resume.buildResume(self.in_h, self.session_id) 80 self._owner.Connection.send(resume, False)
81
82 - def send_ack(self, disp, stanza):
83 ack = Acks() 84 ack.buildAnswer(self.in_h) 85 self._owner.Connection.send(ack, False)
86
87 - def request_ack(self):
88 r = Acks() 89 r.buildRequest() 90 self._owner.Connection.send(r, False)
91
92 - def check_ack(self, disp, stanza):
93 ''' 94 Checks if the number of stanzas sent are the same as the 95 number of stanzas received by the server. Pops stanzas that were 96 handled by the server from the queue. 97 ''' 98 h = int(stanza.getAttr('h')) 99 diff = self.out_h - h 100 101 if len(self.uqueue) < diff or diff < 0: 102 log.error('Server and client number of stanzas handled mismatch ') 103 else: 104 while (len(self.uqueue) > diff): 105 self.uqueue.pop(0) 106 107 if stanza.getName() == 'resumed': 108 self.enabled = True 109 self.resuming = True 110 self.con.set_oldst() 111 if self.uqueue != []: 112 for i in self.uqueue: 113 self._owner.Connection.send(i, False)
114
115 - def error_handling(self, disp, stanza):
116 # If the server doesn't recognize previd, forget about resuming 117 # Ask for service discovery, etc.. 118 if stanza.getTag('item-not-found'): 119 self.resuming = False 120 self.enabled = False 121 # we need to bind a resource 122 self._owner.NonBlockingBind.resuming = False 123 self._owner._on_auth_bind(None) 124 self.failed_resume = True 125 return 126 127 # Doesn't support resumption 128 if stanza.getTag('feature-not-implemented'): 129 self.negociate(False) 130 return 131 132 if stanza.getTag('unexpected-request'): 133 self.enabled = False 134 log.error('Gajim failed to negociate Stream Management') 135 return
136