1493 lines
57 KiB
Python
1493 lines
57 KiB
Python
#!/usr/bin/env python
|
|
import binascii
|
|
import calendar
|
|
import copy
|
|
import datetime
|
|
import math
|
|
import platform
|
|
import random
|
|
import re
|
|
import struct
|
|
import sys
|
|
import traceback as _traceback
|
|
from time import time
|
|
from typing import Any, Literal, Optional, Union
|
|
|
|
from dateutil.relativedelta import relativedelta
|
|
from dateutil.tz import datetime_exists, tzutc
|
|
|
|
ExpandedExpression = list[Union[int, Literal["*", "l"]]]
|
|
|
|
|
|
def is_32bit() -> bool:
|
|
"""
|
|
Detect if Python is running in 32-bit mode.
|
|
Returns True if running on 32-bit Python, False for 64-bit.
|
|
"""
|
|
# Method 1: Check pointer size
|
|
bits = struct.calcsize("P") * 8
|
|
|
|
# Method 2: Check platform architecture string
|
|
try:
|
|
architecture = platform.architecture()[0]
|
|
except RuntimeError:
|
|
architecture = None
|
|
|
|
# Method 3: Check maxsize
|
|
is_small_maxsize = sys.maxsize <= 2**32
|
|
|
|
# Evaluate all available methods
|
|
is_32 = False
|
|
|
|
if bits == 32:
|
|
is_32 = True
|
|
elif architecture and "32" in architecture:
|
|
is_32 = True
|
|
elif is_small_maxsize:
|
|
is_32 = True
|
|
|
|
return is_32
|
|
|
|
|
|
try:
|
|
# https://github.com/python/cpython/issues/101069 detection
|
|
if is_32bit():
|
|
datetime.datetime.fromtimestamp(3999999999)
|
|
OVERFLOW32B_MODE = False
|
|
except OverflowError:
|
|
OVERFLOW32B_MODE = True
|
|
|
|
|
|
UTC_DT = datetime.timezone.utc
|
|
EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)
|
|
|
|
M_ALPHAS: dict[str, Union[int, str]] = {
|
|
"jan": 1,
|
|
"feb": 2,
|
|
"mar": 3,
|
|
"apr": 4,
|
|
"may": 5,
|
|
"jun": 6,
|
|
"jul": 7,
|
|
"aug": 8,
|
|
"sep": 9,
|
|
"oct": 10,
|
|
"nov": 11,
|
|
"dec": 12,
|
|
}
|
|
DOW_ALPHAS: dict[str, Union[int, str]] = {
|
|
"sun": 0,
|
|
"mon": 1,
|
|
"tue": 2,
|
|
"wed": 3,
|
|
"thu": 4,
|
|
"fri": 5,
|
|
"sat": 6,
|
|
}
|
|
|
|
MINUTE_FIELD = 0
|
|
HOUR_FIELD = 1
|
|
DAY_FIELD = 2
|
|
MONTH_FIELD = 3
|
|
DOW_FIELD = 4
|
|
SECOND_FIELD = 5
|
|
YEAR_FIELD = 6
|
|
|
|
UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD)
|
|
SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD)
|
|
YEAR_FIELDS = (
|
|
MINUTE_FIELD,
|
|
HOUR_FIELD,
|
|
DAY_FIELD,
|
|
MONTH_FIELD,
|
|
DOW_FIELD,
|
|
SECOND_FIELD,
|
|
YEAR_FIELD,
|
|
)
|
|
|
|
step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
|
|
only_int_re = re.compile(r"^\d+$")
|
|
|
|
DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
|
|
WEEKDAYS = "|".join(DOW_ALPHAS.keys())
|
|
MONTHS = "|".join(M_ALPHAS.keys())
|
|
star_or_int_re = re.compile(r"^(\d+|\*)$")
|
|
special_dow_re = re.compile(
|
|
rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)"
|
|
rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$"
|
|
)
|
|
nearest_weekday_re = re.compile(r"^(?:(\d+)w|w(\d+))$")
|
|
re_star = re.compile("[*]")
|
|
hash_expression_re = re.compile(
|
|
r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
|
|
)
|
|
|
|
CRON_FIELDS = {
|
|
"unix": UNIX_FIELDS,
|
|
"second": SECOND_FIELDS,
|
|
"year": YEAR_FIELDS,
|
|
len(UNIX_FIELDS): UNIX_FIELDS,
|
|
len(SECOND_FIELDS): SECOND_FIELDS,
|
|
len(YEAR_FIELDS): YEAR_FIELDS,
|
|
}
|
|
UNIX_CRON_LEN = len(UNIX_FIELDS)
|
|
SECOND_CRON_LEN = len(SECOND_FIELDS)
|
|
YEAR_CRON_LEN = len(YEAR_FIELDS)
|
|
# retrocompat
|
|
VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)}
|
|
|
|
MARKER = object()
|
|
|
|
|
|
def datetime_to_timestamp(d):
|
|
if d.tzinfo is not None:
|
|
d = d.replace(tzinfo=None) - d.utcoffset()
|
|
|
|
return (d - datetime.datetime(1970, 1, 1)).total_seconds()
|
|
|
|
|
|
def _is_leap(year: int) -> bool:
|
|
return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)
|
|
|
|
|
|
def _last_day_of_month(year: int, month: int) -> int:
|
|
"""Calculate the last day of the given month (honor leap years)."""
|
|
last_day = DAYS[month - 1]
|
|
if month == 2 and _is_leap(year):
|
|
last_day += 1
|
|
return last_day
|
|
|
|
|
|
def _is_successor(
|
|
date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
|
|
) -> bool:
|
|
"""Check if the given date is a successor (after/before) of the previous date."""
|
|
if is_prev:
|
|
return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT)
|
|
return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT)
|
|
|
|
|
|
def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta:
|
|
"""Calculate the timezone difference of the given dates."""
|
|
offset1 = date1.utcoffset()
|
|
offset2 = date2.utcoffset()
|
|
assert offset1 is not None
|
|
assert offset2 is not None
|
|
return offset2 - offset1
|
|
|
|
|
|
def _add_tzinfo(
|
|
date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
|
|
) -> tuple[datetime.datetime, bool]:
|
|
"""Add the tzinfo from the previous date to the given date.
|
|
|
|
In case the new date is ambiguous, determine the correct date
|
|
based on it being closer to the previous date but still a successor
|
|
(after/before based on `is_prev`).
|
|
|
|
In case the date does not exist, jump forward to the next existing date.
|
|
"""
|
|
localize = getattr(previous_date.tzinfo, "localize", None)
|
|
if localize is not None:
|
|
# pylint: disable-next=import-outside-toplevel
|
|
import pytz
|
|
|
|
try:
|
|
result = localize(date, is_dst=None)
|
|
except pytz.NonExistentTimeError:
|
|
while True:
|
|
date += datetime.timedelta(minutes=1)
|
|
try:
|
|
result = localize(date, is_dst=None)
|
|
except pytz.NonExistentTimeError:
|
|
continue
|
|
break
|
|
return result, False
|
|
except pytz.AmbiguousTimeError:
|
|
closer = localize(date, is_dst=not is_prev)
|
|
farther = localize(date, is_dst=is_prev)
|
|
# TODO: Check negative DST
|
|
assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
|
|
if _is_successor(closer, previous_date, is_prev):
|
|
result = closer
|
|
else:
|
|
assert _is_successor(farther, previous_date, is_prev)
|
|
result = farther
|
|
return result, True
|
|
|
|
result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo)
|
|
if not datetime_exists(result):
|
|
while not datetime_exists(result):
|
|
result += datetime.timedelta(minutes=1)
|
|
return result, False
|
|
|
|
# result is closer to the previous date
|
|
farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo)
|
|
# Comparing the UTC offsets in the check for the date being ambiguous.
|
|
if result.utcoffset() != farther.utcoffset():
|
|
# TODO: Check negative DST
|
|
assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
|
|
if not _is_successor(result, previous_date, is_prev):
|
|
assert _is_successor(farther, previous_date, is_prev)
|
|
result = farther
|
|
return result, True
|
|
|
|
|
|
class CroniterError(ValueError):
|
|
"""General top-level Croniter base exception"""
|
|
|
|
|
|
class CroniterBadTypeRangeError(TypeError):
|
|
"""."""
|
|
|
|
|
|
class CroniterBadCronError(CroniterError):
|
|
"""Syntax, unknown value, or range error within a cron expression"""
|
|
|
|
|
|
class CroniterUnsupportedSyntaxError(CroniterBadCronError):
|
|
"""Valid cron syntax, but likely to produce inaccurate results"""
|
|
|
|
# Extending CroniterBadCronError, which may be contridatory, but this allows
|
|
# catching both errors with a single exception. From a user perspective
|
|
# these will likely be handled the same way.
|
|
|
|
|
|
class CroniterBadDateError(CroniterError):
|
|
"""Unable to find next/prev timestamp match"""
|
|
|
|
|
|
class CroniterNotAlphaError(CroniterBadCronError):
|
|
"""Cron syntax contains an invalid day or month abbreviation"""
|
|
|
|
|
|
class croniter:
|
|
MONTHS_IN_YEAR = 12
|
|
|
|
# This helps with expanding `*` fields into `lower-upper` ranges. Each item
|
|
# in this tuple maps to the corresponding field index
|
|
RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099))
|
|
|
|
ALPHACONV: tuple[dict[str, Union[int, str]], ...] = (
|
|
{}, # 0: min
|
|
{}, # 1: hour
|
|
{"l": "l"}, # 2: dom
|
|
# 3: mon
|
|
copy.deepcopy(M_ALPHAS),
|
|
# 4: dow
|
|
copy.deepcopy(DOW_ALPHAS),
|
|
# 5: second
|
|
{},
|
|
# 6: year
|
|
{},
|
|
)
|
|
|
|
LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {})
|
|
|
|
LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130)
|
|
|
|
def __init__(
|
|
self,
|
|
expr_format: str,
|
|
start_time: Optional[Union[datetime.datetime, float]] = None,
|
|
ret_type: type = float,
|
|
day_or: bool = True,
|
|
max_years_between_matches: Optional[int] = None,
|
|
is_prev: bool = False,
|
|
hash_id: Optional[Union[bytes, str]] = None,
|
|
implement_cron_bug: bool = False,
|
|
second_at_beginning: bool = False,
|
|
expand_from_start_time: bool = False,
|
|
) -> None:
|
|
self._ret_type = ret_type
|
|
self._day_or = day_or
|
|
self._implement_cron_bug = implement_cron_bug
|
|
self.second_at_beginning = bool(second_at_beginning)
|
|
self._expand_from_start_time = expand_from_start_time
|
|
|
|
if hash_id is not None:
|
|
if not isinstance(hash_id, (bytes, str)):
|
|
raise TypeError("hash_id must be bytes or UTF-8 string")
|
|
if not isinstance(hash_id, bytes):
|
|
hash_id = hash_id.encode("UTF-8")
|
|
|
|
self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
|
|
if max_years_between_matches is None:
|
|
max_years_between_matches = 50
|
|
self._max_years_between_matches = max(int(max_years_between_matches), 1)
|
|
|
|
if start_time is None:
|
|
start_time = time()
|
|
|
|
self.tzinfo: Optional[datetime.tzinfo] = None
|
|
|
|
self.start_time = 0.0
|
|
self.dst_start_time = 0.0
|
|
self.cur = 0.0
|
|
self.set_current(start_time, force=True)
|
|
|
|
self.expanded, self.nth_weekday_of_month, self.expressions, self.nearest_weekday = self._expand(
|
|
expr_format,
|
|
hash_id=hash_id,
|
|
from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
|
|
second_at_beginning=second_at_beginning,
|
|
)
|
|
self.fields = CRON_FIELDS[len(self.expanded)]
|
|
self._is_prev = is_prev
|
|
|
|
@classmethod
|
|
def _alphaconv(cls, index, key, expressions):
|
|
try:
|
|
return cls.ALPHACONV[index][key]
|
|
except KeyError:
|
|
raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable")
|
|
|
|
def get_next(self, ret_type=None, start_time=None, update_current=True):
|
|
if start_time and self._expand_from_start_time:
|
|
raise ValueError(
|
|
"start_time is not supported when using expand_from_start_time = True."
|
|
)
|
|
return self._get_next(
|
|
ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current
|
|
)
|
|
|
|
def get_prev(self, ret_type=None, start_time=None, update_current=True):
|
|
return self._get_next(
|
|
ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current
|
|
)
|
|
|
|
def get_current(self, ret_type=None):
|
|
ret_type = ret_type or self._ret_type
|
|
if issubclass(ret_type, datetime.datetime):
|
|
return self.timestamp_to_datetime(self.cur)
|
|
return self.cur
|
|
|
|
def set_current(
|
|
self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True
|
|
) -> float:
|
|
if (force or (self.cur is None)) and start_time is not None:
|
|
if isinstance(start_time, datetime.datetime):
|
|
self.tzinfo = start_time.tzinfo
|
|
start_time = self.datetime_to_timestamp(start_time)
|
|
|
|
self.start_time = start_time
|
|
self.dst_start_time = start_time
|
|
self.cur = start_time
|
|
return self.cur
|
|
|
|
@staticmethod
|
|
def datetime_to_timestamp(d: datetime.datetime) -> float:
|
|
"""
|
|
Converts a `datetime` object `d` into a UNIX timestamp.
|
|
"""
|
|
return datetime_to_timestamp(d)
|
|
|
|
_datetime_to_timestamp = datetime_to_timestamp # retrocompat
|
|
|
|
def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime:
|
|
"""
|
|
Converts a UNIX `timestamp` into a `datetime` object.
|
|
"""
|
|
if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set
|
|
tzinfo = self.tzinfo
|
|
if OVERFLOW32B_MODE:
|
|
# degraded mode to workaround Y2038
|
|
# see https://github.com/python/cpython/issues/101069
|
|
result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
|
|
else:
|
|
result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
|
|
if tzinfo:
|
|
result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
|
|
return result
|
|
|
|
_timestamp_to_datetime = timestamp_to_datetime # retrocompat
|
|
|
|
def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None):
|
|
if update_current is None:
|
|
update_current = True
|
|
self.set_current(start_time, force=True)
|
|
if is_prev is None:
|
|
is_prev = self._is_prev
|
|
self._is_prev = is_prev
|
|
|
|
ret_type = ret_type or self._ret_type
|
|
|
|
if not issubclass(ret_type, (float, datetime.datetime)):
|
|
raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")
|
|
|
|
result = self._calc_next(is_prev)
|
|
timestamp = self.datetime_to_timestamp(result)
|
|
if update_current:
|
|
self.cur = timestamp
|
|
if issubclass(ret_type, datetime.datetime):
|
|
return result
|
|
return timestamp
|
|
|
|
# iterator protocol, to enable direct use of croniter
|
|
# objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
|
|
# or for combining multiple croniters into single
|
|
# dates feed using 'itertools' module
|
|
def all_next(self, ret_type=None, start_time=None, update_current=None):
|
|
"""
|
|
Returns a generator yielding consecutive dates.
|
|
|
|
May be used instead of an implicit call to __iter__ whenever a
|
|
non-default `ret_type` needs to be specified.
|
|
"""
|
|
# In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could
|
|
# be used instead
|
|
try:
|
|
while True:
|
|
self._is_prev = False
|
|
yield self._get_next(
|
|
ret_type=ret_type, start_time=start_time, update_current=update_current
|
|
)
|
|
start_time = None
|
|
except CroniterBadDateError:
|
|
if self._max_years_btw_matches_explicitly_set:
|
|
return
|
|
raise
|
|
|
|
def all_prev(self, ret_type=None, start_time=None, update_current=None):
|
|
"""
|
|
Returns a generator yielding previous dates.
|
|
"""
|
|
try:
|
|
while True:
|
|
self._is_prev = True
|
|
yield self._get_next(
|
|
ret_type=ret_type, start_time=start_time, update_current=update_current
|
|
)
|
|
start_time = None
|
|
except CroniterBadDateError:
|
|
if self._max_years_btw_matches_explicitly_set:
|
|
return
|
|
raise
|
|
|
|
def iter(self, *args, **kwargs):
|
|
return self.all_prev if self._is_prev else self.all_next
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
__next__ = next = _get_next
|
|
|
|
def _calc_next(self, is_prev: bool) -> datetime.datetime:
|
|
current = self.timestamp_to_datetime(self.cur)
|
|
expanded = self.expanded[:]
|
|
nth_weekday_of_month = self.nth_weekday_of_month.copy()
|
|
|
|
# exception to support day of month and day of week as defined in cron
|
|
if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
|
|
# If requested, handle a bug in vixie cron/ISC cron where day_of_month and
|
|
# day_of_week form an intersection (AND) instead of a union (OR) if either
|
|
# field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html)
|
|
if self._implement_cron_bug and (
|
|
re_star.match(self.expressions[DAY_FIELD])
|
|
or re_star.match(self.expressions[DOW_FIELD])
|
|
):
|
|
# To produce a schedule identical to the cron bug, we'll bypass the code
|
|
# that makes a union of DOM and DOW, and instead skip to the code that
|
|
# does an intersect instead
|
|
pass
|
|
else:
|
|
bak = expanded[DOW_FIELD]
|
|
expanded[DOW_FIELD] = ["*"]
|
|
t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
|
|
expanded[DOW_FIELD] = bak
|
|
expanded[DAY_FIELD] = ["*"]
|
|
|
|
t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
|
|
if is_prev:
|
|
return t1 if t1 > t2 else t2
|
|
return t1 if t1 < t2 else t2
|
|
|
|
return self._calc(current, expanded, nth_weekday_of_month, is_prev)
|
|
|
|
def _calc(
|
|
self,
|
|
now: datetime.datetime,
|
|
expanded: list[ExpandedExpression],
|
|
nth_weekday_of_month: dict[int, set[int]],
|
|
is_prev: bool,
|
|
) -> datetime.datetime:
|
|
if is_prev:
|
|
nearest_diff_method = self._get_prev_nearest_diff
|
|
offset = relativedelta(microseconds=-1)
|
|
else:
|
|
nearest_diff_method = self._get_next_nearest_diff
|
|
if len(expanded) > UNIX_CRON_LEN:
|
|
offset = relativedelta(seconds=1)
|
|
else:
|
|
offset = relativedelta(minutes=1)
|
|
# Calculate the next cron time in local time a.k.a. timezone unaware time.
|
|
unaware_time = now.replace(tzinfo=None) + offset
|
|
if len(expanded) > UNIX_CRON_LEN:
|
|
unaware_time = unaware_time.replace(microsecond=0)
|
|
else:
|
|
unaware_time = unaware_time.replace(second=0, microsecond=0)
|
|
|
|
month = unaware_time.month
|
|
year = current_year = unaware_time.year
|
|
|
|
def proc_year(d):
|
|
if len(expanded) == YEAR_CRON_LEN:
|
|
try:
|
|
expanded[YEAR_FIELD].index("*")
|
|
except ValueError:
|
|
# use None as range_val to indicate no loop
|
|
diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
|
|
if diff_year is None:
|
|
return None, d
|
|
if diff_year != 0:
|
|
if is_prev:
|
|
d += relativedelta(
|
|
years=diff_year, month=12, day=31, hour=23, minute=59, second=59
|
|
)
|
|
else:
|
|
d += relativedelta(
|
|
years=diff_year, month=1, day=1, hour=0, minute=0, second=0
|
|
)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_month(d):
|
|
try:
|
|
expanded[MONTH_FIELD].index("*")
|
|
except ValueError:
|
|
diff_month = nearest_diff_method(
|
|
d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR
|
|
)
|
|
reset_day = 1
|
|
|
|
if diff_month is not None and diff_month != 0:
|
|
if is_prev:
|
|
d += relativedelta(months=diff_month)
|
|
reset_day = _last_day_of_month(d.year, d.month)
|
|
d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
|
|
else:
|
|
d += relativedelta(
|
|
months=diff_month, day=reset_day, hour=0, minute=0, second=0
|
|
)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_day_of_month(d):
|
|
try:
|
|
expanded[DAY_FIELD].index("*")
|
|
except ValueError:
|
|
days = _last_day_of_month(year, month)
|
|
if "l" in expanded[DAY_FIELD] and days == d.day:
|
|
return False, d
|
|
|
|
if is_prev:
|
|
prev_month = (month - 2) % self.MONTHS_IN_YEAR + 1
|
|
prev_year = year - 1 if month == 1 else year
|
|
days_in_prev_month = _last_day_of_month(prev_year, prev_month)
|
|
diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
|
|
else:
|
|
diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)
|
|
|
|
if diff_day is not None and diff_day != 0:
|
|
if is_prev:
|
|
d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
|
|
else:
|
|
d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_day_of_week(d):
|
|
try:
|
|
expanded[DOW_FIELD].index("*")
|
|
except ValueError:
|
|
diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
|
|
if diff_day_of_week is not None and diff_day_of_week != 0:
|
|
if is_prev:
|
|
d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
|
|
else:
|
|
d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_day_of_week_nth(d):
|
|
if "*" in nth_weekday_of_month:
|
|
s = nth_weekday_of_month["*"]
|
|
for i in range(0, 7):
|
|
if i in nth_weekday_of_month:
|
|
nth_weekday_of_month[i].update(s)
|
|
else:
|
|
nth_weekday_of_month[i] = s
|
|
del nth_weekday_of_month["*"]
|
|
|
|
candidates = []
|
|
for wday, nth in nth_weekday_of_month.items():
|
|
c = self._get_nth_weekday_of_month(d.year, d.month, wday)
|
|
for n in nth:
|
|
if n == "l":
|
|
candidate = c[-1]
|
|
elif len(c) < n:
|
|
continue
|
|
else:
|
|
candidate = c[n - 1]
|
|
if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
|
|
candidates.append(candidate)
|
|
|
|
if not candidates:
|
|
if is_prev:
|
|
d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
|
|
else:
|
|
days = _last_day_of_month(year, month)
|
|
d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
|
|
return True, d
|
|
|
|
candidates.sort()
|
|
diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
|
|
if diff_day != 0:
|
|
if is_prev:
|
|
d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
|
|
else:
|
|
d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_nearest_weekday(d):
|
|
"""Process W (nearest weekday) day-of-month entries."""
|
|
candidates = []
|
|
for w_day in self.nearest_weekday:
|
|
candidate = self._get_nearest_weekday(d.year, d.month, w_day)
|
|
if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
|
|
candidates.append(candidate)
|
|
|
|
if not candidates:
|
|
if is_prev:
|
|
d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
|
|
else:
|
|
days = _last_day_of_month(year, month)
|
|
d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
|
|
return True, d
|
|
|
|
candidates.sort()
|
|
diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
|
|
if diff_day != 0:
|
|
if is_prev:
|
|
d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
|
|
else:
|
|
d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_hour(d):
|
|
try:
|
|
expanded[HOUR_FIELD].index("*")
|
|
except ValueError:
|
|
diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
|
|
if diff_hour is not None and diff_hour != 0:
|
|
if is_prev:
|
|
d += relativedelta(hours=diff_hour, minute=59, second=59)
|
|
else:
|
|
d += relativedelta(hours=diff_hour, minute=0, second=0)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_minute(d):
|
|
try:
|
|
expanded[MINUTE_FIELD].index("*")
|
|
except ValueError:
|
|
diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
|
|
if diff_min is not None and diff_min != 0:
|
|
if is_prev:
|
|
d += relativedelta(minutes=diff_min, second=59)
|
|
else:
|
|
d += relativedelta(minutes=diff_min, second=0)
|
|
return True, d
|
|
return False, d
|
|
|
|
def proc_second(d):
|
|
if len(expanded) > UNIX_CRON_LEN:
|
|
try:
|
|
expanded[SECOND_FIELD].index("*")
|
|
except ValueError:
|
|
diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
|
|
if diff_sec is not None and diff_sec != 0:
|
|
d += relativedelta(seconds=diff_sec)
|
|
return True, d
|
|
else:
|
|
d += relativedelta(second=0)
|
|
return False, d
|
|
|
|
procs = [
|
|
proc_year,
|
|
proc_month,
|
|
(proc_nearest_weekday if self.nearest_weekday else proc_day_of_month),
|
|
(proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
|
|
proc_hour,
|
|
proc_minute,
|
|
proc_second,
|
|
]
|
|
|
|
while abs(year - current_year) <= self._max_years_between_matches:
|
|
next = False
|
|
stop = False
|
|
for proc in procs:
|
|
(changed, unaware_time) = proc(unaware_time)
|
|
# `None` can be set mostly for year processing
|
|
# so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
|
|
if changed is None:
|
|
stop = True
|
|
break
|
|
if changed:
|
|
month, year = unaware_time.month, unaware_time.year
|
|
next = True
|
|
break
|
|
if stop:
|
|
break
|
|
if next:
|
|
continue
|
|
|
|
unaware_time = unaware_time.replace(microsecond=0)
|
|
if now.tzinfo is None:
|
|
return unaware_time
|
|
|
|
# Add timezone information back and handle DST changes
|
|
aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
|
|
|
|
if not exists and (
|
|
not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD]
|
|
):
|
|
# The calculated local date does not exist and moving the time forward
|
|
# to the next valid time isn't the correct solution. Search for the
|
|
# next matching cron time that exists.
|
|
while not exists:
|
|
unaware_time = self._calc(
|
|
unaware_time, expanded, nth_weekday_of_month, is_prev
|
|
)
|
|
aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
|
|
|
|
offset_delta = _timezone_delta(now, aware_time)
|
|
if not offset_delta:
|
|
# There was no DST change.
|
|
return aware_time
|
|
|
|
# There was a DST change. So check if there is a alternative cron time
|
|
# for the other UTC offset.
|
|
alternative_unaware_time = now.replace(tzinfo=None) + offset_delta
|
|
alternative_unaware_time = self._calc(
|
|
alternative_unaware_time, expanded, nth_weekday_of_month, is_prev
|
|
)
|
|
alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev)
|
|
|
|
if not _is_successor(alternative_aware_time, now, is_prev):
|
|
# The alternative time is an ancestor of now. Thus it is not an alternative.
|
|
return aware_time
|
|
|
|
if _is_successor(aware_time, alternative_aware_time, is_prev):
|
|
return alternative_aware_time
|
|
|
|
return aware_time
|
|
|
|
if is_prev:
|
|
raise CroniterBadDateError("failed to find prev date")
|
|
raise CroniterBadDateError("failed to find next date")
|
|
|
|
@staticmethod
|
|
def _get_next_nearest_diff(x, to_check, range_val):
|
|
"""
|
|
`range_val` is the range of a field.
|
|
If no available time, we can move to next loop(like next month).
|
|
`range_val` can also be set to `None` to indicate that there is no loop.
|
|
( Currently, should only used for `year` field )
|
|
"""
|
|
for i, d in enumerate(to_check):
|
|
if range_val is not None:
|
|
if d == "l":
|
|
# if 'l' then it is the last day of month
|
|
# => its value of range_val
|
|
d = range_val
|
|
elif d > range_val:
|
|
continue
|
|
if d >= x:
|
|
return d - x
|
|
# When range_val is None and x not exists in to_check,
|
|
# `None` will be returned to suggest no more available time
|
|
if range_val is None:
|
|
return None
|
|
return to_check[0] - x + range_val
|
|
|
|
@staticmethod
|
|
def _get_prev_nearest_diff(x, to_check, range_val):
|
|
"""
|
|
`range_val` is the range of a field.
|
|
If no available time, we can move to previous loop(like previous month).
|
|
Range_val can also be set to `None` to indicate that there is no loop.
|
|
( Currently should only used for `year` field )
|
|
"""
|
|
candidates = to_check[:]
|
|
candidates.reverse()
|
|
for d in candidates:
|
|
if d != "l" and d <= x:
|
|
return d - x
|
|
if "l" in candidates:
|
|
return -x
|
|
# When range_val is None and x not exists in to_check,
|
|
# `None` will be returned to suggest no more available time
|
|
if range_val is None:
|
|
return None
|
|
candidate = candidates[0]
|
|
for c in candidates:
|
|
# fixed: c < range_val
|
|
# this code will reject all 31 day of month, 12 month, 59 second,
|
|
# 23 hour and so on.
|
|
# if candidates has just a element, this will not harmful.
|
|
# but candidates have multiple elements, then values equal to
|
|
# range_val will rejected.
|
|
if c <= range_val:
|
|
candidate = c
|
|
break
|
|
# fix crontab "0 6 30 3 *" condidates only a element, then get_prev error
|
|
# return 2021-03-02 06:00:00
|
|
if candidate > range_val:
|
|
return -range_val
|
|
return candidate - x - range_val
|
|
|
|
@staticmethod
|
|
def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]:
|
|
"""For a given year/month return a list of days in nth-day-of-month order.
|
|
The last weekday of the month is always [-1].
|
|
"""
|
|
w = (day_of_week + 6) % 7
|
|
c = calendar.Calendar(w).monthdayscalendar(year, month)
|
|
if c[0][0] == 0:
|
|
c.pop(0)
|
|
return tuple(i[0] for i in c)
|
|
|
|
@staticmethod
|
|
def _get_nearest_weekday(year, month, day):
|
|
"""Get the nearest weekday (Mon-Fri) to the given day in the given month.
|
|
|
|
Rules:
|
|
- If the day is a weekday, return it.
|
|
- If Saturday, return Friday (day-1), unless that crosses into previous month,
|
|
then return Monday (day+2).
|
|
- If Sunday, return Monday (day+1), unless that crosses into next month,
|
|
then return Friday (day-2).
|
|
"""
|
|
last_day = _last_day_of_month(year, month)
|
|
day = min(day, last_day)
|
|
weekday = calendar.weekday(year, month, day) # 0=Mon, 6=Sun
|
|
if weekday < 5: # Mon-Fri
|
|
return day
|
|
if weekday == 5: # Saturday
|
|
if day > 1:
|
|
return day - 1 # Friday
|
|
else:
|
|
return day + 2 # Monday (1st is Sat, so 3rd is Mon)
|
|
# Sunday
|
|
if day < last_day:
|
|
return day + 1 # Monday
|
|
else:
|
|
return day - 2 # Friday (last day is Sun, go back to Fri)
|
|
|
|
@classmethod
|
|
def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
|
|
if isinstance(len_expressions, (list, dict, tuple, set)):
|
|
len_expressions = len(len_expressions)
|
|
if val in cls.LOWMAP[field_index] and not (
|
|
# do not support 0 as a month either for classical 5 fields cron,
|
|
# 6fields second repeat form or 7 fields year form
|
|
# but still let conversion happen if day field is shifted
|
|
(field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
|
|
or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
|
|
or (
|
|
field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD]
|
|
and len_expressions == YEAR_CRON_LEN
|
|
)
|
|
):
|
|
val = cls.LOWMAP[field_index][val]
|
|
return val
|
|
|
|
# Maximum days in each month (non-leap year for Feb)
|
|
DAYS_IN_MONTH = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
|
|
|
|
@classmethod
|
|
def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, strict=False, strict_year=None):
|
|
# Split the expression in components, and normalize L -> l, MON -> mon,
|
|
# etc. Keep expr_format untouched so we can use it in the exception
|
|
# messages.
|
|
expr_aliases = {
|
|
"@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
|
|
"@hourly": ("0 * * * *", "h * * * * h"),
|
|
"@daily": ("0 0 * * *", "h h * * * h"),
|
|
"@weekly": ("0 0 * * 0", "h h * * h h"),
|
|
"@monthly": ("0 0 1 * *", "h h h * * h"),
|
|
"@yearly": ("0 0 1 1 *", "h h h h * h"),
|
|
"@annually": ("0 0 1 1 *", "h h h h * h"),
|
|
}
|
|
|
|
efl = expr_format.lower()
|
|
hash_id_expr = 1 if hash_id is not None else 0
|
|
try:
|
|
efl = expr_aliases[efl][hash_id_expr]
|
|
except KeyError:
|
|
pass
|
|
|
|
expressions = efl.split()
|
|
|
|
if len(expressions) not in VALID_LEN_EXPRESSION:
|
|
raise CroniterBadCronError(
|
|
"Exactly 5, 6 or 7 columns has to be specified for iterator expression."
|
|
)
|
|
|
|
if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
|
|
# move second to it's own(6th) field to process by same logical
|
|
expressions.insert(SECOND_FIELD, expressions.pop(0))
|
|
|
|
expanded = []
|
|
nth_weekday_of_month = {}
|
|
nearest_weekday = set()
|
|
|
|
for field_index, expr in enumerate(expressions):
|
|
for expanderid, expander in EXPANDERS.items():
|
|
expr = expander(cls).expand(
|
|
efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp
|
|
)
|
|
|
|
if "?" in expr:
|
|
if expr != "?":
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable."
|
|
f" Question mark can not used with other characters"
|
|
)
|
|
if field_index not in [DAY_FIELD, DOW_FIELD]:
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable. "
|
|
f"Question mark can only used in day_of_month or day_of_week"
|
|
)
|
|
# currently just trade `?` as `*`
|
|
expr = "*"
|
|
|
|
e_list = expr.split(",")
|
|
res = []
|
|
|
|
while len(e_list) > 0:
|
|
e = e_list.pop()
|
|
nth = None
|
|
|
|
if field_index == DOW_FIELD:
|
|
# Handle special case in the dow expression: 2#3, l3
|
|
special_dow_rem = special_dow_re.match(str(e))
|
|
if special_dow_rem:
|
|
g = special_dow_rem.groupdict()
|
|
he, last = g.get("he", ""), g.get("last", "")
|
|
if he:
|
|
e = he
|
|
try:
|
|
nth = int(last)
|
|
assert 5 >= nth >= 1
|
|
except (KeyError, ValueError, AssertionError):
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable."
|
|
f" Invalid day_of_week value: '{nth}'"
|
|
)
|
|
elif last:
|
|
e = last
|
|
nth = g["pre"] # 'l'
|
|
|
|
if field_index == DAY_FIELD:
|
|
# Handle W (nearest weekday) in day-of-month: 15w, w15
|
|
w_match = nearest_weekday_re.match(str(e))
|
|
if w_match:
|
|
w_day = int(w_match.group(1) or w_match.group(2))
|
|
if w_day < 1 or w_day > 31:
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable,"
|
|
f" nearest weekday day value '{w_day}' out of range"
|
|
)
|
|
if len(e_list) > 0 or len(res) > 0:
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable."
|
|
f" 'W' can only be used with a single day value,"
|
|
f" not in a list or range"
|
|
)
|
|
nearest_weekday.add(w_day)
|
|
res.append(w_day)
|
|
continue
|
|
|
|
# Before matching step_search_re, normalize "*" to "{min}-{max}".
|
|
# Example: in the minute field, "*/5" normalizes to "0-59/5"
|
|
t = re.sub(
|
|
r"^\*(\/.+)$",
|
|
r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
|
|
str(e),
|
|
)
|
|
m = step_search_re.search(t)
|
|
|
|
if not m:
|
|
# Before matching step_search_re,
|
|
# normalize "{start}/{step}" to "{start}-{max}/{step}".
|
|
# Example: in the minute field, "10/5" normalizes to "10-59/5"
|
|
t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e))
|
|
m = step_search_re.search(t)
|
|
|
|
if m:
|
|
# early abort if low/high are out of bounds
|
|
(low, high, step) = m.group(1), m.group(2), m.group(4) or 1
|
|
if field_index == DAY_FIELD and high == "l":
|
|
high = "31"
|
|
|
|
if not only_int_re.search(low):
|
|
low = str(cls._alphaconv(field_index, low, expressions))
|
|
|
|
if not only_int_re.search(high):
|
|
high = str(cls._alphaconv(field_index, high, expressions))
|
|
|
|
# normally, it's already guarded by the RE that should not accept
|
|
# not-int values.
|
|
if not only_int_re.search(str(step)):
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] step '{step}'"
|
|
f" in field {field_index} is not acceptable"
|
|
)
|
|
step = int(step)
|
|
|
|
for band in low, high:
|
|
if not only_int_re.search(str(band)):
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] bands '{low}-{high}'"
|
|
f" in field {field_index} are not acceptable"
|
|
)
|
|
|
|
low, high = (
|
|
cls.value_alias(int(_val), field_index, expressions)
|
|
for _val in (low, high)
|
|
)
|
|
|
|
if max(low, high) > max(
|
|
cls.RANGES[field_index][0], cls.RANGES[field_index][1]
|
|
):
|
|
raise CroniterBadCronError(f"{expr_format} is out of bands")
|
|
|
|
if from_timestamp:
|
|
low = cls._get_low_from_current_date_number(
|
|
field_index, int(step), int(from_timestamp)
|
|
)
|
|
|
|
# Handle when the second bound of the range is in backtracking order:
|
|
# eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
|
|
if low > high:
|
|
whole_field_range = list(
|
|
range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1)
|
|
)
|
|
# Add FirstBound -> ENDRANGE, respecting step
|
|
rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
|
|
# Then 0 -> SecondBound, but skipping n first occurences according to step
|
|
# EG to respect such expressions : Apr-Jan/3
|
|
to_skip = 0
|
|
if rng:
|
|
already_skipped = list(reversed(whole_field_range)).index(rng[-1])
|
|
curpos = whole_field_range.index(rng[-1])
|
|
if ((curpos + step) > len(whole_field_range)) and (
|
|
already_skipped < step
|
|
):
|
|
to_skip = step - already_skipped
|
|
rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
|
|
# if we include a range type: Jan-Jan, or Sun-Sun,
|
|
# it means the whole cycle (all days of week, # all monthes of year, etc)
|
|
elif low == high:
|
|
rng = list(
|
|
range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step)
|
|
)
|
|
else:
|
|
try:
|
|
rng = list(range(low, high + 1, step))
|
|
except ValueError as exc:
|
|
raise CroniterBadCronError(f"invalid range: {exc}")
|
|
|
|
if field_index == DOW_FIELD and nth and nth != "l":
|
|
rng = [f"{item}#{nth}" for item in rng]
|
|
e_list += [a for a in rng if a not in e_list]
|
|
else:
|
|
if t.startswith("-"):
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable, negative numbers not allowed"
|
|
)
|
|
if not star_or_int_re.search(t):
|
|
t = cls._alphaconv(field_index, t, expressions)
|
|
|
|
try:
|
|
t = int(t)
|
|
except ValueError:
|
|
pass
|
|
|
|
t = cls.value_alias(t, field_index, expressions)
|
|
|
|
if t not in ["*", "l"] and (
|
|
int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
|
|
):
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable, out of range"
|
|
)
|
|
|
|
res.append(t)
|
|
|
|
if field_index == DOW_FIELD and nth:
|
|
if t not in nth_weekday_of_month:
|
|
nth_weekday_of_month[t] = set()
|
|
nth_weekday_of_month[t].add(nth)
|
|
|
|
res = set(res)
|
|
res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i)
|
|
if len(res) == cls.LEN_MEANS_ALL[field_index]:
|
|
# Make sure the wildcard is used in the correct way (avoid over-optimization)
|
|
if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
|
|
field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
|
|
):
|
|
pass
|
|
else:
|
|
res = ["*"]
|
|
|
|
expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)
|
|
|
|
# Check to make sure the dow combo in use is supported
|
|
if nth_weekday_of_month:
|
|
dow_expanded_set = set(expanded[DOW_FIELD])
|
|
dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
|
|
dow_expanded_set.discard("*")
|
|
# Skip: if it's all weeks instead of wildcard
|
|
if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
|
|
raise CroniterUnsupportedSyntaxError(
|
|
f"day-of-week field does not support mixing literal values and nth"
|
|
f" day of week syntax. Cron: '{expr_format}'"
|
|
f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}"
|
|
)
|
|
|
|
if strict:
|
|
# Cross-validate day-of-month against month (and optionally year)
|
|
# to reject impossible combinations like "0 0 31 2 *" (Feb 31st).
|
|
days = expanded[DAY_FIELD]
|
|
months = expanded[MONTH_FIELD]
|
|
if days != ["*"] and days != ["l"] and months != ["*"]:
|
|
int_days = [d for d in days if isinstance(d, int)]
|
|
int_months = [m for m in months if isinstance(m, int)]
|
|
if int_days and int_months:
|
|
# Determine max days per month, accounting for leap years
|
|
days_in_month = dict(cls.DAYS_IN_MONTH)
|
|
if 2 in int_months:
|
|
has_leap_year = True # assume possible by default
|
|
if strict_year is not None:
|
|
# Year explicitly provided as parameter
|
|
if isinstance(strict_year, int):
|
|
has_leap_year = calendar.isleap(strict_year)
|
|
else:
|
|
has_leap_year = any(calendar.isleap(y) for y in strict_year)
|
|
elif len(expanded) > YEAR_FIELD:
|
|
years = expanded[YEAR_FIELD]
|
|
if years != ["*"]:
|
|
int_years = [y for y in years if isinstance(y, int)]
|
|
if int_years:
|
|
has_leap_year = any(calendar.isleap(y) for y in int_years)
|
|
if has_leap_year:
|
|
days_in_month[2] = 29
|
|
min_day = min(int_days)
|
|
max_possible = max(days_in_month[m] for m in int_months)
|
|
if min_day > max_possible:
|
|
raise CroniterBadCronError(
|
|
f"[{expr_format}] is not acceptable. Day(s) {int_days}"
|
|
f" can never occur in month(s) {int_months}"
|
|
)
|
|
|
|
return expanded, nth_weekday_of_month, expressions, nearest_weekday
|
|
|
|
@classmethod
|
|
def expand(
|
|
cls,
|
|
expr_format: str,
|
|
hash_id: Optional[Union[bytes, str]] = None,
|
|
second_at_beginning: bool = False,
|
|
from_timestamp: Optional[float] = None,
|
|
strict: bool = False,
|
|
strict_year: Optional[Union[int, list[int]]] = None,
|
|
) -> tuple[list[ExpandedExpression], dict[int, set[int]]]:
|
|
"""
|
|
Expand a cron expression format into a noramlized format of
|
|
list[list[int | 'l' | '*']]. The first list representing each element
|
|
of the epxression, and each sub-list representing the allowed values
|
|
for that expression component.
|
|
|
|
A tuple is returned, the first value being the expanded epxression
|
|
list, and the second being a `nth_weekday_of_month` mapping.
|
|
|
|
Examples:
|
|
|
|
# Every minute
|
|
>>> croniter.expand('* * * * *')
|
|
([['*'], ['*'], ['*'], ['*'], ['*']], {})
|
|
|
|
# On the hour
|
|
>>> croniter.expand('0 0 * * *')
|
|
([[0], [0], ['*'], ['*'], ['*']], {})
|
|
|
|
# Hours 0-5 and 10 monday through friday
|
|
>>> croniter.expand('0-5,10 * * * mon-fri')
|
|
([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})
|
|
|
|
Note that some special values such as nth day of week are expanded to a
|
|
special mapping format for later processing:
|
|
|
|
# Every minute on the 3rd tuesday of the month
|
|
>>> croniter.expand('* * * * 2#3')
|
|
([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})
|
|
|
|
# Every hour on the last day of the month
|
|
>>> croniter.expand('0 * l * *')
|
|
([[0], ['*'], ['l'], ['*'], ['*']], {})
|
|
|
|
# On the hour every 15 seconds
|
|
>>> croniter.expand('0 0 * * * */15')
|
|
([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
|
|
"""
|
|
try:
|
|
expanded, nth_weekday_of_month, _expressions, _nearest_weekday = cls._expand(
|
|
expr_format,
|
|
hash_id=hash_id,
|
|
second_at_beginning=second_at_beginning,
|
|
from_timestamp=from_timestamp,
|
|
strict=strict,
|
|
strict_year=strict_year,
|
|
)
|
|
return expanded, nth_weekday_of_month
|
|
except (ValueError,) as exc:
|
|
if isinstance(exc, CroniterError):
|
|
raise
|
|
trace = _traceback.format_exc()
|
|
raise CroniterBadCronError(trace)
|
|
|
|
@classmethod
|
|
def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
|
|
dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
|
|
if field_index == MINUTE_FIELD:
|
|
return dt.minute % step
|
|
if field_index == HOUR_FIELD:
|
|
return dt.hour % step
|
|
if field_index == DAY_FIELD:
|
|
return ((dt.day - 1) % step) + 1
|
|
if field_index == MONTH_FIELD:
|
|
return dt.month % step
|
|
if field_index == DOW_FIELD:
|
|
return (dt.weekday() + 1) % step
|
|
|
|
raise ValueError("Can't get current date number for index larger than 4")
|
|
|
|
@classmethod
|
|
def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, strict=False, strict_year=None):
|
|
if hash_id:
|
|
if not isinstance(hash_id, (bytes, str)):
|
|
raise TypeError("hash_id must be bytes or UTF-8 string")
|
|
if not isinstance(hash_id, bytes):
|
|
hash_id = hash_id.encode(encoding)
|
|
try:
|
|
cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning, strict=strict, strict_year=strict_year)
|
|
except CroniterError:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def match(
|
|
cls,
|
|
cron_expression,
|
|
testdate,
|
|
day_or=True,
|
|
second_at_beginning=False,
|
|
precision_in_seconds=None,
|
|
):
|
|
return cls.match_range(
|
|
cron_expression, testdate, testdate, day_or, second_at_beginning, precision_in_seconds
|
|
)
|
|
|
|
@classmethod
|
|
def match_range(
|
|
cls,
|
|
cron_expression,
|
|
from_datetime,
|
|
to_datetime,
|
|
day_or=True,
|
|
second_at_beginning=False,
|
|
precision_in_seconds=None,
|
|
):
|
|
cron = cls(
|
|
cron_expression,
|
|
to_datetime,
|
|
ret_type=datetime.datetime,
|
|
day_or=day_or,
|
|
second_at_beginning=second_at_beginning,
|
|
)
|
|
tdp = cron.get_current(datetime.datetime)
|
|
if not tdp.microsecond:
|
|
tdp += relativedelta(microseconds=1)
|
|
cron.set_current(tdp, force=True)
|
|
try:
|
|
tdt = cron.get_prev()
|
|
except CroniterBadDateError:
|
|
return False
|
|
if precision_in_seconds is None:
|
|
precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
|
|
duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
|
|
return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second
|
|
|
|
|
|
def croniter_range(
|
|
start,
|
|
stop,
|
|
expr_format,
|
|
ret_type=None,
|
|
day_or=True,
|
|
exclude_ends=False,
|
|
_croniter=None,
|
|
second_at_beginning=False,
|
|
expand_from_start_time=False,
|
|
):
|
|
"""
|
|
Generator that provides all times from start to stop matching the given cron expression.
|
|
If the cron expression matches either 'start' and/or 'stop', those times will be returned as
|
|
well unless 'exclude_ends=True' is passed.
|
|
|
|
You can think of this function as sibling to the builtin range function for datetime objects.
|
|
Like range(start,stop,step), except that here 'step' is a cron expression.
|
|
"""
|
|
_croniter = _croniter or croniter
|
|
auto_rt = datetime.datetime
|
|
# type is used in first if branch for perfs reasons
|
|
if type(start) is not type(stop) and not (
|
|
isinstance(start, type(stop)) or isinstance(stop, type(start))
|
|
):
|
|
raise CroniterBadTypeRangeError(
|
|
f"The start and stop must be same type. {type(start)} != {type(stop)}"
|
|
)
|
|
if isinstance(start, (float, int)):
|
|
start, stop = (
|
|
datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)
|
|
)
|
|
auto_rt = float
|
|
if ret_type is None:
|
|
ret_type = auto_rt
|
|
if not exclude_ends:
|
|
ms1 = relativedelta(microseconds=1)
|
|
if start < stop: # Forward (normal) time order
|
|
start -= ms1
|
|
stop += ms1
|
|
else: # Reverse time order
|
|
start += ms1
|
|
stop -= ms1
|
|
year_span = math.floor(abs(stop.year - start.year)) + 1
|
|
ic = _croniter(
|
|
expr_format,
|
|
start,
|
|
ret_type=datetime.datetime,
|
|
day_or=day_or,
|
|
max_years_between_matches=year_span,
|
|
second_at_beginning=second_at_beginning,
|
|
expand_from_start_time=expand_from_start_time,
|
|
)
|
|
# define a continue (cont) condition function and step function for the main while loop
|
|
if start < stop: # Forward
|
|
|
|
def cont(v):
|
|
return v < stop
|
|
|
|
step = ic.get_next
|
|
else: # Reverse
|
|
|
|
def cont(v):
|
|
return v > stop
|
|
|
|
step = ic.get_prev
|
|
try:
|
|
dt = step()
|
|
while cont(dt):
|
|
if ret_type is float:
|
|
yield ic.get_current(float)
|
|
else:
|
|
yield dt
|
|
dt = step()
|
|
except CroniterBadDateError:
|
|
# Stop iteration when this exception is raised; no match found within the given year range
|
|
return
|
|
|
|
|
|
class HashExpander:
|
|
def __init__(self, cronit):
|
|
self.cron = cronit
|
|
|
|
def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
|
|
"""Return a hashed/random integer given range/hash information"""
|
|
if range_end is None:
|
|
range_end = self.cron.RANGES[idx][1]
|
|
if range_begin is None:
|
|
range_begin = self.cron.RANGES[idx][0]
|
|
if hash_type == "r":
|
|
crc = random.randint(0, 0xFFFFFFFF)
|
|
else:
|
|
crc = binascii.crc32(hash_id) & 0xFFFFFFFF
|
|
return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin
|
|
|
|
def match(self, efl, idx, expr, hash_id=None, **kw):
|
|
return hash_expression_re.match(expr)
|
|
|
|
def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
|
|
"""Expand a hashed/random expression to its normal representation"""
|
|
if match == "":
|
|
match = self.match(efl, idx, expr, hash_id, **kw)
|
|
if not match:
|
|
return expr
|
|
m = match.groupdict()
|
|
|
|
if m["hash_type"] == "h" and hash_id is None:
|
|
raise CroniterBadCronError("Hashed definitions must include hash_id")
|
|
|
|
if m["range_begin"] and m["range_end"]:
|
|
if int(m["range_begin"]) >= int(m["range_end"]):
|
|
raise CroniterBadCronError("Range end must be greater than range begin")
|
|
|
|
if m["range_begin"] and m["range_end"] and m["divisor"]:
|
|
# Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
|
|
if int(m["divisor"]) == 0:
|
|
raise CroniterBadCronError(f"Bad expression: {expr}")
|
|
|
|
x = self.do(
|
|
idx,
|
|
hash_type=m["hash_type"],
|
|
hash_id=hash_id,
|
|
range_begin=int(m["range_begin"]),
|
|
range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
|
|
)
|
|
return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}"
|
|
if m["range_begin"] and m["range_end"]:
|
|
# Example: H(0-29) -> 12
|
|
return str(
|
|
self.do(
|
|
idx,
|
|
hash_type=m["hash_type"],
|
|
hash_id=hash_id,
|
|
range_end=int(m["range_end"]),
|
|
range_begin=int(m["range_begin"]),
|
|
)
|
|
)
|
|
if m["divisor"]:
|
|
# Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
|
|
if int(m["divisor"]) == 0:
|
|
raise CroniterBadCronError(f"Bad expression: {expr}")
|
|
|
|
x = self.do(
|
|
idx,
|
|
hash_type=m["hash_type"],
|
|
hash_id=hash_id,
|
|
range_begin=self.cron.RANGES[idx][0],
|
|
range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
|
|
)
|
|
return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}"
|
|
|
|
# Example: H -> 32
|
|
return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id))
|
|
|
|
|
|
EXPANDERS = {"hash": HashExpander}
|