From 3e852c0b9f7ebc9688c75ec768f25b18ddabacc6 Mon Sep 17 00:00:00 2001 From: Charles-Henri Decultot Date: Tue, 25 Mar 2025 10:59:32 +0100 Subject: [PATCH 1/3] feat: option to display subscriptions in the booking calendar --- .../doctype/item_booking/item_booking.py | 1984 +++++++++++++++++ .../venue_settings/venue_settings.json | 241 ++ .../doctype/venue_settings/venue_settings.py | 434 ++++ 3 files changed, 2659 insertions(+) create mode 100644 bookings/bookings/doctype/item_booking/item_booking.py create mode 100644 bookings/bookings/doctype/venue_settings/venue_settings.json create mode 100644 bookings/bookings/doctype/venue_settings/venue_settings.py diff --git a/bookings/bookings/doctype/item_booking/item_booking.py b/bookings/bookings/doctype/item_booking/item_booking.py new file mode 100644 index 00000000..9d0124fb --- /dev/null +++ b/bookings/bookings/doctype/item_booking/item_booking.py @@ -0,0 +1,1984 @@ +# Copyright (c) 2025, Dokos SAS and contributors +# For license information, please see license.txt + +import calendar +import datetime +import json +from collections.abc import Iterable +from datetime import timedelta +from functools import cached_property +from typing import TYPE_CHECKING, TypedDict + +import frappe +from erpnext.accounts.party import get_party_account_currency +from erpnext.controllers.website_list_for_contact import get_customers_suppliers +from erpnext.setup.utils import get_exchange_rate +from frappe import _ +from frappe.desk.calendar import process_recurring_events +from frappe.integrations.doctype.google_calendar.google_calendar import ( + format_date_according_to_google_calendar, + get_google_calendar_object, + get_timezone_naive_datetime, +) +from frappe.model.document import Document +from frappe.model.mapper import get_mapped_doc +from frappe.utils import ( + add_days, + cint, + date_diff, + flt, + get_datetime, + get_time, + getdate, + is_desk, + now, + now_datetime, + sbool, + time_diff_in_minutes, +) +from frappe.utils.deprecations import deprecated +from googleapiclient.errors import HttpError + +from bookings.bookings.doctype.booking_credit.booking_credit import get_booking_credit_types_for_item +from bookings.bookings.doctype.item_booking_calendar.item_booking_calendar import ( + ItemBookingExceptionEngine, +) +from bookings.bookings.utils.customer import get_linked_customers + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import TypeVar + + from erpnext.selling.doctype.quotation.quotation import Quotation + + class schedule_line_t(TypedDict): + start_time: str + end_time: str + day: str + whole: bool | None # Ignore the duration of the UOM and fill the full schedule line with a single slot. + + class item_calendar_t(TypedDict): + type: str + calendar: list[schedule_line_t] + name: str | None + minimum_duration_in_seconds: int | None # Minimal duration of a slot in seconds. + + class prepared_schedule_line_t(TypedDict): + start: datetime.datetime + end: datetime.datetime + whole: bool | None + base_start: datetime.datetime + + T = TypeVar("T") + + +def util_split_list(items: "list[T]", condition: "Callable[[T], bool]") -> "tuple[list[T], list[T]]": + """ + Splits a list into two lists based on a condition. + Similar to: + ```py + left = [item for item in items if not condition(item)] + right = [item for item in items if condition(item)] + ``` + """ + items_no, items_yes = [], [] + for item in items: + (items_yes if condition(item) else items_no).append(item) + return items_no, items_yes + + +class BookingException(frappe.ValidationError): + @classmethod + def throw(cls, *args, **kwargs): + if is_desk(): + cls.throw_desk(*args, **kwargs) + else: + cls.throw_website(*args, **kwargs) + + @classmethod + def throw_website(cls, *args, **kwargs): + raise cls(*args, **kwargs) + + @classmethod + def throw_desk(cls, *args, **kwargs): + raise cls(*args, **kwargs) + + +class ExceptionBookingOverlap(BookingException): + @classmethod + def overlaps_to_html(cls, overlaps: list): + from frappe.utils import get_link_to_form + + conflicts_str = "

" + conflicts_str += "

" + _("Existing Bookings") + "

" + conflicts_str += "" + return conflicts_str + + @classmethod + def throw_website(cls, doc: "ItemBooking", overlaps: list): + frappe.throw(_("This slot is no longer bookable."), exc=cls) + + @classmethod + def throw_desk(cls, doc: "ItemBooking", overlaps: list): + msg = _( + "An existing item booking or subscription for this item is overlapping with this document. Please change its dates to save it, or change your settings in Venue Settings." + ) + + msg = msg + cls.overlaps_to_html(overlaps) + frappe.throw(msg, exc=cls) + + +class ExceptionTooManyBookings(ExceptionBookingOverlap): + @classmethod + def throw_website(cls, doc: "ItemBooking", overlaps: list): + frappe.throw(_("This slot is no longer bookable."), exc=cls) + + @classmethod + def throw_desk(cls, doc: "ItemBooking", overlaps: list): + frappe.throw( + _("The maximum number of simultaneous bookings allowed for this item has been reached.") + + cls.overlaps_to_html(overlaps), + exc=cls, + ) + +class ItemBooking(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + all_day: DF.Check + amended_from: DF.Link | None + booking_resource: DF.Link + color: DF.Color | None + deduct_booking_credits: DF.Check + ends_on: DF.Datetime + event: DF.Link | None + google_calendar: DF.Link | None + google_calendar_event_id: DF.SmallText | None + google_calendar_id: DF.Data | None + item: DF.Link | None + naming_series: DF.Literal["STO-BOOK-.YYYY.-"] + notes: DF.SmallText | None + parent_item_booking: DF.Link | None + party_name: DF.DynamicLink | None + party_type: DF.Link | None + repeat_this_event: DF.Check + repeat_till: DF.Date | None + rrule: DF.SmallText | None + starts_on: DF.Datetime + status: DF.Literal["In cart", "Not confirmed", "Confirmed", "Cancelled"] + sync_with_google_calendar: DF.Check + title: DF.Data | None + uom: DF.Link | None + user: DF.Link | None + # end: auto-generated types + + def before_insert(self): + if self.parent_item_booking: + self.google_calendar = self.google_calendar_id = None + + def validate(self): + self.validate_linked_item() + self.set_title() + + if self.sync_with_google_calendar and not self.google_calendar: + self.google_calendar = frappe.db.get_value("Item", self.item, "google_calendar") + + if self.google_calendar and not self.google_calendar_id: + self.google_calendar_id = frappe.db.get_value( + "Google Calendar", self.google_calendar, "google_calendar_id" + ) + + if isinstance(self.rrule, list) and self.rrule > 1: + self.rrule = self.rrule[0] + + if get_datetime(self.starts_on) > get_datetime(self.ends_on): + frappe.throw(_("Please make sure the end time is greater than the start time")) + + if not self.color: + self.color = frappe.db.get_value("Item", self.item, "calendar_color") + + if not (self.party_type and self.party_name) and self.user: + self.party_type, self.party_name = get_corresponding_party(self.user) + + self.check_overlaps() + + def validate_linked_item(self): + if not self.item: + return + + item_doc = frappe.get_value("Item", self.item, ["enable_item_booking"], as_dict=True) + if not item_doc: + return # Link validation will catch this + + from frappe.utils import get_link_to_form + + item_link = get_link_to_form("Item", self.item) + + if not item_doc["enable_item_booking"]: + msg = _("Booking is not enabled for this item.") + msg = _("{0}: {1}").format(item_link, msg) + if is_desk(): + frappe.msgprint(msg) + else: + frappe.throw(msg) + + def check_overlaps(self): + overlapping_bookings = self.get_overlapping_bookings() + overlapping_subscriptions = self.get_overlapping_subscriptions() + overlaps = overlapping_bookings + overlapping_subscriptions + + # Split overlaps into repeating and non-repeating in a single line of code using functools + non_repeating_overlaps, repeating_overlaps = util_split_list( + overlaps, lambda x: x.get("repeat_this_event", False) + ) + + overlaps = non_repeating_overlaps + self_start = get_datetime(self.starts_on) + self_end = get_datetime(self.ends_on) + for rep in repeating_overlaps: + # Transform the recurring event into a list of instances + # Note that the starting timestamp for the processing is the 00:00 of the day of the current booking (self) + # NOTE: Only events that start on the same day as the current booking are considered for overlap. + # TODO: Handle the case where the recurring event starts 1 day or more before the current booking. + # Example: A recurring event that starts at 23:00 on the day before the current booking. + + time_window = (self_start.date(), self.ends_on) + recurring = list(process_recurring_events(rep, *time_window, "starts_on", "ends_on", "rrule")) + + # If the recurring event does not have any instances in the time window, at least keep the original event. + if not recurring: + recurring = [rep] + + def filt(other: dict): + """Filter out instances of the recurring event that do not overlap with the current event.""" + other_start = get_datetime(other.get("starts_on")) + other_end = get_datetime(other.get("ends_on")) + return (other_start < self_end) and (other_end > self_start) + + recurring = [other for other in recurring if filt(other)] + if recurring: + overlaps.extend(recurring) + + # Process the overlaps, which is a list of existing documents that overlap with the current booking. + if not overlaps: + # It is always possible to book an item if there are no overlaps. + return + + # Get the number of simultaneous bookings allowed for this item. + simultaneous_bookings_allowed = 0 + if frappe.db.get_single_value("Venue Settings", "enable_simultaneous_booking"): + # If simultaneous bookings are enabled, get the number of simultaneous bookings allowed for this item. + simultaneous_bookings_allowed = cint( + frappe.db.get_value("Item", self.item, "simultaneous_bookings_allowed") # type: ignore + ) + + # Get if overlaps are disallowed for desk users + no_overlap_per_item = frappe.db.get_single_value("Venue Settings", "no_overlap_per_item") + + # At this point, there are overlaps, so we need to check if there are too many bookings. + + if no_overlap_per_item or not is_desk(): + if simultaneous_bookings_allowed <= 0: + # Overlaps are not allowed, and there is already a booking/subscription. + ExceptionBookingOverlap.throw(self, overlaps) + elif len(overlaps) >= simultaneous_bookings_allowed: + # There would be too many bookings if we allowed this one. + ExceptionTooManyBookings.throw(self, overlaps) + else: + if len(overlaps) >= simultaneous_bookings_allowed: + # NOTE: This always run for simultaneous_bookings_allowed=0 + frappe.publish_realtime("booking_overlap") + + def get_overlapping_bookings(self): + from pypika import Criterion + from pypika import functions as fn + + if not self.item: + return [] + + IB = frappe.qb.DocType("Item Booking") + # https://stackoverflow.com/questions/13390333/two-rectangles-intersection + item_name = self.item if isinstance(self.item, str) else self.item.name + query = ( + frappe.qb.select(IB.name, IB.repeat_this_event, IB.rrule, IB.starts_on, IB.ends_on) + .from_(IB) + .where( + Criterion.all( + [ + IB.name != self.name, + IB.item == item_name, + IB.status != "Cancelled", + ] + ) + ) + .where( + Criterion.any( + [ + (IB.starts_on < self.ends_on) & (IB.ends_on > self.starts_on), + # Check overlaps with recurring events. + # TODO: Check overlaps with other events when this booking (self) is recurring. + (IB.starts_on < self.ends_on) + & (IB.repeat_this_event == 1) + & (fn.Coalesce(IB.repeat_till, "9999-01-01") > self.starts_on), + ] + ) + ) + ) + return [{"doctype": "Item Booking", **booking} for booking in query.run(as_dict=True)] + + def get_overlapping_subscriptions(self): + from pypika import Criterion + from pypika import functions as fn + + if not self.item: + return [] + + Subscription = frappe.qb.DocType("Subscription") + SubscriptionPlanDetail = frappe.qb.DocType("Subscription Plan Detail") + start_field = fn.Coalesce(SubscriptionPlanDetail.from_date, Subscription.start, "0000-00-00") + end_field = fn.Coalesce(SubscriptionPlanDetail.to_date, Subscription.cancellation_date, "9999-01-01") + item_name = self.item if isinstance(self.item, str) else self.item.name + query = ( + frappe.qb.select(Subscription.name) + .from_(Subscription) + .join(SubscriptionPlanDetail) + .on( + Criterion.all( + [ + Subscription.name == SubscriptionPlanDetail.parent, + SubscriptionPlanDetail.parenttype == "Subscription", + ] + ) + ) + .where( + Criterion.all( + [ + start_field < self.ends_on, + end_field > self.starts_on, + SubscriptionPlanDetail.booked_item == item_name, + ] + ) + ) + .limit(1) + ) + return [{"doctype": "Subscription", "name": sub[0]} for sub in query.run()] + + def set_title(self): + if self.meta.get_field("title").hidden or not self.title: + self.title = self.booking_resource or "" + if self.user: + user_name = frappe.db.get_value("User", self.user, "full_name") + self.title += " - " + (user_name or self.user) + + elif self.party_name and self.party_type: + self.title += " - " + frappe.get_doc(self.party_type, self.party_name).get_title() or "" + + def set_status(self, status): + self.db_set("status", status, update_modified=True, notify=True) + + gcalendar_method = ( + delete_event_in_google_calendar if status == "Cancelled" else update_event_in_google_calendar + ) + gcalendar_method(self) + + for child in frappe.get_all( + "Item Booking", filters=dict(parent_item_booking=self.name), pluck="name" + ): + child = frappe.get_doc("Item Booking", child) + child.set_status(status) + gcalendar_method(child) + + def on_update(self): + self.synchronize_child_bookings() + + def synchronize_child_bookings(self): + def update_child(item, childname=None): + child = frappe.get_doc("Item Booking", childname) if childname else frappe.new_doc("Item Booking") + child.update( + { + key: value + for key, value in frappe.copy_doc(self).as_dict().items() + if (value is not None and not key.startswith("__")) + } + ) + child.item = item + child.parent_item_booking = self.name + child.save() + + if frappe.db.exists("Product Bundle", dict(new_item_code=self.item)): + doc = frappe.get_doc("Product Bundle", dict(new_item_code=self.item)) + for item in doc.items: + childnames = frappe.db.get_all( + "Item Booking", dict(item=item.item_code, parent_item_booking=self.name), pluck="name" + ) + for childname in childnames: + update_child(item.item_code, childname) + + if not childnames: + for _value in range(int(item.qty)): + update_child(item.item_code) + + elif frappe.db.exists("Item Booking", dict(parent_item_booking=self.name)): + for child in frappe.get_all( + "Item Booking", filters=dict(parent_item_booking=self.name), fields=["name", "item"] + ): + update_child(child.item, child.name) + + def credits_have_been_deducted(self): + return bool( + frappe.db.get_all( + "Booking Credit Usage", filters={"item_booking": self.name, "docstatus": 1}, limit=1 + ) + ) + + def get_deducted_credits(self): + return sum( + frappe.db.get_all( + "Booking Credit Usage", filters={"item_booking": self.name, "docstatus": 1}, pluck="quantity" + ) + ) + + def cancel_appointment(self, ignore_links=False): + if ignore_links: + self.flags.ignore_links = True + + # Check that the user is allowed to cancel the appointment + allow_event_cancellation = frappe.db.get_single_value("Venue Settings", "allow_event_cancellation") + if not allow_event_cancellation: + return frappe.throw("Item Booking cancellation is not allowed") + + if self.status == "Cancelled": + return frappe.throw("Item Booking already cancelled") + + # Cancellation delay is in seconds + cancellation_delay = cint(frappe.db.get_single_value("Venue Settings", "cancellation_delay")) + if time_diff_in_minutes(self.starts_on, now_datetime()) < (cancellation_delay / 60): + formatted_duration = frappe.format(cancellation_delay, "Duration") + return frappe.throw( + _("Cancellation is only possible up to {0} before the start of the booking").format( + formatted_duration + ) + ) + + self.set_status("Cancelled") + + @frappe.whitelist() + def end_now(self): + new_end = now_datetime() + old_start = get_datetime(self.starts_on) + old_end = get_datetime(self.ends_on) + assert old_start and old_end # for typechecking + + if old_start <= new_end <= old_end: + self.ends_on = new_end + return + else: + frappe.throw( + _("Date must be between {0} and {1}").format( + self.get_formatted("starts_on"), + self.get_formatted("ends_on"), + ) + ) + + def has_website_permission(self, ptype, user, verbose=False): + if ptype == "read": + # Read-only, user has to use the website to cancel the bookings. + return has_booking_permission(self, ptype=ptype, user=user, raise_exception=False) + return False + + +def get_list_context(context=None): + allow_event_cancellation = frappe.db.get_single_value("Venue Settings", "allow_event_cancellation") + + cancellation_delay = ( + cint(frappe.db.get_single_value("Venue Settings", "cancellation_delay")) / 60 + if allow_event_cancellation + else 0 + ) + context.update( + { + "show_sidebar": True, + "show_search": True, + "no_breadcrumbs": True, + "title": _("Bookings"), + "get_list": get_bookings_list, + "row_template": "templates/includes/item_booking/item_booking_row.html", + "can_cancel": allow_event_cancellation, + "cancellation_delay": cancellation_delay, + "header_action": frappe.render_template( + "templates/includes/item_booking/item_booking_list_action.html", + {"can_cancel": allow_event_cancellation, "cancellation_delay": cancellation_delay}, + ), + "list_footer": frappe.render_template( + "templates/includes/item_booking/item_booking_list_footer.html", {} + ), + } + ) + + +def get_bookings_list(doctype, txt, filters, limit_start, limit_page_length=20, order_by=None): + from frappe.www.list import get_list + + user = frappe.session.user + + if user == "Guest": + return [] + + customers = set() + or_filters = [] + + contact = frappe.db.get_value("Contact", {"user": user}, "name") + if contact: + contact_doc = frappe.get_doc("Contact", contact) + if customer := contact_doc.get_link_for("Customer"): + customers.add(customer) + + all_customers, _ = get_customers_suppliers("Customer", user) + customers.update(all_customers or []) + + or_filters.append(["user", "=", user]) + if customers: + or_filters.append(["party_name", "in", customers]) + + return get_list( + doctype, + txt, + filters, + limit_start, + limit_page_length, + ignore_permissions=True, + or_filters=or_filters, + order_by="starts_on desc", + ) + + +@frappe.whitelist(allow_guest=True) +def get_bookings_list_for_map(start, end): + bookings_list = _get_events(getdate(start), getdate(end), item=None, user=frappe.session.user) + + def get_title(x): + return x.get("title", x.get("booking_resource", x.name)) + + def is_all_day(x): + return x.get("all_day", x.get("allDay", False)) + + def get_color(x): + if x.ends_on < frappe.utils.now_datetime(): + return "darkgrey" + if x.status == "Cancelled": + return "#ff4d4d" + elif x.status == "Confirmed": + return "#6195ff" + elif x.status == "In cart": + return "#b67890" + return "#ff7846" + + return [ + dict( + start=x.starts_on, + end=x.ends_on, + title=get_title(x), + status=x.status, + id=x.name, + allDay=is_all_day(x), + item_name=x.get("booking_resource"), + backgroundColor=get_color(x), + borderColor="darkgrey", + ) + for x in bookings_list + ] + + +@frappe.whitelist() +def update_linked_transaction(transaction_type, line_item, item_booking): + has_booking_permission(item_booking, raise_exception=True) + return frappe.db.set_value(f"{transaction_type} Item", line_item, "item_booking", item_booking) + + +@frappe.whitelist() +def get_transactions_items(transaction_type, transactions): + frappe.has_permission(transaction_type, "read", throw=True) + transactions = frappe.parse_json(transactions) + output = [] + for transaction in transactions: + doc = frappe.get_doc(transaction_type, transaction) + output.extend(doc.items) + + return output + + +def has_booking_permission(doc: ItemBooking | str, ptype="write", user="", raise_exception=False): + from frappe.permissions import has_permission + + if raise_exception: + if not has_booking_permission(doc, ptype, user, raise_exception=False): + frappe.throw("Not allowed", frappe.PermissionError) + + user = user or frappe.session.user + if isinstance(doc, str): + doc: ItemBooking = frappe.get_doc("Item Booking", doc) # type: ignore + + if has_permission(doc.doctype, ptype, doc=doc, print_logs=False, user=user): + return True + + if doc.user == user: + return True + + customers, _ = get_customers_suppliers("Customer", user) + if customers: + for c in customers: + if doc.party_name == c: + return True + + return False + + +@frappe.whitelist() +def cancel_appointment(id, force=False, render_row=False): + booking: ItemBooking = frappe.get_doc("Item Booking", id) # type: ignore + has_booking_permission(booking, raise_exception=True) + booking.flags.ignore_permissions = True + booking.cancel_appointment(ignore_links=force) + booking.save() + if render_row: + return do_render_row(booking) + + +@frappe.whitelist() +def end_booking_now(id, render_row=False): + booking: ItemBooking = frappe.get_doc("Item Booking", id) # type: ignore + has_booking_permission(booking, raise_exception=True) + booking.flags.ignore_permissions = True + booking.end_now() + booking.save() + if render_row: + return do_render_row(booking) + + +def do_render_row(doc: ItemBooking): + ctx = {"doc": doc} + get_list_context(ctx) + return frappe.render_template(ctx["row_template"], ctx) + + +@frappe.whitelist(allow_guest=True) +def get_item_uoms(item_code): + return { + "uoms": frappe.get_all( + "UOM Conversion Detail", + filters={"parent": item_code}, + fields=["distinct uom"], + order_by="idx desc", + as_list=1, + ), + "sales_uom": frappe.get_cached_value("Item", item_code, "sales_uom"), + } + + +@frappe.whitelist() +@deprecated +def book_new_slot(**kwargs): + frappe.log_error( + "book_new_slot is deprecated: use webshop.webshop.shopping_cart.booking.book_new_slot instead" + ) + try: + doc = frappe.get_doc( + { + "doctype": "Item Booking", + "item": kwargs.get("item"), + "starts_on": kwargs.get("start"), + "ends_on": kwargs.get("end"), + "user": kwargs.get("user"), + "status": kwargs.get("status") or "In cart", + "event": kwargs.get("event"), + "all_day": kwargs.get("all_day") or 0, + "uom": kwargs.get("uom"), + "sync_with_google_calendar": kwargs.get("sync_with_google_calendar") + or frappe.db.get_single_value("Venue Settings", "sync_with_google_calendar"), + "deduct_booking_credits": sbool(kwargs.get("with_credits")), + } + ).insert(ignore_permissions=True) + + return doc + except Exception: + if frappe.db.get_value("User", frappe.session.user, "user_type") != "System User": + frappe.log_error(_("New item booking error")) + + +@frappe.whitelist() +def book_new_slot_from_event(**kwargs): + frappe.only_for("Desk User") + doc = frappe.new_doc("Item Booking") + doc.update( + { + "item": kwargs.get("item"), + "starts_on": kwargs.get("start"), + "ends_on": kwargs.get("end"), + "user": kwargs.get("user"), + "status": kwargs.get("status") or "In cart", + "event": kwargs.get("event"), + "all_day": kwargs.get("all_day") or 0, + "uom": kwargs.get("uom"), + "sync_with_google_calendar": kwargs.get("sync_with_google_calendar") + or frappe.db.get_single_value("Venue Settings", "sync_with_google_calendar"), + "deduct_booking_credits": sbool(kwargs.get("with_credits")), + } + ) + doc.insert(ignore_permissions=True) + return doc + + +@frappe.whitelist() +def remove_booked_slot(name): + has_booking_permission(name, raise_exception=True) + try: + for dt in ["Quotation", "Sales Order"]: + linked_docs = frappe.get_all( + f"{dt} Item", filters={"item_booking": name, "parenttype": dt}, fields=["name", "parent"] + ) + for d in linked_docs: + doc = frappe.get_doc(dt, d.get("parent")) + if len(doc.items) > 1: + doc.items = [i for i in doc.items if i.item_booking != name] + doc.flags.ignore_permissions = True + doc.save() + else: + frappe.delete_doc(dt, doc.name, ignore_permissions=True, force=True) + + return frappe.delete_doc("Item Booking", name, ignore_permissions=True, force=True) + except frappe.TimestampMismatchError: + frappe.get_doc("Item Booking", name).reload() + remove_booked_slot(name) + + +# TODO: refactor with a class and add an option to get monthly availabilities +@frappe.whitelist(allow_guest=True) +def get_availabilities(item: str, start, end, uom: str | None = None, user: str | None = None, limit=0): + return ItemBookingAvailabilities( + item=item, start=start, end=end, uom=uom, user=user, limit=limit + ).get_available_slots() + + +def _parse_date_or_datetime(s: str): + from frappe.utils.data import convert_utc_to_system_timezone + + if isinstance(s, str): + if s.endswith("Z"): + d = get_datetime(s.rstrip("Z")) + d = convert_utc_to_system_timezone(d) + return d.replace(tzinfo=None) + + if len(s) <= 10: + return datetime.datetime.strptime(s, "%Y-%m-%d") + + return get_datetime(s) + + +class ItemBookingAvailabilities: + def __init__( + self, + *, + item: str, + start: str | datetime.datetime | None, + end: str | datetime.datetime | None, + uom: str | None = None, + user: str | None = None, + limit: int = 0, # does NOT work well with exceptions + **kwargs, + ): + self.item = item + self.start = start + self.end = end + self.limit = int(limit) + self.init = _parse_date_or_datetime(self.start) + self.finish = _parse_date_or_datetime(self.end) + self.user = user or frappe.session.user + + if user == "*" and not frappe.has_permission("Item Booking", "read"): + user = frappe.session.user + + self.item_doc = frappe.db.get_value( + "Item", + self.item, + ["name", "sales_uom", "enable_item_booking", "simultaneous_bookings_allowed"], + as_dict=True, + ) + + self.uom = uom or self.item_doc.sales_uom or self.item_doc.stock_uom + self.duration = get_uom_in_minutes(self.uom) + + if self.item_doc.enable_item_booking and self.duration == 0: + if not self.uom: + frappe.throw(_("UOM is not set for Item {0}").format(self.item)) + frappe.throw(_("UOM {0} is not supported").format(self.uom)) + + def get_available_slots(self): + if not self.item_doc.enable_item_booking or not self.duration: + return [] + if not self.init or not self.finish: + return [] + + output = [] + + seen: set[str] = set() + for dt in daterange_including_start(self.init, self.finish): + # For each day, get the available slots + for slot in self._check_availability(dt): + slot_id = slot.get("id") + if (not slot_id) or (slot_id not in seen): + # Deduplicate events that span multiple days + seen.add(slot_id) + output.append(slot) + if self.limit and len(output) >= self.limit: + break + if self.limit and len(output) >= self.limit: + break + + # filter out exclusions + if cal_name := self.item_calendar.get("name"): + eng = ItemBookingExceptionEngine(cal_name) + + for op in eng.get_operations(): + if op.dt_start > self.finish or op.dt_end < self.init: + continue + if op.type == "+": + sch: prepared_schedule_line_t = { + "start": op.dt_start, + "base_start": op.dt_start, + "end": op.dt_end, + "whole": False, + } + slots = self._get_availability_from_schedule([sch]) + slots = eng.filter_keep_in_range(self.init, self.finish, slots) + output.extend(slots) + elif op.type == "-": + output = eng.filter_from_op(op, output) + + if self.limit: + output = output[: self.limit] + return output + + @cached_property + def item_calendar(self): + return get_item_calendar(self.item, self.uom) + + @cached_property + def minimum_duration_in_seconds(self): + return frappe.cint(self.item_calendar.get("minimum_duration_in_seconds") or 1) + + def _check_availability(self, date): + date = getdate(date) + if not date: + return [] + + schedules = self.generate_schedules_for_date(date) + yield from self._get_availability_from_schedule(schedules) + + def generate_schedules_for_date(self, date: datetime.date): + day = calendar.day_name[date.weekday()] + + dt_now = now_datetime() + if cal := self.item_calendar["calendar"]: + schedule_for_the_day = filter(lambda x: x.get("day") == day, cal) + for line in schedule_for_the_day: + line_start, line_end = self.get_schedule_line_as_datetime_tuple(date, line) + + if dt_now >= line_end: + continue # The line already ended, no slot can be booked + + start = line_start + if dt_now > line_start: + # The line already started, some slots need to be skipped + new_start = self._round_datetime_in_slot(dt_now, line_start) + if new_start < line_end: + start = new_start + else: + start = line_end + + whole = line.get("whole", None) + sch: "prepared_schedule_line_t" = { + "start": start, + "end": line_end, + "whole": whole, + "base_start": line_start, + } + yield sch + + def get_schedule_line_as_datetime_tuple(self, date: datetime.date, line): + day_start = datetime.datetime.combine(date, get_time(line.start_time)) + if line.end_time <= line.start_time: + # When the end time is before the start time, it means that the schedule ends the next day. + date = add_days(date, 1) # type: ignore + day_end = datetime.datetime.combine(date, get_time(line.end_time)) + return (day_start, day_end) + + def _round_datetime_in_slot( + self, + dt: datetime.datetime, + slot_start: datetime.datetime, + interval_in_minutes: int | None = None, + ): + from math import ceil, floor + + assert ( + dt >= slot_start + ), "_round_datetime_in_slot: Datetime to round should be after the beginning of the slot." + interval_in_minutes = interval_in_minutes or int( + datetime.timedelta(minutes=cint(self.duration)).total_seconds() / 60 + ) + if not interval_in_minutes: + return dt + + offset = dt - slot_start # How far into the slot is the current time? + offset_in_minutes = offset.total_seconds() / 60 + + if self.minimum_duration_in_seconds: + rounded_offset_in_minutes = interval_in_minutes * floor(offset_in_minutes / interval_in_minutes) + new_dt = slot_start + datetime.timedelta(minutes=rounded_offset_in_minutes) + if (dt - new_dt).total_seconds() > self.minimum_duration_in_seconds: + return new_dt # There is enough time in this slot + + # Round the offset to the nearest interval + rounded_offset_in_minutes = interval_in_minutes * ceil(offset_in_minutes / interval_in_minutes) + new_dt = slot_start + datetime.timedelta(minutes=rounded_offset_in_minutes) + return new_dt + + def _get_availability_from_schedule(self, schedules: "Iterable[prepared_schedule_line_t]"): + for line in schedules: + # We try to get all the events between line.base_start (start of day) and line.end (end of day). + # The base_start is the specified start date in the booking calendar, regardless of the current time. + booked_items = _get_events(line.get("base_start"), line.get("end"), item=self.item_doc) + scheduled_items = [] + for event in booked_items: + e_start = get_datetime(event.get("starts_on")) + e_end = get_datetime(event.get("ends_on")) + # Only keep booked events that overlap the schedule slot + if e_start and e_end and (e_start < line.get("end")) and (e_end > line.get("base_start")): + scheduled_items.append(event) + + yield from self._find_available_slot(line, scheduled_items) + + def _has_whole_slot_overlap( + self, + line_start: datetime.datetime, + line_end: datetime.datetime, + current_schedule: list[tuple[datetime.datetime, datetime.datetime]], + ): + simultaneous_bookings_allowed = self.item_doc.get("simultaneous_bookings_allowed") or 1 + n_overlaps = 1 # Offset by 1 because we take into account the future slot that can be booked + for start, end in current_schedule: + if line_start <= end and start <= line_end: + # For each overlapping item + n_overlaps += 1 + if n_overlaps >= simultaneous_bookings_allowed: + return True + return False + + @cached_property + def simultaneous_booking_allowed(self): + return frappe.db.get_single_value("Venue Settings", "enable_simultaneous_booking") + + def _find_available_slot(self, line: "prepared_schedule_line_t", scheduled_items): + slots = [] + output = [] + + if self.user == "*": + user_scheduled_items = scheduled_items + else: + user_scheduled_items = (x for x in scheduled_items if x.get("user") == self.user) + + simultaneous_booking_allowed = self.simultaneous_booking_allowed # TODO: Remove global setting + if simultaneous_booking_allowed: + scheduled_items = self.check_simultaneaous_bookings(scheduled_items) + + slots.extend( + self._get_all_slots( + line, + simultaneous_booking_allowed, + scheduled_items, + ) + ) + + if not slots and not scheduled_items: + slots.extend(self._get_all_slots(line, simultaneous_booking_allowed)) + + for slot in slots: + out = self.get_available_dict(slot) + try: + count = get_booking_count(self.item, out.get("start"), out.get("end")) + out.remaining = count.get("remaining") + except Exception: + pass + output.append(out) + + for scheduled_item in user_scheduled_items: + added = False + + if scheduled_item.get("status") == "In cart": + status = "selected" + elif scheduled_item.get("status") == "Confirmed": + status = "confirmed" + else: + status = "blocked" + + for out in output: + if ( + out.get("start") == scheduled_item.get("starts_on").isoformat() + and out.get("end") == scheduled_item.get("ends_on").isoformat() + ): + out.id = scheduled_item.get("name") + out.status = status + out.number += 1 + added = True + + # if getdate(out.get("start")) == getdate(scheduled_item.get("starts_on")) or getdate(out.get("end")) == getdate(scheduled_item.get("ends_on")): + # out.color = "red" + + if not added: + out = self.get_available_dict(scheduled_item, status) + out.allDay = scheduled_item.get("all_day") + # out.color = "green" + out.number += 1 + output.append(out) + + return output + + def check_simultaneaous_bookings(self, scheduled_items): + import itertools + from operator import itemgetter + + simultaneous_bookings = self.item_doc.get("simultaneous_bookings_allowed") or 1 + sorted_schedule = sorted(scheduled_items, key=itemgetter("starts_on")) # always sort + for _key, group in itertools.groupby(sorted_schedule, key=lambda x: x["starts_on"]): + grouped_sch = [x.get("name") for x in list(group)] + if len(grouped_sch) == simultaneous_bookings: + scheduled_items = [x for x in scheduled_items if x.get("name") not in grouped_sch[:-1]] + elif len(grouped_sch) < simultaneous_bookings: + scheduled_items = [x for x in scheduled_items if x.get("name") not in grouped_sch] + + return scheduled_items + + def _get_all_slots( + self, line: "prepared_schedule_line_t", simultaneous_booking_allowed, scheduled_items=None + ): + line_start = line.get("start") + line_end = line.get("end") + + assert isinstance(line_start, datetime.datetime) + assert isinstance(line_end, datetime.datetime) + + # Compute existing items + if not scheduled_items: + scheduled_items = [] + + current_schedule = [] + for scheduled_item in scheduled_items: + sch_start = get_datetime(scheduled_item.get("starts_on")) + sch_end = get_datetime(scheduled_item.get("ends_on")) + if not sch_start or not sch_end: + continue # For some reason this fails + if sch_start < line_start: + # Ok, but the scheduled item begins before the current slot, trim it. + current_schedule.append((line_start, sch_end)) + elif sch_start < line_end: + # Ok, the scheduled item ends before the end of the slot, keep it. + current_schedule.append((sch_start, sch_end)) + + if line.get("whole"): + # Too small to fit a slot + if (line_end - line_start).total_seconds() < self.minimum_duration_in_seconds: + return [] + + if self._has_whole_slot_overlap(line_start, line_end, current_schedule): + return [] + + return [{"starts_on": line_start, "ends_on": line_end}] + + interval = datetime.timedelta(minutes=cint(self.duration)) + + slots = sorted([(line_start, line_start), (line_end, line_end)]) + + if simultaneous_booking_allowed: + vanilla_start_times = [] + for start, end in ((slots[i][0], slots[i + 1][0]) for i in range(len(slots) - 1)): + while start + interval <= end: + vanilla_start_times.append(start) + start += interval + + if current_schedule: + sorted_schedule = list(reduced(sorted(current_schedule, key=lambda x: x[0]))) + slots = sorted([(line_start, line_start), *sorted_schedule, (line_end, line_end)]) + + free_slots = [] + for start, end in ((slots[i][1], slots[i + 1][0]) for i in range(len(slots) - 1)): + while start + interval <= end: + if simultaneous_booking_allowed: + if start not in vanilla_start_times: + vanilla_start = [x for x in vanilla_start_times if start + interval <= x] + if vanilla_start: + start = vanilla_start[0] + free_slots.append({"starts_on": start, "ends_on": start + interval}) + start += interval + + return free_slots + + def get_available_dict(self, slot, status=None): + """ + Status can be: + - available + - selected + """ + return frappe._dict( + { + "start": slot.get("starts_on").isoformat(), + "end": slot.get("ends_on").isoformat(), + "id": slot.get("name") or frappe.generate_hash(length=8), + "status": status or "available", + "number": 0, + "total_available": self.item_doc.get("simultaneous_bookings_allowed"), + # "display": "background", + # "color": None, + # "allDay": 1, + } + ) + + +@frappe.whitelist() +def get_events_for_calendar(doctype, start, end, field_map, filters=None, fields=None): + assert doctype in ( + None, + "", + "Item Booking", + ), "get_events_for_calendar: expected the doctype to be Item Booking" + # Note: we ignore the doctype because we return Item Booking and Subscription objects + if isinstance(field_map, str): + field_map: dict = frappe.parse_json(field_map) + + if fields and isinstance(fields, str): + fields: list = frappe.parse_json(fields) + + fields = fields or [] # default value + + for f in field_map.values(): + dt = doctype + doc_meta = frappe.get_meta(dt) + if doc_meta.has_field(f): + fields.append(f) + + if filters and isinstance(filters, str): + filters: dict | list = frappe.parse_json(filters) + if isinstance(filters, list): + # Normalize the filters to [table, field, operator, value] + for i, f in enumerate(filters): + if len(f) >= 4: + f = [f[0], f[1], f[2], f[3]] + elif len(f) == 3: + f = [doctype, f[0], f[1], f[2]] + filters[i] = f + + events: list = _get_events(start, end, item=None, user=None, filters=filters, fields=fields) + return events + + +def _get_events( + start, end, item=None, user=None, filters: list | dict | None = None, fields: list | None = None +): + from pypika import Criterion + from pypika import functions as fn + + if user == "Guest": + return [] + + assert (not fields) or isinstance( + fields, (list | tuple | set) + ), "`fields` parameters must be a list, tuple, set, or None" + filters = filters or [] + + IB = frappe.qb.DocType("Item Booking") + all_fields = list( + { + "starts_on", + "ends_on", + "all_day", + IB.item.as_("item_name"), + "name", + "repeat_this_event", + "rrule", + "user", + "status", + *(fields or []), + } + ) + + time_condition_1 = (IB.starts_on < end) & (IB.ends_on > start) + time_condition_2 = ( + (IB.starts_on < end) + & (IB.repeat_this_event == 1) + & (fn.Coalesce(IB.repeat_till, "3000-01-01") > start) + ) + + extra_conditions = [] + if item: + item_name = item if isinstance(item, str) else item.name + extra_conditions.append(IB.item == item_name) + if user: + customers, _ = get_customers_suppliers("Customer", user) + cond = IB.user == user + if customers: + cond |= IB.party_name.isin(customers) + extra_conditions.append(cond) + + query = ( + frappe.qb.get_query("Item Booking", filters=filters) + .select(*all_fields) + .where(IB.status != "Cancelled") + .where(time_condition_1 | time_condition_2) + .where(Criterion.all(extra_conditions)) + ) + events = query.run(as_dict=1) + + # Note: do not forward the fields/filters arguments to _get_subscriptions_as_events + subscriptions_as_events = _get_subscriptions_as_events( + start, end, item=item, user=user, fields=None, filters=None + ) + events += subscriptions_as_events + + result = [] + + for event in events: + if event.get("repeat_this_event") == 1: + result.extend(process_recurring_events(event, start, end, "starts_on", "ends_on", "rrule")) + else: + result.append(event) + + return result + + +def _get_subscriptions_as_events(start, end, item=None, user=None, fields=None, filters=None): + subscriptions = _get_booking_subscriptions_between( + start, end, item=item, user=user, fields=fields, filters=filters + ) + display_subscriptions = bool(frappe.db.get_single_value("Venue Settings", "display_subscriptions_in_calendar")) + events = [] + for sub in subscriptions: + qty = sub["qty"] + booked_item = sub["booked_item"] + customer = sub["customer"] + + title = booked_item + if qty > 1: + title = f"{qty} × {title}" + if customer: + title += " - " + customer + + title = frappe._("{0}: {1}").format( + frappe._("Subscription"), + title, + ) + + events.append( + frappe._dict( + { + **sub, + "starts_on": get_datetime(sub["start"]), + "ends_on": get_datetime(sub["end"]), + "item_name": booked_item, + "title": title, + "name": sub["name"], + "doctype": "Subscription", + # "repeat_this_event": 1, + # "rrule": "RRULE:FREQ=HOURLY", + # "user": sub["_customers"][0] if sub["_customers"] and len(sub["_customers"]) > 0, + "status": "Confirmed", + "all_day": 1, + "startEditable": False, + "durationEditable": False, + "eventDisplay": display_subscriptions + } + ) + ) + return events + + +def _get_booking_subscriptions_between( + after_date, + before_date, + item: str | None = None, + user: str | None = None, + fields: list | None = None, + filters: list | dict | None = None, +): + from pypika import Criterion, Field + from pypika import functions as fn + + Subscription = frappe.qb.DocType("Subscription") + SubscriptionPlanDetail = frappe.qb.DocType("Subscription Plan Detail") + + doc_meta = frappe.get_meta("Subscription") + all_fields = [] + if fields: + fields.extend(fieldname for fieldname in fields if doc_meta.has_field(fieldname)) + for d in doc_meta.fields: + if d.fieldtype == "Color": + all_fields.append(Subscription.field(d.fieldname).as_("color")) + break + + start_field = fn.Coalesce(SubscriptionPlanDetail.from_date, Subscription.start, "0000-00-00") + end_field = fn.Coalesce(SubscriptionPlanDetail.to_date, Subscription.cancellation_date, "9999-01-01") + + item_field: Field = SubscriptionPlanDetail.booked_item + + all_filters = [ + start_field < before_date, + end_field > after_date, + ] + if item: + item_name = item if isinstance(item, str) else item.name + all_filters.append(item_field == item_name) # Must book this exact item + else: + all_filters.append(item_field.isnotnull()) # Must be a booking subscription + + if user: + customers, _ = get_customers_suppliers("Customer", user) + if customers: + all_filters.append(Subscription.customer.isin(customers)) + else: + return [] + + all_fields.extend( + ( + Subscription.name.as_("name"), + SubscriptionPlanDetail.name.as_("plan_detail_name"), + SubscriptionPlanDetail.qty, + start_field.as_("start"), + end_field.as_("end"), + item_field.as_("booked_item"), + Subscription.customer.as_("customer"), + ) + ) + + query = ( + frappe.qb.get_query("Subscription", filters=(filters or [])) + .select(*all_fields) + .join(SubscriptionPlanDetail) + .on( + (Subscription.name == SubscriptionPlanDetail.parent) + & (SubscriptionPlanDetail.parenttype == "Subscription") + ) # NOTE: Plans are present in both Subscription and Subscription template + .where(Criterion.all(all_filters)) + ) + + subscriptions = query.run(as_dict=True) + + # Update subscriptions to strip the time component in the datetime + for s in subscriptions: + if abs(round(s["qty"]) - s["qty"]) > 1e-6: + raise ValueError("Non integer quantity of booked slots.") + s["start"] = getdate(s["start"]) + s["end"] = getdate(s["end"]) + s["qty"] = int(s["qty"]) + s["color"] = s.get("color", "#77bbff") + + return subscriptions + + +def get_item_calendar(item: str | None = None, uom: str | None = None) -> "item_calendar_t": + # Override the calendar with a custom one + for hook in frappe.get_hooks("get_item_booking_calendar"): + calendar = frappe.call(hook, item=item, uom=uom) + if not calendar: + continue + if isinstance(calendar, str): + calendar = frappe.get_doc("Item Booking Calendar", calendar) + if calendar: + lines = calendar.get("booking_calendar") + lines = list(map(lambda x: x.as_dict(), lines)) + return { + "type": str(calendar.get("type")), + "calendar": lines, # type: ignore + "name": str(calendar.name), + "minimum_duration_in_seconds": calendar.get("minimum_duration_in_seconds"), + } + + if item and not uom: + uom = frappe.get_cached_value("Item", item, "sales_uom") + + item = item or "" + uom = uom or "" + + for filters in [ + dict(item=item, uom=uom), + dict(item=item, uom=""), + dict(item="", uom=uom), + dict(item="", uom=""), + ]: + filtered_calendars = frappe.get_all( + "Item Booking Calendar", + fields=["name", "item", "uom"], + filters=filters, + limit=1, + ) + if filtered_calendars: + return { + "type": "Daily", + "calendar": frappe.get_all( + "Item Booking Calendars", + filters={"parent": filtered_calendars[0].name, "parenttype": "Item Booking Calendar"}, + fields=["start_time", "end_time", "day", "whole"], + ), + "name": filtered_calendars[0].name, + } + + return {"type": "Daily", "calendar": [], "name": None} + + +def get_uom_in_minutes(uom=None, minute_uom=None): + minute_uom = minute_uom or frappe.db.get_single_value("Venue Settings", "minute_uom") + if uom == minute_uom: + return 1 + + return frappe.db.get_value("UOM Conversion Factor", dict(from_uom=uom, to_uom=minute_uom), "value") or 0 + + +def get_sales_qty(item, start, end): + minute_uom = frappe.db.get_single_value("Venue Settings", "minute_uom") + sales_uom = frappe.get_cached_value("Item", item, "sales_uom") or frappe.get_cached_value( + "Item", item, "stock_uom" + ) + duration = time_diff_in_minutes(end, start) + + if sales_uom == minute_uom: + return duration + + conversion_factor = ( + frappe.db.get_value("UOM Conversion Factor", dict(from_uom=sales_uom, to_uom=minute_uom), "value") + or 1 + ) + + return flt(duration) / flt(conversion_factor) + + +def daterange(start_date, end_date): + if start_date < get_datetime(now()): + start_date = datetime.datetime.now().replace( + hour=0, minute=0, second=0, microsecond=0 + ) + datetime.timedelta(days=1) + for n in range(int((end_date - start_date).days)): + yield start_date + timedelta(n) + + +def daterange_including_start(start_date, end_date): + if start_date < get_datetime(now()): + start_date = datetime.datetime.now() + start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) + for n in range(int((end_date - start_date).days)): + yield start_date + timedelta(n) + + +def reduced(timeseries): + prev = datetime.datetime.min + for start, end in timeseries: + if end > prev: + prev = end + yield start, end + + +def delete_linked_item_bookings(doc, method): + for item in doc.items: + if item.item_booking: + frappe.delete_doc("Item Booking", item.item_booking, ignore_permissions=True, force=True) + + +def confirm_linked_item_bookings(doc, method): + confirm_after_payment = cint( + frappe.db.get_single_value("Venue Settings", "confirm_booking_after_payment") + ) + for item in doc.items: + if item.item_booking: + slot = frappe.get_doc("Item Booking", item.item_booking) + slot.flags.ignore_permissions = True + slot.set_status("Not confirmed" if confirm_after_payment else "Confirmed") + + +def clear_draft_bookings(): + drafts = frappe.get_all("Item Booking", filters={"status": "In cart"}, fields=["name", "modified"]) + if not drafts: + return + + clearing_duration = frappe.db.get_value("Venue Settings", None, "clear_item_booking_draft_duration") + + if cint(clearing_duration) <= 0: + return + + for draft in drafts: + if now_datetime() > draft.get("modified") + datetime.timedelta(minutes=cint(clearing_duration)): + remove_booked_slot(draft.get("name")) + + +@frappe.whitelist() +def make_quotation(source_name, target_doc=None): + def set_missing_values(source, target): + quotation: "Quotation" = frappe.get_doc(target) + quotation.order_type = "Maintenance" + company_currency = frappe.get_cached_value("Company", quotation.company, "default_currency") + + if quotation.quotation_to == "Customer" and quotation.party_name: + party_account_currency = get_party_account_currency( + "Customer", quotation.party_name, quotation.company + ) + else: + party_account_currency = company_currency + + quotation.currency = party_account_currency or company_currency + + if company_currency == quotation.currency: + exchange_rate = 1 + else: + exchange_rate = get_exchange_rate( + quotation.currency, company_currency, quotation.transaction_date, args="for_selling" + ) + + quotation.conversion_rate = exchange_rate + + # add item + quotation.append( + "items", + { + "item_code": source.item, + "qty": get_sales_qty(source.item, source.starts_on, source.ends_on), + "uom": frappe.get_cached_value("Item", source.item, "sales_uom"), + "item_booking": source.name, + }, + ) + + quotation.run_method("set_missing_values") + quotation.run_method("set_other_charges") + quotation.run_method("calculate_taxes_and_totals") + + doclist = get_mapped_doc( + "Item Booking", + source_name, + { + "Item Booking": { + "doctype": "Quotation", + "field_map": {"party_type": "quotation_to"}, + "field_no_map": ["status"], + } + }, + target_doc, + set_missing_values, + ) + + return doclist + + +@frappe.whitelist() +def make_sales_order(source_name, target_doc=None): + def set_missing_values(source, target): + from erpnext.controllers.accounts_controller import get_default_taxes_and_charges + + sales_order = frappe.get_doc(target) + sales_order.order_type = "Maintenance" + company_currency = frappe.get_cached_value("Company", sales_order.company, "default_currency") + + party_account_currency = get_party_account_currency( + "Customer", sales_order.customer, sales_order.company + ) + + sales_order.currency = party_account_currency or company_currency + + if company_currency == sales_order.currency: + exchange_rate = 1 + else: + exchange_rate = get_exchange_rate( + sales_order.currency, company_currency, sales_order.transaction_date, args="for_selling" + ) + + sales_order.conversion_rate = exchange_rate + + # add item + sales_order.append( + "items", + { + "item_code": source.item, + "qty": get_sales_qty(source.item, source.starts_on, source.ends_on), + "uom": frappe.get_cached_value("Item", source.item, "sales_uom"), + "item_booking": source.name, + }, + ) + + # get default taxes + taxes = get_default_taxes_and_charges("Sales Taxes and Charges Template", company=sales_order.company) + if taxes.get("taxes"): + sales_order.update(taxes) + + sales_order.run_method("set_missing_values") + sales_order.run_method("calculate_taxes_and_totals") + + doclist = get_mapped_doc( + "Item Booking", + source_name, + {"Item Booking": {"doctype": "Sales Order", "field_map": {"party_name": "customer"}}}, + target_doc, + set_missing_values, + ) + + return doclist + + +@frappe.whitelist() +def make_booking_credit_usage(source_name, target_doc=None): + def set_missing_values(source, target): + result = get_booking_credit_types_for_item(source.item, source.uom) + if result: + target.booking_credit_type = result[0] + + doclist = get_mapped_doc( + "Item Booking", + source_name, + {"Item Booking": {"doctype": "Booking Credit Usage", "field_map": {"party_name": "customer"}}}, + target_doc, + set_missing_values, + ) + + return doclist + + +def register_google_calendar_item(doc, method, *args, **kwargs): + if method == "on_trash": + for item_name in frappe.get_all("Item", filters={"google_calendar": doc.name}): + frappe.get_doc("Item", item_name.name).update({"google_calendar": ""}).save() + return + + if doc.reference_document != "Item Booking": + return + elif doc.booking_item_code: + frappe.get_doc("Item", doc.booking_item_code).update({"google_calendar": doc.name}).save() + doc.booking_item_code = "" # The field is just there to make it easier to setup + + +def get_calendar_item(account): + return frappe.db.get_value( + "Item", dict(google_calendar=account.name, disabled=0), ["item_code", "calendar_color"] + ) + + +def insert_event_to_calendar(account, event, recurrence=None): + """ + Inserts event in Dokos Calendar during Sync + """ + start = event.get("start") + end = event.get("end") + + if item_values := get_calendar_item(account): + item, color = item_values + else: + frappe.throw(_("Item not found for Google Calendar {0}").format(repr(account.name))) + + calendar_event = { + "doctype": "Item Booking", + "item": item, + "color": color, + "notes": event.get("description"), + "sync_with_google_calendar": 1, + "google_calendar": account.name, + "google_calendar_id": account.google_calendar_id, + "google_calendar_event_id": event.get("id"), + "rrule": recurrence, + "starts_on": get_datetime(start.get("date")) + if start.get("date") + else get_timezone_naive_datetime(start), + "ends_on": get_datetime(end.get("date")) if end.get("date") else get_timezone_naive_datetime(end), + "all_day": 1 if start.get("date") else 0, + "repeat_this_event": 1 if recurrence else 0, + "status": "Confirmed", + } + doc = frappe.get_doc(calendar_event) + doc.flags.pulled_from_google_calendar = True + doc.insert(ignore_permissions=True) + + +def update_event_in_calendar(account, event, recurrence=None): + """ + Updates Event in Dokos Calendar if any existing Google Calendar Event is updated + """ + start = event.get("start") + end = event.get("end") + + calendar_event = frappe.get_doc("Item Booking", {"google_calendar_event_id": event.get("id")}) + item, _ = get_calendar_item(account) + + updated_event = { + "item": item, + "notes": event.get("description"), + "rrule": recurrence, + "starts_on": get_datetime(start.get("date")) + if start.get("date") + else get_timezone_naive_datetime(start), + "ends_on": get_datetime(end.get("date")) if end.get("date") else get_timezone_naive_datetime(end), + "all_day": 1 if start.get("date") else 0, + "repeat_this_event": 1 if recurrence else 0, + "status": "Confirmed", + } + + update = False + for field in updated_event: + if field == "rrule" and recurrence: + update = calendar_event.get(field) is None or ( + set(calendar_event.get(field).split(";")) != set(updated_event.get(field).split(";")) + ) + else: + update = str(calendar_event.get(field)) != str(updated_event.get(field)) + if update: + break + + if update: + calendar_event.update(updated_event) + calendar_event.flags.pulled_from_google_calendar = True + calendar_event.save() + + +def cancel_event_in_calendar(account, event): + # If any synced Google Calendar Event is cancelled, then close the Event + add_comment = False + + if booking_event := frappe.db.exists( + "Item Booking", + {"google_calendar_id": account.google_calendar_id, "google_calendar_event_id": event.get("id")}, + ): + booking = frappe.get_doc("Item Booking", booking_event) + + try: + booking.flags.pulled_from_google_calendar = True + booking.delete() + add_comment = False + except frappe.LinkExistsError: + # Try to delete event, but only if it has no links + add_comment = True + + if add_comment: + frappe.get_doc( + { + "doctype": "Comment", + "comment_type": "Info", + "reference_doctype": "Item Booking", + "reference_name": booking.get("name"), + "content": " {}".format(_("- Event deleted from Google Calendar.")), + } + ).insert(ignore_permissions=True) + + +def insert_event_in_google_calendar(doc, method=None): + """ + Insert Events in Google Calendar if sync_with_google_calendar is checked. + """ + if ( + not doc.sync_with_google_calendar + or doc.flags.pulled_from_google_calendar + or not frappe.db.exists("Google Calendar", {"name": doc.google_calendar, "push_to_google_calendar": 1}) + ): + return + + google_calendar, account = get_google_calendar_object(doc.google_calendar) + + event = { + "summary": doc.title, + "description": doc.notes, + "recurrence": [doc.rrule] if doc.repeat_this_event and doc.rrule else [], + } + event.update( + format_date_according_to_google_calendar( + doc.get("all_day", 0), get_datetime(doc.starts_on), get_datetime(doc.ends_on) + ) + ) + + try: + event = google_calendar.events().insert(calendarId=doc.google_calendar_id, body=event).execute() + doc.db_set("google_calendar_event_id", event.get("id"), update_modified=False) + frappe.publish_realtime( + "event_synced", {"message": _("Event Synced with Google Calendar.")}, user=frappe.session.user + ) + except HttpError as err: + frappe.msgprint(f'{_("Google Error")}: {json.loads(err.content)["error"]["message"]}') + frappe.throw( + _("Google Calendar - Could not insert event in Google Calendar {0}, error code {1}.").format( + account.name, err.resp.status + ) + ) + + +def update_event_in_google_calendar(doc, method=None): + """ + Updates Events in Google Calendar if any existing event is modified in Dokos Calendar + """ + # Workaround to avoid triggering update when Event is being inserted since + # creation and modified are same when inserting doc + if ( + doc.flags.pulled_from_google_calendar + or doc.modified == doc.creation + or not doc.sync_with_google_calendar + or not frappe.db.exists("Google Calendar", {"name": doc.google_calendar, "push_to_google_calendar": 1}) + ): + return + + if doc.sync_with_google_calendar and not doc.google_calendar_event_id: + # If sync_with_google_calendar is checked later, then insert the event rather than updating it. + return insert_event_in_google_calendar(doc) + + google_calendar, dummy = get_google_calendar_object(doc.google_calendar) + + try: + event = ( + google_calendar.events() + .get(calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id) + .execute() + ) + event["summary"] = doc.title + event["description"] = doc.notes + event["recurrence"] = [doc.rrule] if doc.repeat_this_event and doc.rrule else [] + event["status"] = "cancelled" if doc.status == "Cancelled" else "confirmed" + event.update( + format_date_according_to_google_calendar( + doc.get("all_day", 0), get_datetime(doc.starts_on), get_datetime(doc.ends_on) + ) + ) + + google_calendar.events().update( + calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id, body=event + ).execute() + frappe.publish_realtime( + "event_synced", {"message": _("Event Synced with Google Calendar.")}, user=frappe.session.user + ) + except HttpError as err: + frappe.msgprint(f'{_("Google Error")}: {json.loads(err.content)["error"]["message"]}') + frappe.throw( + _("Google Calendar - Could not update Event {0} in Google Calendar, error code {1}.").format( + doc.name, err.resp.status + ) + ) + + +def delete_event_in_google_calendar(doc, method=None): + """ + Delete Events from Google Calendar if Item Booking is deleted. + """ + + if ( + not doc.google_calendar_event_id + or doc.flags.pulled_from_google_calendar + or not doc.sync_with_google_calendar + or not frappe.db.exists("Google Calendar", {"name": doc.google_calendar, "push_to_google_calendar": 1}) + ): + return + + google_calendar, account = get_google_calendar_object(doc.google_calendar) + + try: + event = ( + google_calendar.events() + .get(calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id) + .execute() + ) + event["recurrence"] = None + event["status"] = "cancelled" + + google_calendar.events().update( + calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id, body=event + ).execute() + except HttpError as err: + frappe.msgprint(f'{_("Google Error")}: {json.loads(err.content)["error"]["message"]}') + frappe.msgprint( + _("Google Calendar - Could not delete Event {0} from Google Calendar, error code {1}.").format( + doc.name, err.resp.status + ) + ) + + +@frappe.whitelist() +def get_corresponding_party(user): + customers, leads = get_linked_customers(user) + party_type = party_name = None + if customers: + party_type = "Customer" + party_name = customers[0] + + elif leads: + party_type = "Lead" + party_name = leads[0] + + return party_type, party_name + + +def move_booking_with_event(doc, method): + doc_before_save = doc.get_doc_before_save() + if doc_before_save and getdate(doc_before_save.starts_on) != getdate(doc.starts_on): + days = date_diff(doc.starts_on, doc_before_save.starts_on) + bookings = frappe.get_all( + "Item Booking", filters={"event": doc.name}, fields=["name", "starts_on", "ends_on"] + ) + + for booking in bookings: + doc = frappe.get_doc("Item Booking", booking.name) + doc.starts_on = add_days(booking.starts_on, days) + doc.ends_on = add_days(booking.ends_on, days) + doc.flags.ignore_permissions = True + doc.save() + + +@frappe.whitelist() +def get_booking_count(item=None, starts_on=None, ends_on=None): + if not item: + return + + if not starts_on: + starts_on = now_datetime() + if not ends_on: + ends_on = starts_on + + starts_on, ends_on = get_datetime(starts_on), get_datetime(ends_on) + + simultaneous_bookings_enabled = frappe.db.get_single_value( + "Venue Settings", "enable_simultaneous_booking" + ) + item_doc = frappe.get_doc("Item", item) + + if simultaneous_bookings_enabled: + capacity = cint(item_doc.get("simultaneous_bookings_allowed")) # type: ignore + else: + capacity = 1 + + events = _get_events(starts_on, ends_on, item=item_doc) + current = len(events) + + return {"capacity": capacity, "current": current, "remaining": capacity - current} + + +# def get_simultaneous_bookings(scheduled_items, timeslot, simultaneous_bookings=None): +# import itertools +# from operator import itemgetter +# count = 0 +# if cint(simultaneous_bookings) > 1: +# sorted_schedule = sorted(scheduled_items, key=itemgetter("starts_on")) +# for key, group in itertools.groupby(sorted_schedule, key=lambda x: x["starts_on"]): +# group_count = 0 +# for slot in group: +# if get_datetime(timeslot[1]) > slot.get("starts_on") and get_datetime(timeslot[0]) < slot.get( +# "ends_on" +# ): +# group_count += 1 +# count = max(count, group_count) +# return count diff --git a/bookings/bookings/doctype/venue_settings/venue_settings.json b/bookings/bookings/doctype/venue_settings/venue_settings.json new file mode 100644 index 00000000..7e777b33 --- /dev/null +++ b/bookings/bookings/doctype/venue_settings/venue_settings.json @@ -0,0 +1,241 @@ +{ + "actions": [], + "creation": "2020-08-03 08:10:42.541814", + "doctype": "DocType", + "editable_grid": 1, + "engine": "InnoDB", + "field_order": [ + "item_booking_section", + "clear_item_booking_draft_duration", + "confirm_booking_after_payment", + "column_break_4", + "enable_simultaneous_booking", + "no_overlap_per_item", + "sync_with_google_calendar", + "allow_event_cancellation", + "display_subscriptions_in_calendar", + "cancellation_delay", + "role_allowed_to_skip_cart", + "event_registration_section", + "registration_item_code", + "multi_venue_section", + "enable_multi_companies", + "cart_settings_overrides", + "unit_of_measure_tab", + "short_bookings_section", + "minute_uom", + "venue_units_of_measure", + "section_break_20", + "month_uom", + "column_break_uuvm", + "week_uom", + "column_break_jmrs", + "day_uom", + "section_break_lnrr", + "venue_long_uoms" + ], + "fields": [ + { + "fieldname": "item_booking_section", + "fieldtype": "Section Break", + "label": "Item Booking" + }, + { + "default": "15", + "description": "Minimum 3 minutes.
Set 0 to disable this functionality.", + "fieldname": "clear_item_booking_draft_duration", + "fieldtype": "Int", + "label": "Clear drafts after x minutes" + }, + { + "default": "0", + "description": "If not set, the booking will be considered confirmed when the order is placed", + "fieldname": "confirm_booking_after_payment", + "fieldtype": "Check", + "label": "Confirm booking after payment" + }, + { + "fieldname": "column_break_4", + "fieldtype": "Column Break" + }, + { + "default": "0", + "fieldname": "enable_simultaneous_booking", + "fieldtype": "Check", + "label": "Enable simultaneous booking" + }, + { + "default": "0", + "fieldname": "no_overlap_per_item", + "fieldtype": "Check", + "label": "Do not allow overlapping bookings for the same item on desk" + }, + { + "default": "0", + "description": "Applicable for shopping cart bookings", + "fieldname": "sync_with_google_calendar", + "fieldtype": "Check", + "label": "Automatically synchronize with Google Calendar" + }, + { + "default": "0", + "fieldname": "allow_event_cancellation", + "fieldtype": "Check", + "label": "Allow cancellation of item bookings on the portal" + }, + { + "default": "86400", + "depends_on": "eval:doc.allow_event_cancellation", + "description": "Users will not be able to cancel later than this delay before the appointment", + "fieldname": "cancellation_delay", + "fieldtype": "Duration", + "label": "Cancellation delay" + }, + { + "description": "Users with this role will be able to make bookings without having to validate a shopping cart and create a sales order", + "fieldname": "role_allowed_to_skip_cart", + "fieldtype": "Link", + "label": "Role allowed to skip shopping cart", + "options": "Role" + }, + { + "fieldname": "event_registration_section", + "fieldtype": "Section Break", + "label": "Event Registration" + }, + { + "fieldname": "registration_item_code", + "fieldtype": "Link", + "label": "Billed Item for registration", + "options": "Item" + }, + { + "fieldname": "multi_venue_section", + "fieldtype": "Tab Break", + "label": "Multi-venue mode" + }, + { + "default": "0", + "fieldname": "enable_multi_companies", + "fieldtype": "Check", + "label": "Enable" + }, + { + "depends_on": "enable_multi_companies", + "fieldname": "cart_settings_overrides", + "fieldtype": "Table", + "label": "Allowed companies", + "mandatory_depends_on": "enable_multi_companies", + "options": "Venue Cart Settings" + }, + { + "fieldname": "unit_of_measure_tab", + "fieldtype": "Tab Break", + "label": "Units of Measure" + }, + { + "fieldname": "short_bookings_section", + "fieldtype": "Section Break", + "label": "Short Bookings" + }, + { + "description": "Used for slots calculation.
Please add an UOM conversion factor between each UOM used in booked items and this UOM.", + "fieldname": "minute_uom", + "fieldtype": "Link", + "label": "Minute UOM", + "mandatory_depends_on": "eval:doc.venue_units_of_measure?.length", + "options": "UOM" + }, + { + "description": "Example: For the unit Half-Day set a duration of 4 hours.", + "fieldname": "venue_units_of_measure", + "fieldtype": "Table", + "label": "Conversion Table", + "options": "Venue Units of Measure" + }, + { + "fieldname": "section_break_20", + "fieldtype": "Section Break", + "hide_border": 1, + "label": "Long Bookings" + }, + { + "fieldname": "month_uom", + "fieldtype": "Link", + "label": "Month UOM", + "options": "UOM" + }, + { + "fieldname": "column_break_uuvm", + "fieldtype": "Column Break" + }, + { + "fieldname": "week_uom", + "fieldtype": "Link", + "label": "Week UOM", + "options": "UOM" + }, + { + "fieldname": "column_break_jmrs", + "fieldtype": "Column Break" + }, + { + "fieldname": "day_uom", + "fieldtype": "Link", + "label": "Day UOM", + "options": "UOM" + }, + { + "fieldname": "section_break_lnrr", + "fieldtype": "Section Break" + }, + { + "description": "Example: For the unit '3 month' set a quantity of 3 and a target unit of Month.", + "fieldname": "venue_long_uoms", + "fieldtype": "Table", + "label": "Conversion Table", + "options": "Venue UOM Conversion" + }, + { + "default": "1", + "description": "If checked, the subscriptions linked to a resource will be displayed on the desk calendar", + "fieldname": "display_subscriptions_in_calendar", + "fieldtype": "Check", + "label": "Display subscriptions in the booking calendar" + } + ], + "grid_page_length": 50, + "issingle": 1, + "links": [], + "modified": "2025-03-25 10:57:38.144654", + "modified_by": "Administrator", + "module": "Bookings", + "name": "Venue Settings", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "print": 1, + "read": 1, + "role": "System Manager", + "share": 1, + "write": 1 + }, + { + "create": 1, + "delete": 1, + "email": 1, + "print": 1, + "read": 1, + "role": "Venue Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} diff --git a/bookings/bookings/doctype/venue_settings/venue_settings.py b/bookings/bookings/doctype/venue_settings/venue_settings.py new file mode 100644 index 00000000..031fdbcc --- /dev/null +++ b/bookings/bookings/doctype/venue_settings/venue_settings.py @@ -0,0 +1,434 @@ +# Copyright (c) 2025, Dokos SAS and contributors +# For license information, please see license.txt + +import datetime +from dataclasses import dataclass +from typing import Literal +from urllib.parse import unquote + +import frappe +from frappe import _ +from frappe.model.document import Document +from frappe.utils import cint + + +@dataclass +class venue_uom_info_t(frappe._dict): + selector: Literal["long", "short"] + target_type: Literal["Minute", "Day", "Week", "Month", "Year"] + value: int | float + from_uom: str + to_uom: str + +class VenueSettings(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from bookings.bookings.doctype.venue_cart_settings.venue_cart_settings import VenueCartSettings + from bookings.bookings.doctype.venue_units_of_measure.venue_units_of_measure import VenueUnitsofMeasure + from bookings.bookings.doctype.venue_uom_conversion.venue_uom_conversion import VenueUOMConversion + from frappe.types import DF + + allow_event_cancellation: DF.Check + cancellation_delay: DF.Duration | None + cart_settings_overrides: DF.Table[VenueCartSettings] + clear_item_booking_draft_duration: DF.Int + confirm_booking_after_payment: DF.Check + day_uom: DF.Link | None + display_subscriptions_in_calendar: DF.Check + enable_multi_companies: DF.Check + enable_simultaneous_booking: DF.Check + minute_uom: DF.Link | None + month_uom: DF.Link | None + no_overlap_per_item: DF.Check + registration_item_code: DF.Link | None + role_allowed_to_skip_cart: DF.Link | None + sync_with_google_calendar: DF.Check + venue_long_uoms: DF.Table[VenueUOMConversion] + venue_units_of_measure: DF.Table[VenueUnitsofMeasure] + week_uom: DF.Link | None + # end: auto-generated types + + def onload(self): + # see: webshop_settings.py + self.get("__onload").quotation_series = frappe.get_meta("Quotation").get_options("naming_series") + + def validate(self): + # check that all selected companies are unique in the cart_settings_overrides, + # even if disabled to avoid mistakes + unique_companies = set() + for override in self.cart_settings_overrides: + if override.company in unique_companies: + frappe.throw( + frappe._("Company {0} is used more than once in the cart settings overrides").format( + override.company + ) + ) + unique_companies.add(override.company) + + if self.enable_multi_companies and not unique_companies: + frappe.throw(frappe._("You must select at least one company in the cart settings overrides")) + + def configure_uom_conversions(self): + if self.minute_uom: + for row in self.venue_units_of_measure: + self.venue_upsert_uom_conversion( + row.unit_of_measure, self.minute_uom, cint(row.duration) / 60 + ) + + self.venue_configure_long_uom_conversions() + + def venue_upsert_uom_conversion(self, from_uom: str, to_uom: str, value: int | float): + if not from_uom or not to_uom or value <= 0: + return + + conversion = frappe.db.get_value( + "UOM Conversion Factor", + filters={"from_uom": from_uom, "to_uom": to_uom}, + fieldname=["name", "value"], + as_dict=True, + for_update=True, + ) + if conversion and conversion.value == value: + return + + if conversion: + conversion = frappe.get_doc("UOM Conversion Factor", conversion) + else: + conversion = frappe.new_doc("UOM Conversion Factor") + + conversion.category = self.venue_make_time_category() + conversion.from_uom = from_uom + conversion.to_uom = to_uom + conversion.value = value + conversion.save(ignore_permissions=True) + + def get_uom_aliases(self): + return { + "Minute": self.minute_uom, + "Day": self.day_uom, + "Week": self.week_uom, + "Month": self.month_uom, + "Year": None, + } + + def venue_configure_long_uom_conversions(self): + uom_aliases = self.get_uom_aliases() + for row in self.venue_long_uoms: + to_uom = uom_aliases[row.target_type] + if to_uom: + self.venue_upsert_uom_conversion(row.from_uom, to_uom, row.value) + + def venue_make_time_category(self): + category = frappe.db.exists("UOM Category", "Time") + if not category: + category = frappe.db.exists("UOM Category", _("Time")) + + if not category: + category_doc = frappe.new_doc("UOM Category") + category_doc.category_name = _("Time") + category_doc.insert(ignore_permissions=True) + category = category_doc.name + + return category + + def get_uom_infos(self) -> dict[str, venue_uom_info_t]: + uom_aliases = self.get_uom_aliases() + uom_infos = {} + for long_uom in self.venue_long_uoms: + base_uom = uom_aliases[long_uom.target_type] + if not base_uom: + continue + uom_infos[long_uom.from_uom] = venue_uom_info_t( + selector="long", + target_type=long_uom.target_type, + value=long_uom.value, + from_uom=long_uom.from_uom, + to_uom=base_uom, + ) + for target_type in ("Month", "Week", "Day", "Year"): + if uom := getattr(self, f"{target_type.lower()}_uom", None): + uom_infos[uom] = venue_uom_info_t( + selector="long", + target_type=target_type, + value=1, + from_uom=uom, + to_uom=uom, + ) + if minute_uom := self.minute_uom: + uom_infos[minute_uom] = venue_uom_info_t( + selector="short", + target_type="Minute", + value=1, + from_uom=minute_uom, + to_uom=minute_uom, + ) + # conversions = frappe.get_all( + # "UOM Conversion Factor", + # filters={"to_uom": minute_uom}, + # fields=["from_uom", "to_uom", "value"], + # order_by="modified desc", + # ) + # for conv in conversions: + # if conv.from_uom not in uom_infos: + # uom_infos[conv.from_uom] = venue_uom_info_t( + # selector="short", + # target_type="Minute", + # value=conv.value, + # from_uom=conv.from_uom, + # to_uom=minute_uom, + # ) + return uom_infos + + def clear_cache(self): + frappe.cache.delete_value("venue_settings_enable_multi_companies") + return super().clear_cache() + + def on_update(self): + old_doc = self.get_doc_before_save() + did_change = False + if old_doc: + did_change = old_doc.enable_multi_companies != self.enable_multi_companies + else: + did_change = True + if did_change: + if self.enable_multi_companies: + multicompany_create_custom_fields(self) + else: + multicompany_delete_custom_fields(self) + + self.configure_uom_conversions() + + ## Type hints for fields + enable_multi_companies: bool + cart_settings_overrides: dict + + + ## Helpers + def multicompany_is_company_allowed(self, company): + if not self.enable_multi_companies: + return True # all companies are allowed if the feature is disabled + + for override in self.cart_settings_overrides: + if override.company == company: + # return override.enabled + return True + + return False + + def multicompany_get_allowed_companies(self) -> list: + return [override.company for override in self.cart_settings_overrides] + + def multicompany_get_dropdown(self, selected_company: str | None = None) -> list: + selected_company = selected_company or self.multicompany_get_current_company() + return [ + { + "label": override.get("_label") or override.company, + "value": override.company, + "selected": override.company == selected_company, + } + for override in self.cart_settings_overrides + ] + + def multicompany_get_current_company(self): + if self.enable_multi_companies: + if company := multicompany_read_cookie(self): + if self.multicompany_is_company_allowed(company): + return company + + def multicompany_get_item_filter(self): + NOT_ALLOWED = ["Venue Selected Company", "company", "=", ""] + if self.enable_multi_companies: + if company := self.multicompany_get_current_company(): + return ["Venue Selected Company", "company", "=", company] + return NOT_ALLOWED + return None + + def multicompany_get_item_filter_for_company(self, for_company=None): + NOT_ALLOWED = ["Venue Selected Company", "company", "=", ""] + if self.enable_multi_companies: + if self.multicompany_is_company_allowed(for_company): + return ["Venue Selected Company", "company", "=", for_company] + return NOT_ALLOWED + return None + +MULTICOMPANY_COOKIE_NAME = "company" +MULTICOMPANY_FLAG_NAME = "multicompany_current_company" +MULTICOMPANY_CONTEXT_DROPDOWN = "multicompany_dropdown" +MULTICOMPANY_CONTEXT_CURRENT_COMPANY = "multicompany_current" + + +def multicompany_read_cookie(venue_settings=None): + return multicompany_read_and_update_cookie(venue_settings) + + +def multicompany_read_and_update_cookie(venue_settings: "VenueSettings | None" = None): + cached = frappe.flags.get(MULTICOMPANY_FLAG_NAME, 0) + if cached != 0: + return cached + + venue_settings: "VenueSettings" = venue_settings or frappe.get_single("Venue Settings") + + if not venue_settings.enable_multi_companies: + multicompany_clear_cookie() # clear the cookie + set cache + return None + + # Read the selected company from the query string parameters + # to overwrite the cookie "company" (MULTICOMPANY_COOKIE_NAME) + # if valid. + from_query = frappe.form_dict.get("selected_company", None) + if from_query: + is_valid = venue_settings.multicompany_is_company_allowed(from_query) + if is_valid: + multicompany_write_cookie(from_query) # overwrite the cookie + return from_query + else: + frappe.local.flags.redirect_location = "/" + raise frappe.Redirect + elif from_query == "": + # front-end wants to clear of the cookie + multicompany_clear_cookie() # clear the cookie + set cache + return None + + try: + from_cookie = frappe.request.cookies.get(MULTICOMPANY_COOKIE_NAME, None) + from_cookie = unquote(from_cookie) if from_cookie else None + if venue_settings.multicompany_is_company_allowed(from_cookie): + multicompany_write_cookie(from_cookie) # refresh the cookie + return from_cookie + except RuntimeError: + # frappe.request is not available in some contexts + multicompany_clear_cookie() # clear the cookie + return None + + multicompany_clear_cookie() # fallback: clear the cookie + return None + + +def multicompany_write_cookie(value): + frappe.flags[MULTICOMPANY_FLAG_NAME] = value + if hasattr(frappe.local, "cookie_manager"): + expires = datetime.datetime.now() + datetime.timedelta(days=14) + frappe.local.cookie_manager.set_cookie(MULTICOMPANY_COOKIE_NAME, value, expires=expires) + + +def multicompany_clear_cookie(): + frappe.flags[MULTICOMPANY_FLAG_NAME] = None + if hasattr(frappe.local, "cookie_manager"): + frappe.local.cookie_manager.delete_cookie(MULTICOMPANY_COOKIE_NAME) + + +## Custom fields for multi company +def get_custom_fields_for_multicompany(): + def _get_fields(insert_after: str, depends_on: str = None): + # hint for translation + # frappe._('Show only for selected companies') + # frappe._('Multi-venue mode') + + return [{ + 'insert_after': insert_after, + 'fieldname': '_section_break_multicompany', + 'fieldtype': 'Section Break', + 'label': 'Multi-venue mode', + 'collapsible': 0, + 'depends_on': depends_on, + }, { + 'insert_after': '_section_break_multicompany', + 'fieldname': 'only_companies', + 'fieldtype': 'Table MultiSelect', + 'label': 'Show only for the following companies', + 'options': 'Venue Selected Company', + }] + return { + 'Item Group': _get_fields(insert_after='website_specifications', depends_on='show_in_website'), + 'Website Item': _get_fields(insert_after='brand', depends_on='published'), + } + +def multicompany_create_custom_fields(venue_settings = None): + if not venue_settings: + venue_settings = frappe.get_single("Venue Settings") + if not getattr(venue_settings, "enable_multi_companies", False): + return + + from frappe.custom.doctype.custom_field.custom_field import create_custom_fields + custom_fields = get_custom_fields_for_multicompany() + create_custom_fields(custom_fields) + +def multicompany_delete_custom_fields(venue_settings): + custom_fields = get_custom_fields_for_multicompany() + for doctype, fields in custom_fields.items(): + for df in fields: + docname = frappe.db.get_value('Custom Field', { + 'dt': doctype, + 'fieldname': df['fieldname'] + }) + + if docname: + frappe.delete_doc('Custom Field', docname) + + + +@frappe.whitelist() +def create_role_profile_fields(): + from frappe.custom.doctype.custom_field.custom_field import create_custom_fields + + # For translations + __ = _("All users associated with this customer will be attributed this role profile") + __ = _("Role Profile") + + custom_fields = {} + for dt, insert_after in { + "Customer": "customer_primary_contact", + "Subscription": "contact_person", + }.items(): + df = dict( + doctype=dt, + fieldname="role_profile_name", + label="Role Profile", + fieldtype="Link", + insert_after=insert_after, + options="Role Profile", + description="All users associated with this customer will be attributed this role profile", + ) + custom_fields[dt] = [df] + + create_custom_fields(custom_fields) + + +@frappe.whitelist() +def get_duration_for_uom(uom, minute_uom): + from bookings.bookings.doctype.item_booking.item_booking import get_uom_in_minutes + + return get_uom_in_minutes(uom, minute_uom) * 60 + + +@frappe.whitelist() +def simultaneous_booking_enabled(): + return bool(frappe.db.get_single_value("Venue Settings", "enable_simultaneous_booking")) + + +@frappe.whitelist() +def get_booking_uoms(*args): + valid_uoms: list[str] = [] + + venue_settings = frappe.get_single("Venue Settings") + for fieldname in ["minute_uom", "day_uom", "week_uom", "month_uom"]: + if uom := venue_settings.get(fieldname): + valid_uoms.append(uom) + + all_uoms = valid_uoms + frappe.get_all( + "UOM Conversion Factor", filters={"to_uom": ("in", valid_uoms)}, pluck="from_uom" + ) + + out = [] + seen = set() + for uom in all_uoms: + if uom not in seen: + out.append([uom]) + seen.add(uom) + + out.sort() + return out -- GitLab From d2312e692455f0c85f2d9b8c90c0a9203f1bff38 Mon Sep 17 00:00:00 2001 From: Charles-Henri Decultot Date: Tue, 25 Mar 2025 11:07:19 +0100 Subject: [PATCH 2/3] fix: correct property for event --- bookings/bookings/doctype/item_booking/item_booking.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bookings/bookings/doctype/item_booking/item_booking.py b/bookings/bookings/doctype/item_booking/item_booking.py index 9d0124fb..fae93f6b 100644 --- a/bookings/bookings/doctype/item_booking/item_booking.py +++ b/bookings/bookings/doctype/item_booking/item_booking.py @@ -1255,6 +1255,7 @@ def _get_events( subscriptions_as_events = _get_subscriptions_as_events( start, end, item=item, user=user, fields=None, filters=None ) + print(subscriptions_as_events) events += subscriptions_as_events result = [] @@ -1273,6 +1274,7 @@ def _get_subscriptions_as_events(start, end, item=None, user=None, fields=None, start, end, item=item, user=user, fields=fields, filters=filters ) display_subscriptions = bool(frappe.db.get_single_value("Venue Settings", "display_subscriptions_in_calendar")) + events = [] for sub in subscriptions: qty = sub["qty"] @@ -1307,7 +1309,7 @@ def _get_subscriptions_as_events(start, end, item=None, user=None, fields=None, "all_day": 1, "startEditable": False, "durationEditable": False, - "eventDisplay": display_subscriptions + "display": 'auto' if display_subscriptions else 'none' } ) ) -- GitLab From 085bef9928382d41edaf5e9413f2943b2bbc3536 Mon Sep 17 00:00:00 2001 From: Charles-Henri Decultot Date: Tue, 25 Mar 2025 11:16:38 +0100 Subject: [PATCH 3/3] fix: remove print statement --- bookings/bookings/doctype/item_booking/item_booking.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bookings/bookings/doctype/item_booking/item_booking.py b/bookings/bookings/doctype/item_booking/item_booking.py index fae93f6b..b0eb9a35 100644 --- a/bookings/bookings/doctype/item_booking/item_booking.py +++ b/bookings/bookings/doctype/item_booking/item_booking.py @@ -1255,7 +1255,6 @@ def _get_events( subscriptions_as_events = _get_subscriptions_as_events( start, end, item=item, user=user, fields=None, filters=None ) - print(subscriptions_as_events) events += subscriptions_as_events result = [] -- GitLab