Package pilot :: Package filemanagement :: Module s3_adaptor

Source Code for Module pilot.filemanagement.s3_adaptor

  1  ''' 
  2  Amazon S3 based File Transfer Implementation 
  3  ''' 
  4  import urlparse 
  5   
  6  import errno 
  7  import sys 
  8  import os 
  9  import stat 
 10  import logging 
 11  import traceback 
 12  import time 
 13   
 14  # This is for local debugging! 
 15  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) 
 16   
 17  import bliss.saga as saga 
 18  from pilot.api import State 
 19  from bigjob import logger 
 20   
 21  ################## 
 22   
 23  from boto.s3.connection import S3Connection, OrdinaryCallingFormat 
 24  from boto.s3.key import Key 
 25  from boto.s3.connection import Location 
 26   
 27  # Authentication 
 28  # Please use ~/.boto file to configure your security credentials (if possible) 
 29  # see http://boto.readthedocs.org/en/latest/boto_config_tut.html 
 30  #  
 31  # [Credentials] 
 32  # aws_access_key_id = <your access key> 
 33  # aws_secret_access_key = <your secret key> 
 34  # 
 35  # Alternatively you can use these two variables 
 36  AWS_ACCESS_KEY_ID=None 
 37  AWS_SECRET_ACCESS_KEY=None 
 38   
39 -class S3FileAdaptor(object):
40 """ BigData File Management for Pilot Data 41 Supports pilot data on top of S3 and Eucalyptus Walrus 42 43 s3:// 44 walrus://<endpoint-ip> 45 46 Amazon S3 supported regions: 47 48 Default is an empty string "" => us-east-1 49 50 'ap-northeast-1' 51 'ap-southeast-1' 52 'ap-southeast-2' 53 'EU' 54 'sa-east-1' 55 'us-west-1' 56 'us-west-2' 57 58 """ 59 60 61
62 - def __init__(self, resource_url, security_context=None, pilot_data_description=None):
63 self.resource_url = saga.Url(resource_url) 64 self.bucket_name = self.__get_bucket_name(resource_url) 65 self.__state=State.New 66 self.pilot_data_description = pilot_data_description 67 aws_access_key_id=None 68 aws_secret_access_key=None 69 70 # try to recover key from pilot_data_description 71 if self.pilot_data_description!=None and\ 72 self.pilot_data_description.has_key("access_key_id") and \ 73 self.pilot_data_description.has_key("secret_access_key"): 74 aws_access_key_id=self.pilot_data_description["access_key_id"] 75 aws_secret_access_key=self.pilot_data_description["secret_access_key"] 76 77 # try to recover key from security context 78 if security_context!=None: 79 logger.debug("Attempt to restore credentials from security context: " + str(security_context)) 80 security_context = eval(security_context) 81 aws_access_key_id=security_context["aws_access_key_id"] 82 aws_secret_access_key=security_context["aws_secret_access_key"] 83 84 self.s3_conn=None 85 if self.resource_url.scheme == "walrus" or self.resource_url.scheme == "swift": 86 calling_format=OrdinaryCallingFormat() 87 logger.debug("Access Key: %s Secret: %s Host: %s"%(aws_access_key_id, 88 aws_secret_access_key, 89 self.resource_url.host) 90 ) 91 port = 8773 92 if self.resource_url.port!=None: 93 port = self.resource_url.port 94 95 path = "/" 96 if self.resource_url.scheme == "walrus": 97 path = "/services/Walrus" 98 99 self.s3_conn = S3Connection(aws_access_key_id=aws_access_key_id, 100 aws_secret_access_key=aws_secret_access_key, 101 is_secure=False, 102 host=self.resource_url.host, 103 port=port, 104 calling_format=calling_format, 105 path=path) 106 else: # s3:// urls 107 self.s3_region = None 108 # Region specifier according to Amazon API: 109 # http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html 110 if self.pilot_data_description.has_key("region"): 111 self.s3_region = self.pilot_data_description["region"] 112 113 self.s3_conn = S3Connection( 114 aws_access_key_id, 115 aws_secret_access_key, 116 )
117 118 119
120 - def get_security_context(self):
121 """ Returns security context that needs to be available on the distributed 122 node in order to access this Pilot Data """ 123 return {"aws_access_key_id": self.s3_conn.aws_access_key_id, 124 "aws_secret_access_key": self.s3_conn.aws_secret_access_key}
125 126 127
128 - def initialize_pilotdata(self):
129 # Create bucket 130 try: 131 if self.s3_region==None: 132 logger.debug("Use default S3 region.") 133 self.s3_region = "" # Default for US East 134 self.bucket = self.s3_conn.create_bucket(self.bucket_name, location=self.s3_region) 135 except: 136 # bucket already exists 137 #self.__print_traceback() 138 self.bucket = self.s3_conn.get_bucket(self.bucket_name) 139 140 self.__state=State.Running
141 142
143 - def get_pilotdata_size(self):
144 # unlimited size 145 return None
146 147
148 - def delete_pilotdata(self):
149 self.__state=State.Done
150 151
152 - def get_state(self):
153 return self.__state
154 155
156 - def create_du(self, du_id):
157 logger.debug("create object: " + du_id) 158 k = Key(self.bucket) 159 k.key = str(du_id)+"/du_info" 160 k.set_contents_from_string(du_id)
161 162
163 - def put_du(self, du):
164 logger.debug("Copy DU to S3/Walrus") 165 du_items = du.list() 166 for i in du_items.keys(): 167 try: 168 local_filename=du_items[i]["local"] 169 remote_path = os.path.join(str(du.id), os.path.basename(local_filename)) 170 logger.debug("copy %s to %s"%(local_filename, remote_path)) 171 self._put_file(local_filename, remote_path) 172 except: 173 logger.debug("Could not copy: " + str(i))
174 175
176 - def get_du(self, du, target_url):
177 #du_id = "du-7370d7b5-ed0b-11e1-95df-705681b3df0f" 178 du_id = du.id 179 logger.debug("Get DU: " + str(du_id)) 180 result = self.bucket.list(prefix=du_id) 181 logger.debug("Result Bucket List: " + str(result)) 182 for key in result: 183 logger.debug(str(key)) 184 full_filename = key.name 185 if full_filename != None: 186 logger.debug("Process file: " + full_filename) 187 if not full_filename.endswith("/"): 188 self._get_file(full_filename, os.path.join(target_url, os.path.basename(full_filename)))
189 190
191 - def copy_du(self, du, pd_new):
192 remote_url = pd_new.resource_url + "/" + str(du.id) 193 local_url = self.resource_url + "/" + str(du.id) 194 self.copy_du_to_url(du, local_url, remote_url)
195 196 197 198
199 - def remove_du(self, du):
200 self.__remove_directory(os.path.join(self.path, du.id))
201 202 203 ########################################################################### 204 # Pure File Management APIs
205 - def _put_file(self, source, target):
206 logger.debug("Put file: %s to %s"%(source, target)) 207 if self.__starts_with_valid_prefix(source): 208 logger.debug("Copy file from S3/Walrus") 209 source_bucket_name = self.__get_bucket_name(source) 210 source_key_name = self.__get_key_name(source) 211 self.bucket.copy_key(target, source_bucket_name, source_key_name) 212 #k = Key(source_bucket_name) 213 #k.copy(self.bucket_name, target) 214 else: 215 logger.debug("Copy file from Local") 216 k = Key(self.bucket) 217 k.key=target 218 k.set_contents_from_filename(source) 219 logger.debug("Put file result: %s"%source)
220 221
222 - def _get_file(self, source, target):
223 logger.debug("GET file: %s to %s"%(source, target)) 224 k = self.bucket.get_key(source) 225 k.key=source 226 k.get_contents_to_filename(target)
227 228
229 - def transfer(self, source_url, target_url):
230 pass
231
232 - def create_remote_directory(self, target_url):
233 return True
234 235 236 ###########################################################################
237 - def __starts_with_valid_prefix(self, url):
238 valid_prefix=["s3", "walrus"] 239 result = False 240 for i in valid_prefix: 241 result = url.startswith(i) 242 if result == True: 243 break 244 return result
245
246 - def __get_bucket_name(self, resource_url):
247 surl = saga.Url(resource_url) 248 if surl.scheme.startswith("s3"): 249 bucket_name = resource_url.replace("s3://", "") 250 try: 251 bucket_name = bucket_name[:bucket_name.index("/")] 252 except: 253 pass 254 #bucket_name = bucket_name.replace("/", "") 255 else: 256 bucket_name = surl.path[1:] 257 return bucket_name
258 259
260 - def __get_key_name(self, resource_url):
261 surl = saga.Url(resource_url) 262 # get path out of URL 263 if surl.scheme.startswith("s3"): 264 bucket_name = resource_url.replace("s3://", "") 265 else: 266 bucket_name = surl.path[1:] 267 268 # get key path out of URL 269 try: 270 key_name = bucket_name[bucket_name.index("/")+1:] 271 except: 272 pass 273 274 return key_name
275 276
277 - def __print_traceback(self):
278 exc_type, exc_value, exc_traceback = sys.exc_info() 279 print "*** print_tb:" 280 traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) 281 print "*** print_exception:" 282 traceback.print_exception(exc_type, exc_value, exc_traceback, 283 limit=2, file=sys.stdout)
284 285
286 -def test_walrus():
287 s3 = S3FileAdaptor("walrus://149.165.146.135/pilot-data-c4eb26eb-ed0c-11e1-ac98-705681b3df0f", 288 pilot_data_description={ "access_key_id":"8MCXRAMXMHDYKWNKXZ8WF", 289 "secret_access_key":"YrcUqSw2Arxshrh3ZtenkxerWwCWdMTKvZYoLPAo" }) 290 s3.initialize_pilotdata() 291 s3._put_file("test.txt", "du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt") 292 s3._get_file("du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt", "test2.txt") 293 s3.get_du("du-7370d7b5-ed0b-11e1-95df-705681b3df0f", ".")
294
295 -def test_swift():
296 s3 = S3FileAdaptor("swift://149.165.146.50:3333/pilot-data-c4eb26eb-ed0c-11e1-ac98-705681b3df0f", 297 pilot_data_description={ "access_key_id":"f9716a49c92a4a4cbedb6aba5e78d682", 298 "secret_access_key":"bcdff54b7fe94d63b4412c762e823a84" }) 299 s3.initialize_pilotdata() 300 s3._put_file("test.txt", "du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt") 301 s3._get_file("du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt", "test2.txt") 302 s3.get_du("du-7370d7b5-ed0b-11e1-95df-705681b3df0f", ".")
303
304 -def test_s3import():
305 s3 = S3FileAdaptor("s3://pilot-data-andre-test-create-from-s3-url", 306 pilot_data_description={ "access_key_id":"AKIAJPGNDJRYIG5LIEUA", 307 "secret_access_key":"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" }) 308 s3.initialize_pilotdata() 309 s3._put_file("s3://pilot-data-05d88e40-f65b-11e1-a327-00215ec9e3ac/du-3624837e-f66f-11e1-a327-00215ec9e3ac/WRT54GS_UG_WEB_20070529.pdf", "bla/test.pdf")
310
311 -def test_s3import_via_pilotapi():
312 COORDINATION_URL="redis://ILikeBigJob_wITH-REdIS@gw68.quarry.iu.teragrid.org:6379" 313 from pilot import PilotComputeService, PilotDataService, ComputeDataService, State 314 pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL) 315 316 ################################################################################################### 317 # Pick one of the Pilot Data Descriptions below 318 319 pilot_data_description_aws={ 320 "service_url": "s3://pilot-data-andre-workflow", 321 "size": 100, 322 "affinity_datacenter_label": "us-east-1", 323 "affinity_machine_label": "" , 324 "access_key_id": "AKIAJPGNDJRYIG5LIEUA", 325 "secret_access_key":"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", 326 } 327 328 pd = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description_aws) 329 330 data_unit_description = { 331 "file_urls": ['s3://pilot-data-cec5d816-fa8f-11e1-ab5e-e61f1322a75c/du-67b4c762-fa90-11e1-ab5e-e61f1322a75c/ip-10-84-173-21512MB_2.input-chunk-02'], 332 "affinity_datacenter_label": "us-east-1", 333 "affinity_machine_label": "" 334 } 335 336 # submit pilot data to a pilot store 337 input_data_unit = pd.submit_data_unit(data_unit_description) 338 input_data_unit.wait()
339 340 341 if __name__ == "__main__": 342 test_s3import_via_pilotapi() 343