1  import logging 
  2  import pdb 
  3  from pilot import * 
  4  from bigjob import logger 
  5  import bliss.saga as saga 
  6   
  7  try: 
  8      import json 
  9  except ImportError: 
 10      import simplejson as json 
 14      """ 
 15          Dummy Adaptor - No distributed coordination done     
 16      """ 
 17      BASE_URL="redis://localhost/" 
 18      SEPARATOR=":" 
 19       
 20      PILOT_PATH="pilot" 
 21      PILOT_DATA_PATH=PILOT_PATH 
 22      PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR + "pds" 
 23      DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR +"dus" 
 24      COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + SEPARATOR + "cds" 
 25   
 26       
 27       
 28       
 29        
 30      @classmethod 
 33       
 34      @classmethod 
 36          if cls.BASE_URL==None: 
 37              logger.error("Coordination URL not set. Exiting Pilot-Data.") 
 38              raise Exception("Coordination URL not set. Exiting Pilot-Data.") 
 39          surl = saga.Url(cls.BASE_URL) 
 40          base_url = surl.scheme + "://" + surl.host + "/" + application_id  
 41          logger.debug(base_url) 
 42          return base_url 
  43       
 44       
 45       
 46       
 47      @classmethod   
 48 -    def add_pds(cls, application_url, pds): 
  49          pds_url_no_dbtype = cls.get_pds_url(application_url, pds.id) 
 50          pds_url = cls.__get_url(pds_url_no_dbtype) 
 51          logger.debug("Create PDS directory at %s"%pds_url) 
 52          return pds_url_no_dbtype 
  53       
 54       
 55      @classmethod 
 57          pds_url = cls.__get_url(pds_url) 
  58           
 59           
 60       
 61       
 62       
 63       
 64      @classmethod 
 68       
 69       
 70      @classmethod 
 72          du_urls=None 
 73          if len(pd.data_unit_urls) > 0: 
 74              du_urls = pd.data_unit_urls 
 75           
 76          pd_dict={ 
 77                   "data_unit_urls": du_urls, 
 78                   "pilot_data": pd.to_dict(), 
 79                   "pilot_data_description": pd.pilot_data_description, 
 80                   "security_context": pd.security_context 
 81                   } 
 82           
 83          cls.__store_entry(pd.url+RedisCoordinationAdaptor.SEPARATOR + "info", pd_dict) 
  84       
 85           
 86      @classmethod 
 91           
 92       
 93      @classmethod 
 95          """ return a list of urls to pd managed by the PDS """ 
 96          pds_url = cls.__get_url(pds_url) 
 97          logger.debug("List PD at %s"%pds_url) 
  98           
 99       
100      @classmethod 
102          pds_url = cls.__get_url(pds_url) 
 103           
104           
105           
106           
107           
108       
109           
110       
111       
112      @classmethod   
113 -    def add_cds(cls, application_url, cds): 
 114          cds_url_no_dbtype = cls.get_cds_url(application_url, cds.id) 
115          cds_url = cls.__get_url(cds_url_no_dbtype) 
116          logger.debug("Create CDS directory at %s"%cds_url) 
117           
118           
119           
120          return cds_url_no_dbtype 
 121       
122       
123      @classmethod   
125           
126           
127          pds_urls = [i.url for i in cds.pilot_data_services] 
128           
129           
130          pjs_urls = [i.url for i in cds.pilot_job_services] 
131           
132           
133           
134          pd_urls = [i.url for i in cds.data_units.values()] 
135           
136           
137          wu_urls = [i.url for i in cds.compute_units.values()] 
 138           
139               
140           
141      @classmethod 
143          cds_url = cls.__get_url(cds_url) 
 144           
145           
146           
147           
148           
149       
150       
151       
152           
153       
154       
155      @classmethod 
156 -    def add_du(cls, root_url, du): 
 160   
161       
162      @classmethod 
164          logger.debug("**** GET DU: " + str(du_url)) 
165          du_dict=cls.__retrieve_entry(du_url+ RedisCoordinationAdaptor.SEPARATOR + "info")    
166          logger.debug("Retrieved DU: " + du_url + " Content: " + str(du_dict))      
167          return du_dict 
 168       
169        
170      @classmethod   
172          logger.debug("**** Update data unit at: " + du.url) 
173          du_dict_list = [i.to_dict() for i in du.data_unit_items] 
174          du_urls = [i.url for i in du.pilot_data] 
175          du_dict = { 
176                  "data_unit_description":du.data_unit_description, 
177                  "state": du.state, 
178                  "pilot_data": du_urls, 
179                  "data_unit_items": du_dict_list 
180                  } 
181          cls.__store_entry(du.url + RedisCoordinationAdaptor.SEPARATOR + "info", du_dict) 
 182           
183          
184      @classmethod 
186          """ return a list of urls to du managed by the PDS """ 
187          pd_url = cls.__get_url(pd_url) 
188          logger.debug("List Data-Units of Pilot-Data at %s"%pd_url) 
189          dus = cls.__list_keys(pd_url+":du-*") 
190          return dus 
 191       
192       
193      @classmethod 
195          du_url = cls.__get_url(du_url) 
 196           
197           
198           
199           
200           
201       
202       
203       
204       
205       
206      @classmethod 
211       
212       
213      @classmethod 
218       
219       
220       
221       
222      @classmethod 
224          import redis 
225          ''' Initialize Redis API Client     ''' 
226          saga_url = saga.Url(RedisCoordinationAdaptor.BASE_URL) 
227          username = saga_url.username 
228          server = saga_url.host 
229          server_port = saga_url.port 
230          if username==None or username=="": 
231              redis_client = redis.Redis(host=server, port=server_port, db=0) 
232          else: 
233              redis_client = redis.Redis(host=server, port=server_port, password=username, db=0) 
234           
235          try: 
236              redis_client.ping() 
237          except: 
238              logger.error("Please start Redis server!") 
239              raise Exception("Please start Redis server!") 
240          return redis_client 
 241       
242       
243      @classmethod 
246       
247       
248      @classmethod 
250          redis_client = cls.__get_redis_api_client() 
251          keys = redis_client.keys(search_url) 
252          keys_normalized = [i[:i.index(":info")] for i in keys] 
253          return keys_normalized 
 254           
255           
256      @classmethod 
257 -    def __store_entry(cls, entry_url, content): 
 258          entry_url = cls.__get_url(entry_url) 
259          redis_client = cls.__get_redis_api_client() 
260          redis_client.hmset(entry_url, content) 
261           
262          logger.debug("Store Redis entry at: " + entry_url  
263                        + " Content: " + str(json.dumps(content))) 
 264           
265      @classmethod 
266 -    def __retrieve_entry(cls, entry_url): 
 267          entry_url = cls.__get_url(entry_url) 
268          redis_client = cls.__get_redis_api_client() 
269          content = redis_client.hgetall(entry_url) 
270           
271          logger.debug("Retrieve Redis entry at: " + entry_url  
272                        + " Content: " + str(json.dumps(content))) 
273          return content 
  274