Package pilot :: Package coordination :: Module redis_adaptor

Source Code for Module pilot.coordination.redis_adaptor

  1  import logging 
  2  import pdb 
  3  from pilot import * 
  4  from bigjob import logger 
  5  import bliss.saga as saga 
  6   
  7  try: 
  8      import json 
  9  except ImportError: 
 10      import simplejson as json 
11 12 13 -class RedisCoordinationAdaptor:
14 """ 15 Dummy Adaptor - No distributed coordination done 16 """ 17 BASE_URL="redis://localhost/" 18 SEPARATOR=":" 19 20 PILOT_PATH="pilot" 21 PILOT_DATA_PATH=PILOT_PATH 22 PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR + "pds" 23 DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR +"dus" 24 COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + SEPARATOR + "cds" 25 26 27 ########################################################################### 28 # Construct a base url for an application 29 30 @classmethod
31 - def configure_base_url(cls, base_url):
32 cls.BASE_URL=base_url
33 34 @classmethod
35 - def get_base_url(cls, application_id):
36 if cls.BASE_URL==None: 37 logger.error("Coordination URL not set. Exiting Pilot-Data.") 38 raise Exception("Coordination URL not set. Exiting Pilot-Data.") 39 surl = saga.Url(cls.BASE_URL) 40 base_url = surl.scheme + "://" + surl.host + "/" + application_id 41 logger.debug(base_url) 42 return base_url
43 44 ########################################################################### 45 # Pilot Store Service related methods 46 47 @classmethod
48 - def add_pds(cls, application_url, pds):
49 pds_url_no_dbtype = cls.get_pds_url(application_url, pds.id) 50 pds_url = cls.__get_url(pds_url_no_dbtype) 51 logger.debug("Create PDS directory at %s"%pds_url) 52 return pds_url_no_dbtype
53 54 55 @classmethod
56 - def delete_pds(cls, pds_url):
57 pds_url = cls.__get_url(pds_url)
58 59 #pds_dir.remove(pds_url, saga.name_space.Recursive) 60 61 ########################################################################### 62 # Pilot Data related methods 63 64 @classmethod
65 - def add_pd(cls, pds_url, pd):
66 pd_url =pds_url+ RedisCoordinationAdaptor.SEPARATOR + pd.id 67 return pd_url
68 69 70 @classmethod
71 - def update_pd(cls, pd):
72 du_urls=None 73 if len(pd.data_unit_urls) > 0: 74 du_urls = pd.data_unit_urls 75 76 pd_dict={ 77 "data_unit_urls": du_urls, 78 "pilot_data": pd.to_dict(), 79 "pilot_data_description": pd.pilot_data_description, 80 "security_context": pd.security_context 81 } 82 83 cls.__store_entry(pd.url+RedisCoordinationAdaptor.SEPARATOR + "info", pd_dict)
84 85 86 @classmethod
87 - def get_pd(cls, pd_url):
88 logger.debug("GET PD: " + pd_url) 89 pd_dict=cls.__retrieve_entry(pd_url + RedisCoordinationAdaptor.SEPARATOR + "info") 90 return pd_dict
91 92 93 @classmethod
94 - def list_pd(cls, pds_url):
95 """ return a list of urls to pd managed by the PDS """ 96 pds_url = cls.__get_url(pds_url) 97 logger.debug("List PD at %s"%pds_url)
98 99 100 @classmethod
101 - def delete_pd(cls, pds_url):
102 pds_url = cls.__get_url(pds_url)
103 #pd_dir = saga.advert.directory(saga.url(pds_url), 104 # saga.advert.Create | 105 # saga.advert.CreateParents | 106 # saga.advert.ReadWrite) 107 #pd_dir.remove(pds_url, saga.name_space.Recursive) 108 109 110 ########################################################################### 111 # Compute Data Service related methods 112 @classmethod
113 - def add_cds(cls, application_url, cds):
114 cds_url_no_dbtype = cls.get_cds_url(application_url, cds.id) 115 cds_url = cls.__get_url(cds_url_no_dbtype) 116 logger.debug("Create CDS directory at %s"%cds_url) 117 #saga.advert.directory(cds_url, saga.advert.Create | 118 # saga.advert.CreateParents | 119 # saga.advert.ReadWrite) 120 return cds_url_no_dbtype
121 122 123 @classmethod
124 - def update_cds(cls, cds_url, cds):
125 126 # Storage and Compute Resources 127 pds_urls = [i.url for i in cds.pilot_data_services] 128 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/pds/", pds_urls) 129 130 pjs_urls = [i.url for i in cds.pilot_job_services] 131 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cds/", pjs_urls) 132 133 # currently managed PDs and WUs 134 pd_urls = [i.url for i in cds.data_units.values()] 135 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/du/", pd_urls) 136 137 wu_urls = [i.url for i in cds.compute_units.values()]
138 #cls.__store_entry(cls.__remove_dbtype(cds_url)+"/cu/", wu_urls) 139 140 141 @classmethod
142 - def delete_cds(cls, cds_url):
143 cds_url = cls.__get_url(cds_url)
144 #cds_dir = saga.advert.directory(saga.url(cds_url), 145 # saga.advert.Create | 146 # saga.advert.CreateParents | 147 # saga.advert.ReadWrite) 148 # cds_dir.remove(cds_url, saga.name_space.Recursive) 149 150 151 152 153 ########################################################################### 154 # Data Unit related methods 155 @classmethod
156 - def add_du(cls, root_url, du):
157 du_url = root_url + RedisCoordinationAdaptor.SEPARATOR + du.id 158 du_url = cls.__get_url(du_url) 159 return du_url
160 161 162 @classmethod
163 - def get_du(cls, du_url):
164 logger.debug("**** GET DU: " + str(du_url)) 165 du_dict=cls.__retrieve_entry(du_url+ RedisCoordinationAdaptor.SEPARATOR + "info") 166 logger.debug("Retrieved DU: " + du_url + " Content: " + str(du_dict)) 167 return du_dict
168 169 170 @classmethod
171 - def update_du(cls, du):
172 logger.debug("**** Update data unit at: " + du.url) 173 du_dict_list = [i.to_dict() for i in du.data_unit_items] 174 du_urls = [i.url for i in du.pilot_data] 175 du_dict = { 176 "data_unit_description":du.data_unit_description, 177 "state": du.state, 178 "pilot_data": du_urls, 179 "data_unit_items": du_dict_list 180 } 181 cls.__store_entry(du.url + RedisCoordinationAdaptor.SEPARATOR + "info", du_dict)
182 183 184 @classmethod
185 - def list_du(cls, pd_url):
186 """ return a list of urls to du managed by the PDS """ 187 pd_url = cls.__get_url(pd_url) 188 logger.debug("List Data-Units of Pilot-Data at %s"%pd_url) 189 dus = cls.__list_keys(pd_url+":du-*") 190 return dus
191 192 193 @classmethod
194 - def delete_du(cls, du_url):
195 du_url = cls.__get_url(du_url)
196 #du_dir = saga.advert.directory(saga.url(du_url), 197 # saga.advert.Create | 198 # saga.advert.CreateParents | 199 # saga.advert.ReadWrite) 200 #du_dir.remove(du_url, saga.name_space.Recursive) 201 202 203 204 ########################################################################### 205 # URL Tweaking 206 @classmethod
207 - def get_pds_url(cls, application_url, pds_id):
208 pds_url = application_url + RedisCoordinationAdaptor.SEPARATOR +pds_id 209 logger.debug("PDS URL: %s"%(pds_url)) 210 return pds_url
211 212 213 @classmethod
214 - def get_cds_url(cls, application_url, cds_id):
215 cds_url = application_url + RedisCoordinationAdaptor.SEPARATOR +cds_id 216 logger.debug("CDS URL: %s"%(cds_url)) 217 return cds_url
218 219 220 ########################################################################### 221 # internal Redis-related methods 222 @classmethod
223 - def __get_redis_api_client(cls):
224 import redis 225 ''' Initialize Redis API Client ''' 226 saga_url = saga.Url(RedisCoordinationAdaptor.BASE_URL) 227 username = saga_url.username 228 server = saga_url.host 229 server_port = saga_url.port 230 if username==None or username=="": 231 redis_client = redis.Redis(host=server, port=server_port, db=0) 232 else: 233 redis_client = redis.Redis(host=server, port=server_port, password=username, db=0) 234 235 try: 236 redis_client.ping() 237 except: 238 logger.error("Please start Redis server!") 239 raise Exception("Please start Redis server!") 240 return redis_client
241 242 243 @classmethod
244 - def __get_url(cls, url):
245 return url
246 247 248 @classmethod
249 - def __list_keys(cls, search_url):
250 redis_client = cls.__get_redis_api_client() 251 keys = redis_client.keys(search_url) 252 keys_normalized = [i[:i.index(":info")] for i in keys] 253 return keys_normalized
254 255 256 @classmethod
257 - def __store_entry(cls, entry_url, content):
258 entry_url = cls.__get_url(entry_url) 259 redis_client = cls.__get_redis_api_client() 260 redis_client.hmset(entry_url, content) 261 262 logger.debug("Store Redis entry at: " + entry_url 263 + " Content: " + str(json.dumps(content)))
264 265 @classmethod
266 - def __retrieve_entry(cls, entry_url):
267 entry_url = cls.__get_url(entry_url) 268 redis_client = cls.__get_redis_api_client() 269 content = redis_client.hgetall(entry_url) 270 271 logger.debug("Retrieve Redis entry at: " + entry_url 272 + " Content: " + str(json.dumps(content))) 273 return content
274