BucketReaperTask.java (eucalyptus-4.4.1) | : | BucketReaperTask.java (eucalyptus-4.4.2) | ||
---|---|---|---|---|
skipping to change at line 25 | skipping to change at line 25 | |||
* | * | |||
* Please contact Eucalyptus Systems, Inc., 6755 Hollister Ave., Goleta | * Please contact Eucalyptus Systems, Inc., 6755 Hollister Ave., Goleta | |||
* CA 93117, USA or visit http://www.eucalyptus.com/licenses/ if you need | * CA 93117, USA or visit http://www.eucalyptus.com/licenses/ if you need | |||
* additional information or have any questions. | * additional information or have any questions. | |||
*/ | */ | |||
package com.eucalyptus.objectstorage.asynctask; | package com.eucalyptus.objectstorage.asynctask; | |||
import java.util.List; | import java.util.List; | |||
import java.util.Random; | import java.util.Random; | |||
import java.util.concurrent.TimeUnit; | ||||
import java.util.concurrent.atomic.AtomicLong; | ||||
import org.apache.log4j.Level; | ||||
import org.apache.log4j.Logger; | import org.apache.log4j.Logger; | |||
import com.eucalyptus.objectstorage.BucketMetadataManagers; | import com.eucalyptus.objectstorage.BucketMetadataManagers; | |||
import com.eucalyptus.objectstorage.BucketState; | import com.eucalyptus.objectstorage.BucketState; | |||
import com.eucalyptus.objectstorage.ObjectMetadataManagers; | import com.eucalyptus.objectstorage.ObjectMetadataManagers; | |||
import com.eucalyptus.objectstorage.OsgBucketFactory; | import com.eucalyptus.objectstorage.OsgBucketFactory; | |||
import com.eucalyptus.objectstorage.PaginatedResult; | import com.eucalyptus.objectstorage.PaginatedResult; | |||
import com.eucalyptus.objectstorage.entities.Bucket; | import com.eucalyptus.objectstorage.entities.Bucket; | |||
import com.eucalyptus.objectstorage.entities.ObjectEntity; | import com.eucalyptus.objectstorage.entities.ObjectEntity; | |||
import com.eucalyptus.objectstorage.entities.ObjectStorageGlobalConfiguration; | import com.eucalyptus.objectstorage.entities.ObjectStorageGlobalConfiguration; | |||
import com.eucalyptus.objectstorage.providers.ObjectStorageProviders; | import com.eucalyptus.objectstorage.providers.ObjectStorageProviders; | |||
import com.eucalyptus.storage.config.ConfigurationCache; | import com.eucalyptus.storage.config.ConfigurationCache; | |||
import com.google.common.base.Supplier; | ||||
import com.google.common.base.Suppliers; | ||||
/** | /** | |||
* Scans metadata for each objects in a bucket and cleans history for each Many | * Scans metadata for each objects in a bucket and cleans history for each. | |||
of these may be running concurrently. Has a self-imposed timeout of 30 | ||||
* seconds. | ||||
* | ||||
* This should be more than sufficient given that it only manipulates metadata a | ||||
nd never interacts with the backend. | ||||
* | ||||
*/ | */ | |||
public class BucketReaperTask implements Runnable { | public class BucketReaperTask { | |||
private static final Logger LOG = Logger.getLogger(BucketReaperTask.class); | private static final Logger LOG = Logger.getLogger(BucketReaperTask.class); | |||
private long startTime; | private static final long TASK_INTERVAL = 30 * 1000; // 30 seconds | |||
private static final long MAX_TASK_DURATION = 30 * 1000; // 30 seconds | private static final long TIMEOUT_LOG_INTERVAL = TimeUnit.HOURS.toMillis( 1 ); | |||
private static final AtomicLong TIMEOUT_LOG_TOKEN = new AtomicLong( ); | ||||
private static final Random rand = new Random(System.currentTimeMillis()); | private static final Random rand = new Random(System.currentTimeMillis()); | |||
private boolean interrupted = false; | ||||
private volatile long startTime; | ||||
private volatile long timeoutTime; | ||||
private volatile boolean interrupted = false; | ||||
public BucketReaperTask() {} | public BucketReaperTask() {} | |||
// Does a single scan of all objects in the bucket and does history cleanup on each | // Does a single scan of all objects in the bucket and does history cleanup on each | |||
@Override | public void run( final long nextRun ) { | |||
public void run() { | startTime = System.currentTimeMillis( ); | |||
startTime = System.currentTimeMillis(); | timeoutTime = nextRun - TASK_INTERVAL; | |||
Supplier<Level> timeoutLevelSupplier = Suppliers.memoize(() -> logTimeout( ) | ||||
? Level.WARN : Level.TRACE); | ||||
int bucketsResolved = 0; | ||||
int bucketsCleaned = 0; | ||||
boolean cleaningStarted = false; | ||||
try { | try { | |||
LOG.trace("Initiating bucket cleanup task"); | LOG.trace("Initiating bucket cleanup task"); | |||
final List<Bucket> buckets = BucketMetadataManagers.getInstance().lookupBu cketsByState(null); | final List<Bucket> buckets = BucketMetadataManagers.getInstance().lookupBu cketsByState(null); | |||
if (buckets == null || buckets.size() <= 0) { | if (buckets == null || buckets.size() <= 0) { | |||
LOG.trace("No buckets found to clean. Cleanup task complete"); | LOG.trace("No buckets found to clean. Cleanup task complete"); | |||
return; | return; | |||
} | } | |||
// Resolve all bucket states (fast) before cleaning histories (slow, could | ||||
time out) | ||||
for (Bucket bucket : buckets) { | ||||
if (!isTimedOut() && !interrupted) { | ||||
resolveBucketState(bucket); | ||||
bucketsResolved++; | ||||
} else { | ||||
LOG.warn("Timed out while cleaning up bucket states after processing " | ||||
+ bucketsResolved + " buckets."); | ||||
break; | ||||
} | ||||
} | ||||
LOG.trace("Finished resolving " + bucketsResolved + " bucket states."); | ||||
// Randomly iterate through the buckets so they all have equal chance of r | ||||
unning before a timeout | ||||
Bucket b; | Bucket b; | |||
// Randomly iterate through | ||||
int idx; | int idx; | |||
cleaningStarted = true; | ||||
while (buckets.size() > 0 && !isTimedOut() && !interrupted) { | while (buckets.size() > 0 && !isTimedOut() && !interrupted) { | |||
idx = rand.nextInt(buckets.size()); | idx = rand.nextInt(buckets.size()); | |||
b = buckets.get(idx); | b = buckets.get(idx); | |||
cleanObjectHistoriesInBucket(b); | cleanObjectHistoriesInBucket(b,timeoutLevelSupplier); | |||
resolveBucketState(b); | ||||
buckets.remove(idx); | buckets.remove(idx); | |||
bucketsCleaned++; | ||||
} | } | |||
} catch (final Throwable f) { | } catch (final Throwable f) { | |||
LOG.error("Error during bucket cleanup execution. Will retry later", f); | LOG.error("Error during bucket cleanup execution. Will retry later", f); | |||
} finally { | } finally { | |||
try { | try { | |||
long endTime = System.currentTimeMillis(); | long endTime = System.currentTimeMillis(); | |||
LOG.trace("Bucket cleanup execution task took " + Long.toString(endTime - startTime) + "ms to complete"); | LOG.trace("Bucket cleanup execution task took " + Long.toString(endTime - startTime) + "ms to complete"); | |||
if (isTimedOut() || interrupted) { | ||||
if (!cleaningStarted) { | ||||
LOG.log(timeoutLevelSupplier.get(), | ||||
"Timed out while cleaning up bucket states after processing " + | ||||
bucketsResolved + " buckets. " + | ||||
"No object records were cleaned up."); | ||||
} else { | ||||
LOG.log(timeoutLevelSupplier.get(), | ||||
"Timed out while cleaning up object records after processing " + | ||||
bucketsCleaned + " buckets."); | ||||
} | ||||
} | ||||
} catch (final Throwable f) { | } catch (final Throwable f) { | |||
// Do nothing, but don't allow exceptions out | // Do nothing, but don't allow exceptions out | |||
} | } | |||
} | } | |||
} | } | |||
public void interrupt() { | public void interrupt() { | |||
this.interrupted = true; | this.interrupted = true; | |||
} | } | |||
skipping to change at line 107 | skipping to change at line 139 | |||
this.interrupted = false; | this.interrupted = false; | |||
} | } | |||
/** | /** | |||
* Fixes the state of the bucket. If in 'deleting' state, will issue deletion to backend. And remove the bucket. If in 'creating' state that is | * Fixes the state of the bucket. If in 'deleting' state, will issue deletion to backend. And remove the bucket. If in 'creating' state that is | |||
* expired (by timestamp), will issue delete to backend and update state | * expired (by timestamp), will issue delete to backend and update state | |||
* | * | |||
* @param bucket | * @param bucket | |||
*/ | */ | |||
private void resolveBucketState(Bucket bucket) { | private void resolveBucketState(Bucket bucket) { | |||
LOG.trace("Resolving bucket state for bucket uuid " + bucket.getBucketUuid() ); | LOG.trace("Resolving bucket state for bucket uuid " + bucket.getBucketUuid() + ", name " + bucket.getBucketName()); | |||
if (BucketState.deleting.equals(bucket.getState()) | if (BucketState.deleting.equals(bucket.getState()) | |||
|| !bucket.stateStillValid(ConfigurationCache.getConfiguration(ObjectSto rageGlobalConfiguration.class) | || !bucket.stateStillValid(ConfigurationCache.getConfiguration(ObjectSto rageGlobalConfiguration.class) | |||
.getBucket_creation_wait_interval_seconds())) { | .getBucket_creation_wait_interval_seconds())) { | |||
// Clean-up a bucket marked for deletion. This usually indicates a failed delete operation previously | // Clean-up a bucket marked for deletion. This usually indicates a failed delete operation previously | |||
LOG.trace("Deleting backend bucket for bucket uuid " + bucket.getBucketUui d() + " during bucket cleanup"); | LOG.trace("Deleting backend bucket for bucket uuid " + bucket.getBucketUui d() + " during bucket cleanup"); | |||
try { | try { | |||
OsgBucketFactory.getFactory().deleteBucket(ObjectStorageProviders.getIns tance(), bucket, null, null); | OsgBucketFactory.getFactory().deleteBucket(ObjectStorageProviders.getIns tance(), bucket, null, null); | |||
} catch (Exception e) { | } catch (Exception e) { | |||
LOG.error("Error cleaning deletion marked bucketuuid " + bucket.getBucke tUuid(), e); | LOG.error("Error cleaning deletion marked bucketuuid " + bucket.getBucke tUuid(), e); | |||
} | } | |||
} | } | |||
} | } | |||
private boolean isTimedOut() { | private boolean isTimedOut() { | |||
return System.currentTimeMillis() - startTime >= MAX_TASK_DURATION; | return System.currentTimeMillis() >= timeoutTime; | |||
} | } | |||
protected void cleanObjectHistoriesInBucket(Bucket b) { | protected void cleanObjectHistoriesInBucket( final Bucket b, final Supplier<Le vel> timeoutLevelSupplier ) { | |||
String nextKey = null; | String nextKey = null; | |||
final int chunkSize = 1000; | final int chunkSize = 1000; | |||
int objectsProcessed = 0; | ||||
PaginatedResult<ObjectEntity> result = null; | PaginatedResult<ObjectEntity> result = null; | |||
LOG.trace("Cleaning object histories for bucket uuid " + b.getBucketUuid()); | LOG.trace("Cleaning object histories for bucket uuid " + b.getBucketUuid() + ", name " + b.getBucketName()); | |||
do { | do { | |||
try { | try { | |||
result = ObjectMetadataManagers.getInstance().listPaginated(b, chunkSize , null, null, nextKey); | result = ObjectMetadataManagers.getInstance().listPaginated(b, chunkSize , null, null, nextKey); | |||
} catch (final Throwable f) { | } catch (final Throwable f) { | |||
LOG.error("Could not get object listing for bucket " + b.getBucketName() + " with next marker: " + nextKey); | LOG.error("Could not get object listing for bucket " + b.getBucketName() + " with next marker: " + nextKey); | |||
nextKey = null; | ||||
result = null; | ||||
break; | break; | |||
} | } | |||
INNER: for (ObjectEntity obj : result.getEntityList()) { | INNER: for (ObjectEntity obj : result.getEntityList()) { | |||
try { | try { | |||
ObjectMetadataManagers.getInstance().cleanupInvalidObjects(b, obj.getO bjectKey()); | ObjectMetadataManagers.getInstance().cleanupInvalidObjects(b, obj.getO bjectKey()); | |||
objectsProcessed++; | ||||
if ((objectsProcessed % 1000) == 0) { | ||||
LOG.trace("Processed " + objectsProcessed + " objects for bucket uui | ||||
d " + b.getBucketUuid() + | ||||
", name " + b.getBucketName()); | ||||
} | ||||
} catch (final Throwable f) { | } catch (final Throwable f) { | |||
LOG.error("Error doing async repair of object " + b.getBucketName() + "/" + obj.getObjectKey() + " Continuing to next object", f); | LOG.error("Error doing async repair of object " + b.getBucketName() + "/" + obj.getObjectKey() + " Continuing to next object", f); | |||
} | } | |||
if (interrupted) { | if (interrupted) { | |||
break INNER; | break INNER; | |||
} | } | |||
} | } | |||
if (!interrupted && result.getIsTruncated()) { | if (!interrupted && result.getIsTruncated()) { | |||
nextKey = ((ObjectEntity) result.getLastEntry()).getObjectKey(); | nextKey = ((ObjectEntity) result.getLastEntry()).getObjectKey(); | |||
} else { | } else { | |||
nextKey = null; | nextKey = null; | |||
} | } | |||
} while (nextKey != null && !isTimedOut()); | } while (nextKey != null && !isTimedOut()); | |||
if (interrupted || isTimedOut()) { | ||||
LOG.log( timeoutLevelSupplier.get( ), | ||||
"Timed out while cleaning up object records in bucket uuid " + | ||||
b.getBucketUuid() + ", name " + b.getBucketName() + | ||||
" after processing " + objectsProcessed + " objects."); | ||||
} else { | ||||
LOG.trace("Finished cleaning " + objectsProcessed + " object histories for | ||||
bucket uuid " + | ||||
b.getBucketUuid() + ", name " + b.getBucketName()); | ||||
} | ||||
} | ||||
private boolean logTimeout( ) { | ||||
final long lastToken = TIMEOUT_LOG_TOKEN.get( ); | ||||
final long currentToken = System.currentTimeMillis( ) / TIMEOUT_LOG_INTERVAL | ||||
; | ||||
return lastToken!=currentToken && TIMEOUT_LOG_TOKEN.compareAndSet( lastToken | ||||
, currentToken ); | ||||
} | } | |||
} | } | |||
End of changes. 23 change blocks. | ||||
24 lines changed or deleted | 83 lines changed or added |