Package pilot :: Package impl :: Module pilotdata_manager

Source Code for Module pilot.impl.pilotdata_manager

  1  """ B{PilotData Module}: Implementation of L{PilotData}, L{PilotDataService} and L{DataUnit} 
  2  """  
  3  import sys 
  4  import os 
  5  import logging 
  6  import uuid 
  7  import random 
  8  import threading 
  9  import time 
 10  import pdb 
 11  import Queue 
 12  from pilot.api.api import PilotError 
 13   
 14  sys.path.append(os.path.join(os.path.dirname(__file__), "../..")) 
 15  from bigjob import logger 
 16  from pilot.api import PilotData, DataUnit, PilotDataService, State 
 17   
 18   
 19  """ Load file management adaptors """ 
 20  from pilot.filemanagement.ssh_adaptor import SSHFileAdaptor  
 21  try: 
 22      from pilot.filemanagement.webhdfs_adaptor import WebHDFSFileAdaptor 
 23  except: 
 24      logger.warn("WebHDFS package not found.")  
 25  try: 
 26      from pilot.filemanagement.globusonline_adaptor import GlobusOnlineFileAdaptor 
 27  except: 
 28      logger.warn("Globus Online package not found.")  
 29       
 30  try: 
 31      from pilot.filemanagement.gs_adaptor import GSFileAdaptor 
 32  except: 
 33      logger.warn("Goggle Storage package not found.")  
 34   
 35   
 36  try: 
 37      from pilot.filemanagement.s3_adaptor import S3FileAdaptor 
 38  except: 
 39      logger.warn("Amazon S3 package not found.")  
 40   
 41  try: 
 42      from pilot.filemanagement.irods_adaptor import iRodsFileAdaptor 
 43  except: 
 44      logger.warn("iRods Storage package not found.")  
 45   
 46   
 47   
 48  #from pilot.coordination.advert import AdvertCoordinationAdaptor as CoordinationAdaptor 
 49  #from pilot.coordination.nocoord import NoCoordinationAdaptor as CoordinationAdaptor 
 50  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 51  from bliss.saga import Url as SAGAUrl 
 52   
 53   
 54  # generate global application id for this instance 
 55  application_id = "bigdata" 
 56   
 57  """ 
 58      Implementation considerations: 
 59       
 60      PilotDataService can have n PilotData  
 61      PilotData can have n DataUnits (only DU URLs is held to avoid circular dependencies) 
 62      A DataUnit can be part of n PilotData  
 63  """ 
64 65 -class PilotData(PilotData):
66 """ B{PilotData (PD).} 67 68 This is the object that is returned by the PilotDataService when a 69 new PilotData is created based on a PilotDataDescription. A PilotData represents 70 a finite amount of physical space on a certain resource. It can be populated 71 with L{DataUnit}s. 72 73 The PilotData object can be used by the application to keep track 74 of a pilot. A PilotData has state, can be queried, can be cancelled. 75 76 """ 77 78 PD_ID_PREFIX="pd-" 79 80
81 - def __init__(self, pilot_data_service=None, pilot_data_description=None, pd_url=None):
82 """ 83 Initialize PilotData at given service url:: 84 85 ssh://<hostname> 86 gsissh://<hostname> 87 go://<hostname> 88 gs://google.com 89 s3://aws.amazon.com 90 91 In the future more SAGA/Bliss URL schemes/adaptors are supported. 92 """ 93 self.id = None 94 self.url = pd_url 95 self.pilot_data_description = None 96 self.pilot_data_service = pilot_data_service 97 self.service_url=None 98 self.size = None 99 self.data_unit_urls = [] 100 self.security_context = None 101 102 if pd_url==None and pilot_data_service!=None: # new pd 103 self.id = self.PD_ID_PREFIX+str(uuid.uuid1()) 104 self.pilot_data_description = pilot_data_description 105 self.url = CoordinationAdaptor.add_pd(CoordinationAdaptor.get_base_url(application_id)+":"+pilot_data_service.id, self) 106 elif pd_url != None: 107 logger.warn("Reconnect to PilotData: %s"%pd_url) 108 dictionary = CoordinationAdaptor.get_pd(pd_url) 109 if dictionary.has_key("security_context"): 110 self.security_context=dictionary["security_context"] 111 pd_dict = eval(dictionary["pilot_data"]) 112 for i in pd_dict: 113 self.__setattr__(i, pd_dict[i]) 114 # A Pilot Data does not hold a direct reference to a Data Unit (only URL refs are stored) 115 self.data_unit_urls = eval(dictionary["data_unit_urls"]) 116 117 self.__initialize_pilot_data() 118 CoordinationAdaptor.update_pd(self)
119 120
121 - def cancel(self):
122 """ Cancel PilotData """ 123 #self.__filemanager.delete_pilotdata() 124 pass 125 126
127 - def get_url(self):
128 """ Get URL of PilotData. Used for reconnecting to PilotData """ 129 return self.url
130 131
132 - def url_for_du(self, du):
133 """ Get full URL to DataUnit within PilotData """ 134 return self.service_url + "/" + str(du.id)
135 136
137 - def submit_data_unit(self, data_unit_description):
138 """ creates a data unit object and initially imports data specified in data_unit_description """ 139 du = DataUnit(pilot_data=self, 140 data_unit_description=data_unit_description) 141 self.data_unit_urls.append(du.get_url()) 142 du.add_pilot_data(self) 143 return du
144 145
146 - def list_data_units(self):
147 """ List all data units of Pilot Data """ 148 return self.data_unit_urls
149 150
151 - def get_state(self):
152 """ Return current state of Pilot Data """ 153 return self.__filemanager.get_state()
154 155
156 - def get_du(self, du_url):
157 """ Returns Data Unit if part of Pilot Data """ 158 if self.data_unit_urls.count(du_url)>0: 159 du = DataUnit(du_url=du_url) 160 return du 161 return None
162 163
164 - def wait(self):
165 """ Wait until PD enters a final state (Done, Canceled or Failed).""" 166 while 1: 167 finish_counter=0 168 result_map = {} 169 for du_url in self.data_units_urls: 170 du = DataUnit(du_url=du_url) 171 du.wait() 172 state = du.get_state() 173 #state = job_detail["state"] 174 if result_map.has_key(state)==False: 175 result_map[state]=1 176 else: 177 result_map[state] = result_map[state]+1 178 if self.__has_finished(state)==True: 179 finish_counter = finish_counter + 1 180 logger.debug("PD ID: " + str(self.id) + " Total DUs: %s States: %s"%(len(self.data_units_urls), str(result_map))) 181 if finish_counter == len(self.data_units_urls): 182 break 183 time.sleep(2)
184 185
186 - def export_du(self, du, target_url):
187 """ Export Data Unit to a local directory """ 188 if target_url.startswith("/") and os.path.exists(target_url)==False: 189 os.mkdir(target_url) 190 logger.debug("Export Data-Unit to %s"%target_url) 191 self.__filemanager.get_du(du, target_url)
192 193
194 - def put_du(self, du):
195 """Copy Data Unit to Pilot Data""" 196 logger.debug("Put DU: %s to Pilot-Data: %s"%(du.id,self.service_url)) 197 self.__filemanager.create_du(du.id) 198 self.__filemanager.put_du(du) 199 self.data_unit_urls.append(du.get_url()) 200 CoordinationAdaptor.update_pd(self)
201 202
203 - def remove_du(self, du):
204 """ Remove Data Unit from Pilot Data """ 205 if self.data_unit_urls.count(du.get_url())>0: 206 self.__filemanager.remove_du(du) 207 self.data_unit_urls.remove(du.get_url()) 208 CoordinationAdaptor.update_pd(self)
209 210
211 - def copy_du(self, du, pd_new):
212 """ Copy DataUnit to another Pilot Data """ 213 pd_new.create_du(du) 214 self.__filemanager.copy_du(du, pd_new) 215 216 # update meta data at pd_new 217 #pd_new.data_units[du.id] = du 218 pd_new.data_unit_urls.append(du.get_url()) 219 CoordinationAdaptor.update_pd(pd_new)
220 221 222 # END API methods 223 ########################################################################### 224 # Auxillary Methods 225
226 - def create_du(self, du):
227 """ Create a new Data Unit within Pilot """ 228 self.__filemanager.create_du(du.id)
229 230
231 - def __initialize_pilot_data(self):
232 233 if self.pilot_data_description!=None: 234 self.service_url=self.pilot_data_description["service_url"] 235 self.size = self.pilot_data_description["size"] 236 237 # initialize file adaptor 238 if self.service_url.startswith("ssh:"): 239 logger.debug("Use SSH backend") 240 self.__filemanager = SSHFileAdaptor(self.service_url, 241 self.security_context, 242 self.pilot_data_description) 243 elif self.service_url.startswith("http:"): 244 logger.debug("Use WebHDFS backend") 245 self.__filemanager = WebHDFSFileAdaptor(self.service_url) 246 elif self.service_url.startswith("go:"): 247 logger.debug("Use Globus Online backend") 248 self.__filemanager = GlobusOnlineFileAdaptor(self.service_url) 249 elif self.service_url.startswith("gs:"): 250 logger.debug("Use Google Cloud Storage backend") 251 self.__filemanager = GSFileAdaptor(self.service_url, self.security_context) 252 elif self.service_url.startswith("irods:"): 253 logger.debug("Use iRods Storage backend") 254 self.__filemanager = iRodsFileAdaptor(self.service_url, self.security_context) 255 elif self.service_url.startswith("s3:") \ 256 or self.service_url.startswith("walrus:") \ 257 or self.service_url.startswith("swift:"): 258 logger.debug("Use Amazon S3/Eucalyptus Walrus/SWIFT Storage backend") 259 self.__filemanager = S3FileAdaptor(self.service_url, 260 self.security_context, 261 self.pilot_data_description) 262 else: 263 raise PilotError("No File Plugin found.") 264 265 self.__filemanager.initialize_pilotdata() 266 self.__filemanager.get_pilotdata_size() 267 268 # Update security context 269 self.security_context = self.__filemanager.get_security_context()
270 271
272 - def __get_pd_id(self, pd_url):
273 start = pd_url.index(self.PD_ID_PREFIX) 274 end =pd_url.index("/", start) 275 return pd_url[start:end]
276 277 278
279 - def to_dict(self):
280 """ Internal method that returns a dict with all data contained in this Pilot Data""" 281 pd_dict = {} 282 pd_dict["id"]=self.id 283 pd_dict["url"]=self.url 284 pd_dict["pilot_data_description"]=self.pilot_data_description 285 logger.debug("PilotData Dictionary: " + str(pd_dict)) 286 return pd_dict
287 288
289 - def __repr__(self):
290 """Returns Pilot Data URL""" 291 return self.service_url
292 293
294 - def __has_finished(self, state):
295 state = state.lower() 296 if state=="running" or state=="failed" or state=="canceled": 297 return True 298 else: 299 return False
300 301 @classmethod
302 - def create_pilot_data_from_dict(cls, pd_dict):
303 """Restore Pilot Data from dictionary""" 304 pd = PilotData() 305 for i in pd_dict.keys(): 306 pd.__setattr__(i, pd_dict[i]) 307 pd.__initialize_pilot_data() 308 logger.debug("created pd " + str(pd)) 309 return pd
310 311 312 ############################################################################### 313 COORDINATION_URL = "redis://localhost"
314 315 -class PilotDataService(PilotDataService):
316 """ B{PilotDataService (PDS).} 317 318 Factory for creating Pilot Data. 319 320 """ 321 322 PDS_ID_PREFIX="pds-" 323
324 - def __init__(self, coordination_url=COORDINATION_URL, pds_url=None):
325 """ Create a PilotDataService 326 327 Keyword arguments: 328 pds_id -- restore from pds_id 329 """ 330 self.pilot_data={} 331 CoordinationAdaptor.configure_base_url(coordination_url) 332 if pds_url == None: 333 self.id = self.PDS_ID_PREFIX + str(uuid.uuid1()) 334 application_url = CoordinationAdaptor.get_base_url(application_id) 335 self.url = CoordinationAdaptor.add_pds(application_url, self) 336 else: 337 self.id = self.__get_pds_id(pds_url)
338 339 340
341 - def create_pilot(self, pilot_data_description):
342 """ Create a PilotData 343 344 Keyword arguments: 345 pilot_data_description -- PilotData Description:: 346 347 { 348 'service_url': "ssh://<hostname>/base-url/", 349 'size': "1000" 350 } 351 352 Return value: 353 A PilotData object 354 """ 355 pd = PilotData(pilot_data_service=self, 356 pilot_data_description=pilot_data_description) 357 self.pilot_data[pd.id]=pd 358 359 # store pilot data in central data space 360 CoordinationAdaptor.add_pd(self.url, pd) 361 return pd
362 363
364 - def get_pilot(self, pd_id):
365 """ Reconnect to an existing pilot. """ 366 if self.pilot_data.has_key(pd_id): 367 return self.pilot_data[pd_id] 368 return None
369 370
371 - def list_pilots(self):
372 """ List all PDs of PDS """ 373 return self.pilot_data.values()
374 375
376 - def cancel(self):
377 """ Cancel the PilotDataService. Release all Pilot Data created by this service. 378 379 Keyword arguments: 380 None 381 382 Return value: 383 Result of operation 384 """ 385 for i in self.pilot_data.values(): 386 i.cancel()
387 388
389 - def wait(self):
390 """ Wait until all managed PD (of this Pilot Data Service) enter a final state""" 391 392 for i in self.pilot_data.values(): 393 i.wait()
394 395
396 - def get_url(self):
397 """ Returns URL of Pilot Data Service """ 398 return self.url
399 400 ########################################################################### 401 # Non-API methods
402 - def to_dict(self):
403 """ Return a Python dictionary containing the representation of the PDS 404 (internal method not part of Pilot API) 405 """ 406 pds_dict = self.__dict__ 407 pds_dict["id"]=self.id 408 return pds_dict
409 410
411 - def __del__(self):
412 """Releases all Pilot Data created by this Pilot Data Service.""" 413 self.cancel()
414 415
416 - def __get_pds_id(self, pds_url):
417 start = pds_url.index(self.PDS_ID_PREFIX) 418 end =pds_url.index("/", start) 419 return pds_url[start:end]
420 421
422 - def __restore_pd(self, pds_url):
423 pd_list=CoordinationAdaptor.list_pd(pds_url) 424 for i in pd_list: 425 pass
426
427 428 -class DataUnit(DataUnit):
429 """ B{DataUnit (DU).} 430 431 This is the object that is returned by the ComputeDataService when a 432 new DataUnit is created based on a DataUnitDescription. 433 434 The DataUnit object can be used by the application to keep track 435 of a DataUnit. 436 437 A DataUnit has state, can be queried and can be cancelled. 438 439 440 441 State model: 442 - New: PD object created 443 - Pending: PD object is currently updated 444 - Running: At least 1 replica of PD is persistent in a pilot data 445 """ 446 447 ## TODO 448 # DU are stored as top-level objects in Redis: 449 # redis://localhost/<application-id>/du-<id> 450 # 451 # In the future a DU can be possibly bound to multiple PD 452 # Thus, it should be a top level entity 453 # The lower levels of the hierarchy will only store references to the DU then 454 455 456 DU_ID_PREFIX="du-" 457
458 - def __init__(self, pilot_data=None, data_unit_description=None, du_url=None):
459 """ 460 1.) create a new Pilot Data: pilot_data_service and data_unit_description required 461 2.) reconnect to an existing Pilot Data: du_url required 462 463 """ 464 if du_url==None: 465 self.id = self.DU_ID_PREFIX + str(uuid.uuid1()) 466 self.data_unit_description = data_unit_description 467 self.pilot_data=[] 468 self.state = State.New 469 self.data_unit_items=[] 470 if self.data_unit_description.has_key("file_urls"): 471 self.data_unit_items = DataUnitItem.create_data_unit_list(self, self.data_unit_description["file_urls"]) 472 473 self.url = None 474 475 # register a data unit as top-level entry in Redis 476 application_url = CoordinationAdaptor.get_base_url(application_id) 477 self.url = CoordinationAdaptor.add_du(application_url, self) 478 CoordinationAdaptor.update_du(self) 479 480 # Deprecated 481 # old method only allowed the creation of a du if a pd existed 482 #if pilot_data!=None: 483 # # Allow data units that are not connected to a resource! 484 # self.url = CoordinationAdaptor.add_du(pilot_data.url, self) 485 # CoordinationAdaptor.update_du(self) 486 else: 487 self.id = DataUnit._get_du_id(du_url) 488 self.url = du_url 489 logger.debug("Restore du: %s"%self.id) 490 self.__restore_state() 491 492 self.transfer_threads=[]
493 494
495 - def cancel(self):
496 """ Cancel the Data Unit. """ 497 self.state = State.Done 498 if len(self.pilot_data) > 0: 499 CoordinationAdaptor.update_du(self)
500 501
502 - def add_files(self, file_url_list=[]):
503 """Add files referenced in list to Data Unit""" 504 self._update_state(State.Pending) 505 item_list = DataUnitItem.create_data_unit_from_urls(None, file_url_list) 506 for i in item_list: 507 self.data_unit_items.append(i) 508 CoordinationAdaptor.update_du(self) 509 if len(self.pilot_data) > 0: 510 for i in self.pilot_data: 511 logger.debug("Update Pilot Data %s"%(i.get_url())) 512 i.put_du(self) 513 self._update_state(State.Running) 514 CoordinationAdaptor.update_du(self)
515 516
517 - def remove_files(self, file_urls):
518 """Remove files from Data Unit (NOT implemented yet""" 519 # TODO 520 #self.data_unit_items.remove(input_data_unit) 521 if len(self.pilot_data) > 0: 522 CoordinationAdaptor.update_du(self)
523 524
525 - def list(self):
526 """ List all items contained in DU 527 { 528 "filename" : { 529 "pilot_data" : [url1, url2], 530 "local" : url 531 } 532 } 533 """ 534 self.__refresh() 535 base_urls = [i.url_for_du(self) for i in self.get_pilot_data()] 536 result_dict = {} 537 for i in self.data_unit_items: 538 logger.debug("Process file: %s"%(i.filename)) 539 result_dict[i.filename]={ 540 "pilot_data": [os.path.join(j, i.filename) for j in base_urls], 541 "local": i.local_url 542 } 543 return result_dict
544 545 546
547 - def get_state(self):
548 """ Return current state of DataUnit """ 549 # update remote state 550 du_dict = CoordinationAdaptor.get_du(self.url) 551 self.state = du_dict["state"] 552 return self.state
553 554
555 - def wait(self):
556 """ Wait until in running state 557 (or failed state) 558 """ 559 logger.debug("DU: %s wait()"%(str(self.id))) 560 # Wait for all transfers to finish 561 for i in self.transfer_threads: 562 i.join() 563 564 # Wait for state to change 565 while True: 566 self.state = self.get_state() 567 if self.state==State.Running or self.state==State.Failed: 568 break 569 logger.debug("Waiting DU %s State: %s"%(self.get_url(), self.state)) 570 time.sleep(2)
571 572
573 - def add_pilot_data(self, pilot_data):
574 """ add this DU (self) to a certain pilot data 575 data will be moved into this data 576 """ 577 transfer_thread=threading.Thread(target=self.__add_pilot_data, args=[pilot_data]) 578 transfer_thread.start() 579 self.transfer_threads.append(transfer_thread)
580 581
582 - def get_pilot_data(self):
583 """ get a list of pilot data that have a copy of this PD """ 584 self.__restore_state() 585 return self.pilot_data
586 587
588 - def export(self, target_url):
589 """ simple implementation of export: 590 copies file from first pilot data to local machine 591 """ 592 if self.get_state()!=State.Running: 593 self.wait() 594 if len(self.pilot_data) > 0: 595 self.pilot_data[0].export_du(self, target_url) 596 else: 597 logger.error("No Pilot Data for PD found")
598 599
600 - def get_url(self):
601 """ Return URL that can be used to reconnect to Data Unit """ 602 return self.url
603 604 605 606 ########################################################################### 607 # BigData Internal Methods
608 - def to_dict(self):
609 """ Internal method that returns a dict with all data contained in this DataUnit""" 610 du_dict = self.__dict__ 611 du_dict["id"]=self.id 612 return du_dict
613 614
615 - def _update_state(self, state):
616 """ Internal method for updating state""" 617 self.state=state 618 logger.debug("Update DU state to " + state + " Number of PD: " + str(len(self.pilot_data))) 619 if len(self.pilot_data) > 0: 620 CoordinationAdaptor.update_du(self)
621 622
623 - def __add_pilot_data(self, pilot_data):
624 logger.debug("add du to pilot data") 625 if len(self.pilot_data) > 0: # copy files from other pilot data 626 self.pilot_data[0].copy_du(self, pilot_data) 627 else: # copy files from original location 628 pilot_data.put_du(self) 629 self.pilot_data.append(pilot_data) 630 self._update_state(State.Running) 631 632 #self.url = CoordinationAdaptor.add_du(pilot_data.url, self) 633 CoordinationAdaptor.update_du(self)
634 635 636 @classmethod
637 - def _get_du_id(cls, du_url):
638 try: 639 start = du_url.index(cls.DU_ID_PREFIX) 640 end = du_url.find("/", start) 641 if end==-1: 642 end = du_url.find("?", start) 643 if end==-1: 644 end = len(du_url) 645 return du_url[start:end] 646 except: 647 logger.error("No valid PD URL") 648 return None
649 650
651 - def __refresh(self):
652 """ Update list of data units items 653 from coordination service """ 654 try: 655 if self.url != None: 656 du_dict = CoordinationAdaptor.get_du(self.url) 657 data_unit_dict_list = eval(du_dict["data_unit_items"]) 658 self.data_unit_items = [DataUnitItem.create_data_unit_from_dict(i) for i in data_unit_dict_list] 659 except: 660 logger.warn("Refresh of DU %s failed"%(self.get_url()))
661 662
663 - def __restore_state(self):
664 du_dict = CoordinationAdaptor.get_du(self.url) 665 # Restore Data Unit 666 self.data_unit_description = eval(du_dict["data_unit_description"]) 667 self.state = du_dict["state"] 668 669 # Restore DataUnitItems 670 data_unit_dict_list = eval(du_dict["data_unit_items"]) 671 self.data_unit_items = [DataUnitItem.create_data_unit_from_dict(i) for i in data_unit_dict_list] 672 673 # restore Pilot Data 674 pd_list = eval(du_dict["pilot_data"]) 675 self.pilot_data = [] 676 for i in pd_list: 677 logger.debug("PD: "+str(i)) 678 pd = PilotData(pd_url=str(i)) 679 self.pilot_data.append(pd)
680 681
682 - def __repr__(self):
683 return "PD: " + str(self.url) 684 + " \nData Units: " + str(self.data_unit_items) 685 + " \nPilot Data: " + str(self.pilot_data)
686
687 688 689 -class DataUnitItem(object):
690 """ DataUnitItem """ 691 DUI_ID_PREFIX="dui-" 692
693 - def __init__(self, pd=None, local_url=None):
694 if local_url!=None: 695 self.id = self.DUI_ID_PREFIX + str(uuid.uuid1()) 696 self.local_url = local_url 697 self.filename = os.path.basename(local_url)
698 #if pd != None: 699 # self.url = pd.url + "/" + self.filename 700 701 702 @classmethod
703 - def __exists_file(cls, url):
704 """ return True if file at url exists. Otherwise False """ 705 file_url = SAGAUrl(url) 706 if file_url.host == "": 707 if os.path.exists(str(file_url)): 708 return True 709 else: 710 return False 711 elif file_url.host=="localhost": 712 if os.path.exists(file_url.path): 713 return True 714 else: 715 return False 716 else: 717 return True 718 719
720 - def __repr__(self):
721 return str(self.__dict__)
722 723 724 ########################################################################### 725 # Auxiliary Methods 726 @classmethod
727 - def create_data_unit_list(cls, pd=None, urls=None):
728 """ Creates a list of DUs from URL list 729 """ 730 du_list = [] 731 for i in urls: 732 if cls.__exists_file(i): 733 du = DataUnitItem(pd, i) 734 du_list.append(du) 735 736 return du_list
737 738 @classmethod
739 - def create_data_unit_from_urls(cls, pd=None, urls=None):
740 """ Creates a list of DUs from URL list 741 """ 742 du_item_list = [] 743 for i in urls: 744 if cls.__exists_file(i): 745 du = DataUnitItem(pd, i) 746 du_item_list.append(du) 747 748 return du_item_list
749 750 751 @classmethod
752 - def create_data_unit_from_dict(cls, du_dict):
753 du = DataUnitItem() 754 logger.debug("Restore DU: " + str(du_dict)) 755 for i in du_dict.keys(): 756 logger.debug("Set attribute: %s", i) 757 du.__setattr__(i, du_dict[i]) 758 return du
759 760
761 - def to_dict(self):
762 du_dict = self.__dict__ 763 du_dict["id"]=self.id 764 return du_dict
765
766 ################################################################################################### 767 # Tests 768 # Auxilliary testing methods 769 -def __get_pd_url(du_url):
770 url = du_url[:du_url.index(":du-")] 771 return url
772
773 -def __get_du_id(du_url):
774 du_id = du_url[du_url.index("du-"):] 775 return du_id
776
777 # Tests 778 -def test_pd_reconnect():
779 du_url = "redis://localhost/bigdata:pds-f31a670c-e3f6-11e1-afaf-705681b3df0f:pd-f31c47b8-e3f6-11e1-af44-705681b3df0f:du-f4debce8-e3f6-11e1-8399-705681b3df0f" 780 pd_url = __get_pd_url(du_url) 781 pd = PilotData(pd_url=pd_url) 782 print str(pd.list_data_units()) 783 du = pd.get_du(du_url) 784 785 #du = DataUnit(du_url="redis://localhost/bigdata:pds-32d63b2e-df05-11e1-a329-705681b3df0f:pd-37674138-df05-11e1-80d0-705681b3df0f:du-3b8d428c-df05-11e1-af2a-705681b3df0f") 786 logger.debug(str(du.list()))
787
788 789 -def test_du_reconnect():
790 du_url = "redis://localhost/bigdata:du-1d1b7078-229f-11e2-834e-705681b3df0f" 791 du = DataUnit(du_url=du_url) 792 logger.debug(str(du.list())) 793 du.export("/tmp/export-test")
794
795 796 -def test_data_unit_add_file():
797 pilot_data_service = PilotDataService(coordination_url="redis://localhost/") 798 pilot_data_description = { 799 "service_url": "ssh://localhost/tmp/pilot-" + str(uuid.uuid1()), 800 "size": 100 801 } 802 pd = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description) 803 804 # create data unit for output data 805 output_data_unit_description = { 806 "file_urls": [], 807 "file_url_patterns": ["test.txt"] 808 } 809 output_data_unit = pd.submit_data_unit(output_data_unit_description) 810 output_data_unit.wait() 811 logger.debug("Output DU: " + output_data_unit.get_url()) 812 pd_reconnect_url = pd.get_url() 813 du_url = output_data_unit.get_url() 814 pd_reconnect = PilotData(pd_url=pd_reconnect_url) 815 du_reconnect = pd_reconnect.get_du(du_url) 816 du_reconnect.add_files(["test.txt"])
817 818 819 820 if __name__ == "__main__": 821 #test_data_unit_add_file() 822 test_du_reconnect() 823