1 """ B{PilotData Module}: Implementation of L{PilotData}, L{PilotDataService} and L{DataUnit}
2 """
3 import sys
4 import os
5 import logging
6 import uuid
7 import random
8 import threading
9 import time
10 import pdb
11 import Queue
12 from pilot.api.api import PilotError
13
14 sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
15 from bigjob import logger
16 from pilot.api import PilotData, DataUnit, PilotDataService, State
17
18
19 """ Load file management adaptors """
20 from pilot.filemanagement.ssh_adaptor import SSHFileAdaptor
21 try:
22 from pilot.filemanagement.webhdfs_adaptor import WebHDFSFileAdaptor
23 except:
24 logger.warn("WebHDFS package not found.")
25 try:
26 from pilot.filemanagement.globusonline_adaptor import GlobusOnlineFileAdaptor
27 except:
28 logger.warn("Globus Online package not found.")
29
30 try:
31 from pilot.filemanagement.gs_adaptor import GSFileAdaptor
32 except:
33 logger.warn("Goggle Storage package not found.")
34
35
36 try:
37 from pilot.filemanagement.s3_adaptor import S3FileAdaptor
38 except:
39 logger.warn("Amazon S3 package not found.")
40
41 try:
42 from pilot.filemanagement.irods_adaptor import iRodsFileAdaptor
43 except:
44 logger.warn("iRods Storage package not found.")
45
46
47
48
49
50 from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor
51 from bliss.saga import Url as SAGAUrl
52
53
54
55 application_id = "bigdata"
56
57 """
58 Implementation considerations:
59
60 PilotDataService can have n PilotData
61 PilotData can have n DataUnits (only DU URLs is held to avoid circular dependencies)
62 A DataUnit can be part of n PilotData
63 """
66 """ B{PilotData (PD).}
67
68 This is the object that is returned by the PilotDataService when a
69 new PilotData is created based on a PilotDataDescription. A PilotData represents
70 a finite amount of physical space on a certain resource. It can be populated
71 with L{DataUnit}s.
72
73 The PilotData object can be used by the application to keep track
74 of a pilot. A PilotData has state, can be queried, can be cancelled.
75
76 """
77
78 PD_ID_PREFIX="pd-"
79
80
81 - def __init__(self, pilot_data_service=None, pilot_data_description=None, pd_url=None):
82 """
83 Initialize PilotData at given service url::
84
85 ssh://<hostname>
86 gsissh://<hostname>
87 go://<hostname>
88 gs://google.com
89 s3://aws.amazon.com
90
91 In the future more SAGA/Bliss URL schemes/adaptors are supported.
92 """
93 self.id = None
94 self.url = pd_url
95 self.pilot_data_description = None
96 self.pilot_data_service = pilot_data_service
97 self.service_url=None
98 self.size = None
99 self.data_unit_urls = []
100 self.security_context = None
101
102 if pd_url==None and pilot_data_service!=None:
103 self.id = self.PD_ID_PREFIX+str(uuid.uuid1())
104 self.pilot_data_description = pilot_data_description
105 self.url = CoordinationAdaptor.add_pd(CoordinationAdaptor.get_base_url(application_id)+":"+pilot_data_service.id, self)
106 elif pd_url != None:
107 logger.warn("Reconnect to PilotData: %s"%pd_url)
108 dictionary = CoordinationAdaptor.get_pd(pd_url)
109 if dictionary.has_key("security_context"):
110 self.security_context=dictionary["security_context"]
111 pd_dict = eval(dictionary["pilot_data"])
112 for i in pd_dict:
113 self.__setattr__(i, pd_dict[i])
114
115 self.data_unit_urls = eval(dictionary["data_unit_urls"])
116
117 self.__initialize_pilot_data()
118 CoordinationAdaptor.update_pd(self)
119
120
122 """ Cancel PilotData """
123
124 pass
125
126
128 """ Get URL of PilotData. Used for reconnecting to PilotData """
129 return self.url
130
131
133 """ Get full URL to DataUnit within PilotData """
134 return self.service_url + "/" + str(du.id)
135
136
138 """ creates a data unit object and initially imports data specified in data_unit_description """
139 du = DataUnit(pilot_data=self,
140 data_unit_description=data_unit_description)
141 self.data_unit_urls.append(du.get_url())
142 du.add_pilot_data(self)
143 return du
144
145
147 """ List all data units of Pilot Data """
148 return self.data_unit_urls
149
150
152 """ Return current state of Pilot Data """
153 return self.__filemanager.get_state()
154
155
157 """ Returns Data Unit if part of Pilot Data """
158 if self.data_unit_urls.count(du_url)>0:
159 du = DataUnit(du_url=du_url)
160 return du
161 return None
162
163
165 """ Wait until PD enters a final state (Done, Canceled or Failed)."""
166 while 1:
167 finish_counter=0
168 result_map = {}
169 for du_url in self.data_units_urls:
170 du = DataUnit(du_url=du_url)
171 du.wait()
172 state = du.get_state()
173
174 if result_map.has_key(state)==False:
175 result_map[state]=1
176 else:
177 result_map[state] = result_map[state]+1
178 if self.__has_finished(state)==True:
179 finish_counter = finish_counter + 1
180 logger.debug("PD ID: " + str(self.id) + " Total DUs: %s States: %s"%(len(self.data_units_urls), str(result_map)))
181 if finish_counter == len(self.data_units_urls):
182 break
183 time.sleep(2)
184
185
187 """ Export Data Unit to a local directory """
188 if target_url.startswith("/") and os.path.exists(target_url)==False:
189 os.mkdir(target_url)
190 logger.debug("Export Data-Unit to %s"%target_url)
191 self.__filemanager.get_du(du, target_url)
192
193
195 """Copy Data Unit to Pilot Data"""
196 logger.debug("Put DU: %s to Pilot-Data: %s"%(du.id,self.service_url))
197 self.__filemanager.create_du(du.id)
198 self.__filemanager.put_du(du)
199 self.data_unit_urls.append(du.get_url())
200 CoordinationAdaptor.update_pd(self)
201
202
204 """ Remove Data Unit from Pilot Data """
205 if self.data_unit_urls.count(du.get_url())>0:
206 self.__filemanager.remove_du(du)
207 self.data_unit_urls.remove(du.get_url())
208 CoordinationAdaptor.update_pd(self)
209
210
212 """ Copy DataUnit to another Pilot Data """
213 pd_new.create_du(du)
214 self.__filemanager.copy_du(du, pd_new)
215
216
217
218 pd_new.data_unit_urls.append(du.get_url())
219 CoordinationAdaptor.update_pd(pd_new)
220
221
222
223
224
225
227 """ Create a new Data Unit within Pilot """
228 self.__filemanager.create_du(du.id)
229
230
270
271
273 start = pd_url.index(self.PD_ID_PREFIX)
274 end =pd_url.index("/", start)
275 return pd_url[start:end]
276
277
278
280 """ Internal method that returns a dict with all data contained in this Pilot Data"""
281 pd_dict = {}
282 pd_dict["id"]=self.id
283 pd_dict["url"]=self.url
284 pd_dict["pilot_data_description"]=self.pilot_data_description
285 logger.debug("PilotData Dictionary: " + str(pd_dict))
286 return pd_dict
287
288
290 """Returns Pilot Data URL"""
291 return self.service_url
292
293
295 state = state.lower()
296 if state=="running" or state=="failed" or state=="canceled":
297 return True
298 else:
299 return False
300
301 @classmethod
303 """Restore Pilot Data from dictionary"""
304 pd = PilotData()
305 for i in pd_dict.keys():
306 pd.__setattr__(i, pd_dict[i])
307 pd.__initialize_pilot_data()
308 logger.debug("created pd " + str(pd))
309 return pd
310
311
312
313 COORDINATION_URL = "redis://localhost"
316 """ B{PilotDataService (PDS).}
317
318 Factory for creating Pilot Data.
319
320 """
321
322 PDS_ID_PREFIX="pds-"
323
325 """ Create a PilotDataService
326
327 Keyword arguments:
328 pds_id -- restore from pds_id
329 """
330 self.pilot_data={}
331 CoordinationAdaptor.configure_base_url(coordination_url)
332 if pds_url == None:
333 self.id = self.PDS_ID_PREFIX + str(uuid.uuid1())
334 application_url = CoordinationAdaptor.get_base_url(application_id)
335 self.url = CoordinationAdaptor.add_pds(application_url, self)
336 else:
337 self.id = self.__get_pds_id(pds_url)
338
339
340
342 """ Create a PilotData
343
344 Keyword arguments:
345 pilot_data_description -- PilotData Description::
346
347 {
348 'service_url': "ssh://<hostname>/base-url/",
349 'size': "1000"
350 }
351
352 Return value:
353 A PilotData object
354 """
355 pd = PilotData(pilot_data_service=self,
356 pilot_data_description=pilot_data_description)
357 self.pilot_data[pd.id]=pd
358
359
360 CoordinationAdaptor.add_pd(self.url, pd)
361 return pd
362
363
365 """ Reconnect to an existing pilot. """
366 if self.pilot_data.has_key(pd_id):
367 return self.pilot_data[pd_id]
368 return None
369
370
372 """ List all PDs of PDS """
373 return self.pilot_data.values()
374
375
377 """ Cancel the PilotDataService. Release all Pilot Data created by this service.
378
379 Keyword arguments:
380 None
381
382 Return value:
383 Result of operation
384 """
385 for i in self.pilot_data.values():
386 i.cancel()
387
388
390 """ Wait until all managed PD (of this Pilot Data Service) enter a final state"""
391
392 for i in self.pilot_data.values():
393 i.wait()
394
395
397 """ Returns URL of Pilot Data Service """
398 return self.url
399
400
401
403 """ Return a Python dictionary containing the representation of the PDS
404 (internal method not part of Pilot API)
405 """
406 pds_dict = self.__dict__
407 pds_dict["id"]=self.id
408 return pds_dict
409
410
412 """Releases all Pilot Data created by this Pilot Data Service."""
413 self.cancel()
414
415
417 start = pds_url.index(self.PDS_ID_PREFIX)
418 end =pds_url.index("/", start)
419 return pds_url[start:end]
420
421
423 pd_list=CoordinationAdaptor.list_pd(pds_url)
424 for i in pd_list:
425 pass
426
429 """ B{DataUnit (DU).}
430
431 This is the object that is returned by the ComputeDataService when a
432 new DataUnit is created based on a DataUnitDescription.
433
434 The DataUnit object can be used by the application to keep track
435 of a DataUnit.
436
437 A DataUnit has state, can be queried and can be cancelled.
438
439
440
441 State model:
442 - New: PD object created
443 - Pending: PD object is currently updated
444 - Running: At least 1 replica of PD is persistent in a pilot data
445 """
446
447
448
449
450
451
452
453
454
455
456 DU_ID_PREFIX="du-"
457
458 - def __init__(self, pilot_data=None, data_unit_description=None, du_url=None):
459 """
460 1.) create a new Pilot Data: pilot_data_service and data_unit_description required
461 2.) reconnect to an existing Pilot Data: du_url required
462
463 """
464 if du_url==None:
465 self.id = self.DU_ID_PREFIX + str(uuid.uuid1())
466 self.data_unit_description = data_unit_description
467 self.pilot_data=[]
468 self.state = State.New
469 self.data_unit_items=[]
470 if self.data_unit_description.has_key("file_urls"):
471 self.data_unit_items = DataUnitItem.create_data_unit_list(self, self.data_unit_description["file_urls"])
472
473 self.url = None
474
475
476 application_url = CoordinationAdaptor.get_base_url(application_id)
477 self.url = CoordinationAdaptor.add_du(application_url, self)
478 CoordinationAdaptor.update_du(self)
479
480
481
482
483
484
485
486 else:
487 self.id = DataUnit._get_du_id(du_url)
488 self.url = du_url
489 logger.debug("Restore du: %s"%self.id)
490 self.__restore_state()
491
492 self.transfer_threads=[]
493
494
500
501
515
516
518 """Remove files from Data Unit (NOT implemented yet"""
519
520
521 if len(self.pilot_data) > 0:
522 CoordinationAdaptor.update_du(self)
523
524
526 """ List all items contained in DU
527 {
528 "filename" : {
529 "pilot_data" : [url1, url2],
530 "local" : url
531 }
532 }
533 """
534 self.__refresh()
535 base_urls = [i.url_for_du(self) for i in self.get_pilot_data()]
536 result_dict = {}
537 for i in self.data_unit_items:
538 logger.debug("Process file: %s"%(i.filename))
539 result_dict[i.filename]={
540 "pilot_data": [os.path.join(j, i.filename) for j in base_urls],
541 "local": i.local_url
542 }
543 return result_dict
544
545
546
548 """ Return current state of DataUnit """
549
550 du_dict = CoordinationAdaptor.get_du(self.url)
551 self.state = du_dict["state"]
552 return self.state
553
554
556 """ Wait until in running state
557 (or failed state)
558 """
559 logger.debug("DU: %s wait()"%(str(self.id)))
560
561 for i in self.transfer_threads:
562 i.join()
563
564
565 while True:
566 self.state = self.get_state()
567 if self.state==State.Running or self.state==State.Failed:
568 break
569 logger.debug("Waiting DU %s State: %s"%(self.get_url(), self.state))
570 time.sleep(2)
571
572
574 """ add this DU (self) to a certain pilot data
575 data will be moved into this data
576 """
577 transfer_thread=threading.Thread(target=self.__add_pilot_data, args=[pilot_data])
578 transfer_thread.start()
579 self.transfer_threads.append(transfer_thread)
580
581
583 """ get a list of pilot data that have a copy of this PD """
584 self.__restore_state()
585 return self.pilot_data
586
587
588 - def export(self, target_url):
589 """ simple implementation of export:
590 copies file from first pilot data to local machine
591 """
592 if self.get_state()!=State.Running:
593 self.wait()
594 if len(self.pilot_data) > 0:
595 self.pilot_data[0].export_du(self, target_url)
596 else:
597 logger.error("No Pilot Data for PD found")
598
599
601 """ Return URL that can be used to reconnect to Data Unit """
602 return self.url
603
604
605
606
607
609 """ Internal method that returns a dict with all data contained in this DataUnit"""
610 du_dict = self.__dict__
611 du_dict["id"]=self.id
612 return du_dict
613
614
616 """ Internal method for updating state"""
617 self.state=state
618 logger.debug("Update DU state to " + state + " Number of PD: " + str(len(self.pilot_data)))
619 if len(self.pilot_data) > 0:
620 CoordinationAdaptor.update_du(self)
621
622
634
635
636 @classmethod
638 try:
639 start = du_url.index(cls.DU_ID_PREFIX)
640 end = du_url.find("/", start)
641 if end==-1:
642 end = du_url.find("?", start)
643 if end==-1:
644 end = len(du_url)
645 return du_url[start:end]
646 except:
647 logger.error("No valid PD URL")
648 return None
649
650
652 """ Update list of data units items
653 from coordination service """
654 try:
655 if self.url != None:
656 du_dict = CoordinationAdaptor.get_du(self.url)
657 data_unit_dict_list = eval(du_dict["data_unit_items"])
658 self.data_unit_items = [DataUnitItem.create_data_unit_from_dict(i) for i in data_unit_dict_list]
659 except:
660 logger.warn("Refresh of DU %s failed"%(self.get_url()))
661
662
664 du_dict = CoordinationAdaptor.get_du(self.url)
665
666 self.data_unit_description = eval(du_dict["data_unit_description"])
667 self.state = du_dict["state"]
668
669
670 data_unit_dict_list = eval(du_dict["data_unit_items"])
671 self.data_unit_items = [DataUnitItem.create_data_unit_from_dict(i) for i in data_unit_dict_list]
672
673
674 pd_list = eval(du_dict["pilot_data"])
675 self.pilot_data = []
676 for i in pd_list:
677 logger.debug("PD: "+str(i))
678 pd = PilotData(pd_url=str(i))
679 self.pilot_data.append(pd)
680
681
683 return "PD: " + str(self.url)
684 + " \nData Units: " + str(self.data_unit_items)
685 + " \nPilot Data: " + str(self.pilot_data)
686
690 """ DataUnitItem """
691 DUI_ID_PREFIX="dui-"
692
693 - def __init__(self, pd=None, local_url=None):
694 if local_url!=None:
695 self.id = self.DUI_ID_PREFIX + str(uuid.uuid1())
696 self.local_url = local_url
697 self.filename = os.path.basename(local_url)
698
699
700
701
702 @classmethod
704 """ return True if file at url exists. Otherwise False """
705 file_url = SAGAUrl(url)
706 if file_url.host == "":
707 if os.path.exists(str(file_url)):
708 return True
709 else:
710 return False
711 elif file_url.host=="localhost":
712 if os.path.exists(file_url.path):
713 return True
714 else:
715 return False
716 else:
717 return True
718
719
721 return str(self.__dict__)
722
723
724
725
726 @classmethod
728 """ Creates a list of DUs from URL list
729 """
730 du_list = []
731 for i in urls:
732 if cls.__exists_file(i):
733 du = DataUnitItem(pd, i)
734 du_list.append(du)
735
736 return du_list
737
738 @classmethod
740 """ Creates a list of DUs from URL list
741 """
742 du_item_list = []
743 for i in urls:
744 if cls.__exists_file(i):
745 du = DataUnitItem(pd, i)
746 du_item_list.append(du)
747
748 return du_item_list
749
750
751 @classmethod
753 du = DataUnitItem()
754 logger.debug("Restore DU: " + str(du_dict))
755 for i in du_dict.keys():
756 logger.debug("Set attribute: %s", i)
757 du.__setattr__(i, du_dict[i])
758 return du
759
760
762 du_dict = self.__dict__
763 du_dict["id"]=self.id
764 return du_dict
765
770 url = du_url[:du_url.index(":du-")]
771 return url
772
774 du_id = du_url[du_url.index("du-"):]
775 return du_id
776
779 du_url = "redis://localhost/bigdata:pds-f31a670c-e3f6-11e1-afaf-705681b3df0f:pd-f31c47b8-e3f6-11e1-af44-705681b3df0f:du-f4debce8-e3f6-11e1-8399-705681b3df0f"
780 pd_url = __get_pd_url(du_url)
781 pd = PilotData(pd_url=pd_url)
782 print str(pd.list_data_units())
783 du = pd.get_du(du_url)
784
785
786 logger.debug(str(du.list()))
787
790 du_url = "redis://localhost/bigdata:du-1d1b7078-229f-11e2-834e-705681b3df0f"
791 du = DataUnit(du_url=du_url)
792 logger.debug(str(du.list()))
793 du.export("/tmp/export-test")
794
797 pilot_data_service = PilotDataService(coordination_url="redis://localhost/")
798 pilot_data_description = {
799 "service_url": "ssh://localhost/tmp/pilot-" + str(uuid.uuid1()),
800 "size": 100
801 }
802 pd = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description)
803
804
805 output_data_unit_description = {
806 "file_urls": [],
807 "file_url_patterns": ["test.txt"]
808 }
809 output_data_unit = pd.submit_data_unit(output_data_unit_description)
810 output_data_unit.wait()
811 logger.debug("Output DU: " + output_data_unit.get_url())
812 pd_reconnect_url = pd.get_url()
813 du_url = output_data_unit.get_url()
814 pd_reconnect = PilotData(pd_url=pd_reconnect_url)
815 du_reconnect = pd_reconnect.get_du(du_url)
816 du_reconnect.add_files(["test.txt"])
817
818
819
820 if __name__ == "__main__":
821
822 test_du_reconnect()
823