Package pilot :: Package coordination :: Module advert_adaptor

Source Code for Module pilot.coordination.advert_adaptor

  1  import logging 
  2  import saga 
  3  import json 
  4  import pdb 
  5   
  6  from pilot import * 
  7  from bigjob import logger 
8 9 -class AdvertCoordinationAdaptor:
10 """ 11 BigData persists its data in a central data space, e.g. the Advert service 12 to facilitate distributed coordination: 13 14 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/ => namespace for pilot data 15 16 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds => pilot data service 17 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds/pilot-data-description => pilot data description 18 ... 19 20 21 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds/ => pilot store service 22 advert://advert.cct.lsu.edu/pilot/3d0d5960-296d-11e1-8896-00264a13ca4c/data/pds/pilot-data-description => pilot data description 23 24 This class is stateless - the application's base_url needs to be passed into every method. 25 """ 26 BASE_URL="advert://localhost/" 27 BASE_URL_QUERY_STRING="?dbtype=sqlite3" 28 29 PILOT_PATH="pilot" 30 PILOT_DATA_PATH=PILOT_PATH 31 PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH+"/pds" 32 DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH+"/dus" 33 COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + "/cds" 34 35 36 ########################################################################### 37 # Construct a base url for an application 38 39 @classmethod
40 - def get_base_url(cls, application_id):
41 surl = saga.url(cls.BASE_URL) 42 base_url = surl.scheme + "://" + surl.host + "/" + application_id + "/" 43 logging.debug(base_url) 44 return base_url
45 46 ########################################################################### 47 # Pilot Store Service related methods 48 49 @classmethod
50 - def add_pds(cls, application_url, pds):
51 pds_url_no_dbtype = cls.get_pds_url(application_url, pds.id) 52 pds_url = cls.__get_url(pds_url_no_dbtype) 53 logger.debug("Create PDS directory at %s"%pds_url) 54 saga.advert.directory(pds_url, saga.advert.Create | 55 saga.advert.CreateParents | 56 saga.advert.ReadWrite) 57 return pds_url_no_dbtype
58 59 60 @classmethod
61 - def delete_pds(cls, pds_url):
62 pds_url = cls.__get_url(pds_url) 63 pds_dir = saga.advert.directory(saga.url(pds_url), 64 saga.advert.Create | 65 saga.advert.CreateParents | 66 saga.advert.ReadWrite) 67 pds_dir.remove(pds_url, saga.name_space.Recursive)
68 69 ########################################################################### 70 # Pilot Data related methods 71 72 @classmethod
73 - def add_pd(cls, pds_url, pd):
74 pds_url = cls.__remove_dbtype(pds_url) 75 pd_url =pds_url+"/" + pd.id 76 pd_description_url = cls.__get_url(pd_url + "/description") 77 logger.debug("PDS URL: %s, PD Description URL: %s"%(pds_url, pd_description_url)) 78 # directory is recursively created 79 pd_desc_entry = saga.advert.entry(saga.url(pd_description_url), 80 saga.advert.Create | saga.advert.CreateParents | saga.advert.ReadWrite) 81 logger.debug("initialized advert entry for pds: " + pd_description_url) 82 pd_desc_entry.store_string(json.dumps(pd.data_unit_description)) 83 return pd_url
84 85 @classmethod
86 - def update_pd(cls, pd):
87 if len(pd.data_units) > 0: 88 du_urls = [i.url for i in pd.data_units.values()] 89 cls.__store_entry(cls.__remove_dbtype(pd.url)+"/data-units", du_urls) 90 cls.__store_entry(cls.__remove_dbtype(pd.url)+"/pilot-data", pd.to_dict())
91 92 93 @classmethod
94 - def get_pd(cls, pds_url):
95 logger.debug("GET PD: " + pds_url) 96 pd_dict={} 97 #pd_dict["pilot_data" ]= cls.__retrieve_entry(cls.__remove_dbtype(pds_url)+"/pilot-data") 98 pd_dict["pilot_data"] = cls.__retrieve_entry(cls.__remove_dbtype(pds_url)+"/pilot-data") 99 return pd_dict
100 101 102 @classmethod
103 - def list_pd(cls, pds_url):
104 """ return a list of urls to pd managed by the PDS """ 105 pds_url = cls.__get_url(pds_url) 106 logger.debug("List PD at %s"%pds_url) 107 pds_dir = saga.advert.directory(pds_url, saga.advert.Create | 108 saga.advert.CreateParents | 109 saga.advert.ReadWrite) 110 111 pd_list = pds_dir.list() 112 pd_full_urls = [] 113 for i in pd_list: 114 pd_full_urls.append(pds_url + "/" + i) 115 return pd_full_urls
116 117 @classmethod
118 - def delete_pd(cls, pds_url):
119 pds_url = cls.__get_url(pds_url) 120 pd_dir = saga.advert.directory(saga.url(pds_url), 121 saga.advert.Create | 122 saga.advert.CreateParents | 123 saga.advert.ReadWrite) 124 pd_dir.remove(pds_url, saga.name_space.Recursive)
125 126 127 ########################################################################### 128 # Compute Data Service related methods 129 @classmethod
130 - def add_cds(cls, application_url, cds):
131 cds_url_no_dbtype = cls.get_cds_url(application_url, cds.id) 132 cds_url = cls.__get_url(cds_url_no_dbtype) 133 logger.debug("Create CDS directory at %s"%cds_url) 134 saga.advert.directory(cds_url, saga.advert.Create | 135 saga.advert.CreateParents | 136 saga.advert.ReadWrite) 137 return cds_url_no_dbtype
138 139 @classmethod
140 - def update_cds(cls, cds_url, cds):
141 142 # Storage and Compute Resources 143 pds_urls = [cls.__remove_dbtype(i.url) for i in cds.pilot_data_services] 144 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/pds/", pds_urls) 145 146 pjs_urls = [i.url for i in cds.pilot_job_services] 147 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cds/", pjs_urls) 148 149 # currently managed PDs and WUs 150 pd_urls = [i.url for i in cds.data_units.values()] 151 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/du/", pd_urls) 152 153 wu_urls = [i.url for i in cds.compute_units.values()] 154 cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cu/", wu_urls)
155 156 157 @classmethod
158 - def delete_cds(cls, cds_url):
159 cds_url = cls.__get_url(cls.__remove_dbtype(cds_url)) 160 cds_dir = saga.advert.directory(saga.url(cds_url), 161 saga.advert.Create | 162 saga.advert.CreateParents | 163 saga.advert.ReadWrite)
164 # cds_dir.remove(cds_url, saga.name_space.Recursive) 165 166 167 168 169 ########################################################################### 170 # Data Unit related methods 171 @classmethod
172 - def add_du(cls, dus_url, du):
173 du_url = cls.__remove_dbtype(dus_url) + "/" + du.id 174 du_url = cls.__get_url(du_url) 175 # directory is recursively created 176 #saga.advert.directory(saga.url(du_url), 177 # saga.advert.Create | saga.advert.CreateParents | saga.advert.ReadWrite) 178 #logger.debug("initialized advert entry for dus: " + du_url) 179 return du_url
180 181 182 @classmethod
183 - def get_du(cls, du_url):
184 logger.debug("**** GET PD: " + du_url) 185 du_dict={} 186 du_dict["data_unit_description" ]= cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/description") 187 du_dict["state"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/state") 188 du_dict["data_units"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/data-units") 189 du_dict["pilot_data"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/pilot-data") 190 logger.debug("Open pilot data at: " + du_url + " State: " + str(du_dict)) 191 return du_dict
192 193 194 @classmethod
195 - def update_du(cls, du):
196 logger.debug("**** Update pilot data at: " + du.url) 197 cls.__store_entry(cls.__remove_dbtype(du.url)+"/description", du.data_unit_description) 198 cls.__store_entry(cls.__remove_dbtype(du.url)+"/state", du.state) 199 200 du_urls = [i.url for i in du.pilot_data] 201 cls.__store_entry(cls.__remove_dbtype(du.url)+"/pilot-data", du_urls) 202 203 du_dict_list = [i.to_dict() for i in du.data_unit_items] 204 cls.__store_entry(cls.__remove_dbtype(du.url)+"/data-units", du_dict_list)
205 206 207 @classmethod
208 - def list_du(cls, dus_url):
209 """ return a list of urls to du managed by the PDS """ 210 dus_url = cls.__get_url(dus_url) 211 logger.debug("List PDS at %s"%dus_url) 212 dus_dir = saga.advert.directory(dus_url, saga.advert.Create | 213 saga.advert.CreateParents | 214 saga.advert.ReadWrite) 215 216 du_list = dus_dir.list() 217 du_full_urls = [] 218 for i in du_list: 219 du_full_urls.append(dus_url + "/" + i) 220 return du_full_urls
221 222 223 @classmethod
224 - def delete_du(cls, du_url):
225 du_url = cls.__get_url(du_url) 226 du_dir = saga.advert.directory(saga.url(du_url), 227 saga.advert.Create | 228 saga.advert.CreateParents | 229 saga.advert.ReadWrite) 230 du_dir.remove(du_url, saga.name_space.Recursive)
231 232 233 234 ########################################################################### 235 # URL Tweaking 236 237 @classmethod
238 - def get_pds_url(cls, application_url, pds_id):
239 pds_url = application_url+AdvertCoordinationAdaptor.PILOT_DATA_SERVICE_PATH+"/"+pds_id 240 logger.debug("PDS URL: %s"%(pds_url)) 241 return pds_url
242 243 @classmethod
244 - def get_cds_url(cls, application_url, cds_id):
245 cds_url = application_url+AdvertCoordinationAdaptor.COMPUTE_DATA_SERVICE_PATH+"/"+cds_id 246 logger.debug("CDS URL: %s"%(cds_url)) 247 return cds_url
248 249 ########################################################################### 250 # internal methods 251 252 @classmethod
253 - def __get_url(cls, url):
254 """ appends advert querystring for dbtype to url """ 255 url = url + AdvertCoordinationAdaptor.BASE_URL_QUERY_STRING 256 return url
257 258 @classmethod
259 - def __remove_dbtype(cls, url):
260 surl = saga.url(url) 261 surl.query = "" 262 return surl.get_string()
263 264 @classmethod
265 - def __store_entry(cls, entry_url, content):
266 entry_url = cls.__get_url(entry_url) 267 268 # directory is recursively created 269 entry = saga.advert.entry(saga.url(entry_url), 270 saga.advert.Create | 271 saga.advert.CreateParents | saga.advert.ReadWrite) 272 entry.store_string(json.dumps(content))
273 #logger.debug("Store Advert entry at: " + entry_url 274 # + " Content: " + str(json.dumps(content))) 275 276 @classmethod
277 - def __retrieve_entry(cls, entry_url):
278 entry_url = cls.__get_url(entry_url) 279 #logger.debug("Retrieve Advert entry at: " + entry_url) 280 # directory is recursively created 281 entry = saga.advert.entry(saga.url(entry_url), 282 saga.advert.Create | 283 saga.advert.CreateParents | saga.advert.ReadWrite) 284 content = json.loads(entry.retrieve_string()) 285 #logger.debug("Retrieve Advert entry at: " + entry_url 286 # + " Content: " + str(json.dumps(content))) 287 return content
288