1 import logging
2 import saga
3 import json
4 import pdb
5
6 from pilot import *
7 from bigjob import logger
10 """
11 BigData persists its data in a central data space, e.g. the Advert service
12 to facilitate distributed coordination:
13
14 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/ => namespace for pilot data
15
16 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds => pilot data service
17 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds/pilot-data-description => pilot data description
18 ...
19
20
21 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds/ => pilot store service
22 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds/pilot-data-description => pilot data description
23
24 This class is stateless - the application's base_url needs to be passed into every method.
25 """
26 BASE_URL="advert://localhost/"
27 BASE_URL_QUERY_STRING="?dbtype=sqlite3"
28
29 PILOT_PATH="pilot"
30 PILOT_DATA_PATH=PILOT_PATH
31 PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH+"/pds"
32 DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH+"/dus"
33 COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + "/cds"
34
35
36
37
38
39 @classmethod
41 surl = saga.url(cls.BASE_URL)
42 base_url = surl.scheme + "://" + surl.host + "/" + application_id + "/"
43 logging.debug(base_url)
44 return base_url
45
46
47
48
49 @classmethod
50 - def add_pds(cls, application_url, pds):
51 pds_url_no_dbtype = cls.get_pds_url(application_url, pds.id)
52 pds_url = cls.__get_url(pds_url_no_dbtype)
53 logger.debug("Create PDS directory at %s"%pds_url)
54 saga.advert.directory(pds_url, saga.advert.Create |
55 saga.advert.CreateParents |
56 saga.advert.ReadWrite)
57 return pds_url_no_dbtype
58
59
60 @classmethod
62 pds_url = cls.__get_url(pds_url)
63 pds_dir = saga.advert.directory(saga.url(pds_url),
64 saga.advert.Create |
65 saga.advert.CreateParents |
66 saga.advert.ReadWrite)
67 pds_dir.remove(pds_url, saga.name_space.Recursive)
68
69
70
71
72 @classmethod
74 pds_url = cls.__remove_dbtype(pds_url)
75 pd_url =pds_url+"/" + pd.id
76 pd_description_url = cls.__get_url(pd_url + "/description")
77 logger.debug("PDS URL: %s, PD Description URL: %s"%(pds_url, pd_description_url))
78
79 pd_desc_entry = saga.advert.entry(saga.url(pd_description_url),
80 saga.advert.Create | saga.advert.CreateParents | saga.advert.ReadWrite)
81 logger.debug("initialized advert entry for pds: " + pd_description_url)
82 pd_desc_entry.store_string(json.dumps(pd.data_unit_description))
83 return pd_url
84
85 @classmethod
87 if len(pd.data_units) > 0:
88 du_urls = [i.url for i in pd.data_units.values()]
89 cls.__store_entry(cls.__remove_dbtype(pd.url)+"/data-units", du_urls)
90 cls.__store_entry(cls.__remove_dbtype(pd.url)+"/pilot-data", pd.to_dict())
91
92
93 @classmethod
95 logger.debug("GET PD: " + pds_url)
96 pd_dict={}
97
98 pd_dict["pilot_data"] = cls.__retrieve_entry(cls.__remove_dbtype(pds_url)+"/pilot-data")
99 return pd_dict
100
101
102 @classmethod
104 """ return a list of urls to pd managed by the PDS """
105 pds_url = cls.__get_url(pds_url)
106 logger.debug("List PD at %s"%pds_url)
107 pds_dir = saga.advert.directory(pds_url, saga.advert.Create |
108 saga.advert.CreateParents |
109 saga.advert.ReadWrite)
110
111 pd_list = pds_dir.list()
112 pd_full_urls = []
113 for i in pd_list:
114 pd_full_urls.append(pds_url + "/" + i)
115 return pd_full_urls
116
117 @classmethod
119 pds_url = cls.__get_url(pds_url)
120 pd_dir = saga.advert.directory(saga.url(pds_url),
121 saga.advert.Create |
122 saga.advert.CreateParents |
123 saga.advert.ReadWrite)
124 pd_dir.remove(pds_url, saga.name_space.Recursive)
125
126
127
128
129 @classmethod
130 - def add_cds(cls, application_url, cds):
131 cds_url_no_dbtype = cls.get_cds_url(application_url, cds.id)
132 cds_url = cls.__get_url(cds_url_no_dbtype)
133 logger.debug("Create CDS directory at %s"%cds_url)
134 saga.advert.directory(cds_url, saga.advert.Create |
135 saga.advert.CreateParents |
136 saga.advert.ReadWrite)
137 return cds_url_no_dbtype
138
139 @classmethod
141
142
143 pds_urls = [cls.__remove_dbtype(i.url) for i in cds.pilot_data_services]
144 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/pds/", pds_urls)
145
146 pjs_urls = [i.url for i in cds.pilot_job_services]
147 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cds/", pjs_urls)
148
149
150 pd_urls = [i.url for i in cds.data_units.values()]
151 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/du/", pd_urls)
152
153 wu_urls = [i.url for i in cds.compute_units.values()]
154 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cu/", wu_urls)
155
156
157 @classmethod
159 cds_url = cls.__get_url(cls.__remove_dbtype(cds_url))
160 cds_dir = saga.advert.directory(saga.url(cds_url),
161 saga.advert.Create |
162 saga.advert.CreateParents |
163 saga.advert.ReadWrite)
164
165
166
167
168
169
170
171 @classmethod
172 - def add_du(cls, dus_url, du):
173 du_url = cls.__remove_dbtype(dus_url) + "/" + du.id
174 du_url = cls.__get_url(du_url)
175
176
177
178
179 return du_url
180
181
182 @classmethod
184 logger.debug("**** GET PD: " + du_url)
185 du_dict={}
186 du_dict["data_unit_description" ]= cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/description")
187 du_dict["state"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/state")
188 du_dict["data_units"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/data-units")
189 du_dict["pilot_data"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/pilot-data")
190 logger.debug("Open pilot data at: " + du_url + " State: " + str(du_dict))
191 return du_dict
192
193
194 @classmethod
196 logger.debug("**** Update pilot data at: " + du.url)
197 cls.__store_entry(cls.__remove_dbtype(du.url)+"/description", du.data_unit_description)
198 cls.__store_entry(cls.__remove_dbtype(du.url)+"/state", du.state)
199
200 du_urls = [i.url for i in du.pilot_data]
201 cls.__store_entry(cls.__remove_dbtype(du.url)+"/pilot-data", du_urls)
202
203 du_dict_list = [i.to_dict() for i in du.data_unit_items]
204 cls.__store_entry(cls.__remove_dbtype(du.url)+"/data-units", du_dict_list)
205
206
207 @classmethod
209 """ return a list of urls to du managed by the PDS """
210 dus_url = cls.__get_url(dus_url)
211 logger.debug("List PDS at %s"%dus_url)
212 dus_dir = saga.advert.directory(dus_url, saga.advert.Create |
213 saga.advert.CreateParents |
214 saga.advert.ReadWrite)
215
216 du_list = dus_dir.list()
217 du_full_urls = []
218 for i in du_list:
219 du_full_urls.append(dus_url + "/" + i)
220 return du_full_urls
221
222
223 @classmethod
225 du_url = cls.__get_url(du_url)
226 du_dir = saga.advert.directory(saga.url(du_url),
227 saga.advert.Create |
228 saga.advert.CreateParents |
229 saga.advert.ReadWrite)
230 du_dir.remove(du_url, saga.name_space.Recursive)
231
232
233
234
235
236
237 @classmethod
242
243 @classmethod
248
249
250
251
252 @classmethod
257
258 @classmethod
260 surl = saga.url(url)
261 surl.query = ""
262 return surl.get_string()
263
264 @classmethod
265 - def __store_entry(cls, entry_url, content):
266 entry_url = cls.__get_url(entry_url)
267
268
269 entry = saga.advert.entry(saga.url(entry_url),
270 saga.advert.Create |
271 saga.advert.CreateParents | saga.advert.ReadWrite)
272 entry.store_string(json.dumps(content))
273
274
275
276 @classmethod
277 - def __retrieve_entry(cls, entry_url):
278 entry_url = cls.__get_url(entry_url)
279
280
281 entry = saga.advert.entry(saga.url(entry_url),
282 saga.advert.Create |
283 saga.advert.CreateParents | saga.advert.ReadWrite)
284 content = json.loads(entry.retrieve_string())
285
286
287 return content
288