Package ldaptor :: Package test :: Module test_ldapsyntax
[hide private]
[frames] | no frames]

Source Code for Module ldaptor.test.test_ldapsyntax

   1  """ 
   2  Test cases for ldaptor.protocols.ldap.ldapsyntax module. 
   3  """ 
   4   
   5  from twisted.trial import unittest 
   6  from ldaptor import config, testutil, delta 
   7  from ldaptor.protocols.ldap import ldapsyntax, ldaperrors 
   8  from ldaptor.protocols import pureldap, pureber 
   9  from twisted.internet import defer 
  10  from twisted.python import failure 
  11  from ldaptor.testutil import LDAPClientTestDriver 
  12   
13 -class LDAPSyntaxBasics(unittest.TestCase):
14 - def testCreation(self):
15 """Creating an LDAP object should succeed.""" 16 client = LDAPClientTestDriver() 17 o=ldapsyntax.LDAPEntry(client=client, 18 dn='cn=foo,dc=example,dc=com', 19 attributes={ 20 'objectClass': ['a', 'b'], 21 'aValue': ['a'], 22 'bValue': ['b'], 23 }) 24 self.failUnlessEqual(str(o.dn), 'cn=foo,dc=example,dc=com') 25 self.failUnlessEqual(o['objectClass'], ['a', 'b']) 26 self.failUnlessEqual(o['aValue'], ['a']) 27 self.failUnlessEqual(o['bValue'], ['b']) 28 client.assertNothingSent()
29
30 - def testKeys(self):
31 """Iterating over the keys of an LDAP object gives expected results.""" 32 client = LDAPClientTestDriver() 33 o=ldapsyntax.LDAPEntry(client=client, 34 dn='cn=foo,dc=example,dc=com', 35 attributes={ 36 'objectClass': ['a', 'b'], 37 'aValue': ['a'], 38 'bValue': ['b'], 39 }) 40 seen={} 41 for k in o.keys(): 42 assert not seen.has_key(k) 43 seen[k]=1 44 assert seen == {'objectClass': 1, 45 'aValue': 1, 46 'bValue': 1, 47 }
48
49 - def testItems(self):
50 """Iterating over the items of an LDAP object gives expected results.""" 51 client = LDAPClientTestDriver() 52 o=ldapsyntax.LDAPEntry(client=client, 53 dn='cn=foo,dc=example,dc=com', 54 attributes={ 55 'objectClass': ['a', 'b'], 56 'aValue': ['a'], 57 'bValue': ['b'], 58 }) 59 seen={} 60 for k,vs in o.items(): 61 assert not seen.has_key(k) 62 seen[k]=vs 63 assert seen == {'objectClass': ['a', 'b'], 64 'aValue': ['a'], 65 'bValue': ['b'], 66 }
67
68 - def testIn(self):
69 """Key in object gives expected results.""" 70 client=LDAPClientTestDriver() 71 o=ldapsyntax.LDAPEntry(client=client, 72 dn='cn=foo,dc=example,dc=com', 73 attributes={ 74 'objectClass': ['a', 'b'], 75 'aValue': ['a'], 76 'bValue': ['b'], 77 }) 78 assert 'objectClass' in o 79 assert 'aValue' in o 80 assert 'bValue' in o 81 assert 'foo' not in o 82 assert '' not in o 83 assert None not in o 84 85 assert 'a' in o['objectClass'] 86 assert 'b' in o['objectClass'] 87 assert 'foo' not in o['objectClass'] 88 assert '' not in o['objectClass'] 89 assert None not in o['objectClass'] 90 91 assert 'a' in o['aValue'] 92 assert 'foo' not in o['aValue'] 93 assert '' not in o['aValue'] 94 assert None not in o['aValue']
95
96 -class LDAPSyntaxAttributes(unittest.TestCase):
97 - def testAttributeSetting(self):
98 client=LDAPClientTestDriver() 99 o=ldapsyntax.LDAPEntry(client=client, 100 dn='cn=foo,dc=example,dc=com', 101 attributes={ 102 'objectClass': ['a', 'b'], 103 'aValue': ['a'], 104 'bValue': ['b'], 105 }) 106 o['aValue']=['foo', 'bar'] 107 self.failUnlessEqual(o['aValue'], ['foo', 'bar']) 108 o['aValue']=['quux'] 109 self.failUnlessEqual(o['aValue'], ['quux']) 110 self.failUnlessEqual(o['bValue'], ['b']) 111 o['cValue']=['thud'] 112 self.failUnlessEqual(o['aValue'], ['quux']) 113 self.failUnlessEqual(o['bValue'], ['b']) 114 self.failUnlessEqual(o['cValue'], ['thud'])
115
116 - def testAttributeDelete(self):
117 client=LDAPClientTestDriver() 118 o=ldapsyntax.LDAPEntry(client=client, 119 dn='cn=foo,dc=example,dc=com', 120 attributes={ 121 'objectClass': ['a', 'b'], 122 'aValue': ['a'], 123 'bValue': ['b'], 124 }) 125 o['aValue']=['quux'] 126 del o['aValue'] 127 del o['bValue'] 128 self.failIf(o.has_key('aValue')) 129 self.failIf(o.has_key('bValue'))
130
131 - def testAttributeAdd(self):
132 client=LDAPClientTestDriver() 133 o=ldapsyntax.LDAPEntry(client=client, 134 dn='cn=foo,dc=example,dc=com', 135 attributes={ 136 'objectClass': ['a', 'b'], 137 'aValue': ['a'], 138 'bValue': ['b'], 139 }) 140 o['aValue'].add('foo') 141 self.failUnlessEqual(o['aValue'], ['a', 'foo'])
142
143 - def testAttributeItemDelete(self):
144 client=LDAPClientTestDriver() 145 o=ldapsyntax.LDAPEntry(client=client, 146 dn='cn=foo,dc=example,dc=com', 147 attributes={ 148 'objectClass': ['a', 'b'], 149 'aValue': ['a', 'b', 'c'], 150 'bValue': ['b'], 151 }) 152 o['aValue'].remove('b') 153 self.failUnlessEqual(o['aValue'], ['a', 'c'])
154
155 - def testUndo(self):
156 """Undo should forget the modifications.""" 157 client=LDAPClientTestDriver() 158 o=ldapsyntax.LDAPEntry(client=client, 159 dn='cn=foo,dc=example,dc=com', 160 attributes={ 161 'objectClass': ['a', 'b'], 162 'aValue': ['a'], 163 'bValue': ['b'], 164 'cValue': ['c'], 165 }) 166 o['aValue']=['foo', 'bar'] 167 o['aValue']=['quux'] 168 del o['cValue'] 169 o.undo() 170 self.failUnlessEqual(o['aValue'], ['a']) 171 self.failUnlessEqual(o['bValue'], ['b']) 172 self.failUnlessEqual(o['cValue'], ['c'])
173
174 - def testUndoJournaling(self):
175 """Journaling should still work after undo.""" 176 client=LDAPClientTestDriver( 177 [ pureldap.LDAPModifyResponse(resultCode=0, 178 matchedDN='', 179 errorMessage=''), 180 ]) 181 o=ldapsyntax.LDAPEntry(client=client, 182 dn='cn=foo,dc=example,dc=com', 183 attributes={ 184 'objectClass': ['a', 'b'], 185 'aValue': ['a'], 186 'bValue': ['b'], 187 'cValue': ['c'], 188 }) 189 o['aValue']=['foo', 'bar'] 190 o['aValue']=['quux'] 191 del o['cValue'] 192 o.undo() 193 o['aValue'].update(['newValue', 'anotherNewValue']) 194 d=o.commit() 195 def cb(dummy): 196 self.failUnlessEqual(o['aValue'], ['a', 'newValue', 'anotherNewValue']) 197 self.failUnlessEqual(o['bValue'], ['b']) 198 self.failUnlessEqual(o['cValue'], ['c']) 199 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 200 delta.Add('aValue', ['newValue', 'anotherNewValue']), 201 ]).asLDAP())
202 d.addCallback(cb) 203 return d
204
205 - def testUndoAfterCommit(self):
206 """Undo should not undo things that have been commited.""" 207 208 client=LDAPClientTestDriver( 209 [ pureldap.LDAPModifyResponse(resultCode=0, 210 matchedDN='', 211 errorMessage=''), 212 ]) 213 o=ldapsyntax.LDAPEntry( 214 client=client, 215 dn='cn=foo,dc=example,dc=com', 216 attributes={ 217 'objectClass': ['a', 'b'], 218 'aValue': ['a'], 219 'bValue': ['b'], 220 'cValue': ['c'], 221 }) 222 o['aValue']=['foo', 'bar'] 223 o['bValue']=['quux'] 224 del o['cValue'] 225 226 d=o.commit() 227 def cb(dummy): 228 o.undo() 229 self.failUnlessEqual(o['aValue'], ['foo', 'bar']) 230 self.failUnlessEqual(o['bValue'], ['quux']) 231 self.failIf(o.has_key('cValue'))
232 d.addCallback(cb) 233 return d 234
235 -class LDAPSyntaxAttributesModificationOnWire(unittest.TestCase):
236 - def testAdd(self):
237 """Modify & commit should write the right data to the server.""" 238 239 client = LDAPClientTestDriver( 240 [ pureldap.LDAPModifyResponse(resultCode=0, 241 matchedDN='', 242 errorMessage=''), 243 ]) 244 245 o=ldapsyntax.LDAPEntry(client=client, 246 dn='cn=foo,dc=example,dc=com', 247 attributes={ 248 'objectClass': ['a', 'b'], 249 'aValue': ['a'], 250 }) 251 o['aValue'].update(['newValue', 'anotherNewValue']) 252 253 d=o.commit() 254 def cb(dummy): 255 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 256 delta.Add('aValue', ['newValue', 'anotherNewValue']), 257 ]).asLDAP())
258 d.addCallback(cb) 259 return d
260
261 - def testAddSeparate(self):
262 """Modify & commit should write the right data to the server.""" 263 264 client = LDAPClientTestDriver( 265 [ pureldap.LDAPModifyResponse(resultCode=0, 266 matchedDN='', 267 errorMessage=''), 268 ]) 269 270 o=ldapsyntax.LDAPEntry(client=client, 271 dn='cn=foo,dc=example,dc=com', 272 attributes={ 273 'objectClass': ['a', 'b'], 274 'aValue': ['a'], 275 }) 276 o['aValue'].add('newValue') 277 o['aValue'].add('anotherNewValue') 278 279 d=o.commit() 280 def cb(dummy): 281 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 282 delta.Add('aValue', ['newValue']), 283 delta.Add('aValue', ['anotherNewValue']), 284 ]).asLDAP())
285 d.addCallback(cb) 286 return d 287
288 - def testDeleteAttribute(self):
289 """Modify & commit should write the right data to the server.""" 290 291 client = LDAPClientTestDriver( 292 [ pureldap.LDAPModifyResponse(resultCode=0, 293 matchedDN='', 294 errorMessage='') 295 ]) 296 297 o=ldapsyntax.LDAPEntry(client=client, 298 dn='cn=foo,dc=example,dc=com', 299 attributes={ 300 'objectClass': ['a', 'b'], 301 'aValue': ['a'], 302 }) 303 o['aValue'].remove('a') 304 305 d=o.commit() 306 def cb(dummy): 307 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 308 delta.Delete('aValue', ['a']), 309 ]).asLDAP())
310 d.addCallback(cb) 311 return d 312
313 - def testDeleteAllAttribute(self):
314 """Modify & commit should write the right data to the server.""" 315 316 client = LDAPClientTestDriver( 317 [ pureldap.LDAPModifyResponse(resultCode=0, 318 matchedDN='', 319 errorMessage=''), 320 ]) 321 322 o=ldapsyntax.LDAPEntry(client=client, 323 dn='cn=foo,dc=example,dc=com', 324 attributes={ 325 'objectClass': ['a', 'b'], 326 'aValue': ['a1', 'a2'], 327 'bValue': ['b1', 'b2'], 328 }) 329 del o['aValue'] 330 o['bValue'].clear() 331 332 d=o.commit() 333 def cb(dummy): 334 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 335 delta.Delete('aValue'), 336 delta.Delete('bValue'), 337 ]).asLDAP())
338 d.addCallback(cb) 339 return d 340 341
342 - def testReplaceAttributes(self):
343 """Modify & commit should write the right data to the server.""" 344 345 client = LDAPClientTestDriver( 346 [ pureldap.LDAPModifyResponse(resultCode=0, 347 matchedDN='', 348 errorMessage='') 349 ]) 350 351 o=ldapsyntax.LDAPEntry(client=client, 352 dn='cn=foo,dc=example,dc=com', 353 attributes={ 354 'objectClass': ['a', 'b'], 355 'aValue': ['a'], 356 }) 357 o['aValue']=['foo', 'bar'] 358 359 d=o.commit() 360 def cb(dummy): 361 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 362 delta.Replace('aValue', ['foo', 'bar']), 363 ]).asLDAP())
364 d.addCallback(cb) 365 return d 366 367
368 -class LDAPSyntaxSearch(unittest.TestCase):
369 - def testSearch(self):
370 """Test searches.""" 371 372 client=LDAPClientTestDriver([ 373 pureldap.LDAPSearchResultEntry( 374 objectName='cn=foo,dc=example,dc=com', 375 attributes=(('foo', ['a']), 376 ('bar', ['b', 'c']), 377 ), 378 ), 379 pureldap.LDAPSearchResultEntry( 380 objectName='cn=bar,dc=example,dc=com', 381 attributes=(('foo', ['a']), 382 ('bar', ['d', 'e']), 383 ), 384 ), 385 pureldap.LDAPSearchResultDone( 386 resultCode=0, 387 matchedDN='', 388 errorMessage='') 389 ]) 390 391 o=ldapsyntax.LDAPEntry(client=client, 392 dn='dc=example,dc=com', 393 attributes={ 394 'objectClass': ['organizationalUnit'], 395 }) 396 397 d=o.search(filterText='(foo=a)', 398 attributes=['foo', 'bar']) 399 def cb(val): 400 client.assertSent(pureldap.LDAPSearchRequest( 401 baseObject='dc=example,dc=com', 402 scope=pureldap.LDAP_SCOPE_wholeSubtree, 403 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 404 sizeLimit=0, 405 timeLimit=0, 406 typesOnly=0, 407 filter=pureldap.LDAPFilter_equalityMatch( 408 attributeDesc=pureldap.LDAPAttributeDescription(value='foo'), 409 assertionValue=pureldap.LDAPAssertionValue(value='a')), 410 attributes=['foo', 'bar'])) 411 self.failUnlessEqual(len(val), 2) 412 413 self.failUnlessEqual(val[0], 414 ldapsyntax.LDAPEntry( 415 client=client, 416 dn='cn=foo,dc=example,dc=com', 417 attributes={ 418 'foo': ['a'], 419 'bar': ['b', 'c'], 420 })) 421 422 self.failUnlessEqual(val[1], 423 ldapsyntax.LDAPEntry( 424 client=client, 425 dn='cn=bar,dc=example,dc=com', 426 attributes={ 427 'foo': ['a'], 428 'bar': ['d', 'e'], 429 }))
430 d.addCallback(cb) 431 return d
432 433
434 - def testSearch_defaultAttributes(self):
435 """Search without explicit list of attributes returns all attributes.""" 436 437 client=LDAPClientTestDriver([ 438 pureldap.LDAPSearchResultEntry( 439 objectName='cn=foo,dc=example,dc=com', 440 attributes=(('foo', ['a']), 441 ('bar', ['b', 'c']), 442 ), 443 ), 444 pureldap.LDAPSearchResultEntry( 445 objectName='cn=bar,dc=example,dc=com', 446 attributes=(('foo', ['a']), 447 ('bar', ['d', 'e']), 448 ), 449 ), 450 pureldap.LDAPSearchResultDone( 451 resultCode=0, 452 matchedDN='', 453 errorMessage='') 454 ]) 455 456 o=ldapsyntax.LDAPEntry(client=client, 457 dn='dc=example,dc=com', 458 attributes={ 459 'objectClass': ['organizationalUnit'], 460 }) 461 462 d=o.search(filterText='(foo=a)') 463 def cb(val): 464 client.assertSent(pureldap.LDAPSearchRequest( 465 baseObject='dc=example,dc=com', 466 scope=pureldap.LDAP_SCOPE_wholeSubtree, 467 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 468 sizeLimit=0, 469 timeLimit=0, 470 typesOnly=0, 471 filter=pureldap.LDAPFilter_equalityMatch( 472 attributeDesc=pureldap.LDAPAttributeDescription(value='foo'), 473 assertionValue=pureldap.LDAPAssertionValue(value='a')), 474 attributes=[])) 475 self.failUnlessEqual(len(val), 2) 476 477 self.failUnlessEqual(val[0], 478 ldapsyntax.LDAPEntry( 479 client=client, 480 dn='cn=foo,dc=example,dc=com', 481 attributes={ 482 'foo': ['a'], 483 'bar': ['b', 'c'], 484 })) 485 self.failUnless(val[0].complete) 486 487 self.failUnlessEqual(val[1], 488 ldapsyntax.LDAPEntry( 489 client=client, 490 dn='cn=bar,dc=example,dc=com', 491 attributes={ 492 'foo': ['a'], 493 'bar': ['d', 'e'], 494 })) 495 self.failUnless(val[1].complete)
496 d.addCallback(cb) 497 return d 498
499 - def testSearch_noAttributes(self):
500 """Search with attributes=None returns no attributes.""" 501 502 client=LDAPClientTestDriver([ 503 pureldap.LDAPSearchResultEntry('cn=foo,dc=example,dc=com', 504 attributes=()), 505 pureldap.LDAPSearchResultEntry('cn=bar,dc=example,dc=com', 506 attributes=()), 507 pureldap.LDAPSearchResultDone( 508 resultCode=0, 509 matchedDN='', 510 errorMessage='') 511 ]) 512 513 o=ldapsyntax.LDAPEntry(client=client, 514 dn='dc=example,dc=com', 515 attributes={ 516 'objectClass': ['organizationalUnit'], 517 }) 518 519 d=o.search(filterText='(foo=a)', 520 attributes=None) 521 def cb(val): 522 client.assertSent(pureldap.LDAPSearchRequest( 523 baseObject='dc=example,dc=com', 524 scope=pureldap.LDAP_SCOPE_wholeSubtree, 525 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 526 sizeLimit=0, 527 timeLimit=0, 528 typesOnly=0, 529 filter=pureldap.LDAPFilter_equalityMatch( 530 attributeDesc=pureldap.LDAPAttributeDescription(value='foo'), 531 assertionValue=pureldap.LDAPAssertionValue(value='a')), 532 attributes=['1.1'])) 533 self.failUnlessEqual(len(val), 2) 534 535 self.failUnlessEqual(val[0], 536 ldapsyntax.LDAPEntry( 537 client=client, 538 dn='cn=foo,dc=example,dc=com')) 539 self.failIf(val[0].complete) 540 541 self.failUnlessEqual(val[1], 542 ldapsyntax.LDAPEntry( 543 client=client, 544 dn='cn=bar,dc=example,dc=com')) 545 self.failIf(val[1].complete)
546 d.addCallback(cb) 547 return d 548
549 - def testSearch_ImmediateProcessing(self):
550 """Test searches with the immediate processing feature.""" 551 552 client=LDAPClientTestDriver([ 553 pureldap.LDAPSearchResultEntry( 554 objectName='cn=foo,dc=example,dc=com', 555 attributes=(('bar', ['b', 'c']), 556 ), 557 ), 558 559 pureldap.LDAPSearchResultEntry( 560 objectName='cn=bar,dc=example,dc=com', 561 attributes=(('bar', ['b', 'c']), 562 ), 563 ), 564 565 pureldap.LDAPSearchResultDone( 566 resultCode=0, 567 matchedDN='', 568 errorMessage='') 569 ]) 570 571 o=ldapsyntax.LDAPEntry(client=client, 572 dn='dc=example,dc=com', 573 attributes={ 574 'objectClass': ['organizationalUnit'], 575 }) 576 577 seen=[] 578 def process(o): 579 seen.append(o)
580 581 d=o.search(filterText='(foo=a)', 582 attributes=['bar'], 583 callback=process) 584 def cb(val): 585 self.assertEquals(val, None) 586 587 client.assertSent(pureldap.LDAPSearchRequest( 588 baseObject='dc=example,dc=com', 589 scope=pureldap.LDAP_SCOPE_wholeSubtree, 590 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 591 sizeLimit=0, 592 timeLimit=0, 593 typesOnly=0, 594 filter=pureldap.LDAPFilter_equalityMatch( 595 attributeDesc=pureldap.LDAPAttributeDescription(value='foo'), 596 assertionValue=pureldap.LDAPAssertionValue(value='a')), 597 attributes=['bar'])) 598 599 self.failUnlessEqual(seen, 600 [ 601 ldapsyntax.LDAPEntry( 602 client=client, 603 dn='cn=foo,dc=example,dc=com', 604 attributes={ 605 'bar': ['b', 'c'], 606 }), 607 ldapsyntax.LDAPEntry( 608 client=client, 609 dn='cn=bar,dc=example,dc=com', 610 attributes={ 611 'bar': ['b', 'c'], 612 })]) 613 d.addCallback(cb) 614 return d 615
616 - def testSearch_fail(self):
617 client=LDAPClientTestDriver([ 618 pureldap.LDAPSearchResultDone( 619 resultCode=ldaperrors.LDAPBusy.resultCode, 620 matchedDN='', 621 errorMessage='Go away') 622 ]) 623 624 o=ldapsyntax.LDAPEntry(client=client, dn='dc=example,dc=com') 625 d=o.search(filterText='(foo=a)') 626 def eb(fail): 627 fail.trap(ldaperrors.LDAPBusy) 628 self.assertEquals(fail.value.message, 'Go away') 629 630 client.assertSent(pureldap.LDAPSearchRequest( 631 baseObject='dc=example,dc=com', 632 scope=pureldap.LDAP_SCOPE_wholeSubtree, 633 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 634 sizeLimit=0, 635 timeLimit=0, 636 typesOnly=0, 637 filter=pureldap.LDAPFilter_equalityMatch( 638 attributeDesc=pureldap.LDAPAttributeDescription(value='foo'), 639 assertionValue=pureldap.LDAPAssertionValue(value='a')), 640 ))
641 d.addCallbacks(testutil.mustRaise, eb) 642 return d 643
644 -class LDAPSyntaxDNs(unittest.TestCase):
645 - def testDNKeyExistenceSuccess(self):
646 client = LDAPClientTestDriver() 647 ldapsyntax.LDAPEntry(client=client, 648 dn='cn=foo,dc=example,dc=com', 649 attributes={ 650 'cn': ['foo'], 651 })
652
654 client = LDAPClientTestDriver() 655 self.failUnlessRaises(ldapsyntax.DNNotPresentError, 656 ldapsyntax.LDAPEntry, 657 client=client, 658 dn='cn=foo,dc=example,dc=com', 659 attributes={ 660 'foo': ['bar'], 661 })
662
663 -class LDAPSyntaxLDIF(unittest.TestCase):
664 - def testLDIFConversion(self):
665 client = LDAPClientTestDriver() 666 o=ldapsyntax.LDAPEntry(client=client, 667 dn='cn=foo,dc=example,dc=com', 668 attributes={ 669 'objectClass': ['a', 'b'], 670 'aValue': ['a', 'b'], 671 'bValue': ['c'], 672 }) 673 self.failUnlessEqual(str(o), 674 '\n'.join(( 675 "dn: cn=foo,dc=example,dc=com", 676 "objectClass: a", 677 "objectClass: b", 678 "aValue: a", 679 "aValue: b", 680 "bValue: c", 681 "\n")))
682
683 -class LDAPSyntaxDelete(unittest.TestCase):
684 - def testDeleteInvalidates(self):
685 """Deleting an LDAPEntry invalidates it.""" 686 client = LDAPClientTestDriver( 687 [pureldap.LDAPDelResponse(resultCode=0, 688 matchedDN='', 689 errorMessage=''), 690 ]) 691 o=ldapsyntax.LDAPEntry(client=client, 692 dn='cn=foo,dc=example,dc=com', 693 attributes={ 694 'objectClass': ['a'], 695 }) 696 d=o.delete() 697 def cb(dummy): 698 self.failUnlessRaises( 699 ldapsyntax.ObjectDeletedError, 700 o.search, 701 filterText='(foo=a)') 702 self.failUnlessRaises( 703 ldapsyntax.ObjectDeletedError, 704 o.get, 705 'objectClass')
706 d.addCallback(cb) 707 return d
708
709 - def testDeleteOnWire(self):
710 """LDAPEntry.delete should write the right data to the server.""" 711 client = LDAPClientTestDriver( 712 [pureldap.LDAPDelResponse(resultCode=0, 713 matchedDN='', 714 errorMessage=''), 715 ]) 716 o=ldapsyntax.LDAPEntry(client=client, 717 dn='cn=foo,dc=example,dc=com', 718 attributes={ 719 'objectClass': ['a'], 720 }) 721 d=o.delete() 722 def cb(dummy): 723 client.assertSent(pureldap.LDAPDelRequest( 724 entry='cn=foo,dc=example,dc=com', 725 ))
726 d.addCallback(cb) 727 return d 728
729 - def testErrorHandling(self):
730 """LDAPEntry.delete should pass LDAP errors to it's deferred.""" 731 client = LDAPClientTestDriver( 732 [pureldap.LDAPDelResponse(resultCode=ldaperrors.LDAPBusy.resultCode, 733 matchedDN='', 734 errorMessage='Go away'), 735 ]) 736 o=ldapsyntax.LDAPEntry(client=client, 737 dn='cn=foo,dc=example,dc=com', 738 attributes={ 739 'objectClass': ['a'], 740 }) 741 d=o.delete() 742 def eb(fail): 743 fail.trap(ldaperrors.LDAPBusy) 744 self.assertEquals(fail.value.message, 'Go away') 745 746 client.assertSent(pureldap.LDAPDelRequest( 747 entry='cn=foo,dc=example,dc=com', 748 ))
749 d.addCallbacks(testutil.mustRaise, eb) 750 return d 751
752 - def testErrorHandling_extended(self):
753 """LDAPEntry.delete should pass even non-LDAPDelResponse errors to it's deferred.""" 754 client = LDAPClientTestDriver( 755 [pureldap.LDAPExtendedResponse(resultCode=ldaperrors.LDAPProtocolError.resultCode, 756 responseName='1.3.6.1.4.1.1466.20036', 757 errorMessage='Unknown request') 758 ]) 759 o=ldapsyntax.LDAPEntry(client=client, 760 dn='cn=foo,dc=example,dc=com', 761 attributes={ 762 'objectClass': ['a'], 763 }) 764 d=o.delete() 765 def eb(fail): 766 fail.trap(ldaperrors.LDAPProtocolError) 767 self.assertEquals(fail.value.message, 'Unknown request') 768 769 client.assertSent(pureldap.LDAPDelRequest( 770 entry='cn=foo,dc=example,dc=com', 771 ))
772 d.addCallbacks(testutil.mustRaise, eb) 773 return d 774
775 -class LDAPSyntaxAddChild(unittest.TestCase):
776 - def testAddChildOnWire(self):
777 """LDAPEntry.addChild should write the right data to the server.""" 778 client = LDAPClientTestDriver( 779 [pureldap.LDAPAddResponse(resultCode=0, 780 matchedDN='', 781 errorMessage=''), 782 ]) 783 o=ldapsyntax.LDAPEntry(client=client, 784 dn='ou=things,dc=example,dc=com', 785 attributes={ 786 'objectClass': ['organizationalUnit'], 787 'ou': ['things'], 788 }) 789 d=o.addChild( 790 rdn='givenName=Firstname+surname=Lastname', 791 attributes={'objectClass': ['person', 'otherStuff'], 792 'givenName': ['Firstname'], 793 'surname': ['Lastname'], 794 }) 795 def cb(dummy): 796 client.assertSent(pureldap.LDAPAddRequest( 797 entry='givenName=Firstname+surname=Lastname,ou=things,dc=example,dc=com', 798 attributes=[ (pureldap.LDAPAttributeDescription('objectClass'), 799 pureber.BERSet([pureldap.LDAPAttributeValue('person'), 800 pureldap.LDAPAttributeValue('otherStuff'), 801 ])), 802 (pureldap.LDAPAttributeDescription('givenName'), 803 pureber.BERSet([pureldap.LDAPAttributeValue('Firstname')])), 804 (pureldap.LDAPAttributeDescription('surname'), 805 pureber.BERSet([pureldap.LDAPAttributeValue('Lastname')])), 806 ], 807 ))
808 d.addCallback(cb) 809 return d
810
811 -class LDAPSyntaxContainingNamingContext(unittest.TestCase):
812 - def testNamingContext(self):
813 """LDAPEntry.namingContext returns the naming context that contains this object (via a Deferred).""" 814 client=LDAPClientTestDriver( 815 [ pureldap.LDAPSearchResultEntry( 816 objectName='', 817 attributes=[('namingContexts', 818 ('dc=foo,dc=example', 819 'dc=example,dc=com', 820 'dc=bar,dc=example', 821 ))]), 822 823 pureldap.LDAPSearchResultDone(resultCode=0, 824 matchedDN='', 825 errorMessage='') 826 ]) 827 828 o=ldapsyntax.LDAPEntry(client=client, 829 dn='cn=foo,ou=bar,dc=example,dc=com', 830 attributes={ 831 'objectClass': ['a'], 832 }) 833 d=o.namingContext() 834 def cb(p): 835 assert isinstance(p, ldapsyntax.LDAPEntry) 836 assert p.client == o.client 837 assert str(p.dn) == 'dc=example,dc=com' 838 839 client.assertSent(pureldap.LDAPSearchRequest( 840 baseObject='', 841 scope=pureldap.LDAP_SCOPE_baseObject, 842 filter=pureldap.LDAPFilter_present('objectClass'), 843 attributes=['namingContexts'], 844 ))
845 d.addCallback(cb) 846 return d
847 848
849 -class LDAPSyntaxPasswords(unittest.TestCase):
850 - def setUp(self):
851 cfg = config.loadConfig() 852 cfg.set('samba', 'use-lmhash', 'no')
853
855 """LDAPEntry.setPassword_ExtendedOperation(newPasswd=...) changes the password.""" 856 client = LDAPClientTestDriver( 857 [pureldap.LDAPExtendedResponse(resultCode=0, 858 matchedDN='', 859 errorMessage='')], 860 ) 861 862 o=ldapsyntax.LDAPEntry(client=client, 863 dn='cn=foo,dc=example,dc=com') 864 d=o.setPassword_ExtendedOperation(newPasswd='new') 865 def cb(dummy): 866 client.assertSent(pureldap.LDAPPasswordModifyRequest( 867 userIdentity='cn=foo,dc=example,dc=com', 868 newPasswd='new'), 869 )
870 d.addCallback(cb) 871 return d
872
873 - def testPasswordSetting_Samba_sambaAccount(self):
874 """LDAPEntry.setPassword_Samba(newPasswd=..., 875 style='sambaAccount') changes the password.""" 876 client = LDAPClientTestDriver( 877 [pureldap.LDAPModifyResponse(resultCode=0, 878 matchedDN='', 879 errorMessage='')], 880 ) 881 882 o=ldapsyntax.LDAPEntry(client=client, 883 dn='cn=foo,dc=example,dc=com') 884 d=o.setPassword_Samba(newPasswd='new', style='sambaAccount') 885 def cb(dummy): 886 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 887 delta.Replace('ntPassword', 888 ['89963F5042E5041A59C249282387A622']), 889 delta.Replace('lmPassword', 890 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 891 ]).asLDAP())
892 d.addCallback(cb) 893 return d 894
895 - def testPasswordSetting_Samba_sambaSamAccount(self):
896 """LDAPEntry.setPassword_Samba(newPasswd=..., style='sambaSamAccount') changes the password.""" 897 client = LDAPClientTestDriver( 898 [pureldap.LDAPModifyResponse(resultCode=0, 899 matchedDN='', 900 errorMessage='')], 901 ) 902 903 o=ldapsyntax.LDAPEntry(client=client, 904 dn='cn=foo,dc=example,dc=com') 905 d=o.setPassword_Samba(newPasswd='new', style='sambaSamAccount') 906 def cb(dummy): 907 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 908 delta.Replace('sambaNTPassword', 909 ['89963F5042E5041A59C249282387A622']), 910 delta.Replace('sambaLMPassword', 911 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 912 ]).asLDAP())
913 d.addCallback(cb) 914 return d 915
916 - def testPasswordSetting_Samba_defaultStyle(self):
917 """LDAPEntry.setPassword_Samba(newPasswd=...) changes the password.""" 918 client = LDAPClientTestDriver( 919 [pureldap.LDAPModifyResponse(resultCode=0, 920 matchedDN='', 921 errorMessage='')], 922 ) 923 924 o=ldapsyntax.LDAPEntry(client=client, 925 dn='cn=foo,dc=example,dc=com') 926 d=o.setPassword_Samba(newPasswd='new') 927 def cb(dummy): 928 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 929 delta.Replace('sambaNTPassword', 930 ['89963F5042E5041A59C249282387A622']), 931 delta.Replace('sambaLMPassword', 932 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 933 ]).asLDAP())
934 d.addCallback(cb) 935 return d 936
937 - def testPasswordSetting_Samba_badStyle(self):
938 """LDAPEntry.setPassword_Samba(..., style='foo') fails.""" 939 client = LDAPClientTestDriver( 940 [pureldap.LDAPModifyResponse(resultCode=0, 941 matchedDN='', 942 errorMessage='')], 943 ) 944 945 o=ldapsyntax.LDAPEntry(client=client, 946 dn='cn=foo,dc=example,dc=com') 947 d=defer.maybeDeferred(o.setPassword_Samba, newPasswd='new', style='foo') 948 def eb(fail): 949 fail.trap(RuntimeError) 950 self.assertEquals(fail.getErrorMessage(), 951 "Unknown samba password style 'foo'") 952 client.assertNothingSent()
953 d.addCallbacks(testutil.mustRaise, eb) 954 return d 955
956 - def testPasswordSettingAll_noSamba(self):
957 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 958 client = LDAPClientTestDriver( 959 [pureldap.LDAPExtendedResponse(resultCode=0, 960 matchedDN='', 961 errorMessage='')], 962 ) 963 964 o=ldapsyntax.LDAPEntry(client=client, 965 dn='cn=foo,dc=example,dc=com', 966 attributes={ 967 'objectClass': ['foo'], 968 }, 969 complete=1) 970 d=o.setPassword(newPasswd='new') 971 def cb(dummy): 972 client.assertSent(pureldap.LDAPPasswordModifyRequest( 973 userIdentity='cn=foo,dc=example,dc=com', 974 newPasswd='new'), 975 )
976 d.addCallback(cb) 977 return d 978 979
980 - def testPasswordSettingAll_hasSamba(self):
981 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 982 client = LDAPClientTestDriver( 983 [pureldap.LDAPExtendedResponse(resultCode=0, 984 matchedDN='', 985 errorMessage='')], 986 [pureldap.LDAPModifyResponse(resultCode=0, 987 matchedDN='', 988 errorMessage='')], 989 ) 990 991 o=ldapsyntax.LDAPEntry(client=client, 992 dn='cn=foo,dc=example,dc=com', 993 attributes={ 994 'objectClass': ['foo', 'sambaAccount'], 995 }, 996 complete=1) 997 d=o.setPassword(newPasswd='new') 998 def cb(dummy): 999 client.assertSent(pureldap.LDAPPasswordModifyRequest( 1000 userIdentity='cn=foo,dc=example,dc=com', 1001 newPasswd='new'), 1002 delta.ModifyOp('cn=foo,dc=example,dc=com', [ 1003 delta.Replace('ntPassword', 1004 ['89963F5042E5041A59C249282387A622']), 1005 delta.Replace('lmPassword', 1006 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 1007 ]).asLDAP())
1008 d.addCallback(cb) 1009 return d 1010 1011
1012 - def testPasswordSettingAll_hasSambaSam(self):
1013 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 1014 client = LDAPClientTestDriver( 1015 [pureldap.LDAPExtendedResponse(resultCode=0, 1016 matchedDN='', 1017 errorMessage='')], 1018 [pureldap.LDAPModifyResponse(resultCode=0, 1019 matchedDN='', 1020 errorMessage='')], 1021 ) 1022 1023 o=ldapsyntax.LDAPEntry(client=client, 1024 dn='cn=foo,dc=example,dc=com', 1025 attributes={ 1026 'objectClass': ['foo', 'sambaSamAccount'], 1027 }, 1028 complete=1) 1029 d=o.setPassword(newPasswd='new') 1030 def cb(dummy): 1031 client.assertSent(pureldap.LDAPPasswordModifyRequest( 1032 userIdentity='cn=foo,dc=example,dc=com', 1033 newPasswd='new'), 1034 delta.ModifyOp('cn=foo,dc=example,dc=com', [ 1035 delta.Replace('sambaNTPassword', 1036 ['89963F5042E5041A59C249282387A622']), 1037 delta.Replace('sambaLMPassword', 1038 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 1039 ]).asLDAP())
1040 d.addCallback(cb) 1041 return d 1042 1043
1044 - def testPasswordSettingAll_hasSamba_differentCase(self):
1045 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 1046 client = LDAPClientTestDriver( 1047 [pureldap.LDAPExtendedResponse(resultCode=0, 1048 matchedDN='', 1049 errorMessage='')], 1050 [pureldap.LDAPModifyResponse(resultCode=0, 1051 matchedDN='', 1052 errorMessage='')], 1053 ) 1054 1055 o=ldapsyntax.LDAPEntry(client=client, 1056 dn='cn=foo,dc=example,dc=com', 1057 attributes={ 1058 'objectClass': ['foo', 'saMBaAccOuNT'], 1059 }, 1060 complete=1) 1061 d=o.setPassword(newPasswd='new') 1062 def cb(dummy): 1063 client.assertSent(pureldap.LDAPPasswordModifyRequest( 1064 userIdentity='cn=foo,dc=example,dc=com', 1065 newPasswd='new'), 1066 delta.ModifyOp('cn=foo,dc=example,dc=com', [ 1067 delta.Replace('ntPassword', 1068 ['89963F5042E5041A59C249282387A622']), 1069 delta.Replace('lmPassword', 1070 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 1071 ]).asLDAP())
1072 d.addCallback(cb) 1073 return d 1074 1075
1076 - def testPasswordSettingAll_hasSambaSam_differentCase(self):
1077 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 1078 client = LDAPClientTestDriver( 1079 [pureldap.LDAPExtendedResponse(resultCode=0, 1080 matchedDN='', 1081 errorMessage='')], 1082 [pureldap.LDAPModifyResponse(resultCode=0, 1083 matchedDN='', 1084 errorMessage='')], 1085 ) 1086 1087 o=ldapsyntax.LDAPEntry(client=client, 1088 dn='cn=foo,dc=example,dc=com', 1089 attributes={ 1090 'objectClass': ['foo', 'sAmbASAmaccoUnt'], 1091 }, 1092 complete=1) 1093 d=o.setPassword(newPasswd='new') 1094 def cb(dummy): 1095 client.assertSent(pureldap.LDAPPasswordModifyRequest( 1096 userIdentity='cn=foo,dc=example,dc=com', 1097 newPasswd='new'), 1098 delta.ModifyOp('cn=foo,dc=example,dc=com', [ 1099 delta.Replace('sambaNTPassword', 1100 ['89963F5042E5041A59C249282387A622']), 1101 delta.Replace('sambaLMPassword', 1102 ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 1103 ]).asLDAP())
1104 d.addCallback(cb) 1105 return d 1106 1107
1108 - def testPasswordSettingAll_maybeSamba_WillFind(self):
1109 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 1110 client = LDAPClientTestDriver( 1111 [pureldap.LDAPExtendedResponse(resultCode=0, 1112 matchedDN='', 1113 errorMessage='')], 1114 [ 1115 pureldap.LDAPSearchResultEntry(objectName='', 1116 attributes=[('objectClass', 1117 ('foo', 1118 'sambaAccount', 1119 'bar'))]), 1120 pureldap.LDAPSearchResultDone(resultCode=0, 1121 matchedDN='', 1122 errorMessage=''), 1123 ], 1124 [pureldap.LDAPModifyResponse(resultCode=0, 1125 matchedDN='', 1126 errorMessage='')], 1127 ) 1128 1129 o=ldapsyntax.LDAPEntry(client=client, dn='cn=foo,dc=example,dc=com') 1130 d=o.setPassword(newPasswd='new') 1131 def cb(dummy): 1132 client.assertSent( 1133 pureldap.LDAPPasswordModifyRequest(userIdentity='cn=foo,dc=example,dc=com', 1134 newPasswd='new'), 1135 pureldap.LDAPSearchRequest(baseObject='cn=foo,dc=example,dc=com', 1136 scope=pureldap.LDAP_SCOPE_baseObject, 1137 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 1138 sizeLimit=0, 1139 timeLimit=0, 1140 typesOnly=0, 1141 filter=pureldap.LDAPFilterMatchAll, 1142 attributes=('objectClass',)), 1143 delta.ModifyOp('cn=foo,dc=example,dc=com', [ 1144 delta.Replace('ntPassword', ['89963F5042E5041A59C249282387A622']), 1145 delta.Replace('lmPassword', ['XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX']), 1146 ]).asLDAP(), 1147 )
1148 d.addCallback(cb) 1149 return d 1150
1151 - def testPasswordSettingAll_maybeSamba_WillNotFind(self):
1152 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 1153 client = LDAPClientTestDriver( 1154 [pureldap.LDAPExtendedResponse(resultCode=0, 1155 matchedDN='', 1156 errorMessage='')], 1157 [pureldap.LDAPSearchResultEntry(objectName='', 1158 attributes=[('objectClass', 1159 ('foo', 1160 'bar'))]), 1161 pureldap.LDAPSearchResultDone(resultCode=0, 1162 matchedDN='', 1163 errorMessage=''), 1164 ], 1165 [pureldap.LDAPModifyResponse(resultCode=0, 1166 matchedDN='', 1167 errorMessage='')], 1168 ) 1169 1170 o=ldapsyntax.LDAPEntry(client=client, dn='cn=foo,dc=example,dc=com') 1171 d=o.setPassword(newPasswd='new') 1172 def cb(dummy): 1173 client.assertSent( 1174 pureldap.LDAPPasswordModifyRequest(userIdentity='cn=foo,dc=example,dc=com', 1175 newPasswd='new'), 1176 pureldap.LDAPSearchRequest(baseObject='cn=foo,dc=example,dc=com', 1177 scope=pureldap.LDAP_SCOPE_baseObject, 1178 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 1179 sizeLimit=0, 1180 timeLimit=0, 1181 typesOnly=0, 1182 filter=pureldap.LDAPFilterMatchAll, 1183 attributes=('objectClass',)), 1184 )
1185 d.addCallback(cb) 1186 return d 1187
1188 - def testPasswordSettingAll_maybeSamba_WillNotFindAnything(self):
1189 """LDAPEntry.setPassword(newPasswd=...) changes the password.""" 1190 client = LDAPClientTestDriver( 1191 [pureldap.LDAPExtendedResponse(resultCode=0, 1192 matchedDN='', 1193 errorMessage='')], 1194 [ 1195 pureldap.LDAPSearchResultDone(resultCode=0, 1196 matchedDN='', 1197 errorMessage=''), 1198 ], 1199 [pureldap.LDAPModifyResponse(resultCode=0, 1200 matchedDN='', 1201 errorMessage='')], 1202 ) 1203 1204 o=ldapsyntax.LDAPEntry(client=client, dn='cn=foo,dc=example,dc=com') 1205 d=o.setPassword(newPasswd='new') 1206 1207 def checkError(fail): 1208 fail.trap(ldapsyntax.PasswordSetAggregateError) 1209 l=fail.value.errors 1210 assert len(l)==1 1211 assert len(l[0])==2 1212 assert l[0][0]=='Samba' 1213 assert isinstance(l[0][1], failure.Failure) 1214 l[0][1].trap(ldapsyntax.DNNotPresentError) 1215 return 'This test run should succeed'
1216 1217 def chainMustErrback(dummy): 1218 raise RuntimeError('Should never get here') 1219 d.addCallbacks(callback=chainMustErrback, errback=checkError) 1220 d.addCallback(self.assertEquals, 'This test run should succeed') 1221 def cb(dummy): 1222 client.assertSent( 1223 pureldap.LDAPPasswordModifyRequest(userIdentity='cn=foo,dc=example,dc=com', 1224 newPasswd='new'), 1225 pureldap.LDAPSearchRequest(baseObject='cn=foo,dc=example,dc=com', 1226 scope=pureldap.LDAP_SCOPE_baseObject, 1227 derefAliases=pureldap.LDAP_DEREF_neverDerefAliases, 1228 sizeLimit=0, 1229 timeLimit=0, 1230 typesOnly=0, 1231 filter=pureldap.LDAPFilterMatchAll, 1232 attributes=('objectClass',)), 1233 ) 1234 d.addCallback(cb) 1235 return d 1236
1237 - def testPasswordSetting_abortsOnFirstError(self):
1238 """LDAPEntry.setPassword() aborts on first error (does not parallelize, as it used to).""" 1239 client = LDAPClientTestDriver( 1240 [pureldap.LDAPExtendedResponse(resultCode=ldaperrors.LDAPInsufficientAccessRights.resultCode, 1241 matchedDN='', 1242 errorMessage='')], 1243 ) 1244 1245 o=ldapsyntax.LDAPEntry(client=client, 1246 dn='cn=foo,dc=example,dc=com', 1247 attributes={ 1248 'objectClass': ['foo', 'sambaAccount'], 1249 }, 1250 complete=1) 1251 d=o.setPassword(newPasswd='new') 1252 def eb(fail): 1253 fail.trap(ldapsyntax.PasswordSetAggregateError) 1254 l=fail.value.errors 1255 assert len(l)==2 1256 1257 assert len(l[0])==2 1258 self.assertEquals(l[0][0], 'ExtendedOperation') 1259 assert isinstance(l[0][1], failure.Failure) 1260 l[0][1].trap(ldaperrors.LDAPInsufficientAccessRights) 1261 1262 assert len(l[1])==2 1263 self.assertEquals(l[1][0], 'Samba') 1264 assert isinstance(l[1][1], failure.Failure) 1265 l[1][1].trap(ldapsyntax.PasswordSetAborted) 1266 1267 client.assertSent(pureldap.LDAPPasswordModifyRequest( 1268 userIdentity='cn=foo,dc=example,dc=com', 1269 newPasswd='new'), 1270 )
1271 d.addCallbacks(testutil.mustRaise, eb) 1272 return d 1273 1274
1275 -class LDAPSyntaxFetch(unittest.TestCase):
1276 - def testFetch_WithDirtyJournal(self):
1277 """Trying to fetch attributes with a dirty journal fails.""" 1278 client = LDAPClientTestDriver() 1279 o=ldapsyntax.LDAPEntry(client=client, 1280 dn='cn=foo,dc=example,dc=com') 1281 o['x']=['foo'] 1282 1283 self.failUnlessRaises( 1284 ldapsyntax.ObjectDirtyError, 1285 o.fetch)
1286
1287 - def testFetch_Empty(self):
1288 """Fetching attributes for a newly-created object works.""" 1289 client = LDAPClientTestDriver( 1290 [ pureldap.LDAPSearchResultEntry(objectName='cn=foo,dc=example,dc=com', 1291 attributes=( 1292 ('foo', ['a']), 1293 ('bar', ['b', 'c']), 1294 )), 1295 pureldap.LDAPSearchResultDone(resultCode=0, 1296 matchedDN='', 1297 errorMessage=''), 1298 ]) 1299 o=ldapsyntax.LDAPEntry(client=client, 1300 dn='cn=foo,dc=example,dc=com') 1301 d=o.fetch() 1302 def cb(dummy): 1303 client.assertSent(pureldap.LDAPSearchRequest( 1304 baseObject='cn=foo,dc=example,dc=com', 1305 scope=pureldap.LDAP_SCOPE_baseObject, 1306 )) 1307 1308 has=o.keys() 1309 has.sort() 1310 want=['foo', 'bar'] 1311 want.sort() 1312 self.assertEquals(has, want) 1313 self.assertEquals(o['foo'], ['a']) 1314 self.assertEquals(o['bar'], ['b', 'c'])
1315 d.addCallback(cb) 1316 return d
1317
1318 - def testFetch_Prefilled(self):
1319 """Fetching attributes for a (partially) known object overwrites the old attributes.""" 1320 client = LDAPClientTestDriver( 1321 [ pureldap.LDAPSearchResultEntry(objectName='cn=foo,dc=example,dc=com', 1322 attributes=( 1323 ('foo', ['a']), 1324 ('bar', ['b', 'c']), 1325 )), 1326 pureldap.LDAPSearchResultDone(resultCode=0, 1327 matchedDN='', 1328 errorMessage=''), 1329 ]) 1330 o=ldapsyntax.LDAPEntry(client=client, 1331 dn='cn=foo,dc=example,dc=com', 1332 attributes={ 1333 'foo': ['x'], 1334 'quux': ['baz', 'xyzzy'] 1335 }) 1336 d=o.fetch() 1337 def cb(dummy): 1338 client.assertSent(pureldap.LDAPSearchRequest( 1339 baseObject='cn=foo,dc=example,dc=com', 1340 scope=pureldap.LDAP_SCOPE_baseObject, 1341 )) 1342 1343 has=o.keys() 1344 has.sort() 1345 want=['foo', 'bar'] 1346 want.sort() 1347 self.assertEquals(has, want) 1348 self.assertEquals(o['foo'], ['a']) 1349 self.assertEquals(o['bar'], ['b', 'c'])
1350 d.addCallback(cb) 1351 return d 1352
1353 - def testFetch_Partial(self):
1354 """Fetching only some of the attributes does not overwrite existing values of different attribute types.""" 1355 client = LDAPClientTestDriver( 1356 [ pureldap.LDAPSearchResultEntry(objectName='cn=foo,dc=example,dc=com', 1357 attributes=( 1358 ('foo', ['a']), 1359 ('bar', ['b', 'c']), 1360 )), 1361 pureldap.LDAPSearchResultDone(resultCode=0, 1362 matchedDN='', 1363 errorMessage=''), 1364 ]) 1365 o=ldapsyntax.LDAPEntry(client=client, 1366 dn='cn=foo,dc=example,dc=com', 1367 attributes={ 1368 'foo': ['x'], 1369 'quux': ['baz', 'xyzzy'] 1370 }) 1371 d=o.fetch('foo', 'bar', 'thud') 1372 def cb(dummy): 1373 client.assertSent(pureldap.LDAPSearchRequest( 1374 baseObject='cn=foo,dc=example,dc=com', 1375 scope=pureldap.LDAP_SCOPE_baseObject, 1376 attributes=('foo', 'bar', 'thud'), 1377 )) 1378 1379 has=o.keys() 1380 has.sort() 1381 want=['foo', 'bar', 'quux'] 1382 want.sort() 1383 self.assertEquals(has, want) 1384 self.assertEquals(o['foo'], ['a']) 1385 self.assertEquals(o['bar'], ['b', 'c']) 1386 self.assertEquals(o['quux'], ['baz', 'xyzzy'])
1387 d.addCallback(cb) 1388 return d 1389
1390 - def testCommitAndFetch(self):
1391 """Fetching after a commit works.""" 1392 1393 client = LDAPClientTestDriver( 1394 [ pureldap.LDAPModifyResponse(resultCode=0, 1395 matchedDN='', 1396 errorMessage='') 1397 ], 1398 [ pureldap.LDAPSearchResultEntry('cn=foo,dc=example,dc=com', 1399 [('aValue', ['foo', 'bar'])], 1400 ), 1401 pureldap.LDAPSearchResultDone(resultCode=0), 1402 ]) 1403 o=ldapsyntax.LDAPEntry(client=client, 1404 dn='cn=foo,dc=example,dc=com', 1405 attributes={ 1406 'objectClass': ['a', 'b'], 1407 'aValue': ['a'], 1408 }) 1409 1410 o['aValue']=['foo', 'bar'] 1411 d=o.commit() 1412 d.addCallback(self.assertIdentical, o) 1413 1414 d.addCallback(lambda _: o.fetch('aValue')) 1415 d.addCallback(self.assertIdentical, o) 1416 1417 def cb(dummy): 1418 client.assertSent(delta.ModifyOp('cn=foo,dc=example,dc=com', [ 1419 delta.Replace('aValue', ['foo', 'bar']), 1420 ]).asLDAP(), 1421 pureldap.LDAPSearchRequest( 1422 baseObject='cn=foo,dc=example,dc=com', 1423 scope=pureldap.LDAP_SCOPE_baseObject, 1424 attributes=['aValue'], 1425 ))
1426 d.addCallback(cb) 1427 return d 1428
1429 -class LDAPSyntaxRDNHandling(unittest.TestCase):
1430 - def testRemovingRDNFails(self):
1431 """Removing RDN fails with CannotRemoveRDNError.""" 1432 o=ldapsyntax.LDAPEntry(client=None, dn='cn=foo,dc=example,dc=com', 1433 attributes={ 1434 'objectClass': ['someObjectClass'], 1435 'cn': ['foo', 'bar', 'baz'], 1436 'a': ['aValue'], 1437 }) 1438 o['cn'].remove('bar') 1439 del o['a'] 1440 self.assertRaises(ldapsyntax.CannotRemoveRDNError, 1441 o['cn'].remove, 1442 'foo') 1443 def f(): 1444 del o['cn']
1445 self.assertRaises(ldapsyntax.CannotRemoveRDNError, 1446 f) 1447 def f(): 1448 o['cn']=['thud']
1449 self.assertRaises(ldapsyntax.CannotRemoveRDNError, 1450 f) 1451 1452 # TODO maybe this should be ok, it preserves the RDN. 1453 # For now, disallow it. 1454 def f(): 1455 o['cn']=['foo'] 1456 self.assertRaises(ldapsyntax.CannotRemoveRDNError, 1457 f) 1458
1459 -class LDAPSyntaxMove(unittest.TestCase):
1460 - def test_move(self):
1461 client = LDAPClientTestDriver( 1462 [ pureldap.LDAPModifyDNResponse(resultCode=0, 1463 matchedDN='', 1464 errorMessage=''), 1465 ]) 1466 1467 o=ldapsyntax.LDAPEntry(client=client, 1468 dn='cn=foo,dc=example,dc=com', 1469 attributes={ 1470 'objectClass': ['a', 'b'], 1471 'cn': ['foo'], 1472 'aValue': ['a'], 1473 }) 1474 d = o.move('cn=bar,ou=somewhere,dc=example,dc=com') 1475 def cb(dummy): 1476 client.assertSent(pureldap.LDAPModifyDNRequest( 1477 entry='cn=foo,dc=example,dc=com', 1478 newrdn='cn=bar', 1479 deleteoldrdn=1, 1480 newSuperior='ou=somewhere,dc=example,dc=com', 1481 )) 1482 1483 self.assertEquals(o.dn, 'cn=bar,ou=somewhere,dc=example,dc=com')
1484 d.addCallback(cb) 1485 return d
1486
1487 -class Bind(unittest.TestCase):
1488 - def test_ok(self):
1489 client = LDAPClientTestDriver( 1490 [ pureldap.LDAPBindResponse(resultCode=0, 1491 matchedDN=''), 1492 ]) 1493 1494 o=ldapsyntax.LDAPEntry(client=client, 1495 dn='cn=foo,dc=example,dc=com') 1496 d = defer.maybeDeferred(o.bind, 's3krit') 1497 d.addCallback(self.assertIdentical, o) 1498 def cb(dummy): 1499 client.assertSent(pureldap.LDAPBindRequest( 1500 dn='cn=foo,dc=example,dc=com', 1501 auth='s3krit'))
1502 d.addCallback(cb) 1503 return d
1504
1505 - def test_fail(self):
1506 client = LDAPClientTestDriver( 1507 [ pureldap.LDAPBindResponse( 1508 resultCode=ldaperrors.LDAPInvalidCredentials.resultCode, 1509 matchedDN=''), 1510 ]) 1511 1512 o=ldapsyntax.LDAPEntry(client=client, 1513 dn='cn=foo,dc=example,dc=com') 1514 d = defer.maybeDeferred(o.bind, 's3krit') 1515 def eb(fail): 1516 fail.trap(ldaperrors.LDAPInvalidCredentials)
1517 d.addCallbacks(testutil.mustRaise, eb) 1518 return d 1519