How to implement an automatically expiring variable in python? For example, Let the program running For one hour. I want implement an array of 6 variables, each variable in array will be automatically deleted themselves after 10 mins. And after 1 hour, there will be no variable in the array.
You can use the time
module to clear the "array" every 10 minutes, by checking the time interval from when the script starts.
The last example on http://effbot.org/librarybook/time.htm point you in the right direction.
Hmmm, seems weird, but possible.
Sounds like you need a class which records the time when __init__
is called. Then, implement __getitem__
to check the time when it is called, and only return the item if it's not too late. (It's probably easier to do this than to have a process "running in the background" which actively deletes items even when you don't ask for them.)
It sounds like the items in your array know about each other, because otherwise they'll all expire at the same time.
I think you want to create a subclass of list which deletes its contents after a certain time.
I actually had to do this for dictionaries. Maybe you'll find the code useful:
"""Cache which has data that expires after a given period of time."""
from datetime import datetime, timedelta
class KeyExpiredError(KeyError): pass
def __hax():
class NoArg: pass
return NoArg()
NoArg = __hax()
class DataCache(object):
def __init__(self, defaultExpireTime=timedelta(1, 0, 0), dbg=True):
self.defaultExpireTime = defaultExpireTime
self.cache = {}
self.dbg = dbg
self.processExpires = True
def setProcessExpires(self, b):
self.processExpires = b
def __getitem__(self, key):
c = self.cache[key]
n = datetime.now()
if (n - c['timestamp']) < c['expireTime'] or not self.processExpires:
return c['data']
del self.cache[key]
if self.dbg:
print "DataCache: Key %s expired" % repr(key)
raise KeyExpiredError(key)
def __contains__(self, key):
try:
self[key]
return True
except KeyError:
return False
def __setitem__(self, key, val):
self.cache[key] = {
'data': val,
'timestamp': datetime.now(),
'expireTime': self.defaultExpireTime,
}
def items(self):
keys = list(self.cache)
for k in keys:
try:
val = self[k]
yield (k, val)
except:
pass
def get(self, key, default=NoArg, expired=NoArg):
try:
return self[key]
except KeyExpiredError:
if expired is NoArg and default is not NoArg:
return default
if expired is NoArg: return None
return expired
except KeyError:
if default is NoArg: return None
return default
def set(self, key, val, expireTime=None):
if expireTime is None:
expireTime = self.defaultExpireTime
self.cache[key] = {
'data': val,
'timestamp': datetime.now(),
'expireTime': expireTime,
}
def tryremove(self, key):
if key in self.cache:
del self.cache[key]
return True
return False
#the following you can call without triggering any expirations
def getTotalExpireTime(self, key):
"""Get the total amount of time the key will be in the cache for"""
c = self.cache[key]
return c['expireTime']
def getExpirationTime(self, key):
"""Return the datetime when the given key will expire"""
c = self.cache[key]
return c['timestamp'] + c['expireTime']
def getTimeRemaining(self, key):
"""Get the time left until the item will expire"""
return self.getExpirationTime(key) - datetime.now()
def getTimestamp(self, key):
return self.cache[key]['timestamp']
def __len__(self):
return len(self.cache)
Usage:
>>> dc = DataCache(timedelta(0, 5, 0)) #expire in 5 seconds
>>> dc[4] = 3
>>> dc[4]
3
>>> import time
>>> time.sleep(5)
>>> dc[4]
DataCache: Key 4 expired
Traceback (most recent call last):
File "<pyshell#5>", line 1, in <module>
dc[4]
File "datacache.py", line 35, in __getitem__
raise KeyExpiredError(key)
KeyExpiredError: 4
>>>
you can create a background process that check how much time it's passed, and del the right item... or if you want to create a subclass of list wich deletes it's contens after a certain time you can do the same thing, just calling it in init
def __init__(self, time):
#run subprocess to chek_espired elements
edit:
i wrote an example, but it can be done much better!
class MyList(list):
def __init__(self,elems, expires_time):
list.__init__(self, elems)
self.created = time.time()
self.expires_time = expires_time
def __getitem__(self, index):
t = time.time()
print t - self.created
if t - self.created > self.expires_time:
self.created += self.expires_time
self.pop(index)
self.__getitem__(index)
return list.__getitem__(self, index)
ps of course you can easily raise a personal error if the program try to get the index from an empty list
import sched
import time
import threading
a = [1, 2, 3, 4, 5, 6]
scheduler = sched.scheduler(time.time, time.sleep)
def delete(_list):
del _list[0]
for i in range(len(a)):
scheduler.enter(60*10*i, 1, delete, (a,))
t = threading.Thread(target=scheduler.run)
t.start()