|
@@ -0,0 +1,103 @@
|
|
|
+from typing import Any, AnyStr, BinaryIO, Dict, List, Mapping, Optional, Tuple
|
|
|
+
|
|
|
+from twisted.cred.credentials import IUsernameDigestHash as IUsernameDigestHash
|
|
|
+from twisted.internet.defer import Deferred
|
|
|
+from twisted.internet.interfaces import (
|
|
|
+ IAddress,
|
|
|
+ IConsumer,
|
|
|
+ IOpenSSLClientConnectionCreator,
|
|
|
+ IProtocol,
|
|
|
+ IPushProducer,
|
|
|
+ IStreamClientEndpoint,
|
|
|
+)
|
|
|
+from twisted.python.urlpath import URLPath
|
|
|
+from twisted.web.client import URI
|
|
|
+from twisted.web.http_headers import Headers
|
|
|
+from typing_extensions import Literal
|
|
|
+from zope.interface import Interface
|
|
|
+
|
|
|
+class IClientRequest(Interface):
|
|
|
+ method: bytes
|
|
|
+ absoluteURI: Optional[bytes]
|
|
|
+ headers: Headers
|
|
|
+
|
|
|
+class IRequest(Interface):
|
|
|
+ method: bytes
|
|
|
+ uri: bytes
|
|
|
+ path: bytes
|
|
|
+ args: Mapping[bytes, List[bytes]]
|
|
|
+ prepath: List[bytes]
|
|
|
+ postpath: List[bytes]
|
|
|
+ requestHeaders: Headers
|
|
|
+ content: BinaryIO
|
|
|
+ responseHeaders: Headers
|
|
|
+ def getHeader(key: AnyStr) -> Optional[AnyStr]: ...
|
|
|
+ def getCookie(key: bytes) -> Optional[bytes]: ...
|
|
|
+ def getAllHeaders() -> Dict[bytes, bytes]: ...
|
|
|
+ def getRequestHostname() -> bytes: ...
|
|
|
+ def getHost() -> IAddress: ...
|
|
|
+ def getClientAddress() -> IAddress: ...
|
|
|
+ def getClientIP() -> Optional[str]: ...
|
|
|
+ def getUser() -> str: ...
|
|
|
+ def getPassword() -> str: ...
|
|
|
+ def isSecure() -> bool: ...
|
|
|
+ def getSession(sessionInterface: Any | None = ...) -> Any: ...
|
|
|
+ def URLPath() -> URLPath: ...
|
|
|
+ def prePathURL() -> bytes: ...
|
|
|
+ def rememberRootURL() -> None: ...
|
|
|
+ def getRootURL() -> bytes: ...
|
|
|
+ def finish() -> None: ...
|
|
|
+ def write(data: bytes) -> None: ...
|
|
|
+ def addCookie(
|
|
|
+ k: AnyStr,
|
|
|
+ v: AnyStr,
|
|
|
+ expires: Optional[AnyStr] = ...,
|
|
|
+ domain: Optional[AnyStr] = ...,
|
|
|
+ path: Optional[AnyStr] = ...,
|
|
|
+ max_age: Optional[AnyStr] = ...,
|
|
|
+ comment: Optional[AnyStr] = ...,
|
|
|
+ secure: Optional[bool] = ...,
|
|
|
+ ) -> None: ...
|
|
|
+ def setResponseCode(code: int, message: Optional[bytes] = ...) -> None: ...
|
|
|
+ def setHeader(k: AnyStr, v: AnyStr) -> None: ...
|
|
|
+ def redirect(url: AnyStr) -> None: ...
|
|
|
+ # returns http.CACHED or False. http.CACHED is a string constant, but we
|
|
|
+ # treat it as an opaque object, similar to UNKNOWN_LENGTH.
|
|
|
+ def setLastModified(when: float) -> object | Literal[False]: ...
|
|
|
+ def setETag(etag: str) -> object | Literal[False]: ...
|
|
|
+ def setHost(host: bytes, port: int, ssl: bool = ...) -> None: ...
|
|
|
+
|
|
|
+class IBodyProducer(IPushProducer):
|
|
|
+ # Length is either `int` or the opaque object UNKNOWN_LENGTH.
|
|
|
+ length: int | object
|
|
|
+ def startProducing(consumer: IConsumer) -> Deferred[None]: ...
|
|
|
+ def stopProducing() -> None: ...
|
|
|
+
|
|
|
+class IResponse(Interface):
|
|
|
+ version: Tuple[str, int, int]
|
|
|
+ code: int
|
|
|
+ phrase: str
|
|
|
+ headers: Headers
|
|
|
+ length: int | object
|
|
|
+ request: IClientRequest
|
|
|
+ previousResponse: Optional[IResponse]
|
|
|
+ def deliverBody(protocol: IProtocol) -> None: ...
|
|
|
+ def setPreviousResponse(response: IResponse) -> None: ...
|
|
|
+
|
|
|
+class IAgent(Interface):
|
|
|
+ def request(
|
|
|
+ method: bytes,
|
|
|
+ uri: bytes,
|
|
|
+ headers: Optional[Headers] = ...,
|
|
|
+ bodyProducer: Optional[IBodyProducer] = ...,
|
|
|
+ ) -> Deferred[IResponse]: ...
|
|
|
+
|
|
|
+class IPolicyForHTTPS(Interface):
|
|
|
+ def creatorForNetloc(
|
|
|
+ hostname: bytes, port: int
|
|
|
+ ) -> IOpenSSLClientConnectionCreator: ...
|
|
|
+
|
|
|
+class IAgentEndpointFactory(Interface):
|
|
|
+ def endpointForURI(uri: URI) -> IStreamClientEndpoint: ...
|
|
|
+
|
|
|
+UNKNOWN_LENGTH: object
|