Package pilot :: Package coordination :: Module nocoord_adaptor

Source Code for Module pilot.coordination.nocoord_adaptor

  1  import logging 
  2  import json 
  3  import pdb 
  4   
  5  from pilot import * 
  6  from bigjob import logger 
  7  from bliss.saga import Url as SAGAUrl 
8 9 -class NoCoordinationAdaptor:
10 """ 11 Dummy Adaptor - No distributed coordination done 12 """ 13 BASE_URL="nocoord://localhost/" 14 BASE_URL_QUERY_STRING="?dbtype=sqlite3" 15 16 PILOT_PATH="pilot" 17 PILOT_DATA_PATH=PILOT_PATH 18 PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH+"/pds" 19 DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH+"/dus" 20 COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + "/cds" 21 22 23 ########################################################################### 24 # Construct a base url for an application 25 26 @classmethod
27 - def get_base_url(cls, application_id):
28 surl = SAGAUrl(cls.BASE_URL) 29 base_url = surl.scheme + "://" + surl.host + "/" + application_id + "/" 30 logger.debug(base_url) 31 return base_url
32 33 ########################################################################### 34 # Pilot Store Service related methods 35 36 @classmethod
37 - def add_pds(cls, application_url, pds):
38 pds_url_no_dbtype = cls.get_pds_url(application_url, pds.id) 39 pds_url = cls.__get_url(pds_url_no_dbtype) 40 logger.debug("Create PDS directory at %s"%pds_url) 41 #saga.advert.directory(pds_url, saga.advert.Create | 42 # saga.advert.CreateParents | 43 # saga.advert.ReadWrite) 44 return pds_url_no_dbtype
45 46 47 @classmethod
48 - def delete_pds(cls, pds_url):
49 pds_url = cls.__get_url(pds_url)
50 #pds_dir = saga.advert.directory(saga.url(pds_url), 51 # saga.advert.Create | 52 # saga.advert.CreateParents | 53 # saga.advert.ReadWrite) 54 #pds_dir.remove(pds_url, saga.name_space.Recursive) 55 56 ########################################################################### 57 # Pilot Data related methods 58 59 @classmethod
60 - def add_pd(cls, pds_url, pd):
61 pds_url = cls.__remove_dbtype(pds_url) 62 pd_url =pds_url+"/" + pd.id 63 pd_description_url = cls.__get_url(pd_url + "/description") 64 logger.debug("PDS URL: %s, PD Description URL: %s"%(pds_url, pd_description_url)) 65 # directory is recursively created 66 #pd_desc_entry = saga.advert.entry(saga.url(pd_description_url), 67 # saga.advert.Create | saga.advert.CreateParents | saga.advert.ReadWrite) 68 #logger.debug("initialized advert entry for pds: " + pd_description_url) 69 #pd_desc_entry.store_string(json.dumps(pd.data_unit_description)) 70 return pd_url
71 72 @classmethod
73 - def update_pd(cls, pd):
74 if len(pd.data_units) > 0: 75 du_urls = [i.url for i in pd.data_units.values()]
76 #cls.__store_entry(cls.__remove_dbtype(pd.url)+"/data-units", du_urls) 77 #cls.__store_entry(cls.__remove_dbtype(pd.url)+"/pilot-data", pd.to_dict()) 78 79 80 @classmethod
81 - def get_pd(cls, pds_url):
82 logger.debug("GET PD: " + pds_url) 83 pd_dict={}
84 #pd_dict["pilot_data" ]= cls.__retrieve_entry(cls.__remove_dbtype(pds_url)+"/pilot-data") 85 #pd_dict["pilot_data"] = cls.__retrieve_entry(cls.__remove_dbtype(pds_url)+"/pilot-data") 86 #return pd_dict 87 88 89 @classmethod
90 - def list_pd(cls, pds_url):
91 """ return a list of urls to pd managed by the PDS """ 92 pds_url = cls.__get_url(pds_url) 93 logger.debug("List PD at %s"%pds_url)
94 #pds_dir = saga.advert.directory(pds_url, saga.advert.Create | 95 # saga.advert.CreateParents | 96 # saga.advert.ReadWrite) 97 98 #pd_list = pds_dir.list() 99 #pd_full_urls = [] 100 #for i in pd_list: 101 # pd_full_urls.append(pds_url + "/" + i) 102 #return pd_full_urls 103 104 @classmethod
105 - def delete_pd(cls, pds_url):
106 pds_url = cls.__get_url(pds_url)
107 #pd_dir = saga.advert.directory(saga.url(pds_url), 108 # saga.advert.Create | 109 # saga.advert.CreateParents | 110 # saga.advert.ReadWrite) 111 #pd_dir.remove(pds_url, saga.name_space.Recursive) 112 113 114 ########################################################################### 115 # Compute Data Service related methods 116 @classmethod
117 - def add_cds(cls, application_url, cds):
118 cds_url_no_dbtype = cls.get_cds_url(application_url, cds.id) 119 cds_url = cls.__get_url(cds_url_no_dbtype) 120 logger.debug("Create CDS directory at %s"%cds_url) 121 #saga.advert.directory(cds_url, saga.advert.Create | 122 # saga.advert.CreateParents | 123 # saga.advert.ReadWrite) 124 return cds_url_no_dbtype
125 126 @classmethod
127 - def update_cds(cls, cds_url, cds):
128 129 # Storage and Compute Resources 130 pds_urls = [cls.__remove_dbtype(i.url) for i in cds.pilot_data_services] 131 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/pds/", pds_urls) 132 133 pjs_urls = [i.url for i in cds.pilot_job_services] 134 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cds/", pjs_urls) 135 136 # currently managed PDs and WUs 137 pd_urls = [i.url for i in cds.data_units.values()] 138 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/du/", pd_urls) 139 140 wu_urls = [i.url for i in cds.compute_units.values()]
141 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cu/", wu_urls) 142 143 144 @classmethod
145 - def delete_cds(cls, cds_url):
146 cds_url = cls.__get_url(cls.__remove_dbtype(cds_url))
147 #cds_dir = saga.advert.directory(saga.url(cds_url), 148 # saga.advert.Create | 149 # saga.advert.CreateParents | 150 # saga.advert.ReadWrite) 151 # cds_dir.remove(cds_url, saga.name_space.Recursive) 152 153 154 155 156 ########################################################################### 157 # Data Unit related methods 158 @classmethod
159 - def add_du(cls, dus_url, du):
160 du_url = cls.__remove_dbtype(dus_url) + "/" + du.id 161 du_url = cls.__get_url(du_url) 162 # directory is recursively created 163 #saga.advert.directory(saga.url(du_url), 164 # saga.advert.Create | saga.advert.CreateParents | saga.advert.ReadWrite) 165 #logger.debug("initialized advert entry for dus: " + du_url) 166 return du_url
167 168 169 @classmethod
170 - def get_du(cls, du_url):
171 logger.debug("**** GET PD: " + du_url) 172 du_dict={} 173 #du_dict["data_unit_description" ]= cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/description") 174 #du_dict["state"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/state") 175 #du_dict["data_units"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/data-units") 176 #du_dict["pilot_data"] = cls.__retrieve_entry(cls.__remove_dbtype(du_url)+"/pilot-data") 177 #logger.debug("Open pilot data at: " + du_url + " State: " + str(du_dict)) 178 return du_dict
179 180 181 @classmethod
182 - def update_du(cls, du):
183 logger.debug("**** Update pilot data at: " + du.url) 184 #cls.__store_entry(cls.__remove_dbtype(du.url)+"/description", du.data_unit_description) 185 #cls.__store_entry(cls.__remove_dbtype(du.url)+"/state", du.state) 186 187 du_urls = [i.url for i in du.pilot_data] 188 #cls.__store_entry(cls.__remove_dbtype(du.url)+"/pilot-data", du_urls) 189 190 du_dict_list = [i.to_dict() for i in du.data_unit_items]
191 #cls.__store_entry(cls.__remove_dbtype(du.url)+"/data-units", du_dict_list) 192 193 194 @classmethod
195 - def list_du(cls, dus_url):
196 """ return a list of urls to du managed by the PDS """ 197 dus_url = cls.__get_url(dus_url) 198 logger.debug("List PDS at %s"%dus_url) 199 #dus_dir = saga.advert.directory(dus_url, saga.advert.Create | 200 # saga.advert.CreateParents | 201 # saga.advert.ReadWrite) 202 203 #du_list = dus_dir.list() 204 #du_full_urls = [] 205 #for i in du_list: 206 # du_full_urls.append(dus_url + "/" + i) 207 return du_full_urls
208 209 210 @classmethod
211 - def delete_du(cls, du_url):
212 du_url = cls.__get_url(du_url)
213 #du_dir = saga.advert.directory(saga.url(du_url), 214 # saga.advert.Create | 215 # saga.advert.CreateParents | 216 # saga.advert.ReadWrite) 217 #du_dir.remove(du_url, saga.name_space.Recursive) 218 219 220 221 ########################################################################### 222 # URL Tweaking 223 224 @classmethod
225 - def get_pds_url(cls, application_url, pds_id):
226 pds_url = application_url+NoCoordinationAdaptor.PILOT_DATA_SERVICE_PATH+"/"+pds_id 227 logger.debug("PDS URL: %s"%(pds_url)) 228 return pds_url
229 230 @classmethod
231 - def get_cds_url(cls, application_url, cds_id):
232 cds_url = application_url+NoCoordinationAdaptor.COMPUTE_DATA_SERVICE_PATH+"/"+cds_id 233 logger.debug("CDS URL: %s"%(cds_url)) 234 return cds_url
235 236 ########################################################################### 237 # internal methods 238 239 @classmethod
240 - def __get_url(cls, url):
241 """ appends advert querystring for dbtype to url """ 242 url = url + NoCoordinationAdaptor.BASE_URL_QUERY_STRING 243 return url
244 245 @classmethod
246 - def __remove_dbtype(cls, url):
247 surl = SAGAUrl(url) 248 return str(surl)
249 250 @classmethod
251 - def __store_entry(cls, entry_url, content):
252 entry_url = cls.__get_url(entry_url)
253 254 # directory is recursively created 255 #entry = saga.advert.entry(saga.url(entry_url), 256 # saga.advert.Create | 257 # saga.advert.CreateParents | saga.advert.ReadWrite) 258 #entry.store_string(json.dumps(content)) 259 #logger.debug("Store Advert entry at: " + entry_url 260 # + " Content: " + str(json.dumps(content))) 261 262 @classmethod
263 - def __retrieve_entry(cls, entry_url):
264 entry_url = cls.__get_url(entry_url) 265 #logger.debug("Retrieve Advert entry at: " + entry_url) 266 # directory is recursively created 267 #entry = saga.advert.entry(saga.url(entry_url), 268 # saga.advert.Create | 269 # saga.advert.CreateParents | saga.advert.ReadWrite) 270 #content = json.loads(entry.retrieve_string()) 271 #logger.debug("Retrieve Advert entry at: " + entry_url 272 # + " Content: " + str(json.dumps(content))) 273 return content
274