1818import logging
1919import threading
2020import time
21+ from pathlib import Path
2122from typing import Optional
2223
24+ import pyarrow
25+ import pyarrow .fs
26+ from pypaimon .common .options .config import CatalogOptions , OssOptions
2327from pyarrow ._fs import FileSystem
2428
2529from pypaimon .api .rest_api import RESTApi
30+ from pypaimon .common .options import Options
31+ from pypaimon .api .rest_util import RESTUtil
2632from pypaimon .catalog .rest .rest_token import RESTToken
2733from pypaimon .common .file_io import FileIO
2834from pypaimon .common .identifier import Identifier
29- from pypaimon . common . options import Options
30- from pypaimon . common . options . config import CatalogOptions , OssOptions
35+
36+ from cachetools import TTLCache
3137
3238
3339class RESTTokenFileIO (FileIO ):
40+ _FILESYSTEM_CACHE : TTLCache = TTLCache (maxsize = 1000 , ttl = 36000 ) # 10 hours TTL
41+ _CACHE_LOCK = threading .Lock ()
42+ _TOKEN_CACHE : dict = {}
43+ _TOKEN_LOCKS : dict = {}
44+ _TOKEN_LOCKS_LOCK = threading .Lock ()
3445
3546 def __init__ (self , identifier : Identifier , path : str ,
3647 catalog_options : Optional [Options ] = None ):
3748 self .identifier = identifier
3849 self .path = path
3950 self .token : Optional [RESTToken ] = None
4051 self .api_instance : Optional [RESTApi ] = None
41- self .lock = threading .Lock ()
52+ self .lock = threading .RLock ()
4253 self .log = logging .getLogger (__name__ )
4354 super ().__init__ (path , catalog_options )
4455
@@ -53,33 +64,241 @@ def __getstate__(self):
5364 def __setstate__ (self , state ):
5465 self .__dict__ .update (state )
5566 # Recreate lock after deserialization
56- self .lock = threading .Lock ()
67+ self .lock = threading .RLock ()
5768 # api_instance will be recreated when needed
5869 self .api_instance = None
5970
60- def _initialize_oss_fs (self , path ) -> FileSystem :
71+ def _initialize_oss_fs (self , path , properties : Optional [Options ] = None ) -> FileSystem :
72+ if properties is not None :
73+ with self .lock :
74+ original_properties = self .properties
75+ self .properties = properties
76+ try :
77+ return super ()._initialize_oss_fs (path )
78+ finally :
79+ self .properties = original_properties
80+
6181 self .try_to_refresh_token ()
62- merged_token = self ._merge_token_with_catalog_options (self .token .token )
63- self .properties .data .update (merged_token )
64- return super ()._initialize_oss_fs (path )
82+ if self .token is None :
83+ return super ()._initialize_oss_fs (path )
84+
85+ merged_properties = RESTUtil .merge (
86+ self .properties .to_map () if self .properties else {},
87+ self ._merge_token_with_catalog_options (self .token .token )
88+ )
89+ merged_options = Options (merged_properties )
90+
91+ with self .lock :
92+ original_properties = self .properties
93+ self .properties = merged_options
94+ try :
95+ return super ()._initialize_oss_fs (path )
96+ finally :
97+ self .properties = original_properties
98+
99+ def _initialize_s3_fs (self , properties : Optional [Options ] = None ) -> FileSystem :
100+ if properties is not None :
101+ with self .lock :
102+ original_properties = self .properties
103+ self .properties = properties
104+ try :
105+ return super ()._initialize_s3_fs ()
106+ finally :
107+ self .properties = original_properties
108+
109+ self .try_to_refresh_token ()
110+ if self .token is None :
111+ return super ()._initialize_s3_fs ()
112+
113+ merged_properties = RESTUtil .merge (
114+ self .properties .to_map () if self .properties else {},
115+ self ._merge_token_with_catalog_options (self .token .token )
116+ )
117+ merged_options = Options (merged_properties )
118+
119+ with self .lock :
120+ original_properties = self .properties
121+ self .properties = merged_options
122+ try :
123+ return super ()._initialize_s3_fs ()
124+ finally :
125+ self .properties = original_properties
65126
66127 def _merge_token_with_catalog_options (self , token : dict ) -> dict :
67128 """Merge token with catalog options, DLF OSS endpoint should override the standard OSS endpoint."""
68129 merged_token = dict (token )
69- dlf_oss_endpoint = self .properties .get (CatalogOptions .DLF_OSS_ENDPOINT )
130+ with self .lock :
131+ dlf_oss_endpoint = self .properties .get (CatalogOptions .DLF_OSS_ENDPOINT )
70132 if dlf_oss_endpoint and dlf_oss_endpoint .strip ():
71133 merged_token [OssOptions .OSS_ENDPOINT .key ()] = dlf_oss_endpoint
72134 return merged_token
73135
136+ def get_merged_properties (self ) -> Options :
137+ self .try_to_refresh_token ()
138+ if self .token is None :
139+ with self .lock :
140+ return self .properties
141+
142+ with self .lock :
143+ properties_map = self .properties .to_map () if self .properties else {}
144+
145+ merged_properties = RESTUtil .merge (
146+ properties_map ,
147+ self .token .token
148+ )
149+ return Options (merged_properties )
150+
151+ def _get_filesystem (self ) -> FileSystem :
152+ self .try_to_refresh_token ()
153+
154+ if self .token is None :
155+ return self .filesystem
156+
157+ with self ._CACHE_LOCK :
158+ filesystem = self ._FILESYSTEM_CACHE .get (self .token )
159+ if filesystem is not None :
160+ return filesystem
161+
162+ with self .lock :
163+ properties_map = self .properties .to_map () if self .properties else {}
164+
165+ merged_properties = RESTUtil .merge (
166+ properties_map ,
167+ self .token .token
168+ )
169+ merged_options = Options (merged_properties )
170+
171+ scheme , netloc , _ = self .parse_location (self .path )
172+ if scheme in {"oss" }:
173+ filesystem = self ._initialize_oss_fs (self .path , merged_options )
174+ elif scheme in {"s3" , "s3a" , "s3n" }:
175+ filesystem = self ._initialize_s3_fs (merged_options )
176+ else :
177+ filesystem = self .filesystem
178+
179+ self ._FILESYSTEM_CACHE [self .token ] = filesystem
180+ return filesystem
181+
182+ def new_input_stream (self , path : str ):
183+ filesystem = self ._get_filesystem ()
184+ path_str = self .to_filesystem_path (path )
185+ return filesystem .open_input_file (path_str )
186+
74187 def new_output_stream (self , path : str ):
75- # Call parent class method to ensure path conversion and parent directory creation
76- return super ().new_output_stream (path )
188+ filesystem = self ._get_filesystem ()
189+ path_str = self .to_filesystem_path (path )
190+ parent_dir = Path (path_str ).parent
191+ if str (parent_dir ) and not self .exists (str (parent_dir )):
192+ self .mkdirs (str (parent_dir ))
193+ return filesystem .open_output_stream (path_str )
194+
195+ def get_file_status (self , path : str ):
196+ filesystem = self ._get_filesystem ()
197+ path_str = self .to_filesystem_path (path )
198+ file_infos = filesystem .get_file_info ([path_str ])
199+ return file_infos [0 ]
200+
201+ def list_status (self , path : str ):
202+ filesystem = self ._get_filesystem ()
203+ path_str = self .to_filesystem_path (path )
204+ selector = pyarrow .fs .FileSelector (path_str , recursive = False , allow_not_found = True )
205+ return filesystem .get_file_info (selector )
206+
207+ def exists (self , path : str ) -> bool :
208+ filesystem = self ._get_filesystem ()
209+ path_str = self .to_filesystem_path (path )
210+ file_info = filesystem .get_file_info ([path_str ])[0 ]
211+ return file_info .type != pyarrow .fs .FileType .NotFound
212+
213+ def delete (self , path : str , recursive : bool = False ) -> bool :
214+ filesystem = self ._get_filesystem ()
215+ path_str = self .to_filesystem_path (path )
216+ file_info = filesystem .get_file_info ([path_str ])[0 ]
217+ if file_info .type == pyarrow .fs .FileType .NotFound :
218+ return False
219+ if file_info .type == pyarrow .fs .FileType .Directory :
220+ if recursive :
221+ filesystem .delete_dir_contents (path_str )
222+ else :
223+ filesystem .delete_dir (path_str )
224+ else :
225+ filesystem .delete_file (path_str )
226+ return True
227+
228+ def mkdirs (self , path : str ) -> bool :
229+ filesystem = self ._get_filesystem ()
230+ path_str = self .to_filesystem_path (path )
231+ filesystem .create_dir (path_str , recursive = True )
232+ return True
233+
234+ def rename (self , src : str , dst : str ) -> bool :
235+ filesystem = self ._get_filesystem ()
236+ dst_str = self .to_filesystem_path (dst )
237+ dst_parent = Path (dst_str ).parent
238+ if str (dst_parent ) and not self .exists (str (dst_parent )):
239+ self .mkdirs (str (dst_parent ))
240+
241+ src_str = self .to_filesystem_path (src )
242+ filesystem .move (src_str , dst_str )
243+ return True
244+
245+ def copy_file (self , source_path : str , target_path : str , overwrite : bool = False ):
246+ if not overwrite and self .exists (target_path ):
247+ raise FileExistsError (f"Target file { target_path } already exists and overwrite=False" )
248+
249+ filesystem = self ._get_filesystem ()
250+ source_str = self .to_filesystem_path (source_path )
251+ target_str = self .to_filesystem_path (target_path )
252+ filesystem .copy_file (source_str , target_str )
77253
78254 def try_to_refresh_token (self ):
79- if self .should_refresh ():
80- with self .lock :
81- if self .should_refresh ():
82- self .refresh_token ()
255+ identifier_str = str (self .identifier )
256+
257+ # Fast path 1: Check instance token
258+ if self .token is not None and not self ._is_token_expired (self .token ):
259+ return
260+
261+ # Fast path 2: Check global cache
262+ cached_token = self ._get_cached_token (identifier_str )
263+ if cached_token and not self ._is_token_expired (cached_token ):
264+ self .token = cached_token
265+ return
266+
267+ # Slow path: Acquire global lock for this identifier
268+ global_lock = self ._get_global_token_lock ()
269+ with global_lock :
270+ cached_token = self ._get_cached_token (identifier_str )
271+ if cached_token and not self ._is_token_expired (cached_token ):
272+ self .token = cached_token
273+ return
274+
275+ token_to_check = cached_token if cached_token else self .token
276+ if token_to_check is None or self ._is_token_expired (token_to_check ):
277+ self .refresh_token ()
278+ self ._set_cached_token (identifier_str , self .token )
279+ else :
280+ self .token = cached_token if cached_token else self .token
281+
282+ def _get_cached_token (self , identifier_str : str ) -> Optional [RESTToken ]:
283+ with self ._TOKEN_LOCKS_LOCK :
284+ return self ._TOKEN_CACHE .get (identifier_str )
285+
286+ def _set_cached_token (self , identifier_str : str , token : RESTToken ):
287+ with self ._TOKEN_LOCKS_LOCK :
288+ self ._TOKEN_CACHE [identifier_str ] = token
289+
290+ def _get_global_token_lock (self ) -> threading .Lock :
291+ identifier_str = str (self .identifier )
292+ with self ._TOKEN_LOCKS_LOCK :
293+ if identifier_str not in self ._TOKEN_LOCKS :
294+ self ._TOKEN_LOCKS [identifier_str ] = threading .Lock ()
295+ return self ._TOKEN_LOCKS [identifier_str ]
296+
297+ def _is_token_expired (self , token : RESTToken ) -> bool :
298+ if token is None :
299+ return True
300+ current_time = int (time .time () * 1000 )
301+ return (token .expire_at_millis - current_time ) < RESTApi .TOKEN_EXPIRATION_SAFE_TIME_MILLIS
83302
84303 def should_refresh (self ):
85304 if self .token is None :
@@ -90,13 +309,50 @@ def should_refresh(self):
90309 def refresh_token (self ):
91310 self .log .info (f"begin refresh data token for identifier [{ self .identifier } ]" )
92311 if self .api_instance is None :
93- self .api_instance = RESTApi (self .properties , False )
312+ if not self .properties :
313+ raise RuntimeError (
314+ "Cannot refresh token: properties is None or empty. "
315+ "This may indicate a serialization issue when passing RESTTokenFileIO to Ray workers."
316+ )
317+
318+ if not self .properties .contains (CatalogOptions .URI ):
319+ available_keys = list (self .properties .data .keys ()) if self .properties else []
320+ raise RuntimeError (
321+ f"Cannot refresh token: missing required configuration '{ CatalogOptions .URI } ' in properties. "
322+ "This is required to create RESTApi for token refresh. "
323+ f"Available configuration keys: { available_keys } . "
324+ "This may indicate that catalog options were not properly serialized when passing to Ray workers."
325+ )
326+
327+ uri = self .properties .get (CatalogOptions .URI )
328+ if not uri or not uri .strip ():
329+ raise RuntimeError (
330+ f"Cannot refresh token: '{ CatalogOptions .URI } ' is empty or whitespace. "
331+ f"Value: '{ uri } '. Please ensure the REST catalog URI is properly configured."
332+ )
333+
334+ try :
335+ self .api_instance = RESTApi (self .properties , False )
336+ except Exception as e :
337+ raise RuntimeError (
338+ f"Failed to create RESTApi for token refresh: { e } . "
339+ "Please check that all required catalog options are properly configured. "
340+ f"Identifier: { self .identifier } "
341+ ) from e
342+
343+ try :
344+ response = self .api_instance .load_table_token (self .identifier )
345+ except Exception as e :
346+ raise RuntimeError (
347+ f"Failed to load table token from REST API: { e } . "
348+ f"Identifier: { self .identifier } , URI: { self .properties .get (CatalogOptions .URI )} "
349+ ) from e
94350
95- response = self .api_instance .load_table_token (self .identifier )
96351 self .log .info (
97352 f"end refresh data token for identifier [{ self .identifier } ] expiresAtMillis [{ response .expires_at_millis } ]"
98353 )
99- self .token = RESTToken (response .token , response .expires_at_millis )
354+ merged_token = self ._merge_token_with_catalog_options (response .token )
355+ self .token = RESTToken (merged_token , response .expires_at_millis )
100356
101357 def valid_token (self ):
102358 self .try_to_refresh_token ()
0 commit comments