
    i{0                     l   d dl Z d dlmZ d dlmZmZmZmZmZ d dl	Z	ddl
mZmZ ddlmZ ddlmZ dd	e	j"                  d
ee	j$                     de	j"                  fdZ	 	 dde	j"                  de	j"                  ded
ee	j$                     de	j"                  f
dZe G d de             Z G d dee      Zy)    N)	dataclass)ListLiteralOptionalTupleUnion   )ConfigMixinregister_to_config)
BaseOutput   )SchedulerMixint	generatorreturnc                 J   ||j                   n| j                   }t        j                  | |      j                  dd|      j	                  | j                         }t        j
                  t        j
                  |j                  d             j                  d             S )a  
    Generate Gumbel noise for sampling.

    Args:
        t (`torch.Tensor`):
            Input tensor to match the shape and dtype of the output noise.
        generator (`torch.Generator`, *optional*):
            A random number generator for reproducible sampling.

    Returns:
        `torch.Tensor`:
            Gumbel-distributed noise with the same shape, dtype, and device as the input tensor.
    devicer   r   r   #B;)r   torch
zeros_likeuniform_tologclamp)r   r   r   noises       p/home/obispo/Crisostomo_bridge/mision_env/lib/python3.12/site-packages/diffusers/schedulers/scheduling_amused.pygumbel_noiser      s     "+!6YAHHFQv.771	7RUUVWV^V^_EII		%++e"455<<UCDDD    mask_lenprobstemperaturec                    t        j                  |j                  d            |t        ||      z  z   }t        j                  |d      j
                  }t        j                  |d| j                               }||k  }|S )a  
    Mask tokens by selecting the top-k lowest confidence scores with temperature-based randomness.

    Args:
        mask_len (`torch.Tensor`):
            Number of tokens to mask per sample in the batch.
        probs (`torch.Tensor`):
            Probability scores for each token.
        temperature (`float`, *optional*, defaults to 1.0):
            Temperature parameter for controlling randomness in the masking process.
        generator (`torch.Generator`, *optional*):
            A random number generator for reproducible sampling.

    Returns:
        `torch.Tensor`:
            Boolean mask indicating which tokens should be masked.
    r   r   dimr   )r   r   r   r   sortvaluesgatherlong)r!   r"   r#   r   
confidencesorted_confidencecut_offmaskings           r   mask_by_random_topkr0      sl    . 5;;u-.|E]f?g1ggJ

:26==ll,aAG7"GNr    c                   X    e Zd ZU dZej
                  ed<   dZeej
                     ed<   y)AmusedSchedulerOutputa  
    Output class for the scheduler's `step` function output.

    Args:
        prev_sample (`torch.LongTensor` of shape `(batch_size, height, width)` or `(batch_size, sequence_length)`):
            Computed sample `(x_{t-1})` of previous timestep with token IDs. `prev_sample` should be used as next model
            input in the denoising loop.
        pred_original_sample (`torch.LongTensor` of shape `(batch_size, height, width)` or `(batch_size, sequence_length)`, *optional*):
            The predicted fully denoised sample `(x_{0})` with token IDs based on the model output from the current
            timestep. `pred_original_sample` can be used to preview progress or for guidance.
    prev_sampleNpred_original_sample)	__name__
__module____qualname____doc__r   Tensor__annotations__r4   r    r    r   r2   r2   =   s'    
 37(5<<07r    r2   c                      e Zd ZU dZdZeej                     ed<   eej                     ed<   e		 dde
ded   fd       Z	 	 dd
e
deeeeef   ee   f   deeeej$                  f      dd	fdZ	 	 	 ddej                  de
dej(                  dedeej*                     dedeeeej                  ej                  f   f   fdZ	 ddej(                  de
deej*                     dej(                  fdZy	)AmusedSchedulera  
    A scheduler for masked token generation as used in [`AmusedPipeline`].

    This scheduler iteratively unmasks tokens based on their confidence scores, following either a cosine or linear
    schedule. Unlike traditional diffusion schedulers that work with continuous pixel values, this scheduler operates
    on discrete token IDs, making it suitable for autoregressive and non-autoregressive masked token generation models.

    This scheduler inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the
    generic methods the library implements for all schedulers such as loading and saving.

    Args:
        mask_token_id (`int`):
            The token ID used to represent masked tokens in the sequence.
        masking_schedule (`Literal["cosine", "linear"]`, *optional*, defaults to `"cosine"`):
            The schedule type for determining the mask ratio at each timestep. Can be either `"cosine"` or `"linear"`.
    r   temperatures	timestepsmask_token_idmasking_schedule)cosinelinearc                      d | _         d | _        y N)r>   r?   )selfr@   rA   s      r   __init__zAmusedScheduler.__init__f   s     !r    Nnum_inference_stepsr#   r   r   c                    t        j                  ||      j                  d      | _        t	        |t
        t        f      r%t        j                  |d   |d   ||      | _        yt        j                  |d||      | _        y)a  
        Set the discrete timesteps used for the diffusion chain (to be run before inference).

        Args:
            num_inference_steps (`int`):
                The number of diffusion steps used when generating samples with a pre-trained model.
            temperature (`Union[float, Tuple[float, float], List[float]]`, *optional*, defaults to `(2, 0)`):
                Temperature parameter(s) for controlling the randomness of sampling. If a tuple or list is provided,
                temperatures will be linearly interpolated between the first and second values across all timesteps. If
                a single value is provided, temperatures will be linearly interpolated from that value to 0.01.
            device (`str` or `torch.device`, *optional*):
                The device to which the timesteps and temperatures should be moved to. If `None`, the timesteps are not
                moved.
        r   r   r   g{Gz?N)	r   arangeflipr?   
isinstancetuplelistlinspacer>   )rF   rH   r#   r   s       r   set_timestepszAmusedScheduler.set_timestepso   sj    ( &9&INNqQkE4=1 %{1~{1~Obkq rD %{DBU^d eDr    model_outputtimestepsamplestarting_mask_ratior   return_dictc                    |j                   dk(  xr |j                   dk(  }|rM|j                  \  }}	}
}|j                  ||
|z        }|j                  ||	|
|z        j                  ddd      }|| j                  j
                  k(  }|j                  d      }|j                  }||j                  |j                        n|}|j                  j                  d	k(  r-|j                  t        j                  k7  r|j                         }|j                  d|j                  d            }t        j                  |d|
      j                  |      } |dddf   j                   |j                  dd  }t        j"                  |||      }|dk(  r|}n|j                  d   }| j$                  |k(  j'                         }|dz   t)        | j$                        z  }| j                  j*                  dk(  r*t        j,                  |t.        j0                  z  dz        }nA| j                  j*                  dk(  rd|z
  }n"t3        d| j                  j*                         ||z  }||z  j5                         }t        j6                  |j9                  dd      dz
  |      }t        j:                  t        j<                  dg|j                        |      }t        j>                  |d|dddddf         dddddf   }t        j"                  ||t        j@                  |j                        j:                        }tC        ||| jD                  |   |      }t        j"                  || j                  j
                  |      }|r&|j                  
      }|j                  ||
|      }|s||fS tG        ||      S )ad  
        Predict the sample at the previous timestep by masking tokens based on confidence scores.

        Args:
            model_output (`torch.Tensor`):
                The direct output from the learned diffusion model. Typically of shape `(batch_size, num_tokens,
                codebook_size)` or `(batch_size, codebook_size, height, width)` for 2D inputs.
            timestep (`int`):
                The current discrete timestep in the diffusion chain.
            sample (`torch.LongTensor`):
                A current instance of a sample created by the diffusion process. Contains token IDs, with masked
                positions indicated by `mask_token_id`.
            starting_mask_ratio (`float`, *optional*, defaults to 1.0):
                A multiplier applied to the mask ratio schedule. Values less than 1.0 will result in fewer tokens being
                masked at each step.
            generator (`torch.Generator`, *optional*):
                A random number generator for reproducible sampling.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether to return an [`~schedulers.scheduling_amused.AmusedSchedulerOutput`] or a plain tuple.

        Returns:
            [`~schedulers.scheduling_amused.AmusedSchedulerOutput`] or `tuple`:
                If `return_dict` is `True`, [`~schedulers.scheduling_amused.AmusedSchedulerOutput`] is returned,
                otherwise a tuple is returned where the first element is the sample tensor (`prev_sample`) and the
                second element is the predicted original sample tensor (`pred_original_sample`).
              r   r	   r   r%   r&   Ncpur   r   rB   rC   unknown masking schedule T)r'   keepdim)$ndimshapereshapepermuteconfigr@   softmaxr   r   typedtyper   float32floatsizemultinomialviewwherer?   nonzerolenrA   cosmathpi
ValueErrorfloorminsummaxtensorr*   finfor0   r>   r2   )rF   rQ   rR   rS   rT   r   rU   two_dim_input
batch_sizecodebook_sizeheightwidthunknown_mapr"   r   probs_r4   r3   seq_lenstep_idxratio
mask_ratior!   selected_probsr/   s                            r   stepzAmusedScheduler.step   sT   F q(C\->->!-C7C7I7I4Jvu^^J?F'//
M6TY>ZbbcdfgijkL 9 99$$$,/8/D)**+%==&6<<5==+H\\^FEJJrN3$00iPSS[aSb>3AqD9>>CR@PQ${{;8LfUq=.Kll1oG(2;;=H\S%88E{{++x7"YYutww':;
--9Y
 #<T[[=Y=Y<Z![\\,z9J*,335HyyR!F!JHUHyyqc,:M:M!NPXYH"\\%5I!QPT*5UVWXZ[]^W^_N"[[nekkR`RfRfFgFkFklN)(NDDUDUV^D_ajkG  ++gt{{/H/HJ^_K%--j&%HK#7#?#?
FTY#Z !566$[2FGGr    c                 |   | j                   |k(  j                         }|dz   t        | j                         z  }| j                  j                  dk(  r*t        j                  |t        j                  z  dz        }nA| j                  j                  dk(  rd|z
  }n"t        d| j                  j                         t        j                  |j                  ||j                  n|j                  |      j                  |j                        |k  }|j                         }| j                  j                  ||<   |S )a  
        Add noise to a sample by randomly masking tokens according to the masking schedule.

        Args:
            sample (`torch.LongTensor`):
                The input sample containing token IDs to be partially masked.
            timesteps (`int`):
                The timestep that determines how much masking to apply. Higher timesteps result in more masking.
            generator (`torch.Generator`, *optional*):
                A random number generator for reproducible masking.

        Returns:
            `torch.LongTensor`:
                The sample with some tokens replaced by `mask_token_id` according to the masking schedule.
        r   rB   r	   rC   rZ   )r   r   )r?   rj   rk   r`   rA   r   rl   rm   rn   ro   randr]   r   r   cloner@   )	rF   rS   r?   r   r~   r   r   mask_indicesmasked_samples	            r   	add_noisezAmusedScheduler.add_noise   s   * NNi/88:AT^^!44;;''835477?Q#67J[[))X5UJ89U9U8VWXX JJ9NY%5%5TZTaTamvb 	 &*kk&?&?l#r    )rB   ))r	   r   N)      ?NTrE   )r5   r6   r7   r8   orderr   r   r9   r:   r   intr   rG   r   re   r   r   strr   rP   
LongTensor	Generatorboolr2   r   r   r;   r    r   r=   r=   O   s   " E5<<((%% 9A ""45  GM59	f f 5%u"5tE{BCf sELL012	f
 
f@ &)/3 ]Hll]H ]H   	]H
 #]H EOO,]H ]H 
$eELL%,,,F&GG	H]HF 04	*  * * EOO,	*
 
		*r    r=   rE   )r   N)rm   dataclassesr   typingr   r   r   r   r   r   configuration_utilsr
   r   utilsr   scheduling_utilsr   r9   r   r   re   r0   r2   r=   r;   r    r   <module>r      s     ! 8 8  A  ,EELL EXeoo-F ERWR^R^ E, +/	ll<<  (	
 \\< 8J 8 8"Dnk Dr    