1 '''
2 Amazon S3 based File Transfer Implementation
3 '''
4 import urlparse
5
6 import errno
7 import sys
8 import os
9 import stat
10 import logging
11 import traceback
12 import time
13
14
15 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
16
17 import bliss.saga as saga
18 from pilot.api import State
19 from bigjob import logger
20
21
22
23 from boto.s3.connection import S3Connection, OrdinaryCallingFormat
24 from boto.s3.key import Key
25 from boto.s3.connection import Location
26
27
28
29
30
31
32
33
34
35
36 AWS_ACCESS_KEY_ID=None
37 AWS_SECRET_ACCESS_KEY=None
38
40 """ BigData File Management for Pilot Data
41 Supports pilot data on top of S3 and Eucalyptus Walrus
42
43 s3://
44 walrus://<endpoint-ip>
45
46 Amazon S3 supported regions:
47
48 Default is an empty string "" => us-east-1
49
50 'ap-northeast-1'
51 'ap-southeast-1'
52 'ap-southeast-2'
53 'EU'
54 'sa-east-1'
55 'us-west-1'
56 'us-west-2'
57
58 """
59
60
61
62 - def __init__(self, resource_url, security_context=None, pilot_data_description=None):
63 self.resource_url = saga.Url(resource_url)
64 self.bucket_name = self.__get_bucket_name(resource_url)
65 self.__state=State.New
66 self.pilot_data_description = pilot_data_description
67 aws_access_key_id=None
68 aws_secret_access_key=None
69
70
71 if self.pilot_data_description!=None and\
72 self.pilot_data_description.has_key("access_key_id") and \
73 self.pilot_data_description.has_key("secret_access_key"):
74 aws_access_key_id=self.pilot_data_description["access_key_id"]
75 aws_secret_access_key=self.pilot_data_description["secret_access_key"]
76
77
78 if security_context!=None:
79 logger.debug("Attempt to restore credentials from security context: " + str(security_context))
80 security_context = eval(security_context)
81 aws_access_key_id=security_context["aws_access_key_id"]
82 aws_secret_access_key=security_context["aws_secret_access_key"]
83
84 self.s3_conn=None
85 if self.resource_url.scheme == "walrus" or self.resource_url.scheme == "swift":
86 calling_format=OrdinaryCallingFormat()
87 logger.debug("Access Key: %s Secret: %s Host: %s"%(aws_access_key_id,
88 aws_secret_access_key,
89 self.resource_url.host)
90 )
91 port = 8773
92 if self.resource_url.port!=None:
93 port = self.resource_url.port
94
95 path = "/"
96 if self.resource_url.scheme == "walrus":
97 path = "/services/Walrus"
98
99 self.s3_conn = S3Connection(aws_access_key_id=aws_access_key_id,
100 aws_secret_access_key=aws_secret_access_key,
101 is_secure=False,
102 host=self.resource_url.host,
103 port=port,
104 calling_format=calling_format,
105 path=path)
106 else:
107 self.s3_region = None
108
109
110 if self.pilot_data_description.has_key("region"):
111 self.s3_region = self.pilot_data_description["region"]
112
113 self.s3_conn = S3Connection(
114 aws_access_key_id,
115 aws_secret_access_key,
116 )
117
118
119
121 """ Returns security context that needs to be available on the distributed
122 node in order to access this Pilot Data """
123 return {"aws_access_key_id": self.s3_conn.aws_access_key_id,
124 "aws_secret_access_key": self.s3_conn.aws_secret_access_key}
125
126
127
129
130 try:
131 if self.s3_region==None:
132 logger.debug("Use default S3 region.")
133 self.s3_region = ""
134 self.bucket = self.s3_conn.create_bucket(self.bucket_name, location=self.s3_region)
135 except:
136
137
138 self.bucket = self.s3_conn.get_bucket(self.bucket_name)
139
140 self.__state=State.Running
141
142
146
147
150
151
154
155
157 logger.debug("create object: " + du_id)
158 k = Key(self.bucket)
159 k.key = str(du_id)+"/du_info"
160 k.set_contents_from_string(du_id)
161
162
164 logger.debug("Copy DU to S3/Walrus")
165 du_items = du.list()
166 for i in du_items.keys():
167 try:
168 local_filename=du_items[i]["local"]
169 remote_path = os.path.join(str(du.id), os.path.basename(local_filename))
170 logger.debug("copy %s to %s"%(local_filename, remote_path))
171 self._put_file(local_filename, remote_path)
172 except:
173 logger.debug("Could not copy: " + str(i))
174
175
176 - def get_du(self, du, target_url):
177
178 du_id = du.id
179 logger.debug("Get DU: " + str(du_id))
180 result = self.bucket.list(prefix=du_id)
181 logger.debug("Result Bucket List: " + str(result))
182 for key in result:
183 logger.debug(str(key))
184 full_filename = key.name
185 if full_filename != None:
186 logger.debug("Process file: " + full_filename)
187 if not full_filename.endswith("/"):
188 self._get_file(full_filename, os.path.join(target_url, os.path.basename(full_filename)))
189
190
195
196
197
198
200 self.__remove_directory(os.path.join(self.path, du.id))
201
202
203
204
206 logger.debug("Put file: %s to %s"%(source, target))
207 if self.__starts_with_valid_prefix(source):
208 logger.debug("Copy file from S3/Walrus")
209 source_bucket_name = self.__get_bucket_name(source)
210 source_key_name = self.__get_key_name(source)
211 self.bucket.copy_key(target, source_bucket_name, source_key_name)
212
213
214 else:
215 logger.debug("Copy file from Local")
216 k = Key(self.bucket)
217 k.key=target
218 k.set_contents_from_filename(source)
219 logger.debug("Put file result: %s"%source)
220
221
223 logger.debug("GET file: %s to %s"%(source, target))
224 k = self.bucket.get_key(source)
225 k.key=source
226 k.get_contents_to_filename(target)
227
228
229 - def transfer(self, source_url, target_url):
231
234
235
236
238 valid_prefix=["s3", "walrus"]
239 result = False
240 for i in valid_prefix:
241 result = url.startswith(i)
242 if result == True:
243 break
244 return result
245
247 surl = saga.Url(resource_url)
248 if surl.scheme.startswith("s3"):
249 bucket_name = resource_url.replace("s3://", "")
250 try:
251 bucket_name = bucket_name[:bucket_name.index("/")]
252 except:
253 pass
254
255 else:
256 bucket_name = surl.path[1:]
257 return bucket_name
258
259
261 surl = saga.Url(resource_url)
262
263 if surl.scheme.startswith("s3"):
264 bucket_name = resource_url.replace("s3://", "")
265 else:
266 bucket_name = surl.path[1:]
267
268
269 try:
270 key_name = bucket_name[bucket_name.index("/")+1:]
271 except:
272 pass
273
274 return key_name
275
276
278 exc_type, exc_value, exc_traceback = sys.exc_info()
279 print "*** print_tb:"
280 traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
281 print "*** print_exception:"
282 traceback.print_exception(exc_type, exc_value, exc_traceback,
283 limit=2, file=sys.stdout)
284
285
287 s3 = S3FileAdaptor("walrus://149.165.146.135/pilot-data-c4eb26eb-ed0c-11e1-ac98-705681b3df0f",
288 pilot_data_description={ "access_key_id":"8MCXRAMXMHDYKWNKXZ8WF",
289 "secret_access_key":"YrcUqSw2Arxshrh3ZtenkxerWwCWdMTKvZYoLPAo" })
290 s3.initialize_pilotdata()
291 s3._put_file("test.txt", "du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt")
292 s3._get_file("du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt", "test2.txt")
293 s3.get_du("du-7370d7b5-ed0b-11e1-95df-705681b3df0f", ".")
294
296 s3 = S3FileAdaptor("swift://149.165.146.50:3333/pilot-data-c4eb26eb-ed0c-11e1-ac98-705681b3df0f",
297 pilot_data_description={ "access_key_id":"f9716a49c92a4a4cbedb6aba5e78d682",
298 "secret_access_key":"bcdff54b7fe94d63b4412c762e823a84" })
299 s3.initialize_pilotdata()
300 s3._put_file("test.txt", "du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt")
301 s3._get_file("du-7370d7b5-ed0b-11e1-95df-705681b3df0f/test.txt", "test2.txt")
302 s3.get_du("du-7370d7b5-ed0b-11e1-95df-705681b3df0f", ".")
303
305 s3 = S3FileAdaptor("s3://pilot-data-andre-test-create-from-s3-url",
306 pilot_data_description={ "access_key_id":"AKIAJPGNDJRYIG5LIEUA",
307 "secret_access_key":"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" })
308 s3.initialize_pilotdata()
309 s3._put_file("s3://pilot-data-05d88e40-f65b-11e1-a327-00215ec9e3ac/du-3624837e-f66f-11e1-a327-00215ec9e3ac/WRT54GS_UG_WEB_20070529.pdf", "bla/test.pdf")
310
312 COORDINATION_URL="redis://ILikeBigJob_wITH-REdIS@gw68.quarry.iu.teragrid.org:6379"
313 from pilot import PilotComputeService, PilotDataService, ComputeDataService, State
314 pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL)
315
316
317
318
319 pilot_data_description_aws={
320 "service_url": "s3://pilot-data-andre-workflow",
321 "size": 100,
322 "affinity_datacenter_label": "us-east-1",
323 "affinity_machine_label": "" ,
324 "access_key_id": "AKIAJPGNDJRYIG5LIEUA",
325 "secret_access_key":"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",
326 }
327
328 pd = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description_aws)
329
330 data_unit_description = {
331 "file_urls": ['s3://pilot-data-cec5d816-fa8f-11e1-ab5e-e61f1322a75c/du-67b4c762-fa90-11e1-ab5e-e61f1322a75c/ip-10-84-173-21512MB_2.input-chunk-02'],
332 "affinity_datacenter_label": "us-east-1",
333 "affinity_machine_label": ""
334 }
335
336
337 input_data_unit = pd.submit_data_unit(data_unit_description)
338 input_data_unit.wait()
339
340
341 if __name__ == "__main__":
342 test_s3import_via_pilotapi()
343